Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437790
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
194 KB
Subscribers
None
View Options
diff --git a/PKG-INFO b/PKG-INFO
index 41a768eb..ed3ea4e0 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.storage
-Version: 0.0.37
+Version: 0.0.38
Summary: Software Heritage storage manager
Home-page: https://forge.softwareheritage.org/diffusion/DSTO/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/debian/control b/debian/control
index 2a9e2bd6..8c14e79d 100644
--- a/debian/control
+++ b/debian/control
@@ -1,22 +1,24 @@
Source: swh-storage
Maintainer: Software Heritage developers <swh-devel@inria.fr>
Section: python
Priority: optional
Build-Depends: debhelper (>= 9),
dh-python,
python3-all,
python3-dateutil,
python3-flask,
python3-nose,
python3-psycopg2,
python3-requests,
python3-setuptools,
python3-swh.core (>= 0.0.17~),
- python3-vcversioner
+ python3-vcversioner,
+ python3-swh.scheduler,
+ python3-click
Standards-Version: 3.9.6
-Homepage: https://forge.softwareheritage.org/diffusion/DCORE/
+Homepage: https://forge.softwareheritage.org/diffusion/DSTO/
Package: python3-swh.storage
Architecture: all
Depends: python3-swh.core (>= 0.0.17~), ${misc:Depends}, ${python3:Depends}
Description: Software Heritage storage utilities
diff --git a/doc/archiver-blueprint.md b/docs/archiver-blueprint.md
similarity index 100%
rename from doc/archiver-blueprint.md
rename to docs/archiver-blueprint.md
diff --git a/requirements.txt b/requirements.txt
index 4f10697e..faeaec3b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,12 +1,15 @@
dateutil
psycopg2
vcversioner
# remote storage API client
requests
# remote storage API server
flask
# Internal dependencies
swh.core >= 0.0.17
+
+click
+swh.scheduler
diff --git a/setup.py b/setup.py
index 292e7c22..145e3590 100644
--- a/setup.py
+++ b/setup.py
@@ -1,34 +1,41 @@
#!/usr/bin/env python3
from setuptools import setup
def parse_requirements():
requirements = []
with open('requirements.txt') as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith('#'):
continue
requirements.append(line)
return requirements
setup(
name='swh.storage',
description='Software Heritage storage manager',
author='Software Heritage developers',
author_email='swh-devel@inria.fr',
url='https://forge.softwareheritage.org/diffusion/DSTO/',
- packages=['swh.storage', 'swh.storage.api', 'swh.storage.tests'],
+ packages=[
+ 'swh.storage',
+ 'swh.storage.archiver',
+ 'swh.storage.api',
+ 'swh.storage.objstorage',
+ 'swh.storage.objstorage.api',
+ 'swh.storage.tests',
+ ],
scripts=[
'bin/swh-objstorage-add-dir',
'bin/swh-objstorage-fsck',
'bin/swh-storage-add-dir',
],
install_requires=parse_requirements(),
setup_requires=['vcversioner'],
vcversioner={},
include_package_data=True,
)
diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql
index b7a8307a..1704d680 100644
--- a/sql/swh-schema.sql
+++ b/sql/swh-schema.sql
@@ -1,428 +1,454 @@
---
--- Software Heritage Data Model
---
-- drop schema if exists swh cascade;
-- create schema swh;
-- set search_path to swh;
create table dbversion
(
version int primary key,
release timestamptz,
description text
);
insert into dbversion(version, release, description)
- values(68, now(), 'Work In Progress');
+ values(69, now(), 'Work In Progress');
-- a SHA1 checksum (not necessarily originating from Git)
create domain sha1 as bytea check (length(value) = 20);
-- a Git object ID, i.e., a SHA1 checksum
create domain sha1_git as bytea check (length(value) = 20);
-- a SHA256 checksum
create domain sha256 as bytea check (length(value) = 32);
-- UNIX path (absolute, relative, individual path component, etc.)
create domain unix_path as bytea;
-- a set of UNIX-like access permissions, as manipulated by, e.g., chmod
create domain file_perms as int;
create type content_status as enum ('absent', 'visible', 'hidden');
-- Checksums about actual file content. Note that the content itself is not
-- stored in the DB, but on external (key-value) storage. A single checksum is
-- used as key there, but the other can be used to verify that we do not inject
-- content collisions not knowingly.
create table content
(
sha1 sha1 primary key,
sha1_git sha1_git not null,
sha256 sha256 not null,
length bigint not null,
ctime timestamptz not null default now(),
-- creation time, i.e. time of (first) injection into the storage
status content_status not null default 'visible',
object_id bigserial
);
create unique index on content(sha1_git);
create unique index on content(sha256);
create index on content(ctime); -- TODO use a BRIN index here (postgres >= 9.5)
-- Entities constitute a typed hierarchy of organization, hosting
-- facilities, groups, people and software projects.
--
-- Examples of entities: Software Heritage, Debian, GNU, GitHub,
-- Apache, The Linux Foundation, the Debian Python Modules Team, the
-- torvalds GitHub user, the torvalds/linux GitHub project.
--
-- The data model is hierarchical (via the parent attribute) and might
-- store sub-branches of existing entities. The key feature of an
-- entity is might be *listed* (if it is available in listable_entity)
-- to retrieve information about its content, i.e: sub-entities,
-- projects, origins.
-- Types of entities.
--
-- - organization: a root entity, usually backed by a non-profit, a
-- company, or another kind of "association". (examples: Software
-- Heritage, Debian, GNU, GitHub)
--
-- - group_of_entities: used for hierarchies, doesn't need to have a
-- concrete existence. (examples: GNU hosting facilities, Debian
-- hosting facilities, GitHub users, ...)
--
-- - hosting: a hosting facility, can usually be listed to generate
-- other data. (examples: GitHub git hosting, alioth.debian.org,
-- snapshot.debian.org)
--
-- - group_of_persons: an entity representing a group of
-- persons. (examples: a GitHub organization, a Debian team)
--
-- - person: an entity representing a person. (examples:
-- a GitHub user, a Debian developer)
--
-- - project: an entity representing a software project. (examples: a
-- GitHub project, Apache httpd, a Debian source package, ...)
create type entity_type as enum (
'organization',
'group_of_entities',
'hosting',
'group_of_persons',
'person',
'project'
);
-- The history of entities. Allows us to keep historical metadata
-- about entities. The temporal invariant is the uuid. Root
-- organization uuids are manually generated (and available in
-- swh-data.sql).
--
-- For generated entities (generated = true), we can provide
-- generation_metadata to allow listers to retrieve the uuids of previous
-- iterations of the entity.
--
-- Inactive entities that have been active in the past (active =
-- false) should register the timestamp at which we saw them
-- deactivate, in a new entry of entity_history.
create table entity_history
(
id bigserial primary key,
uuid uuid,
parent uuid, -- should reference entity_history(uuid)
name text not null,
type entity_type not null,
description text,
homepage text,
active boolean not null, -- whether the entity was seen on the last listing
generated boolean not null, -- whether this entity has been generated by a lister
lister_metadata jsonb, -- lister-specific metadata, used for queries
metadata jsonb,
validity timestamptz[] -- timestamps at which we have seen this entity
);
create index on entity_history(uuid);
create index on entity_history(name);
-- The entity table provides a view of the latest information on a
-- given entity. It is updated via a trigger on entity_history.
create table entity
(
uuid uuid primary key,
parent uuid references entity(uuid) deferrable initially deferred,
name text not null,
type entity_type not null,
description text,
homepage text,
active boolean not null, -- whether the entity was seen on the last listing
generated boolean not null, -- whether this entity has been generated by a lister
lister_metadata jsonb, -- lister-specific metadata, used for queries
metadata jsonb,
last_seen timestamptz, -- last listing time or disappearance time for active=false
last_id bigint references entity_history(id) -- last listing id
);
create index on entity(name);
create index on entity using gin(lister_metadata jsonb_path_ops);
-- Register the equivalence between two entities. Allows sideways
-- navigation in the entity table
create table entity_equivalence
(
entity1 uuid references entity(uuid),
entity2 uuid references entity(uuid),
primary key (entity1, entity2),
constraint order_entities check (entity1 < entity2)
);
-- Register a lister for a specific entity.
create table listable_entity
(
uuid uuid references entity(uuid) primary key,
enabled boolean not null default true, -- do we list this entity automatically?
list_engine text, -- crawler to be used to list entity's content
list_url text, -- root URL to start the listing
list_params jsonb, -- org-specific listing parameter
latest_list timestamptz -- last time the entity's content has been listed
);
-- Log of all entity listings (i.e., entity crawling) that have been
-- done in the past, or are still ongoing.
create table list_history
(
id bigserial primary key,
entity uuid references listable_entity(uuid),
date timestamptz not null,
status boolean, -- true if and only if the listing has been successful
result jsonb, -- more detailed return value, depending on status
stdout text,
stderr text,
duration interval -- fetch duration of NULL if still ongoing
);
-- An origin is a place, identified by an URL, where software can be found. We
-- support different kinds of origins, e.g., git and other VCS repositories,
-- web pages that list tarballs URLs (e.g., http://www.kernel.org), indirect
-- tarball URLs (e.g., http://www.example.org/latest.tar.gz), etc. The key
-- feature of an origin is that it can be *fetched* (wget, git clone, svn
-- checkout, etc.) to retrieve all the contained software.
create table origin
(
id bigserial primary key,
type text, -- TODO use an enum here (?)
url text not null,
lister uuid references listable_entity(uuid),
project uuid references entity(uuid)
);
create index on origin(type, url);
-- Content we have seen but skipped for some reason. This table is
-- separate from the content table as we might not have the sha1
-- checksum of that data (for instance when we inject git
-- repositories, objects that are too big will be skipped here, and we
-- will only know their sha1_git). 'reason' contains the reason the
-- content was skipped. origin is a nullable column allowing to find
-- out which origin contains that skipped content.
create table skipped_content
(
sha1 sha1,
sha1_git sha1_git,
sha256 sha256,
length bigint not null,
ctime timestamptz not null default now(),
status content_status not null default 'absent',
reason text not null,
origin bigint references origin(id),
object_id bigserial,
unique (sha1, sha1_git, sha256)
);
-- those indexes support multiple NULL values.
create unique index on skipped_content(sha1);
create unique index on skipped_content(sha1_git);
create unique index on skipped_content(sha256);
-- Log of all origin fetches (i.e., origin crawling) that have been done in the
-- past, or are still ongoing. Similar to list_history, but for origins.
create table fetch_history
(
id bigserial primary key,
origin bigint references origin(id),
date timestamptz not null,
status boolean, -- true if and only if the fetch has been successful
result jsonb, -- more detailed returned values, times, etc...
stdout text,
stderr text, -- null when status is true, filled otherwise
duration interval -- fetch duration of NULL if still ongoing
);
-- A file-system directory. A directory is a list of directory entries (see
-- tables: directory_entry_{dir,file}).
--
-- To list the contents of a directory:
-- 1. list the contained directory_entry_dir using array dir_entries
-- 2. list the contained directory_entry_file using array file_entries
-- 3. list the contained directory_entry_rev using array rev_entries
-- 4. UNION
--
-- Synonyms/mappings:
-- * git: tree
create table directory
(
id sha1_git primary key,
dir_entries bigint[], -- sub-directories, reference directory_entry_dir
file_entries bigint[], -- contained files, reference directory_entry_file
rev_entries bigint[], -- mounted revisions, reference directory_entry_rev
object_id bigserial -- short object identifier
);
create index on directory using gin (dir_entries);
create index on directory using gin (file_entries);
create index on directory using gin (rev_entries);
-- A directory entry pointing to a sub-directory.
create table directory_entry_dir
(
id bigserial primary key,
target sha1_git, -- id of target directory
name unix_path, -- path name, relative to containing dir
perms file_perms -- unix-like permissions
);
create unique index on directory_entry_dir(target, name, perms);
-- A directory entry pointing to a file.
create table directory_entry_file
(
id bigserial primary key,
target sha1_git, -- id of target file
name unix_path, -- path name, relative to containing dir
perms file_perms -- unix-like permissions
);
create unique index on directory_entry_file(target, name, perms);
-- A directory entry pointing to a revision.
create table directory_entry_rev
(
id bigserial primary key,
target sha1_git, -- id of target revision
name unix_path, -- path name, relative to containing dir
perms file_perms -- unix-like permissions
);
create unique index on directory_entry_rev(target, name, perms);
create table person
(
id bigserial primary key,
fullname bytea not null, -- freeform specification; what is actually used in the checksums
-- will usually be of the form 'name <email>'
name bytea, -- advisory: not null if we managed to parse a name
email bytea -- advisory: not null if we managed to parse an email
);
create unique index on person(fullname);
create index on person(name);
create index on person(email);
create type revision_type as enum ('git', 'tar', 'dsc', 'svn');
-- the data object types stored in our data model
create type object_type as enum ('content', 'directory', 'revision', 'release');
-- A snapshot of a software project at a specific point in time.
--
-- Synonyms/mappings:
-- * git / subversion / etc: commit
-- * tarball: a specific tarball
--
-- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in
-- case of merges) parent revisions. Each revision points to a directory, i.e.,
-- a file-system tree containing files and directories.
create table revision
(
id sha1_git primary key,
date timestamptz,
date_offset smallint,
date_neg_utc_offset boolean,
committer_date timestamptz,
committer_date_offset smallint,
committer_date_neg_utc_offset boolean,
type revision_type not null,
directory sha1_git, -- file-system tree
message bytea,
author bigint references person(id),
committer bigint references person(id),
metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...)
synthetic boolean not null default false, -- true if synthetic (cf. swh-loader-tar)
object_id bigserial
);
create index on revision(directory);
-- either this table or the sha1_git[] column on the revision table
create table revision_history
(
id sha1_git references revision(id),
parent_id sha1_git,
parent_rank int not null default 0,
-- parent position in merge commits, 0-based
primary key (id, parent_rank)
);
create index on revision_history(parent_id);
-- The timestamps at which Software Heritage has made a visit of the given origin.
create table origin_visit
(
origin bigint not null references origin(id),
visit bigint not null,
date timestamptz not null,
primary key (origin, visit)
);
create index on origin_visit(date);
-- The content of software origins is indexed starting from top-level pointers
-- called "branches". Every time we fetch some origin we store in this table
-- where the branches pointed to at fetch time.
--
-- Synonyms/mappings:
-- * git: ref (in the "git update-ref" sense)
create table occurrence_history
(
origin bigint references origin(id) not null,
branch bytea not null, -- e.g., b"master" (for VCS), or b"sid" (for Debian)
target sha1_git not null, -- ref target, e.g., commit id
target_type object_type not null, -- ref target type
object_id bigserial not null, -- short object identifier
visits bigint[] not null, -- the visits where that occurrence was valid. References
-- origin_visit(visit), where o_h.origin = origin_visit.origin.
primary key (object_id)
);
create index on occurrence_history(target, target_type);
create index on occurrence_history(origin, branch);
create unique index on occurrence_history(origin, branch, target, target_type);
-- Materialized view of occurrence_history, storing the *current* value of each
-- branch, as last seen by SWH.
create table occurrence
(
origin bigint references origin(id) not null,
branch bytea not null,
target sha1_git not null,
target_type object_type not null,
primary key(origin, branch)
);
-- A "memorable" point in the development history of a project.
--
-- Synonyms/mappings:
-- * git: tag (of the annotated kind, otherwise they are just references)
-- * tarball: the release version number
create table release
(
id sha1_git primary key,
target sha1_git,
target_type object_type,
date timestamptz,
date_offset smallint,
date_neg_utc_offset boolean,
name bytea,
comment bytea,
author bigint references person(id),
synthetic boolean not null default false, -- true if synthetic (cf. swh-loader-tar)
object_id bigserial
);
create index on release(target, target_type);
+
+
+-- In order to archive the content of the object storage, add
+-- some tables to keep trace of what have already been archived.
+
+CREATE DOMAIN archive_id AS TEXT;
+
+CREATE TABLE archives (
+ id archive_id PRIMARY KEY,
+ url TEXT
+);
+
+CREATE TYPE archive_status AS ENUM (
+ 'missing',
+ 'ongoing',
+ 'present'
+);
+
+CREATE TABLE content_archive (
+ content_id sha1 REFERENCES content(sha1),
+ archive_id archive_id REFERENCES archives(id),
+ status archive_status,
+ mtime timestamptz,
+ PRIMARY KEY (content_id, archive_id)
+);
+
diff --git a/sql/upgrades/069.sql b/sql/upgrades/069.sql
new file mode 100644
index 00000000..6190c88d
--- /dev/null
+++ b/sql/upgrades/069.sql
@@ -0,0 +1,28 @@
+-- SWH DB schema upgrade
+-- from_version: 68
+-- to_version: 69
+-- description: add tables for the archiver.
+
+insert into dbversion(version, release, description)
+ values(69, now(), 'Work In Progress');
+
+CREATE DOMAIN archive_id AS TEXT;
+
+CREATE TABLE archives (
+ id archive_id PRIMARY KEY,
+ url TEXT
+);
+
+CREATE TYPE archive_status AS ENUM (
+ 'missing',
+ 'ongoing',
+ 'present'
+);
+
+CREATE TABLE content_archive (
+ content_id sha1 REFERENCES content(sha1),
+ archive_id archive_id REFERENCES archives(id),
+ status archive_status,
+ mtime timestamptz,
+ PRIMARY KEY (content_id, archive_id)
+);
diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO
index 41a768eb..ed3ea4e0 100644
--- a/swh.storage.egg-info/PKG-INFO
+++ b/swh.storage.egg-info/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.storage
-Version: 0.0.37
+Version: 0.0.38
Summary: Software Heritage storage manager
Home-page: https://forge.softwareheritage.org/diffusion/DSTO/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt
index a774771a..2dc3b4fc 100644
--- a/swh.storage.egg-info/SOURCES.txt
+++ b/swh.storage.egg-info/SOURCES.txt
@@ -1,126 +1,141 @@
.gitignore
AUTHORS
LICENSE
MANIFEST.in
Makefile
Makefile.local
README.db_testing
README.dev
requirements.txt
setup.py
version.txt
bin/swh-objstorage-add-dir
bin/swh-objstorage-fsck
bin/swh-storage-add-dir
debian/changelog
debian/compat
debian/control
debian/copyright
debian/rules
debian/source/format
-doc/archiver-blueprint.md
+docs/archiver-blueprint.md
sql/.gitignore
sql/Makefile
sql/TODO
sql/clusters.dot
sql/swh-data.sql
sql/swh-func.sql
sql/swh-init.sql
sql/swh-schema.sql
sql/bin/db-upgrade
sql/bin/dot_add_content
sql/doc/json
sql/doc/json/.gitignore
sql/doc/json/Makefile
sql/doc/json/entity.lister_metadata.schema.json
sql/doc/json/entity.metadata.schema.json
sql/doc/json/entity_history.lister_metadata.schema.json
sql/doc/json/entity_history.metadata.schema.json
sql/doc/json/fetch_history.result.schema.json
sql/doc/json/list_history.result.schema.json
sql/doc/json/listable_entity.list_params.schema.json
sql/doc/json/revision.metadata.schema.json
sql/json/.gitignore
sql/json/Makefile
sql/json/entity.lister_metadata.schema.json
sql/json/entity.metadata.schema.json
sql/json/entity_history.lister_metadata.schema.json
sql/json/entity_history.metadata.schema.json
sql/json/fetch_history.result.schema.json
sql/json/list_history.result.schema.json
sql/json/listable_entity.list_params.schema.json
sql/json/revision.metadata.schema.json
sql/upgrades/015.sql
sql/upgrades/016.sql
sql/upgrades/017.sql
sql/upgrades/018.sql
sql/upgrades/019.sql
sql/upgrades/020.sql
sql/upgrades/021.sql
sql/upgrades/022.sql
sql/upgrades/023.sql
sql/upgrades/024.sql
sql/upgrades/025.sql
sql/upgrades/026.sql
sql/upgrades/027.sql
sql/upgrades/028.sql
sql/upgrades/029.sql
sql/upgrades/030.sql
sql/upgrades/032.sql
sql/upgrades/033.sql
sql/upgrades/034.sql
sql/upgrades/035.sql
sql/upgrades/036.sql
sql/upgrades/037.sql
sql/upgrades/038.sql
sql/upgrades/039.sql
sql/upgrades/040.sql
sql/upgrades/041.sql
sql/upgrades/042.sql
sql/upgrades/043.sql
sql/upgrades/044.sql
sql/upgrades/045.sql
sql/upgrades/046.sql
sql/upgrades/047.sql
sql/upgrades/048.sql
sql/upgrades/049.sql
sql/upgrades/050.sql
sql/upgrades/051.sql
sql/upgrades/052.sql
sql/upgrades/053.sql
sql/upgrades/054.sql
sql/upgrades/055.sql
sql/upgrades/056.sql
sql/upgrades/057.sql
sql/upgrades/058.sql
sql/upgrades/059.sql
sql/upgrades/060.sql
sql/upgrades/061.sql
sql/upgrades/062.sql
sql/upgrades/063.sql
sql/upgrades/064.sql
sql/upgrades/065.sql
sql/upgrades/066.sql
sql/upgrades/067.sql
sql/upgrades/068.sql
+sql/upgrades/069.sql
swh.storage.egg-info/PKG-INFO
swh.storage.egg-info/SOURCES.txt
swh.storage.egg-info/dependency_links.txt
swh.storage.egg-info/requires.txt
swh.storage.egg-info/top_level.txt
swh/storage/__init__.py
swh/storage/converters.py
swh/storage/db.py
swh/storage/exc.py
-swh/storage/objstorage.py
swh/storage/storage.py
swh/storage/api/__init__.py
swh/storage/api/client.py
+swh/storage/api/common.py
swh/storage/api/server.py
+swh/storage/archiver/__init__.py
+swh/storage/archiver/copier.py
+swh/storage/archiver/director.py
+swh/storage/archiver/tasks.py
+swh/storage/archiver/worker.py
+swh/storage/objstorage/__init__.py
+swh/storage/objstorage/objstorage.py
+swh/storage/objstorage/api/__init__.py
+swh/storage/objstorage/api/client.py
+swh/storage/objstorage/api/server.py
+swh/storage/tests/manual_test_archiver.py
+swh/storage/tests/server_testing.py
swh/storage/tests/test_api_client.py
+swh/storage/tests/test_archiver.py
swh/storage/tests/test_converters.py
swh/storage/tests/test_db.py
swh/storage/tests/test_objstorage.py
+swh/storage/tests/test_objstorage_api.py
swh/storage/tests/test_storage.py
utils/dump_revisions.py
utils/fix_revisions_from_dump.py
\ No newline at end of file
diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt
index 8655d947..0ae88685 100644
--- a/swh.storage.egg-info/requires.txt
+++ b/swh.storage.egg-info/requires.txt
@@ -1,6 +1,8 @@
+click
dateutil
flask
psycopg2
requests
swh.core>=0.0.17
+swh.scheduler
vcversioner
diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py
index 937d7f6a..f8f9ad20 100644
--- a/swh/storage/api/client.py
+++ b/swh/storage/api/client.py
@@ -1,199 +1,178 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pickle
-
import requests
from requests.exceptions import ConnectionError
-from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
-from swh.storage.exc import StorageAPIError
-
-
-def encode_data(data):
- try:
- return msgpack_dumps(data)
- except OverflowError as e:
- raise ValueError('Limits were reached. Please, check your input.\n' +
- str(e))
-
-
-def decode_response(response):
- content_type = response.headers['content-type']
-
- if content_type.startswith('application/x-msgpack'):
- r = msgpack_loads(response.content)
- elif content_type.startswith('application/json'):
- r = response.json(cls=SWHJSONDecoder)
- else:
- raise ValueError('Wrong content type `%s` for API response'
- % content_type)
- return r
+from ..exc import StorageAPIError
+from ..api.common import (decode_response,
+ encode_data_client as encode_data)
class RemoteStorage():
"""Proxy to a remote storage API"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def url(self, endpoint):
return '%s%s' % (self.base_url, endpoint)
def post(self, endpoint, data):
try:
response = self.session.post(
self.url(endpoint),
data=encode_data(data),
headers={'content-type': 'application/x-msgpack'},
)
except ConnectionError as e:
print(str(e))
raise StorageAPIError(e)
# XXX: this breaks language-independence and should be
# replaced by proper unserialization
if response.status_code == 400:
raise pickle.loads(decode_response(response))
return decode_response(response)
def get(self, endpoint, data=None):
try:
response = self.session.get(
self.url(endpoint),
params=data,
)
except ConnectionError as e:
print(str(e))
raise StorageAPIError(e)
if response.status_code == 404:
return None
# XXX: this breaks language-independence and should be
# replaced by proper unserialization
if response.status_code == 400:
raise pickle.loads(decode_response(response))
else:
return decode_response(response)
def content_add(self, content):
return self.post('content/add', {'content': content})
def content_missing(self, content, key_hash='sha1'):
return self.post('content/missing', {'content': content,
'key_hash': key_hash})
def content_missing_per_sha1(self, contents):
return self.post('content/missing/sha1', {'contents': contents})
def content_get(self, content):
return self.post('content/data', {'content': content})
def content_find(self, content):
return self.post('content/present', {'content': content})
def content_find_occurrence(self, content):
return self.post('content/occurrence', {'content': content})
def directory_add(self, directories):
return self.post('directory/add', {'directories': directories})
def directory_missing(self, directories):
return self.post('directory/missing', {'directories': directories})
def directory_get(self, directories):
return self.post('directory', dict(directories=directories))
def directory_ls(self, directory, recursive=False):
return self.get('directory/ls', {'directory': directory,
'recursive': recursive})
def revision_get(self, revisions):
return self.post('revision', {'revisions': revisions})
def revision_get_by(self, origin_id, branch_name, timestamp, limit=None):
return self.post('revision/by', dict(origin_id=origin_id,
branch_name=branch_name,
timestamp=timestamp,
limit=limit))
def revision_log(self, revisions, limit=None):
return self.post('revision/log', {'revisions': revisions,
'limit': limit})
def revision_shortlog(self, revisions, limit=None):
return self.post('revision/shortlog', {'revisions': revisions,
'limit': limit})
def revision_add(self, revisions):
return self.post('revision/add', {'revisions': revisions})
def revision_missing(self, revisions):
return self.post('revision/missing', {'revisions': revisions})
def release_add(self, releases):
return self.post('release/add', {'releases': releases})
def release_get(self, releases):
return self.post('release', {'releases': releases})
def release_get_by(self, origin_id, limit=None):
return self.post('release/by', dict(origin_id=origin_id,
limit=limit))
def release_missing(self, releases):
return self.post('release/missing', {'releases': releases})
def object_find_by_sha1_git(self, ids):
return self.post('object/find_by_sha1_git', {'ids': ids})
def occurrence_get(self, origin_id):
return self.post('occurrence', {'origin_id': origin_id})
def occurrence_add(self, occurrences):
return self.post('occurrence/add', {'occurrences': occurrences})
def origin_get(self, origin):
return self.post('origin/get', {'origin': origin})
def origin_add_one(self, origin):
return self.post('origin/add', {'origin': origin})
def person_get(self, person):
return self.post('person', {'person': person})
def fetch_history_start(self, origin_id):
return self.post('fetch_history/start', {'origin_id': origin_id})
def fetch_history_end(self, fetch_history_id, data):
return self.post('fetch_history/end',
{'fetch_history_id': fetch_history_id,
'data': data})
def fetch_history_get(self, fetch_history_id):
return self.get('fetch_history', {'id': fetch_history_id})
def entity_add(self, entities):
return self.post('entity/add', {'entities': entities})
def entity_get(self, uuid):
return self.post('entity/get', {'uuid': uuid})
def entity_get_one(self, uuid):
return self.get('entity', {'uuid': uuid})
def entity_get_from_lister_metadata(self, entities):
return self.post('entity/from_lister_metadata', {'entities': entities})
def stat_counters(self):
return self.get('stat/counters')
def directory_entry_get_by_path(self, directory, paths):
return self.post('directory/path', dict(directory=directory,
paths=paths))
diff --git a/swh/storage/api/common.py b/swh/storage/api/common.py
new file mode 100644
index 00000000..328d8260
--- /dev/null
+++ b/swh/storage/api/common.py
@@ -0,0 +1,69 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import pickle
+
+from flask import Request, Response
+
+from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
+
+
+class BytesRequest(Request):
+ """Request with proper escaping of arbitrary byte sequences."""
+ encoding = 'utf-8'
+ encoding_errors = 'surrogateescape'
+
+
+def encode_data_server(data):
+ return Response(
+ msgpack_dumps(data),
+ mimetype='application/x-msgpack',
+ )
+
+
+def encode_data_client(data):
+ try:
+ return msgpack_dumps(data)
+ except OverflowError as e:
+ raise ValueError('Limits were reached. Please, check your input.\n' +
+ str(e))
+
+
+def decode_request(request):
+ content_type = request.mimetype
+ data = request.get_data()
+
+ if content_type == 'application/x-msgpack':
+ r = msgpack_loads(data)
+ elif content_type == 'application/json':
+ r = json.loads(data, cls=SWHJSONDecoder)
+ else:
+ raise ValueError('Wrong content type `%s` for API request'
+ % content_type)
+
+ return r
+
+
+def decode_response(response):
+ content_type = response.headers['content-type']
+
+ if content_type.startswith('application/x-msgpack'):
+ r = msgpack_loads(response.content)
+ elif content_type.startswith('application/json'):
+ r = response.json(cls=SWHJSONDecoder)
+ else:
+ raise ValueError('Wrong content type `%s` for API response'
+ % content_type)
+
+ return r
+
+
+def error_handler(exception, encoder):
+ # XXX: this breaks language-independence and should be
+ # replaced by proper serialization of errors
+ response = encoder(pickle.dumps(exception))
+ response.status_code = 400
+ return response
diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py
index bce183e7..324e9dd1 100644
--- a/swh/storage/api/server.py
+++ b/swh/storage/api/server.py
@@ -1,278 +1,255 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
-import pickle
+import click
-from flask import Flask, Request, Response, g, request
+from flask import Flask, g, request
from swh.core import config
-from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
from swh.storage import Storage
+from swh.storage.api.common import (BytesRequest, decode_request,
+ error_handler,
+ encode_data_server as encode_data)
DEFAULT_CONFIG = {
'db': ('str', 'dbname=softwareheritage-dev'),
'storage_base': ('str', '/tmp/swh-storage/test'),
}
-class BytesRequest(Request):
- """Request with proper escaping of arbitrary byte sequences."""
- encoding = 'utf-8'
- encoding_errors = 'surrogateescape'
-
-
app = Flask(__name__)
app.request_class = BytesRequest
-def encode_data(data):
- return Response(
- msgpack_dumps(data),
- mimetype='application/x-msgpack',
- )
-
-
-def decode_request(request):
- content_type = request.mimetype
- data = request.get_data()
-
- if content_type == 'application/x-msgpack':
- r = msgpack_loads(data)
- elif content_type == 'application/json':
- r = json.loads(data, cls=SWHJSONDecoder)
- else:
- raise ValueError('Wrong content type `%s` for API request'
- % content_type)
-
- return r
-
-
@app.errorhandler(Exception)
-def error_handler(exception):
- # XXX: this breaks language-independence and should be
- # replaced by proper serialization of errors
- response = encode_data(pickle.dumps(exception))
- response.status_code = 400
- return response
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.storage = Storage(app.config['db'], app.config['storage_base'])
@app.route('/')
def index():
return 'Hello'
@app.route('/content/missing', methods=['POST'])
def content_missing():
return encode_data(g.storage.content_missing(**decode_request(request)))
@app.route('/content/missing/sha1', methods=['POST'])
def content_missing_per_sha1():
return encode_data(g.storage.content_missing_per_sha1(
**decode_request(request)))
@app.route('/content/present', methods=['POST'])
def content_find():
return encode_data(g.storage.content_find(**decode_request(request)))
@app.route('/content/occurrence', methods=['POST'])
def content_find_occurrence():
res = g.storage.content_find_occurrence(**decode_request(request))
return encode_data(res)
@app.route('/content/add', methods=['POST'])
def content_add():
return encode_data(g.storage.content_add(**decode_request(request)))
@app.route('/content/data', methods=['POST'])
def content_get():
return encode_data(g.storage.content_get(**decode_request(request)))
@app.route('/directory', methods=['POST'])
def directory_get():
return encode_data(g.storage.directory_get(**decode_request(request)))
@app.route('/directory/missing', methods=['POST'])
def directory_missing():
return encode_data(g.storage.directory_missing(**decode_request(request)))
@app.route('/directory/add', methods=['POST'])
def directory_add():
return encode_data(g.storage.directory_add(**decode_request(request)))
@app.route('/directory/path', methods=['POST'])
def directory_entry_get_by_path():
return encode_data(g.storage.directory_entry_get_by_path(
**decode_request(request)))
@app.route('/directory/ls', methods=['GET'])
def directory_ls():
dir = request.args['directory'].encode('utf-8', 'surrogateescape')
rec = json.loads(request.args.get('recursive', 'False').lower())
return encode_data(g.storage.directory_ls(dir, recursive=rec))
@app.route('/revision/add', methods=['POST'])
def revision_add():
return encode_data(g.storage.revision_add(**decode_request(request)))
@app.route('/revision', methods=['POST'])
def revision_get():
return encode_data(g.storage.revision_get(**decode_request(request)))
@app.route('/revision/by', methods=['POST'])
def revision_get_by():
return encode_data(g.storage.revision_get_by(**decode_request(request)))
@app.route('/revision/log', methods=['POST'])
def revision_log():
return encode_data(g.storage.revision_log(**decode_request(request)))
@app.route('/revision/shortlog', methods=['POST'])
def revision_shortlog():
return encode_data(g.storage.revision_shortlog(**decode_request(request)))
@app.route('/revision/missing', methods=['POST'])
def revision_missing():
return encode_data(g.storage.revision_missing(**decode_request(request)))
@app.route('/release/add', methods=['POST'])
def release_add():
return encode_data(g.storage.release_add(**decode_request(request)))
@app.route('/release', methods=['POST'])
def release_get():
return encode_data(g.storage.release_get(**decode_request(request)))
@app.route('/release/by', methods=['POST'])
def release_get_by():
return encode_data(g.storage.release_get_by(**decode_request(request)))
@app.route('/release/missing', methods=['POST'])
def release_missing():
return encode_data(g.storage.release_missing(**decode_request(request)))
@app.route('/object/find_by_sha1_git', methods=['POST'])
def object_find_by_sha1_git():
return encode_data(g.storage.object_find_by_sha1_git(
**decode_request(request)))
@app.route('/occurrence', methods=['POST'])
def occurrence_get():
return encode_data(g.storage.occurrence_get(**decode_request(request)))
@app.route('/occurrence/add', methods=['POST'])
def occurrence_add():
return encode_data(g.storage.occurrence_add(**decode_request(request)))
@app.route('/origin/get', methods=['POST'])
def origin_get():
return encode_data(g.storage.origin_get(**decode_request(request)))
@app.route('/origin/add', methods=['POST'])
def origin_add_one():
return encode_data(g.storage.origin_add_one(**decode_request(request)))
@app.route('/person', methods=['POST'])
def person_get():
return encode_data(g.storage.person_get(**decode_request(request)))
@app.route('/fetch_history', methods=['GET'])
def fetch_history_get():
return encode_data(g.storage.fetch_history_get(request.args['id']))
@app.route('/fetch_history/start', methods=['POST'])
def fetch_history_start():
return encode_data(
g.storage.fetch_history_start(**decode_request(request)))
@app.route('/fetch_history/end', methods=['POST'])
def fetch_history_end():
return encode_data(
g.storage.fetch_history_end(**decode_request(request)))
@app.route('/entity/add', methods=['POST'])
def entity_add():
return encode_data(
g.storage.entity_add(**decode_request(request)))
@app.route('/entity/get', methods=['POST'])
def entity_get():
return encode_data(
g.storage.entity_get(**decode_request(request)))
@app.route('/entity', methods=['GET'])
def entity_get_one():
return encode_data(g.storage.entity_get_one(request.args['uuid']))
@app.route('/entity/from_lister_metadata', methods=['POST'])
def entity_from_lister_metadata():
return encode_data(
g.storage.entity_get_from_lister_metadata(**decode_request(request)))
@app.route('/stat/counters', methods=['GET'])
def stat_counters():
return encode_data(g.storage.stat_counters())
def run_from_webserver(environ, start_response):
"""Run the WSGI app from the webserver, loading the configuration."""
config_path = '/etc/softwareheritage/storage/storage.ini'
app.config.update(config.read(config_path, DEFAULT_CONFIG))
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app(environ, start_response)
-if __name__ == '__main__':
- import sys
+@click.command()
+@click.argument('config-path', required=1)
+@click.option('--host', default='0.0.0.0', help="Host to run the server")
+@click.option('--port', default=5000, type=click.INT,
+ help="Binding port of the server")
+@click.option('--debug/--nodebug', default=True,
+ help="Indicates if the server should run in debug mode")
+def launch(config_path, host, port, debug):
+ app.config.update(config.read(config_path, DEFAULT_CONFIG))
+ app.run(host, port=int(port), debug=bool(debug))
- app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG))
- host = sys.argv[2] if len(sys.argv) >= 3 else '127.0.0.1'
- app.run(host, debug=True)
+if __name__ == '__main__':
+ launch()
diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py
new file mode 100644
index 00000000..3507b4ea
--- /dev/null
+++ b/swh/storage/archiver/__init__.py
@@ -0,0 +1,3 @@
+from .director import ArchiverDirector # NOQA
+from .worker import ArchiverWorker # NOQA
+from .copier import ArchiverCopier # NOQA
diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py
new file mode 100644
index 00000000..988cfaf9
--- /dev/null
+++ b/swh/storage/archiver/copier.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.core import hashutil
+from ..objstorage.api.client import RemoteObjStorage
+
+
+class ArchiverCopier():
+ """ This archiver copy some files into a remote objstorage
+ in order to get a backup.
+
+ Attributes:
+ content_ids: A list of sha1's that represents the content this copier
+ has to archive.
+ server (RemoteArchive): The remote object storage that is used to
+ backup content.
+ master_storage (Storage): The master storage that contains the data
+ the copier needs to archive.
+ """
+ def __init__(self, destination, content, master_storage):
+ """ Create a Copier for the archiver
+
+ Args:
+ destination: A tuple (archive_name, archive_url) that represents a
+ remote object storage as in the 'archives' table.
+ content: A list of sha1 that represents the content this copier
+ have to archive.
+ master_storage (Storage): The master storage of the system that
+ contains the data to archive.
+ """
+ _name, self.url = destination
+ self.content_ids = content
+ self.server = RemoteObjStorage(self.url)
+ self.master_storage = master_storage
+
+ def run(self):
+ """ Do the copy on the backup storage.
+
+ Run the archiver copier in order to copy the required content
+ into the current destination.
+ The content which corresponds to the sha1 in self.content_ids
+ will be fetched from the master_storage and then copied into
+ the backup object storage.
+
+ Returns:
+ A boolean that indicates if the whole content have been copied.
+ """
+ self.content_ids = list(map(lambda x: hashutil.hex_to_hash(x[2:]),
+ self.content_ids))
+ contents = self.master_storage.content_get(self.content_ids)
+ try:
+ for content in contents:
+ content_data = content['data']
+ self.server.content_add(content_data)
+ except:
+ return False
+
+ return True
diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py
new file mode 100644
index 00000000..74003f0d
--- /dev/null
+++ b/swh/storage/archiver/director.py
@@ -0,0 +1,243 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import swh
+import logging
+import click
+
+from datetime import datetime
+
+from swh.core import hashutil, config
+from swh.scheduler.celery_backend.config import app
+from . import tasks # NOQA
+
+
+DEFAULT_CONFIG = {
+ 'objstorage_path': ('str', '/tmp/swh-storage/objects'),
+ 'batch_max_size': ('int', 50),
+ 'archival_max_age': ('int', 3600),
+ 'retention_policy': ('int', 2),
+ 'asynchronous': ('bool', True),
+
+ 'dbconn': ('str', 'dbname=softwareheritage-dev user=guest')
+}
+
+task_name = 'swh.storage.archiver.tasks.SWHArchiverTask'
+
+logger = logging.getLogger()
+
+
+class ArchiverDirector():
+ """Process the files in order to know which one is needed as backup.
+
+ The archiver director processes the files in the local storage in order
+ to know which one needs archival and it delegates this task to
+ archiver workers.
+
+ Attributes:
+ master_storage: the local storage of the master server.
+ slave_storages: Iterable of remote obj storages to the slaves servers
+ used for backup.
+ config: Archiver_configuration. A dictionary that must contain
+ the following keys.
+ objstorage_path (string): the path of the objstorage of the
+ master.
+ batch_max_size (int): The number of content items that can be
+ given to the same archiver worker.
+ archival_max_age (int): Delay given to the worker to copy all
+ the files in a given batch.
+ retention_policy (int): Required number of copies for the
+ content to be considered safe.
+ asynchronous (boolean): Indicate whenever the archival should
+ run in asynchronous mode or not.
+ """
+
+ def __init__(self, db_conn, config):
+ """ Constructor of the archiver director.
+
+ Args:
+ db_conn: db_conn: Either a libpq connection string,
+ or a psycopg2 connection.
+ config: Archiver_configuration. A dictionary that must contains
+ the following keys.
+ objstorage_path (string): the path of the objstorage of the
+ master.
+ batch_max_size (int): The number of content items that can be
+ given to the same archiver worker.
+ archival_max_age (int): Delay given to the worker to copy all
+ the files in a given batch.
+ retention_policy (int): Required number of copies for the
+ content to be considered safe.
+ asynchronous (boolean): Indicate whenever the archival should
+ run in asynchronous mode or not.
+ """
+ # Get the local storage of the master and remote ones for the slaves.
+ self.master_storage_args = [db_conn, config['objstorage_path']]
+ master_storage = swh.storage.get_storage('local_storage',
+ self.master_storage_args)
+ slaves = {
+ id: url
+ for id, url
+ in master_storage.db.archive_ls()
+ }
+
+ # TODO Database should be initialized somehow before going in
+ # production. For now, assumes that the database contains
+ # datas for all the current content.
+
+ self.master_storage = master_storage
+ self.slave_storages = slaves
+ self.config = config
+
+ def run(self):
+ """ Run the archiver director.
+
+ The archiver director will check all the contents of the archiver
+ database and do the required backup jobs.
+ """
+ if self.config['asynchronous']:
+ run_fn = self.run_async_worker
+ else:
+ run_fn = self.run_sync_worker
+ for batch in self.get_unarchived_content():
+ run_fn(batch)
+
+ def run_async_worker(self, batch):
+ """ Produce a worker that will be added to the task queue.
+ """
+ task = app.tasks[task_name]
+ task.delay(batch, self.master_storage_args,
+ self.slave_storages, self.config['retention_policy'])
+
+ def run_sync_worker(self, batch):
+ """ Run synchronously a worker on the given batch.
+ """
+ task = app.tasks[task_name]
+ task(batch, self.master_storage_args,
+ self.slave_storages, self.config)
+
+ def get_unarchived_content(self):
+ """ get all the contents that needs to be archived.
+
+ Yields:
+ A batch of contents. Batches are dictionaries which associates
+ a content id to the data about servers that contains it or not.
+
+ {'id1':
+ {'present': [('slave1', 'slave1_url')],
+ 'missing': [('slave2', 'slave2_url'),
+ ('slave3', 'slave3_url')]
+ },
+ 'id2':
+ {'present': [],
+ 'missing': [
+ ('slave1', 'slave1_url'),
+ ('slave2', 'slave2_url'),
+ ('slave3', 'slave3_url')
+ ]}
+ }
+
+ Where keys (idX) are sha1 of the content and (slaveX, slaveX_url)
+ are ids and urls of the storage slaves.
+
+ At least all the content that don't have enough copies on the
+ backups servers are distributed into these batches.
+ """
+ # Get the data about each content referenced into the archiver.
+ missing_copy = {}
+ for content_id in self.master_storage.db.content_archive_ls():
+ db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0])
+
+ # Fetch the datas about archival status of the content
+ backups = self.master_storage.db.content_archive_get(
+ content=db_content_id
+ )
+ for _content_id, server_id, status, mtime in backups:
+ virtual_status = self.get_virtual_status(status, mtime)
+ server_data = (server_id, self.slave_storages[server_id])
+
+ missing_copy.setdefault(
+ db_content_id,
+ {'present': [], 'missing': []}
+ ).setdefault(virtual_status, []).append(server_data)
+
+ # Check the content before archival.
+ try:
+ self.master_storage.objstorage.check(content_id[0])
+ except Exception as e:
+ # Exception can be Error or ObjNotFoundError.
+ logger.error(e)
+ # TODO Do something to restore the content?
+
+ if len(missing_copy) >= self.config['batch_max_size']:
+ yield missing_copy
+ missing_copy = {}
+
+ if len(missing_copy) > 0:
+ yield missing_copy
+
+ def get_virtual_status(self, status, mtime):
+ """ Compute the virtual presence of a content.
+
+ If the status is ongoing but the time is not elasped, the archiver
+ consider it will be present in the futur, and so consider it as
+ present.
+ However, if the time is elasped, the copy may have failed, so consider
+ the content as missing.
+
+ Arguments:
+ status (string): One of ('present', 'missing', 'ongoing'). The
+ status of the content.
+ mtime (datetime): Time at which the content have been updated for
+ the last time.
+
+ Returns:
+ The virtual status of the studied content, which is 'present' or
+ 'missing'.
+
+ Raises:
+ ValueError: if the status is not one 'present', 'missing'
+ or 'ongoing'
+ """
+ if status in ('present', 'missing'):
+ return status
+
+ # If the status is 'ongoing' but there is still time, another worker
+ # may still be on the task.
+ if status == 'ongoing':
+ mtime = mtime.replace(tzinfo=None)
+ elapsed = (datetime.now() - mtime).total_seconds()
+ if elapsed <= self.config['archival_max_age']:
+ return 'present'
+ else:
+ return 'missing'
+ else:
+ raise ValueError("status must be either 'present', 'missing' "
+ "or 'ongoing'")
+
+
+@click.command()
+@click.argument('config-path', required=1)
+@click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1],
+ help="Connection string for the database")
+@click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1],
+ help="Indicates if the archiver should run asynchronously")
+def launch(config_path, dbconn, async):
+ # The configuration have following priority :
+ # command line > file config > default config
+ cl_config = {
+ 'dbconn': dbconn,
+ 'asynchronous': async
+ }
+ conf = config.read(config_path, DEFAULT_CONFIG)
+ conf.update(cl_config)
+ # Create connection data and run the archiver.
+ archiver = ArchiverDirector(conf['dbconn'], conf)
+ logger.info("Starting an archival at", datetime.now())
+ archiver.run()
+
+
+if __name__ == '__main__':
+ launch()
diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py
new file mode 100644
index 00000000..0b7ce61e
--- /dev/null
+++ b/swh/storage/archiver/tasks.py
@@ -0,0 +1,20 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.scheduler.task import Task
+from .worker import ArchiverWorker
+
+
+class SWHArchiverTask(Task):
+ """ Main task that archive a batch of content.
+ """
+ task_queue = 'swh_storage_archive_worker'
+
+ def run(self, batch, master_storage_args,
+ slave_storages, config):
+ aw = ArchiverWorker(batch, master_storage_args,
+ slave_storages, config)
+ if aw.run():
+ self.log("Successful backup for a batch of size %s" % len(batch))
diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py
new file mode 100644
index 00000000..8fda96bc
--- /dev/null
+++ b/swh/storage/archiver/worker.py
@@ -0,0 +1,239 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import random
+import logging
+
+from .copier import ArchiverCopier
+from .. import get_storage
+
+from datetime import datetime
+
+logger = logging.getLogger()
+
+
+class ArchiverWorker():
+ """ Do the required backups on a given batch of contents.
+
+ Process the content of a content batch in order to do the needed backups on
+ the slaves servers.
+
+ Attributes:
+ batch: The content this worker has to archive, which is a dictionary
+ that associates a content's sha1 id to the list of servers where
+ the content is present or missing
+ (see ArchiverDirector::get_unarchived_content).
+ master_storage_args: The connection argument to initialize the
+ master storage with the db connection url & the object storage
+ path.
+ slave_storages: A map that associates server_id to the remote server.
+ config: Archiver_configuration. A dictionary that must contains
+ the following keys.
+ objstorage_path (string): the path of the objstorage of the
+ master.
+ batch_max_size (int): The number of content items that can be
+ given to the same archiver worker.
+ archival_max_age (int): Delay given to the worker to copy all
+ the files in a given batch.
+ retention_policy (int): Required number of copies for the
+ content to be considered safe.
+ asynchronous (boolean): Indicate whenever the archival should
+ run in asynchronous mode or not.
+ """
+ def __init__(self, batch, master_storage_args, slave_storages, config):
+ """ Constructor of the ArchiverWorker class.
+
+ Args:
+ batch: A batch of content, which is a dictionary that associates
+ a content's sha1 id to the list of servers where the content
+ is present.
+ master_storage_args: The master storage arguments.
+ slave_storages: A map that associates server_id to the remote
+ server.
+ config: Archiver_configuration. A dictionary that must contains
+ the following keys.
+ objstorage_path (string): the path of the objstorage of the
+ master.
+ batch_max_size (int): The number of content items that can be
+ given to the same archiver worker.
+ archival_max_age (int): Delay given to the worker to copy all
+ the files in a given batch.
+ retention_policy (int): Required number of copies for the
+ content to be considered safe.
+ asynchronous (boolean): Indicate whenever the archival should
+ run in asynchronous mode or not.
+ """
+ self.batch = batch
+ self.master_storage = get_storage('local_storage', master_storage_args)
+ self.slave_storages = slave_storages
+ self.config = config
+
+ def __choose_backup_servers(self, allowed_storage, backup_number):
+ """ Choose the slave servers for archival.
+
+ Choose the given amount of servers among those which don't already
+ contain a copy of the content.
+
+ Args:
+ allowed_storage: servers when the content is not already present.
+ backup_number (int): The number of servers we have to choose in
+ order to fullfill the objective.
+ """
+ # In case there is not enough backup servers to get all the backups
+ # we need, just do our best.
+ # TODO such situation can only be caused by an incorrect configuration
+ # setting. Do a verification previously.
+ backup_number = min(backup_number, len(allowed_storage))
+
+ # TODO Find a better (or a good) policy to choose the backup servers.
+ # The random choice should be equivalently distributed between
+ # servers for a great amount of data, but don't take care of servers
+ # capacities.
+ return random.sample(allowed_storage, backup_number)
+
+ def __get_archival_status(self, content_id, server):
+ """ Get the archival status of the required content.
+
+ Attributes:
+ content_id (string): Sha1 of the content.
+ server: Tuple (archive_id, archive_url) of the archive server.
+ Returns:
+ A dictionary that contains all the required data : 'content_id',
+ 'archive_id', 'status', and 'mtime'
+ """
+ t, = list(
+ self.master_storage.db.content_archive_get(content_id, server[0])
+ )
+ return {
+ 'content_id': t[0],
+ 'archive_id': t[1],
+ 'status': t[2],
+ 'mtime': t[3]
+ }
+
+ def __content_archive_update(self, content_id, archive_id,
+ new_status=None):
+ """ Update the status of a archive content and set it's mtime to now()
+
+ Change the last modification time of an archived content and change
+ its status to the given one.
+
+ Args:
+ content_id (string): The content id.
+ archive_id (string): The id of the concerned archive.
+ new_status (string): One of missing, ongoing or present, this
+ status will replace the previous one. If not given, the
+ function only changes the mtime of the content.
+ """
+ self.master_storage.db.content_archive_update(
+ content_id,
+ archive_id,
+ new_status
+ )
+
+ def need_archival(self, content, destination):
+ """ Indicates whenever a content need archivage.
+
+ Filter function that returns True if a given content
+ still require to be archived.
+
+ Args:
+ content (str): Sha1 of a content.
+ destination: Tuple (archive id, archive url).
+ """
+ archival_status = self.__get_archival_status(
+ content,
+ destination
+ )
+ status = archival_status['status']
+ mtime = archival_status['mtime']
+ # If the archive is already present, no need to backup.
+ if status == 'present':
+ return False
+ # If the content is ongoing but still have time, there is
+ # another worker working on this content.
+ elif status == 'ongoing':
+ mtime = mtime.replace(tzinfo=None)
+ elapsed = (datetime.now() - mtime).total_seconds()
+ if elapsed <= self.config['archival_max_age']:
+ return False
+ return True
+
+ def sort_content_by_archive(self):
+ """ Create a map {archive_server -> list of content)
+
+ Create a mapping that associate to a archive server all the
+ contents that needs to be archived in it by the current worker.
+
+ The map is in the form of :
+ {
+ (archive_1, archive_1_url): [content1, content2, content_3]
+ (archive_2, archive_2_url): [content1, content3]
+ }
+
+ Returns:
+ The created mapping.
+ """
+ slaves_copy = {}
+ for content_id in self.batch:
+ # Choose some servers to upload the content among the missing ones.
+ server_data = self.batch[content_id]
+ nb_present = len(server_data['present'])
+ nb_backup = self.config['retention_policy'] - nb_present
+ backup_servers = self.__choose_backup_servers(
+ server_data['missing'],
+ nb_backup
+ )
+ # Fill the map destination -> content to upload
+ for server in backup_servers:
+ slaves_copy.setdefault(server, []).append(content_id)
+ return slaves_copy
+
+ def run(self):
+ """ Do the task expected from the archiver worker.
+
+ Process the content in the batch, ensure that the elements still need
+ an archival, and spawn copiers to copy files in each destinations.
+ """
+ # Get a map (archive -> [contents])
+ slaves_copy = self.sort_content_by_archive()
+
+ # At this point, re-check the archival status in order to know if the
+ # job have been done by another worker.
+ for destination in slaves_copy:
+ # list() is needed because filter's result will be consumed twice.
+ slaves_copy[destination] = list(filter(
+ lambda content_id: self.need_archival(content_id, destination),
+ slaves_copy[destination]
+ ))
+ for content_id in slaves_copy[destination]:
+ self.__content_archive_update(content_id, destination[0],
+ new_status='ongoing')
+
+ # Spawn a copier for each destination
+ for destination in slaves_copy:
+ try:
+ self.run_copier(destination, slaves_copy[destination])
+ except:
+ logger.error('Unable to copy a batch to %s' % destination)
+
+ def run_copier(self, destination, contents):
+ """ Run a copier in order to archive the given contents
+
+ Upload the given contents to the given archive.
+ If the process fail, the whole content is considered uncopied
+ and remains 'ongoing', waiting to be rescheduled as there is
+ a delay.
+
+ Attributes:
+ destination: Tuple (archive_id, archive_url) of the destination.
+ contents: List of contents to archive.
+ """
+ ac = ArchiverCopier(destination, contents, self.master_storage)
+ if ac.run():
+ # Once the archival complete, update the database.
+ for content_id in contents:
+ self.__content_archive_update(content_id, destination[0],
+ new_status='present')
diff --git a/swh/storage/converters.py b/swh/storage/converters.py
index 4f82550c..9bf2a35a 100644
--- a/swh/storage/converters.py
+++ b/swh/storage/converters.py
@@ -1,360 +1,360 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import codecs
import datetime
import numbers
DEFAULT_AUTHOR = {
'fullname': None,
'name': None,
'email': None,
}
DEFAULT_DATE = {
'timestamp': None,
'offset': 0,
'neg_utc_offset': None,
}
def backslashescape_errors(exception):
if isinstance(exception, UnicodeDecodeError):
bad_data = exception.object[exception.start:exception.end]
escaped = ''.join(r'\x%02x' % x for x in bad_data)
return escaped, exception.end
return codecs.backslashreplace_errors(exception)
codecs.register_error('backslashescape', backslashescape_errors)
def decode_with_escape(value):
"""Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences
as \\x<hex value>. We also escape NUL bytes as they are invalid in JSON
strings.
"""
# escape backslashes
value = value.replace(b'\\', b'\\\\')
value = value.replace(b'\x00', b'\\x00')
return value.decode('utf-8', 'backslashescape')
def encode_with_unescape(value):
"""Encode an unicode string containing \\x<hex> backslash escapes"""
slices = []
start = 0
odd_backslashes = False
i = 0
while i < len(value):
if value[i] == '\\':
odd_backslashes = not odd_backslashes
else:
if odd_backslashes:
if value[i] != 'x':
raise ValueError('invalid escape for %r at position %d' %
(value, i-1))
slices.append(
value[start:i-1].replace('\\\\', '\\').encode('utf-8')
)
slices.append(bytes.fromhex(value[i+1:i+3]))
odd_backslashes = False
start = i = i + 3
continue
i += 1
slices.append(
value[start:i].replace('\\\\', '\\').encode('utf-8')
)
return b''.join(slices)
def author_to_db(author):
"""Convert a swh-model author to its DB representation.
Args: a swh-model compatible author
Returns:
a dict containing three keys: author, fullname and email
"""
if author is None:
return DEFAULT_AUTHOR
return author
def db_to_author(id, fullname, name, email):
"""Convert the DB representation of an author to a swh-model author.
Args:
id (long): the author's identifier
fullname (bytes): the author's fullname
name (bytes): the author's name
email (bytes): the author's email
Returns:
a dict with four keys: id, fullname, name and email, or None if the id
is None
"""
if id is None:
return None
return {
'id': id,
'fullname': fullname,
'name': name,
'email': email,
}
def git_headers_to_db(git_headers):
"""Convert git headers to their database representation.
We convert the bytes to unicode by decoding them into utf-8 and replacing
invalid utf-8 sequences with backslash escapes.
"""
ret = []
for key, values in git_headers:
if isinstance(values, list):
ret.append([key, [decode_with_escape(value) for value in values]])
else:
ret.append([key, decode_with_escape(values)])
return ret
def db_to_git_headers(db_git_headers):
ret = []
for key, values in db_git_headers:
if isinstance(values, list):
ret.append([key, [encode_with_unescape(value)
for value in values]])
else:
ret.append([key, encode_with_unescape(values)])
return ret
def db_to_date(date, offset, neg_utc_offset):
"""Convert the DB representation of a date to a swh-model compatible date.
Args:
date (datetime.datetime): a date pulled out of the database
offset (int): an integer number of minutes representing an UTC offset
neg_utc_offset (boolean): whether an utc offset is negative
Returns:
a dict with three keys:
timestamp: a timestamp from UTC
offset: the number of minutes since UTC
negative_utc: whether a null UTC offset is negative
"""
if date is None:
return None
return {
'timestamp': date.timestamp(),
'offset': offset,
'negative_utc': neg_utc_offset,
}
def date_to_db(date_offset):
"""Convert a swh-model date_offset to its DB representation.
Args: a swh-model compatible date_offset
Returns:
a dict with three keys:
timestamp: a date in ISO format
offset: the UTC offset in minutes
neg_utc_offset: a boolean indicating whether a null offset is
negative or positive.
"""
if date_offset is None:
return DEFAULT_DATE
if isinstance(date_offset, numbers.Real):
date_offset = datetime.datetime.fromtimestamp(date_offset,
tz=datetime.timezone.utc)
if isinstance(date_offset, datetime.datetime):
timestamp = date_offset
utcoffset = date_offset.utcoffset()
offset = int(utcoffset.total_seconds()) // 60
neg_utc_offset = False if offset == 0 else None
else:
if isinstance(date_offset['timestamp'], numbers.Real):
timestamp = datetime.datetime.fromtimestamp(
date_offset['timestamp'], tz=datetime.timezone.utc)
else:
timestamp = date_offset['timestamp']
offset = date_offset['offset']
neg_utc_offset = date_offset.get('negative_utc', None)
return {
'timestamp': timestamp.isoformat(),
'offset': offset,
'neg_utc_offset': neg_utc_offset,
}
def revision_to_db(revision):
"""Convert a swh-model revision to its database representation.
"""
author = author_to_db(revision['author'])
date = date_to_db(revision['date'])
committer = author_to_db(revision['committer'])
committer_date = date_to_db(revision['committer_date'])
metadata = revision['metadata']
- if metadata and 'extra_git_headers' in metadata:
+ if metadata and 'extra_headers' in metadata:
metadata = metadata.copy()
- extra_git_headers = git_headers_to_db(metadata['extra_git_headers'])
- metadata['extra_git_headers'] = extra_git_headers
+ extra_headers = git_headers_to_db(metadata['extra_headers'])
+ metadata['extra_headers'] = extra_headers
return {
'id': revision['id'],
'author_fullname': author['fullname'],
'author_name': author['name'],
'author_email': author['email'],
'date': date['timestamp'],
'date_offset': date['offset'],
'date_neg_utc_offset': date['neg_utc_offset'],
'committer_fullname': committer['fullname'],
'committer_name': committer['name'],
'committer_email': committer['email'],
'committer_date': committer_date['timestamp'],
'committer_date_offset': committer_date['offset'],
'committer_date_neg_utc_offset': committer_date['neg_utc_offset'],
'type': revision['type'],
'directory': revision['directory'],
'message': revision['message'],
'metadata': metadata,
'synthetic': revision['synthetic'],
'parents': [
{
'id': revision['id'],
'parent_id': parent,
'parent_rank': i,
} for i, parent in enumerate(revision['parents'])
],
}
def db_to_revision(db_revision):
"""Convert a database representation of a revision to its swh-model
representation."""
author = db_to_author(
db_revision['author_id'],
db_revision['author_fullname'],
db_revision['author_name'],
db_revision['author_email'],
)
date = db_to_date(
db_revision['date'],
db_revision['date_offset'],
db_revision['date_neg_utc_offset'],
)
committer = db_to_author(
db_revision['committer_id'],
db_revision['committer_fullname'],
db_revision['committer_name'],
db_revision['committer_email'],
)
committer_date = db_to_date(
db_revision['committer_date'],
db_revision['committer_date_offset'],
db_revision['committer_date_neg_utc_offset']
)
metadata = db_revision['metadata']
- if metadata and 'extra_git_headers' in metadata:
- extra_git_headers = db_to_git_headers(metadata['extra_git_headers'])
- metadata['extra_git_headers'] = extra_git_headers
+ if metadata and 'extra_headers' in metadata:
+ extra_headers = db_to_git_headers(metadata['extra_headers'])
+ metadata['extra_headers'] = extra_headers
parents = []
if 'parents' in db_revision:
for parent in db_revision['parents']:
if parent:
parents.append(parent)
return {
'id': db_revision['id'],
'author': author,
'date': date,
'committer': committer,
'committer_date': committer_date,
'type': db_revision['type'],
'directory': db_revision['directory'],
'message': db_revision['message'],
'metadata': metadata,
'synthetic': db_revision['synthetic'],
'parents': parents,
}
def release_to_db(release):
"""Convert a swh-model release to its database representation.
"""
author = author_to_db(release['author'])
date = date_to_db(release['date'])
return {
'id': release['id'],
'author_fullname': author['fullname'],
'author_name': author['name'],
'author_email': author['email'],
'date': date['timestamp'],
'date_offset': date['offset'],
'date_neg_utc_offset': date['neg_utc_offset'],
'name': release['name'],
'target': release['target'],
'target_type': release['target_type'],
'comment': release['message'],
'synthetic': release['synthetic'],
}
def db_to_release(db_release):
"""Convert a database representation of a release to its swh-model
representation.
"""
author = db_to_author(
db_release['author_id'],
db_release['author_fullname'],
db_release['author_name'],
db_release['author_email'],
)
date = db_to_date(
db_release['date'],
db_release['date_offset'],
db_release['date_neg_utc_offset']
)
return {
'author': author,
'date': date,
'id': db_release['id'],
'name': db_release['name'],
'message': db_release['comment'],
'synthetic': db_release['synthetic'],
'target': db_release['target'],
'target_type': db_release['target_type'],
}
diff --git a/swh/storage/db.py b/swh/storage/db.py
index 08597a66..14fb23f7 100644
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -1,626 +1,716 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import binascii
import datetime
import functools
import json
import psycopg2
import psycopg2.extras
import tempfile
from contextlib import contextmanager
TMP_CONTENT_TABLE = 'tmp_content'
psycopg2.extras.register_uuid()
def stored_procedure(stored_proc):
"""decorator to execute remote stored procedure, specified as argument
Generally, the body of the decorated function should be empty. If it is
not, the stored procedure will be executed first; the function body then.
"""
def wrap(meth):
@functools.wraps(meth)
def _meth(self, *args, **kwargs):
cur = kwargs.get('cur', None)
self._cursor(cur).execute('SELECT %s()' % stored_proc)
meth(self, *args, **kwargs)
return _meth
return wrap
def jsonize(value):
"""Convert a value to a psycopg2 JSON object if necessary"""
if isinstance(value, dict):
return psycopg2.extras.Json(value)
return value
def entry_to_bytes(entry):
"""Convert an entry coming from the database to bytes"""
if isinstance(entry, memoryview):
return entry.tobytes()
if isinstance(entry, list):
return [entry_to_bytes(value) for value in entry]
return entry
def line_to_bytes(line):
"""Convert a line coming from the database to bytes"""
if isinstance(line, dict):
return {k: entry_to_bytes(v) for k, v in line.items()}
return line.__class__(entry_to_bytes(entry) for entry in line)
def cursor_to_bytes(cursor):
"""Yield all the data from a cursor as bytes"""
yield from (line_to_bytes(line) for line in cursor)
class Db:
"""Proxy to the SWH DB, with wrappers around stored procedures
"""
@classmethod
def connect(cls, *args, **kwargs):
"""factory method to create a DB proxy
Accepts all arguments of psycopg2.connect; only some specific
possibilities are reported below.
Args:
connstring: libpq2 connection string
"""
conn = psycopg2.connect(*args, **kwargs)
return cls(conn)
def _cursor(self, cur_arg):
"""get a cursor: from cur_arg if given, or a fresh one otherwise
meant to avoid boilerplate if/then/else in methods that proxy stored
procedures
"""
if cur_arg is not None:
return cur_arg
# elif self.cur is not None:
# return self.cur
else:
return self.conn.cursor()
def __init__(self, conn):
"""create a DB proxy
Args:
conn: psycopg2 connection to the SWH DB
"""
self.conn = conn
@contextmanager
def transaction(self):
"""context manager to execute within a DB transaction
Yields:
a psycopg2 cursor
"""
with self.conn.cursor() as cur:
try:
yield cur
self.conn.commit()
except:
if not self.conn.closed:
self.conn.rollback()
raise
def mktemp(self, tblname, cur=None):
self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,))
def mktemp_dir_entry(self, entry_type, cur=None):
self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)',
(('directory_entry_%s' % entry_type),))
@stored_procedure('swh_mktemp_revision')
def mktemp_revision(self, cur=None): pass
@stored_procedure('swh_mktemp_release')
def mktemp_release(self, cur=None): pass
@stored_procedure('swh_mktemp_occurrence_history')
def mktemp_occurrence_history(self, cur=None): pass
@stored_procedure('swh_mktemp_entity_lister')
def mktemp_entity_lister(self, cur=None): pass
@stored_procedure('swh_mktemp_entity_history')
def mktemp_entity_history(self, cur=None): pass
@stored_procedure('swh_mktemp_bytea')
def mktemp_bytea(self, cur=None): pass
def copy_to(self, items, tblname, columns, cur=None, item_cb=None):
def escape(data):
if data is None:
return ''
if isinstance(data, bytes):
return '\\x%s' % binascii.hexlify(data).decode('ascii')
elif isinstance(data, str):
return '"%s"' % data.replace('"', '""')
elif isinstance(data, datetime.datetime):
# We escape twice to make sure the string generated by
# isoformat gets escaped
return escape(data.isoformat())
elif isinstance(data, dict):
return escape(json.dumps(data))
elif isinstance(data, list):
return escape("{%s}" % ','.join(escape(d) for d in data))
elif isinstance(data, psycopg2.extras.Range):
# We escape twice here too, so that we make sure
# everything gets passed to copy properly
return escape(
'%s%s,%s%s' % (
'[' if data.lower_inc else '(',
'-infinity' if data.lower_inf else escape(data.lower),
'infinity' if data.upper_inf else escape(data.upper),
']' if data.upper_inc else ')',
)
)
else:
# We don't escape here to make sure we pass literals properly
return str(data)
with tempfile.TemporaryFile('w+') as f:
for d in items:
if item_cb is not None:
item_cb(d)
line = [escape(d.get(k)) for k in columns]
f.write(','.join(line))
f.write('\n')
f.seek(0)
self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % (
tblname, ', '.join(columns)), f)
@stored_procedure('swh_content_add')
def content_add_from_temp(self, cur=None): pass
@stored_procedure('swh_directory_add')
def directory_add_from_temp(self, cur=None): pass
@stored_procedure('swh_skipped_content_add')
def skipped_content_add_from_temp(self, cur=None): pass
@stored_procedure('swh_revision_add')
def revision_add_from_temp(self, cur=None): pass
@stored_procedure('swh_release_add')
def release_add_from_temp(self, cur=None): pass
@stored_procedure('swh_occurrence_history_add')
def occurrence_history_add_from_temp(self, cur=None): pass
@stored_procedure('swh_entity_history_add')
def entity_history_add_from_temp(self, cur=None): pass
def store_tmp_bytea(self, ids, cur=None):
"""Store the given identifiers in a new tmp_bytea table"""
cur = self._cursor(cur)
self.mktemp_bytea(cur)
self.copy_to(({'id': elem} for elem in ids), 'tmp_bytea',
['id'], cur)
def content_missing_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("""SELECT sha1, sha1_git, sha256
FROM swh_content_missing()""")
yield from cursor_to_bytes(cur)
def content_missing_per_sha1_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("""SELECT *
FROM swh_content_missing_per_sha1()""")
yield from cursor_to_bytes(cur)
def skipped_content_missing_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("""SELECT sha1, sha1_git, sha256
FROM swh_skipped_content_missing()""")
yield from cursor_to_bytes(cur)
def occurrence_get(self, origin_id, cur=None):
"""Retrieve latest occurrence's information by origin_id.
"""
cur = self._cursor(cur)
cur.execute("""SELECT origin, branch, target, target_type,
(select max(date) from origin_visit
where origin=%s) as date
FROM occurrence
WHERE origin=%s
""",
(origin_id, origin_id))
yield from cursor_to_bytes(cur)
def content_find(self, sha1=None, sha1_git=None, sha256=None, cur=None):
"""Find the content optionally on a combination of the following
checksums sha1, sha1_git or sha256.
Args:
sha1: sha1 content
git_sha1: the sha1 computed `a la git` sha1 of the content
sha256: sha256 content
Returns:
The triplet (sha1, sha1_git, sha256) if found or None.
"""
cur = self._cursor(cur)
cur.execute("""SELECT sha1, sha1_git, sha256, length, ctime, status
FROM swh_content_find(%s, %s, %s)
LIMIT 1""", (sha1, sha1_git, sha256))
content = line_to_bytes(cur.fetchone())
if set(content) == {None}:
return None
else:
return content
def content_find_occurrence(self, sha1, cur=None):
"""Find one content's occurrence.
Args:
sha1: sha1 content
cur: cursor to use
Returns:
One occurrence for that particular sha1
"""
cur = self._cursor(cur)
cur.execute("""SELECT origin_type, origin_url, branch, target, target_type, path
FROM swh_content_find_occurrence(%s)
LIMIT 1""",
(sha1, ))
return line_to_bytes(cur.fetchone())
def directory_get_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute('''SELECT id, file_entries, dir_entries, rev_entries
FROM swh_directory_get()''')
yield from cursor_to_bytes(cur)
def directory_missing_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT * FROM swh_directory_missing()')
yield from cursor_to_bytes(cur)
def directory_walk_one(self, directory, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT * FROM swh_directory_walk_one(%s)', (directory,))
yield from cursor_to_bytes(cur)
def directory_walk(self, directory, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT * FROM swh_directory_walk(%s)', (directory,))
yield from cursor_to_bytes(cur)
def revision_missing_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT id FROM swh_revision_missing() as r(id)')
yield from cursor_to_bytes(cur)
revision_add_cols = [
'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date',
'committer_date_offset', 'committer_date_neg_utc_offset', 'type',
'directory', 'message', 'author_fullname', 'author_name',
'author_email', 'committer_fullname', 'committer_name',
'committer_email', 'metadata', 'synthetic',
]
revision_get_cols = revision_add_cols + [
'author_id', 'committer_id', 'parents']
def revision_get_from_temp(self, cur=None):
cur = self._cursor(cur)
query = 'SELECT %s FROM swh_revision_get()' % (
', '.join(self.revision_get_cols))
cur.execute(query)
yield from cursor_to_bytes(cur)
def revision_log(self, root_revisions, limit=None, cur=None):
cur = self._cursor(cur)
query = """SELECT %s
FROM swh_revision_log(%%s, %%s)
""" % ', '.join(self.revision_get_cols)
cur.execute(query, (root_revisions, limit))
yield from cursor_to_bytes(cur)
revision_shortlog_cols = ['id', 'parents']
def revision_shortlog(self, root_revisions, limit=None, cur=None):
cur = self._cursor(cur)
query = """SELECT %s
FROM swh_revision_list(%%s, %%s)
""" % ', '.join(self.revision_shortlog_cols)
cur.execute(query, (root_revisions, limit))
yield from cursor_to_bytes(cur)
def release_missing_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT id FROM swh_release_missing() as r(id)')
yield from cursor_to_bytes(cur)
object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id']
def object_find_by_sha1_git(self, ids, cur=None):
cur = self._cursor(cur)
self.store_tmp_bytea(ids, cur)
query = 'select %s from swh_object_find_by_sha1_git()' % (
', '.join(self.object_find_by_sha1_git_cols)
)
cur.execute(query)
yield from cursor_to_bytes(cur)
def stat_counters(self, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT * FROM swh_stat_counters()')
yield from cur
fetch_history_cols = ['origin', 'date', 'status', 'result', 'stdout',
'stderr', 'duration']
def create_fetch_history(self, fetch_history, cur=None):
"""Create a fetch_history entry with the data in fetch_history"""
cur = self._cursor(cur)
query = '''INSERT INTO fetch_history (%s)
VALUES (%s) RETURNING id''' % (
','.join(self.fetch_history_cols),
','.join(['%s'] * len(self.fetch_history_cols))
)
cur.execute(query, [fetch_history.get(col) for col in
self.fetch_history_cols])
return cur.fetchone()[0]
def get_fetch_history(self, fetch_history_id, cur=None):
"""Get a fetch_history entry with the given id"""
cur = self._cursor(cur)
query = '''SELECT %s FROM fetch_history WHERE id=%%s''' % (
', '.join(self.fetch_history_cols),
)
cur.execute(query, (fetch_history_id,))
data = cur.fetchone()
if not data:
return None
ret = {'id': fetch_history_id}
for i, col in enumerate(self.fetch_history_cols):
ret[col] = data[i]
return ret
def update_fetch_history(self, fetch_history, cur=None):
"""Update the fetch_history entry from the data in fetch_history"""
cur = self._cursor(cur)
query = '''UPDATE fetch_history
SET %s
WHERE id=%%s''' % (
','.join('%s=%%s' % col for col in self.fetch_history_cols)
)
cur.execute(query, [jsonize(fetch_history.get(col)) for col in
self.fetch_history_cols + ['id']])
base_entity_cols = ['uuid', 'parent', 'name', 'type',
'description', 'homepage', 'active',
'generated', 'lister_metadata',
'metadata']
entity_cols = base_entity_cols + ['last_seen', 'last_id']
entity_history_cols = base_entity_cols + ['id', 'validity']
def origin_add(self, type, url, cur=None):
"""Insert a new origin and return the new identifier."""
insert = """INSERT INTO origin (type, url) values (%s, %s)
RETURNING id"""
cur.execute(insert, (type, url))
return cur.fetchone()[0]
def origin_get_with(self, type, url, cur=None):
"""Retrieve the origin id from its type and url if found."""
cur = self._cursor(cur)
query = """SELECT id, type, url, lister, project
FROM origin
WHERE type=%s AND url=%s"""
cur.execute(query, (type, url))
data = cur.fetchone()
if data:
return line_to_bytes(data)
return None
def origin_get(self, id, cur=None):
"""Retrieve the origin per its identifier.
"""
cur = self._cursor(cur)
query = "SELECT id, type, url, lister, project FROM origin WHERE id=%s"
cur.execute(query, (id,))
data = cur.fetchone()
if data:
return line_to_bytes(data)
return None
person_cols = ['fullname', 'name', 'email']
person_get_cols = person_cols + ['id']
def person_add(self, person, cur=None):
"""Add a person identified by its name and email.
Returns:
The new person's id
"""
cur = self._cursor(cur)
query_new_person = '''\
INSERT INTO person(%s)
VALUES (%s)
RETURNING id''' % (
', '.join(self.person_cols),
', '.join('%s' for i in range(len(self.person_cols)))
)
cur.execute(query_new_person,
[person[col] for col in self.person_cols])
return cur.fetchone()[0]
def person_get(self, ids, cur=None):
"""Retrieve the persons identified by the list of ids.
"""
cur = self._cursor(cur)
query = """SELECT %s
FROM person
WHERE id IN %%s""" % ', '.join(self.person_get_cols)
cur.execute(query, (tuple(ids),))
yield from cursor_to_bytes(cur)
release_add_cols = [
'id', 'target', 'target_type', 'date', 'date_offset',
'date_neg_utc_offset', 'name', 'comment', 'synthetic',
'author_fullname', 'author_name', 'author_email',
]
release_get_cols = release_add_cols + ['author_id']
def release_get_from_temp(self, cur=None):
cur = self._cursor(cur)
query = '''
SELECT %s
FROM swh_release_get()
''' % ', '.join(self.release_get_cols)
cur.execute(query)
yield from cursor_to_bytes(cur)
def release_get_by(self,
origin_id,
limit=None,
cur=None):
"""Retrieve a release by occurrence criterion (only origin right now)
Args:
- origin_id: The origin to look for.
"""
cur = self._cursor(cur)
query = """
SELECT %s
FROM swh_release_get_by(%%s)
LIMIT %%s
""" % ', '.join(self.release_get_cols)
cur.execute(query, (origin_id, limit))
yield from cursor_to_bytes(cur)
def revision_get_by(self,
origin_id,
branch_name,
datetime,
limit=None,
cur=None):
"""Retrieve a revision by occurrence criterion.
Args:
- origin_id: The origin to look for
- branch_name: the branch name to look for
- datetime: the lower bound of timerange to look for.
- limit: limit number of results to return
The upper bound being now.
"""
cur = self._cursor(cur)
if branch_name and isinstance(branch_name, str):
branch_name = branch_name.encode('utf-8')
query = '''
SELECT %s
FROM swh_revision_get_by(%%s, %%s, %%s)
LIMIT %%s
''' % ', '.join(self.revision_get_cols)
cur.execute(query, (origin_id, branch_name, datetime, limit))
yield from cursor_to_bytes(cur)
def directory_entry_get_by_path(self, directory, paths, cur=None):
"""Retrieve a directory entry by path.
"""
cur = self._cursor(cur)
cur.execute("""SELECT dir_id, type, target, name, perms, status, sha1,
sha1_git, sha256
FROM swh_find_directory_entry_by_path(%s, %s)""",
(directory, paths))
data = cur.fetchone()
if set(data) == {None}:
return None
return line_to_bytes(data)
def entity_get(self, uuid, cur=None):
"""Retrieve the entity and its parent hierarchy chain per uuid.
"""
cur = self._cursor(cur)
cur.execute("""SELECT %s
FROM swh_entity_get(%%s)""" % (
', '.join(self.entity_cols)),
(uuid, ))
yield from cursor_to_bytes(cur)
def entity_get_one(self, uuid, cur=None):
"""Retrieve a single entity given its uuid.
"""
cur = self._cursor(cur)
cur.execute("""SELECT %s
FROM entity
WHERE uuid = %%s""" % (
', '.join(self.entity_cols)),
(uuid, ))
data = cur.fetchone()
if not data:
return None
return line_to_bytes(data)
+
+ def archive_ls(self, cur=None):
+ """ Get all the archives registered on the server.
+
+ Yields:
+ a tuple (server_id, server_url) for each archive server.
+ """
+ cur = self._cursor(cur)
+ cur.execute("""SELECT id, url
+ FROM archives
+ """)
+ yield from cursor_to_bytes(cur)
+
+ def content_archive_ls(self, cur=None):
+ """ Get the archival status of the content
+
+ Get an iterable over all the content that is referenced
+ in a backup server.
+
+ Yields:
+ the sha1 of each content referenced at least one time
+ in the database of archiveal status.
+ """
+ cur = self._cursor(cur)
+ cur.execute("""SELECT DISTINCT content_id
+ FROM content_archive""")
+ yield from cursor_to_bytes(cur)
+
+ def content_archive_get(self, content=None, archive=None, cur=None):
+ """ Get the archival status of a content in a specific server.
+
+ Retreive from the database the archival status of the given content
+ in the given archive server.
+
+ Args:
+ content: the sha1 of the content. May be None for any id.
+ archive: the database id of the server we're looking into
+ may be None for any server.
+
+ Yields:
+ A tuple (content_id, server_id, archival status, mtime, tzinfo).
+ """
+ query = """SELECT content_id, archive_id, status, mtime
+ FROM content_archive
+ """
+ conditions = []
+ if content:
+ conditions.append("content_id='%s'" % content)
+ if archive:
+ conditions.append("archive_id='%s'" % archive)
+
+ if conditions:
+ query = """%s
+ WHERE %s
+ """ % (query, ' and '.join(conditions))
+
+ cur = self._cursor(cur)
+ cur.execute(query)
+ yield from cursor_to_bytes(cur)
+
+ def content_archive_update(self, content_id, archive_id,
+ new_status=None, cur=None):
+ """ Update the status of a archive content and set it's mtime to now()
+
+ Change the last modification time of an archived content and change
+ its status to the given one.
+
+ Args:
+ content_id (string): The content id.
+ archive_id (string): The id of the concerned archive.
+ new_status (string): One of missing, ongoing or present, this
+ status will replace the previous one. If not given, the
+ function only changes the mtime of the content.
+ """
+ query = """UPDATE content_archive
+ SET %(fields)s
+ WHERE content_id='%(content_id)s'
+ and archive_id='%(archive_id)s'
+ """
+ fields = []
+ if new_status:
+ fields.append("status='%s'" % new_status)
+ fields.append("mtime=now()")
+
+ d = {'fields': ', '.join(fields),
+ 'content_id': content_id,
+ 'archive_id': archive_id}
+
+ cur = self._cursor(cur)
+ cur.execute(query % d)
diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py
new file mode 100644
index 00000000..6ce4572f
--- /dev/null
+++ b/swh/storage/objstorage/__init__.py
@@ -0,0 +1 @@
+from .objstorage import ObjStorage, DIR_MODE, FILE_MODE # NOQA
diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py
new file mode 100644
index 00000000..8f030865
--- /dev/null
+++ b/swh/storage/objstorage/api/client.py
@@ -0,0 +1,92 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+import pickle
+
+import requests
+
+from requests.exceptions import ConnectionError
+from ...exc import StorageAPIError
+from ...api.common import (decode_response,
+ encode_data_client as encode_data)
+
+
+class RemoteObjStorage():
+ """ Proxy to a remote object storage.
+
+ This class allows to connect to an object storage server via
+ http protocol.
+
+ Attributes:
+ base_url (string): The url of the server to connect. Must end
+ with a '/'
+ session: The session to send requests.
+ """
+ def __init__(self, base_url):
+ self.base_url = base_url
+ self.session = requests.Session()
+
+ def url(self, endpoint):
+ return '%s%s' % (self.base_url, endpoint)
+
+ def post(self, endpoint, data):
+ try:
+ response = self.session.post(
+ self.url(endpoint),
+ data=encode_data(data),
+ headers={'content-type': 'application/x-msgpack'},
+ )
+ except ConnectionError as e:
+ print(str(e))
+ raise StorageAPIError(e)
+
+ # XXX: this breaks language-independence and should be
+ # replaced by proper unserialization
+ if response.status_code == 400:
+ raise pickle.loads(decode_response(response))
+
+ return decode_response(response)
+
+ def content_add(self, bytes, obj_id=None):
+ """ Add a new object to the object storage.
+
+ Args:
+ bytes: content of the object to be added to the storage.
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+
+ """
+ return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id})
+
+ def content_get(self, obj_id):
+ """ Retrieve the content of a given object.
+
+ Args:
+ obj_id: The id of the object.
+
+ Returns:
+ The content of the requested objects as bytes.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing
+ """
+ return self.post('content/get', {'obj_id': obj_id})
+
+ def content_check(self, obj_id):
+ """ Integrity check for a given object
+
+ verify that the file object is in place, and that the gzipped content
+ matches the object id
+
+ Args:
+ obj_id: The id of the object.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing
+ Error: if the requested object is corrupt
+ """
+ self.post('content/check', {'obj_id': obj_id})
diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py
new file mode 100644
index 00000000..030a721d
--- /dev/null
+++ b/swh/storage/objstorage/api/server.py
@@ -0,0 +1,74 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+
+from flask import Flask, g, request
+
+from swh.core import config
+from swh.storage.objstorage import ObjStorage
+from swh.storage.api.common import (BytesRequest, decode_request,
+ error_handler,
+ encode_data_server as encode_data)
+
+DEFAULT_CONFIG = {
+ 'storage_base': ('str', '/tmp/swh-storage/objects/'),
+ 'storage_depth': ('int', 3)
+}
+
+app = Flask(__name__)
+app.request_class = BytesRequest
+
+
+@app.errorhandler(Exception)
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
+
+
+@app.before_request
+def before_request():
+ g.objstorage = ObjStorage(app.config['storage_base'],
+ app.config['storage_depth'])
+
+
+@app.route('/')
+def index():
+ return "Helloworld!"
+
+
+@app.route('/content')
+def content():
+ return str(list(g.storage))
+
+
+@app.route('/content/add', methods=['POST'])
+def add_bytes():
+ return encode_data(g.objstorage.add_bytes(**decode_request(request)))
+
+
+@app.route('/content/get', methods=['POST'])
+def get_bytes():
+ return encode_data(g.objstorage.get_bytes(**decode_request(request)))
+
+
+@app.route('/content/check', methods=['POST'])
+def check():
+ return encode_data(g.objstorage.check(**decode_request(request)))
+
+
+@click.command()
+@click.argument('config-path', required=1)
+@click.option('--host', default='0.0.0.0', help="Host to run the server")
+@click.option('--port', default=5000, type=click.INT,
+ help="Binding port of the server")
+@click.option('--debug/--nodebug', default=True,
+ help="Indicates if the server should run in debug mode")
+def launch(config_path, host, port, debug):
+ app.config.update(config.read(config_path, DEFAULT_CONFIG))
+ app.run(host, port=int(port), debug=bool(debug))
+
+
+if __name__ == '__main__':
+ launch()
diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage/objstorage.py
similarity index 99%
rename from swh/storage/objstorage.py
rename to swh/storage/objstorage/objstorage.py
index 66eba6f0..1f78de0c 100644
--- a/swh/storage/objstorage.py
+++ b/swh/storage/objstorage/objstorage.py
@@ -1,386 +1,386 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import gzip
import os
import shutil
import tempfile
from contextlib import contextmanager
-from .exc import ObjNotFoundError, Error
+from ..exc import ObjNotFoundError, Error
from swh.core import hashutil
ID_HASH_ALGO = 'sha1'
# ID_HASH_ALGO = 'sha1_git'
GZIP_BUFSIZ = 1048576
DIR_MODE = 0o755
FILE_MODE = 0o644
def _obj_dir(hex_obj_id, root_dir, depth):
"""compute the storage directory of an object
Args:
hex_obj_id: object id as hexlified string
root_dir: object storage root directory
depth: slicing depth of object IDs in the storage
see also: `_obj_path`
"""
if len(hex_obj_id) < depth * 2:
raise ValueError('object id "%s" is too short for slicing at depth %d'
% (hex_obj_id, depth))
# compute [depth] substrings of [obj_id], each of length 2, starting from
# the beginning
id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)]
steps = [root_dir] + id_steps
return os.path.join(*steps)
def _obj_path(hex_obj_id, root_dir, depth):
"""similar to `obj_dir`, but also include the actual object file name in the
returned path
"""
return os.path.join(_obj_dir(hex_obj_id, root_dir, depth), hex_obj_id)
@contextmanager
def _write_obj_file(hex_obj_id, root_dir, depth):
"""context manager for writing object files to the object storage
During writing data are written to a temporary file, which is atomically
renamed to the right file name after closing. This context manager also
takes care of (gzip) compressing the data on the fly.
Yields:
a file-like object open for writing bytes
Sample usage:
with _write_obj_file(hex_obj_id, root_dir, depth) as f:
f.write(obj_data)
"""
dir = _obj_dir(hex_obj_id, root_dir, depth)
if not os.path.isdir(dir):
os.makedirs(dir, DIR_MODE, exist_ok=True)
path = os.path.join(dir, hex_obj_id)
(tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.',
dir=dir)
tmp_f = os.fdopen(tmp, 'wb')
with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f:
yield f
tmp_f.close()
os.chmod(tmp_path, FILE_MODE)
os.rename(tmp_path, path)
class ObjStorage:
"""high-level API to manipulate the Software Heritage object storage
Conceptually, the object storage offers 4 methods:
- add() add a new object, returning an object id
- __contains__() check if an object is present, by object id
- get() retrieve the content of an object, by object id
- check() check the integrity of an object, by object id
Variants of the above methods are implemented by this class, depending on
how the content of an object is specified (bytes, file-like object, etc.).
On disk, an object storage is a directory tree containing files named after
their object IDs. An object ID is a checksum of its content, depending on
the value of the ID_HASH_ALGO constant (see hashutil for its meaning).
To avoid directories that contain too many files, the object storage has a
given depth (default: 3). Each depth level consumes two characters of the
object id. So for instance a file with (git) SHA1 of
34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in an object
storage configured at depth 3 at
34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689.
The actual files in the storage are stored in gzipped compressed format.
Each file can hence be self-verified (on the shell) with something like:
actual_id=34973274ccef6ab4dfaaf86599792fa9c3fe4689
expected_id=$(zcat $filename | sha1sum | cut -f 1 -d' ')
if [ $actual_id != $expected_id ] ; then
echo "AYEEE, invalid object $actual_id /o\"
fi
"""
def __init__(self, root, depth=3):
"""create a proxy object to the object storage
Args:
root: object storage root directory
depth: slicing depth of object IDs in the storage
"""
if not os.path.isdir(root):
raise ValueError('obj storage root "%s" is not a directory'
% root)
self._root_dir = root
self._depth = depth
self._temp_dir = os.path.join(root, 'tmp')
if not os.path.isdir(self._temp_dir):
os.makedirs(self._temp_dir, DIR_MODE, exist_ok=True)
def __obj_dir(self, hex_obj_id):
"""_obj_dir wrapper using this storage configuration"""
return _obj_dir(hex_obj_id, self._root_dir, self._depth)
def __obj_path(self, hex_obj_id):
"""_obj_path wrapper using this storage configuration"""
return _obj_path(hex_obj_id, self._root_dir, self._depth)
def __contains__(self, obj_id):
"""check whether a given object id is present in the storage or not
Return:
True iff the object id is present in the storage
"""
hex_obj_id = hashutil.hash_to_hex(obj_id)
return os.path.exists(_obj_path(hex_obj_id, self._root_dir,
self._depth))
def add_bytes(self, bytes, obj_id=None):
"""add a new object to the object storage
Args:
bytes: content of the object to be added to the storage
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
given, obj_id will be trusted to match bytes. If missing,
obj_id will be computed on the fly.
"""
if obj_id is None:
# missing checksum, compute it in memory and write to file
h = hashutil._new_hash(ID_HASH_ALGO, len(bytes))
h.update(bytes)
obj_id = h.digest()
if obj_id in self:
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
# object is either absent, or present but overwrite is requested
with _write_obj_file(hex_obj_id,
root_dir=self._root_dir,
depth=self._depth) as f:
f.write(bytes)
return obj_id
def add_file(self, f, length, obj_id=None):
"""similar to `add_bytes`, but add the content of file-like object f to the
object storage
add_file will read the file content only once, and avoid storing all of
it in memory
"""
if obj_id is None:
# unknkown object id: work on temp file, compute checksum as we go,
# mv temp file into place
(tmp, tmp_path) = tempfile.mkstemp(dir=self._temp_dir)
try:
t = os.fdopen(tmp, 'wb')
tz = gzip.GzipFile(fileobj=t)
sums = hashutil._hash_file_obj(f, length,
algorithms=[ID_HASH_ALGO],
chunk_cb=lambda b: tz.write(b))
tz.close()
t.close()
obj_id = sums[ID_HASH_ALGO]
if obj_id in self:
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
dir = self.__obj_dir(hex_obj_id)
if not os.path.isdir(dir):
os.makedirs(dir, DIR_MODE, exist_ok=True)
path = os.path.join(dir, hex_obj_id)
os.chmod(tmp_path, FILE_MODE)
os.rename(tmp_path, path)
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
else:
# known object id: write to .new file, rename
if obj_id in self:
return obj_id
hex_obj_id = hashutil.hash_to_hex(obj_id)
with _write_obj_file(hex_obj_id,
root_dir=self._root_dir,
depth=self._depth) as obj:
shutil.copyfileobj(f, obj)
return obj_id
@contextmanager
def get_file_obj(self, obj_id):
"""context manager to read the content of an object
Args:
obj_id: object id
Yields:
a file-like object open for reading (bytes)
Raises:
ObjNotFoundError: if the requested object is missing
Sample usage:
with objstorage.get_file_obj(obj_id) as f:
do_something(f.read())
"""
if obj_id not in self:
raise ObjNotFoundError(obj_id)
hex_obj_id = hashutil.hash_to_hex(obj_id)
path = self.__obj_path(hex_obj_id)
with gzip.GzipFile(path, 'rb') as f:
yield f
def get_bytes(self, obj_id):
"""retrieve the content of a given object
Args:
obj_id: object id
Returns:
the content of the requested objects as bytes
Raises:
ObjNotFoundError: if the requested object is missing
"""
with self.get_file_obj(obj_id) as f:
return f.read()
def _get_file_path(self, obj_id):
"""retrieve the path of a given object in the objects storage
Note that the path point to a gzip-compressed file, so you need
gzip.open() or equivalent to get the actual object content.
Args:
obj_id: object id
Returns:
a file path pointing into the object storage
Raises:
ObjNotFoundError: if the requested object is missing
"""
if obj_id not in self:
raise ObjNotFoundError(obj_id)
hex_obj_id = hashutil.hash_to_hex(obj_id)
return self.__obj_path(hex_obj_id)
def check(self, obj_id):
"""integrity check for a given object
verify that the file object is in place, and that the gzipped content
matches the object id
Args:
obj_id: object id
Raises:
ObjNotFoundError: if the requested object is missing
Error: if the requested object is corrupt
"""
if obj_id not in self:
raise ObjNotFoundError(obj_id)
hex_obj_id = hashutil.hash_to_hex(obj_id)
try:
with gzip.open(self.__obj_path(hex_obj_id)) as f:
length = None
if ID_HASH_ALGO.endswith('_git'):
# if the hashing algorithm is git-like, we need to know the
# content size to hash on the fly. Do a first pass here to
# compute the size
length = 0
while True:
chunk = f.read(GZIP_BUFSIZ)
length += len(chunk)
if not chunk:
break
f.rewind()
checksums = hashutil._hash_file_obj(f, length,
algorithms=[ID_HASH_ALGO])
actual_obj_id = checksums[ID_HASH_ALGO]
if obj_id != actual_obj_id:
raise Error('corrupt object %s should have id %s' %
(obj_id, actual_obj_id))
except (OSError, IOError):
# IOError is for compatibility with older python versions
raise Error('corrupt object %s is not a gzip file' % obj_id)
def __iter__(self):
"""iterate over the object identifiers currently available in the storage
Warning: with the current implementation of the object storage, this
method will walk the filesystem to list objects, meaning that listing
all objects will be very slow for large storages. You almost certainly
don't want to use this method in production.
Return:
iterator over object IDs
"""
def obj_iterator():
# XXX hackish: it does not verify that the depth of found files
# matches the slicing depth of the storage
for root, _dirs, files in os.walk(self._root_dir):
for f in files:
yield bytes.fromhex(f)
return obj_iterator()
def __len__(self):
"""compute the number of objects available in the storage
Warning: this currently uses `__iter__`, its warning about bad
performances applies
Return:
number of objects contained in the storage
"""
return sum(1 for i in self)
diff --git a/swh/storage/tests/manual_test_archiver.py b/swh/storage/tests/manual_test_archiver.py
new file mode 100644
index 00000000..26d4f4cc
--- /dev/null
+++ b/swh/storage/tests/manual_test_archiver.py
@@ -0,0 +1,95 @@
+import string
+import random
+
+from swh.core import hashutil
+from swh.storage import Storage
+from swh.storage.db import cursor_to_bytes
+
+from swh.storage.archiver import ArchiverDirector
+
+
+def rs(size=6, chars=string.ascii_uppercase + string.ascii_lowercase):
+ return ''.join(random.choice(chars) for _ in range(size))
+
+
+def mc(data):
+ data = bytes(data, 'utf8')
+ content = hashutil.hashdata(data)
+ content.update({'data': data})
+ return content
+
+
+def initialize_content_archive(db, sample_size, names=['Local']):
+ """ Initialize the content_archive table with a sample.
+
+ From the content table, get a sample of id, and fill the
+ content_archive table with those id in order to create a test sample
+ for the archiver.
+
+ Args:
+ db: The database of the storage.
+ sample_size (int): The size of the sample to create.
+ names: A list of archive names. Those archives must already exists.
+ Archival status of the archives content will be erased on db.
+
+ Returns:
+ Tha amount of entry created.
+ """
+ with db.transaction() as cur:
+ cur.execute('DELETE FROM content_archive')
+
+ with db.transaction() as cur:
+ cur.execute('SELECT sha1 from content limit %d' % sample_size)
+ ids = list(cursor_to_bytes(cur))
+
+ for id, in ids:
+ tid = r'\x' + hashutil.hash_to_hex(id)
+
+ with db.transaction() as cur:
+ for name in names:
+ s = """INSERT INTO content_archive
+ VALUES('%s'::sha1, '%s', 'missing', now())
+ """ % (tid, name)
+ cur.execute(s)
+
+ print('Initialized database with', sample_size * len(names), 'items')
+ return sample_size * len(names)
+
+
+def clean():
+ # Clean all
+ with loc.db.transaction() as cur:
+ cur.execute('delete from content_archive')
+ cur.execute('delete from content')
+ import os
+ os.system("rm -r /tmp/swh/storage-dev/2/*")
+
+
+CONTENT_SIZE = 10
+
+if __name__ == '__main__':
+ random.seed(0)
+ # Local database
+ dbname = 'softwareheritage-dev'
+ user = 'qcampos'
+ cstring = 'dbname=%s user=%s' % (dbname, user)
+ # Archiver config
+ config = {
+ 'objstorage_path': '/tmp/swh/storage-dev/2',
+ 'archival_max_age': 3600,
+ 'batch_max_size': 10,
+ 'retention_policy': 1,
+ 'asynchronous': False
+ }
+
+ # Grand-palais's storage
+ loc = Storage(cstring, config['objstorage_path'])
+
+ # Add the content
+ l = [mc(rs(100)) for _ in range(CONTENT_SIZE)]
+ loc.content_add(l)
+ initialize_content_archive(loc.db, CONTENT_SIZE, ['petit-palais'])
+
+ # Launch the archiver
+ archiver = ArchiverDirector(cstring, config)
+ archiver.run()
diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/server_testing.py
similarity index 59%
copy from swh/storage/tests/test_api_client.py
copy to swh/storage/tests/server_testing.py
index 2af6b875..61277d30 100644
--- a/swh/storage/tests/test_api_client.py
+++ b/swh/storage/tests/server_testing.py
@@ -1,77 +1,80 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import multiprocessing
import socket
import time
-import unittest
+
from urllib.request import urlopen
-from swh.storage.tests.test_storage import AbstractTestStorage
-from swh.storage.api.client import RemoteStorage
-from swh.storage.api.server import app
+class ServerTestFixture():
+ """ Base class for http client/server testing.
+
+ Mix this in a test class in order to have access to an http flask
+ server running in background.
+
+ Note that the subclass should define a dictionary in self.config
+ that contains the flask server config.
+ And a flask application in self.app that corresponds to the type of
+ server the tested client needs.
-class TestRemoteStorage(AbstractTestStorage, unittest.TestCase):
- """Test the remote storage API.
+ To ensure test isolation, each test will run in a different server
+ and a different repertory.
- This class doesn't define any tests as we want identical
- functionality between local and remote storage. All the tests are
- therefore defined in AbstractTestStorage.
+ In order to correctly work, the subclass must call the parents class's
+ setUp() and tearDown() methods.
"""
def setUp(self):
super().setUp()
-
self.start_server()
- self.storage = RemoteStorage(self.url())
def tearDown(self):
self.stop_server()
-
super().tearDown()
def url(self):
return 'http://127.0.0.1:%d/' % self.port
def start_server(self):
- """Spawn the API server using multiprocessing"""
+ """ Spawn the API server using multiprocessing.
+ """
self.process = None
# WSGI app configuration
- self.app = app
- self.app.config['db'] = 'dbname=%s' % self.dbname
- self.app.config['storage_base'] = self.objroot
-
+ for key, value in self.config.items():
+ self.app.config[key] = value
# Get an available port number
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
self.port = sock.getsockname()[1]
sock.close()
- # We need a worker function for multiprocessing
+ # Worker function for multiprocessing
def worker(app, port):
return app.run(port=port, use_reloader=False)
self.process = multiprocessing.Process(
target=worker, args=(self.app, self.port)
)
self.process.start()
- # Wait max. 5 seconds for server to spawn
+ # Wait max 5 seconds for server to spawn
i = 0
while i < 20:
try:
urlopen(self.url())
except Exception:
i += 1
time.sleep(0.25)
else:
- break
+ return
def stop_server(self):
- """Terminate the API server"""
+ """ Terminate the API server's process.
+ """
if self.process:
self.process.terminate()
diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py
index 2af6b875..10d2125d 100644
--- a/swh/storage/tests/test_api_client.py
+++ b/swh/storage/tests/test_api_client.py
@@ -1,77 +1,36 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import multiprocessing
-import socket
-import time
import unittest
-from urllib.request import urlopen
+import tempfile
from swh.storage.tests.test_storage import AbstractTestStorage
+from swh.storage.tests.server_testing import ServerTestFixture
from swh.storage.api.client import RemoteStorage
from swh.storage.api.server import app
-class TestRemoteStorage(AbstractTestStorage, unittest.TestCase):
+class TestRemoteStorage(AbstractTestStorage, ServerTestFixture,
+ unittest.TestCase):
"""Test the remote storage API.
This class doesn't define any tests as we want identical
functionality between local and remote storage. All the tests are
therefore defined in AbstractTestStorage.
"""
def setUp(self):
+ # ServerTestFixture needs to have self.objroot for
+ # setUp() method, but this field is defined in
+ # AbstractTestStorage's setUp()
+ # To avoid confusion, override the self.objroot to a
+ # one choosen in this class.
+ storage_base = tempfile.mkdtemp()
+ self.config = {'db': 'dbname=%s' % self.dbname,
+ 'storage_base': storage_base}
+ self.app = app
super().setUp()
-
- self.start_server()
self.storage = RemoteStorage(self.url())
-
- def tearDown(self):
- self.stop_server()
-
- super().tearDown()
-
- def url(self):
- return 'http://127.0.0.1:%d/' % self.port
-
- def start_server(self):
- """Spawn the API server using multiprocessing"""
- self.process = None
-
- # WSGI app configuration
- self.app = app
- self.app.config['db'] = 'dbname=%s' % self.dbname
- self.app.config['storage_base'] = self.objroot
-
- # Get an available port number
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('127.0.0.1', 0))
- self.port = sock.getsockname()[1]
- sock.close()
-
- # We need a worker function for multiprocessing
- def worker(app, port):
- return app.run(port=port, use_reloader=False)
-
- self.process = multiprocessing.Process(
- target=worker, args=(self.app, self.port)
- )
- self.process.start()
-
- # Wait max. 5 seconds for server to spawn
- i = 0
- while i < 20:
- try:
- urlopen(self.url())
- except Exception:
- i += 1
- time.sleep(0.25)
- else:
- break
-
- def stop_server(self):
- """Terminate the API server"""
- if self.process:
- self.process.terminate()
+ self.objroot = storage_base
diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py
new file mode 100644
index 00000000..cf05270d
--- /dev/null
+++ b/swh/storage/tests/test_archiver.py
@@ -0,0 +1,246 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import tempfile
+import unittest
+import os
+
+from nose.tools import istest
+from nose.plugins.attrib import attr
+from datetime import datetime, timedelta
+
+from swh.core import hashutil
+from swh.core.tests.db_testing import DbTestFixture
+from server_testing import ServerTestFixture
+
+from swh.storage import Storage
+from swh.storage.exc import ObjNotFoundError
+from swh.storage.archiver import ArchiverDirector, ArchiverWorker
+from swh.storage.objstorage.api.client import RemoteObjStorage
+from swh.storage.objstorage.api.server import app
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
+
+
+@attr('db')
+class TestArchiver(DbTestFixture, ServerTestFixture,
+ unittest.TestCase):
+ """ Test the objstorage archiver.
+ """
+
+ TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump')
+
+ def setUp(self):
+ # Launch the backup server
+ self.backup_objroot = tempfile.mkdtemp(prefix='remote')
+ self.config = {'storage_base': self.backup_objroot,
+ 'storage_depth': 3}
+ self.app = app
+ super().setUp()
+
+ # Launch a client to check objects presence
+ print("url", self.url())
+ self.remote_objstorage = RemoteObjStorage(self.url())
+ # Create the local storage.
+ self.objroot = tempfile.mkdtemp(prefix='local')
+ self.storage = Storage(self.conn, self.objroot)
+ # Initializes and fill the tables.
+ self.initialize_tables()
+ # Create the archiver
+ self.archiver = self.__create_director()
+
+ self.storage_data = ('Local', 'http://localhost:%s/' % self.port)
+
+ def tearDown(self):
+ self.empty_tables()
+ super().tearDown()
+
+ def initialize_tables(self):
+ """ Initializes the database with a sample of items.
+ """
+ # Add an archive
+ self.cursor.execute("""INSERT INTO archives(id, url)
+ VALUES('Local', 'http://localhost:{}/')
+ """.format(self.port))
+ self.conn.commit()
+
+ def empty_tables(self):
+ # Remove all content
+ self.cursor.execute('DELETE FROM content_archive')
+ self.cursor.execute('DELETE FROM archives')
+ self.conn.commit()
+
+ def __add_content(self, content_data, status='missing', date='now()'):
+ # Add the content
+ content = hashutil.hashdata(content_data)
+ content.update({'data': content_data})
+ self.storage.content_add([content])
+ # Then update database
+ content_id = r'\x' + hashutil.hash_to_hex(content['sha1'])
+ self.cursor.execute("""INSERT INTO content_archive
+ VALUES('%s'::sha1, 'Local', '%s', %s)
+ """ % (content_id, status, date))
+ return content['sha1']
+
+ def __get_missing(self):
+ self.cursor.execute("""SELECT content_id
+ FROM content_archive
+ WHERE status='missing'""")
+ return self.cursor.fetchall()
+
+ def __create_director(self, batch_size=5000, archival_max_age=3600,
+ retention_policy=1, asynchronous=False):
+ config = {
+ 'objstorage_path': self.objroot,
+ 'batch_max_size': batch_size,
+ 'archival_max_age': archival_max_age,
+ 'retention_policy': retention_policy,
+ 'asynchronous': asynchronous # Avoid depending on queue for tests.
+ }
+ director = ArchiverDirector(self.conn, config)
+ return director
+
+ def __create_worker(self, batch={}, config={}):
+ mstorage_args = [self.archiver.master_storage.db.conn,
+ self.objroot]
+ slaves = [self.storage_data]
+ if not config:
+ config = self.archiver.config
+ return ArchiverWorker(batch, mstorage_args, slaves, config)
+
+ # Integration test
+
+ @istest
+ def archive_missing_content(self):
+ """ Run archiver on a missing content should archive it.
+ """
+ content_data = b'archive_missing_content'
+ id = self.__add_content(content_data)
+ # After the run, the content should be in the archive.
+ self.archiver.run()
+ remote_data = self.remote_objstorage.content_get(id)
+ self.assertEquals(content_data, remote_data)
+
+ @istest
+ def archive_present_content(self):
+ """ A content that is not 'missing' shouldn't be archived.
+ """
+ id = self.__add_content(b'archive_present_content', status='present')
+ # After the run, the content should NOT be in the archive.*
+ self.archiver.run()
+ with self.assertRaises(ObjNotFoundError):
+ self.remote_objstorage.content_get(id)
+
+ @istest
+ def archive_already_enough(self):
+ """ A content missing with enough copies shouldn't be archived.
+ """
+ id = self.__add_content(b'archive_alread_enough')
+ director = self.__create_director(retention_policy=0)
+ director.run()
+ with self.assertRaises(ObjNotFoundError):
+ self.remote_objstorage.content_get(id)
+
+ # Unit test for ArchiverDirector
+
+ def vstatus(self, status, mtime):
+ return self.archiver.get_virtual_status(status, mtime)
+
+ @istest
+ def vstatus_present(self):
+ self.assertEquals(
+ self.vstatus('present', None),
+ 'present'
+ )
+
+ @istest
+ def vstatus_missing(self):
+ self.assertEquals(
+ self.vstatus('missing', None),
+ 'missing'
+ )
+
+ @istest
+ def vstatus_ongoing_remaining(self):
+ current_time = datetime.now()
+ self.assertEquals(
+ self.vstatus('ongoing', current_time),
+ 'present'
+ )
+
+ @istest
+ def vstatus_ongoing_elapsed(self):
+ past_time = datetime.now() - timedelta(
+ seconds=self.archiver.config['archival_max_age'] + 1
+ )
+ self.assertEquals(
+ self.vstatus('ongoing', past_time),
+ 'missing'
+ )
+
+ # Unit tests for archive worker
+
+ @istest
+ def need_archival_missing(self):
+ """ A content should still need archival when it is missing.
+ """
+ id = self.__add_content(b'need_archival_missing', status='missing')
+ id = r'\x' + hashutil.hash_to_hex(id)
+ worker = self.__create_worker()
+ self.assertEqual(worker.need_archival(id, self.storage_data), True)
+
+ @istest
+ def need_archival_present(self):
+ """ A content should still need archival when it is missing
+ """
+ id = self.__add_content(b'need_archival_missing', status='present')
+ id = r'\x' + hashutil.hash_to_hex(id)
+ worker = self.__create_worker()
+ self.assertEqual(worker.need_archival(id, self.storage_data), False)
+
+ @istest
+ def need_archival_ongoing_remaining(self):
+ """ An ongoing archival with remaining time shouldnt need archival.
+ """
+ id = self.__add_content(b'need_archival_ongoing_remaining',
+ status='ongoing', date="'%s'" % datetime.now())
+ id = r'\x' + hashutil.hash_to_hex(id)
+ worker = self.__create_worker()
+ self.assertEqual(worker.need_archival(id, self.storage_data), False)
+
+ @istest
+ def need_archival_ongoing_elasped(self):
+ """ An ongoing archival with elapsed time should be scheduled again.
+ """
+ id = self.__add_content(
+ b'archive_ongoing_elapsed',
+ status='ongoing',
+ date="'%s'" % (datetime.now() - timedelta(
+ seconds=self.archiver.config['archival_max_age'] + 1
+ ))
+ )
+ id = r'\x' + hashutil.hash_to_hex(id)
+ worker = self.__create_worker()
+ self.assertEqual(worker.need_archival(id, self.storage_data), True)
+
+ @istest
+ def content_sorting_by_archiver(self):
+ """ Check that the content is correctly sorted.
+ """
+ batch = {
+ 'id1': {
+ 'present': [('slave1', 'slave1_url')],
+ 'missing': []
+ },
+ 'id2': {
+ 'present': [],
+ 'missing': [('slave1', 'slave1_url')]
+ }
+ }
+ worker = self.__create_worker(batch=batch)
+ mapping = worker.sort_content_by_archive()
+ self.assertNotIn('id1', mapping[('slave1', 'slave1_url')])
+ self.assertIn('id2', mapping[('slave1', 'slave1_url')])
diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py
new file mode 100644
index 00000000..284e1834
--- /dev/null
+++ b/swh/storage/tests/test_objstorage_api.py
@@ -0,0 +1,83 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import tempfile
+import unittest
+
+from nose.tools import istest
+from nose.plugins.attrib import attr
+
+from swh.core import hashutil
+from swh.storage.exc import ObjNotFoundError, Error
+from swh.storage.tests.server_testing import ServerTestFixture
+from swh.storage.objstorage.objstorage import _obj_path
+from swh.storage.objstorage.api.client import RemoteObjStorage
+from swh.storage.objstorage.api.server import app
+
+
+@attr('db')
+class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase):
+ """ Test the remote archive API.
+ """
+
+ def setUp(self):
+ self.config = {'storage_base': tempfile.mkdtemp(),
+ 'storage_depth': 3}
+ self.app = app
+ super().setUp()
+ self.objstorage = RemoteObjStorage(self.url())
+
+ def tearDown(self):
+ super().tearDown()
+
+ @istest
+ def content_add(self):
+ content = bytes('Test content', 'utf8')
+ id = self.objstorage.content_add(content)
+ self.assertEquals(self.objstorage.content_get(id), content)
+
+ @istest
+ def content_get_present(self):
+ content = bytes('content_get_present', 'utf8')
+ content_hash = hashutil.hashdata(content)
+ id = self.objstorage.content_add(content)
+ self.assertEquals(content_hash['sha1'], id)
+
+ @istest
+ def content_get_missing(self):
+ content = bytes('content_get_missing', 'utf8')
+ content_hash = hashutil.hashdata(content)
+ with self.assertRaises(ObjNotFoundError):
+ self.objstorage.content_get(content_hash['sha1'])
+
+ @istest
+ def content_check_invalid(self):
+ content = bytes('content_check_invalid', 'utf8')
+ id = self.objstorage.content_add(content)
+ path = _obj_path(hashutil.hash_to_hex(id),
+ self.app.config['storage_base'],
+ self.app.config['storage_depth'])
+ content = list(content)
+ with open(path, 'bw') as f:
+ content[0] = (content[0] + 1) % 128
+ f.write(bytes(content))
+ with self.assertRaises(Error):
+ self.objstorage.content_check(id)
+
+ @istest
+ def content_check_valid(self):
+ content = bytes('content_check_valid', 'utf8')
+ id = self.objstorage.content_add(content)
+ try:
+ self.objstorage.content_check(id)
+ except:
+ self.fail('Integrity check failed')
+
+ @istest
+ def content_check_missing(self):
+ content = bytes('content_check_valid', 'utf8')
+ content_hash = hashutil.hashdata(content)
+ with self.assertRaises(ObjNotFoundError):
+ self.objstorage.content_check(content_hash['sha1'])
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
index df8332b9..79ba446d 100644
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -1,1471 +1,1471 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import os
import psycopg2
import shutil
import tempfile
import unittest
from uuid import UUID
from unittest.mock import patch
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.core.tests.db_testing import DbTestFixture
from swh.core.hashutil import hex_to_hash
from swh.storage import Storage
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
@attr('db')
class AbstractTestStorage(DbTestFixture):
"""Base class for Storage testing.
This class is used as-is to test local storage (see TestStorage
below) and remote storage (see TestRemoteStorage in
test_remote_storage.py.
We need to have the two classes inherit from this base class
separately to avoid nosetests running the tests from the base
class twice.
"""
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump')
def setUp(self):
super().setUp()
self.maxDiff = None
self.objroot = tempfile.mkdtemp()
self.storage = Storage(self.conn, self.objroot)
self.cont = {
'data': b'42\n',
'length': 3,
'sha1': hex_to_hash(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': hex_to_hash(
'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'sha256': hex_to_hash(
'673650f936cb3b0a2f93ce09d81be107'
'48b1b203c19e8176b4eefc1964a0cf3a'),
}
self.cont2 = {
'data': b'4242\n',
'length': 5,
'sha1': hex_to_hash(
'61c2b3a30496d329e21af70dd2d7e097046d07b7'),
'sha1_git': hex_to_hash(
'36fade77193cb6d2bd826161a0979d64c28ab4fa'),
'sha256': hex_to_hash(
'859f0b154fdb2d630f45e1ecae4a8629'
'15435e663248bb8461d914696fc047cd'),
}
self.missing_cont = {
'data': b'missing\n',
'length': 8,
'sha1': hex_to_hash(
'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'),
'sha1_git': hex_to_hash(
'33e45d56f88993aae6a0198013efa80716fd8919'),
'sha256': hex_to_hash(
'6bbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
}
self.skipped_cont = {
'length': 1024 * 1024 * 200,
'sha1_git': hex_to_hash(
'33e45d56f88993aae6a0198013efa80716fd8920'),
'reason': 'Content too long',
'status': 'absent',
}
self.skipped_cont2 = {
'length': 1024 * 1024 * 300,
'sha1_git': hex_to_hash(
'33e45d56f88993aae6a0198013efa80716fd8921'),
'reason': 'Content too long',
'status': 'absent',
}
self.dir = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90',
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': 0o644,
},
{
'name': b'bar\xc3',
'type': 'dir',
'target': b'12345678901234567890',
'perms': 0o2000,
},
],
}
self.dir2 = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95',
'entries': [
{
'name': b'oof',
'type': 'file',
'target': self.cont2['sha1_git'],
'perms': 0o644,
}
],
}
self.dir3 = {
'id': hex_to_hash('33e45d56f88993aae6a0198013efa80716fd8921'),
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': 0o644,
},
{
'name': b'bar',
'type': 'dir',
'target': b'12345678901234560000',
'perms': 0o2000,
},
{
'name': b'hello',
'type': 'file',
'target': b'12345678901234567890',
'perms': 0o644,
},
],
}
self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120))
self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120))
self.revision = {
'id': b'56789012345678901234',
'message': b'hello',
'author': {
'name': b'Nicolas Dandrimont',
'email': b'nicolas@example.com',
'fullname': b'Nicolas Dandrimont <nicolas@example.com> ',
},
'date': {
'timestamp': 1234567890,
'offset': 120,
'negative_utc': None,
},
'committer': {
'name': b'St\xc3fano Zacchiroli',
'email': b'stefano@example.com',
'fullname': b'St\xc3fano Zacchiroli <stefano@example.com>'
},
'committer_date': {
'timestamp': 1123456789,
'offset': 0,
'negative_utc': True,
},
'parents': [b'01234567890123456789', b'23434512345123456789'],
'type': 'git',
'directory': self.dir['id'],
'metadata': {
'checksums': {
'sha1': 'tarball-sha1',
'sha256': 'tarball-sha256',
},
'signed-off-by': 'some-dude',
- 'extra_git_headers': [
+ 'extra_headers': [
['gpgsig', b'test123'],
['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']],
],
},
'synthetic': True
}
self.revision2 = {
'id': b'87659012345678904321',
'message': b'hello again',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
'fullname': b'Roberto Dicosmo <roberto@example.com>',
},
'date': {
'timestamp': 1234567843.22,
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'committer_date': {
'timestamp': 1123456789,
'offset': 0,
'negative_utc': False,
},
'parents': [b'01234567890123456789'],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': False
}
self.revision3 = {
'id': hex_to_hash('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'a simple revision with no parents this time',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
'fullname': b'Roberto Dicosmo <roberto@example.com>',
},
'date': {
'timestamp': 1234567843.22,
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'committer_date': {
'timestamp': 1127351742,
'offset': 0,
'negative_utc': False,
},
'parents': [],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': True
}
self.revision4 = {
'id': hex_to_hash('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'parent of self.revision2',
'author': {
'name': b'me',
'email': b'me@soft.heri',
'fullname': b'me <me@soft.heri>',
},
'date': {
'timestamp': 1244567843.22,
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'committer-dude',
'email': b'committer@dude.com',
'fullname': b'committer-dude <committer@dude.com>',
},
'committer_date': {
'timestamp': 1244567843.22,
'offset': -720,
'negative_utc': None,
},
'parents': [self.revision3['id']],
'type': 'git',
'directory': self.dir['id'],
'metadata': None,
'synthetic': False
}
self.origin = {
'url': 'file:///dev/null',
'type': 'git',
}
self.origin2 = {
'url': 'file:///dev/zero',
'type': 'git',
}
self.occurrence = {
'branch': b'master',
'target': b'67890123456789012345',
'target_type': 'revision',
'date': datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
}
self.occurrence2 = {
'branch': b'master',
'target': self.revision2['id'],
'target_type': 'revision',
'date': datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
}
self.release = {
'id': b'87659012345678901234',
'name': b'v0.0.1',
'author': {
'name': b'olasd',
'email': b'nic@olasd.fr',
'fullname': b'olasd <nic@olasd.fr>',
},
'date': {
'timestamp': 1234567890,
'offset': 42,
'negative_utc': None,
},
'target': b'43210987654321098765',
'target_type': 'revision',
'message': b'synthetic release',
'synthetic': True,
}
self.release2 = {
'id': b'56789012348765901234',
'name': b'v0.0.2',
'author': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'date': {
'timestamp': 1634366813,
'offset': -120,
'negative_utc': None,
},
'target': b'432109\xa9765432\xc309\x00765',
'target_type': 'revision',
'message': b'v0.0.2\nMisc performance improvments + bug fixes',
'synthetic': False
}
self.release3 = {
'id': b'87659012345678904321',
'name': b'v0.0.2',
'author': {
'name': b'tony',
'email': b'tony@ardumont.fr',
'fullname': b'tony <tony@ardumont.fr>',
},
'date': {
'timestamp': 1634336813,
'offset': 0,
'negative_utc': False,
},
'target': self.revision2['id'],
'target_type': 'revision',
'message': b'yet another synthetic release',
'synthetic': True,
}
self.fetch_history_date = datetime.datetime(
2015, 1, 2, 21, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_end = datetime.datetime(
2015, 1, 2, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_duration = (self.fetch_history_end -
self.fetch_history_date)
self.fetch_history_data = {
'status': True,
'result': {'foo': 'bar'},
'stdout': 'blabla',
'stderr': 'blablabla',
}
self.entity1 = {
'uuid': UUID('f96a7ec1-0058-4920-90cc-7327e4b5a4bf'),
# GitHub users
'parent': UUID('ad6df473-c1d2-4f40-bc58-2b091d4a750e'),
'name': 'github:user:olasd',
'type': 'person',
'description': 'Nicolas Dandrimont',
'homepage': 'http://example.com',
'active': True,
'generated': True,
'lister_metadata': {
# swh.lister.github
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'id': 12877,
'type': 'user',
'last_activity': '2015-11-03',
},
'metadata': None,
'validity': [
datetime.datetime(2015, 11, 3, 11, 0, 0,
tzinfo=datetime.timezone.utc),
]
}
self.entity1_query = {
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'id': 12877,
'type': 'user',
}
self.entity2 = {
'uuid': UUID('3903d075-32d6-46d4-9e29-0aef3612c4eb'),
# GitHub users
'parent': UUID('ad6df473-c1d2-4f40-bc58-2b091d4a750e'),
'name': 'github:user:zacchiro',
'type': 'person',
'description': 'Stefano Zacchiroli',
'homepage': 'http://example.com',
'active': True,
'generated': True,
'lister_metadata': {
# swh.lister.github
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'id': 216766,
'type': 'user',
'last_activity': '2015-11-03',
},
'metadata': None,
'validity': [
datetime.datetime(2015, 11, 3, 11, 0, 0,
tzinfo=datetime.timezone.utc),
]
}
self.entity3 = {
'uuid': UUID('111df473-c1d2-4f40-bc58-2b091d4a7111'),
# GitHub users
'parent': UUID('222df473-c1d2-4f40-bc58-2b091d4a7222'),
'name': 'github:user:ardumont',
'type': 'person',
'description': 'Antoine R. Dumont a.k.a tony',
'homepage': 'https://ardumont.github.io',
'active': True,
'generated': True,
'lister_metadata': {
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'id': 666,
'type': 'user',
'last_activity': '2016-01-15',
},
'metadata': None,
'validity': [
datetime.datetime(2015, 11, 3, 11, 0, 0,
tzinfo=datetime.timezone.utc),
]
}
self.entity4 = {
'uuid': UUID('222df473-c1d2-4f40-bc58-2b091d4a7222'),
# GitHub users
'parent': None,
'name': 'github:user:ToNyX',
'type': 'person',
'description': 'ToNyX',
'homepage': 'https://ToNyX.github.io',
'active': True,
'generated': True,
'lister_metadata': {
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'id': 999,
'type': 'user',
'last_activity': '2015-12-24',
},
'metadata': None,
'validity': [
datetime.datetime(2015, 11, 3, 11, 0, 0,
tzinfo=datetime.timezone.utc),
]
}
self.entity2_query = {
'lister_metadata': {
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'id': 216766,
'type': 'user',
},
}
def tearDown(self):
shutil.rmtree(self.objroot)
self.cursor.execute("""SELECT table_name FROM information_schema.tables
WHERE table_schema = %s""", ('public',))
tables = set(table for (table,) in self.cursor.fetchall())
tables -= {'dbversion', 'entity', 'entity_history', 'listable_entity'}
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.cursor.execute('delete from entity where generated=true')
self.cursor.execute('delete from entity_history where generated=true')
self.conn.commit()
super().tearDown()
@istest
def content_add(self):
cont = self.cont
self.storage.content_add([cont])
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = self.cursor.fetchone()
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3], datum[4]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible'))
@istest
def content_add_collision(self):
cont1 = self.cont
# create (corrupted) content with same sha1{,_git} but != sha256
cont1b = cont1.copy()
sha256_array = bytearray(cont1b['sha256'])
sha256_array[0] += 1
cont1b['sha256'] = bytes(sha256_array)
with self.assertRaises(psycopg2.IntegrityError):
self.storage.content_add([cont1, cont1b])
@istest
def skipped_content_add(self):
cont = self.skipped_cont
cont2 = self.skipped_cont2
self.storage.content_add([cont])
self.storage.content_add([cont2])
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status,'
'reason FROM skipped_content ORDER BY sha1_git')
datum = self.cursor.fetchone()
self.assertEqual(
(datum[0], datum[1].tobytes(), datum[2],
datum[3], datum[4], datum[5]),
(None, cont['sha1_git'], None,
cont['length'], 'absent', 'Content too long'))
datum2 = self.cursor.fetchone()
self.assertEqual(
(datum2[0], datum2[1].tobytes(), datum2[2],
datum2[3], datum2[4], datum2[5]),
(None, cont2['sha1_git'], None,
cont2['length'], 'absent', 'Content too long'))
@istest
def content_missing(self):
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
gen = self.storage.content_missing([cont2, missing_cont])
self.assertEqual(list(gen), [missing_cont['sha1']])
@istest
def content_missing_per_sha1(self):
# given
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
# when
gen = self.storage.content_missing_per_sha1([cont2['sha1'],
missing_cont['sha1']])
# then
self.assertEqual(list(gen), [missing_cont['sha1']])
@istest
def directory_get(self):
# given
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
self.storage.directory_add([self.dir])
# when
actual_dirs = list(self.storage.directory_get([self.dir['id']]))
self.assertEqual(len(actual_dirs), 1)
dir0 = actual_dirs[0]
self.assertEqual(dir0['id'], self.dir['id'])
# ids are generated so non deterministic value
self.assertEqual(len(dir0['file_entries']), 1)
self.assertEqual(len(dir0['dir_entries']), 1)
self.assertIsNone(dir0['rev_entries'])
after_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([], after_missing)
@istest
def directory_add(self):
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
self.storage.directory_add([self.dir])
stored_data = list(self.storage.directory_ls(self.dir['id']))
data_to_store = [{
'dir_id': self.dir['id'],
'type': ent['type'],
'target': ent['target'],
'name': ent['name'],
'perms': ent['perms'],
'status': None,
'sha1': None,
'sha1_git': None,
'sha256': None,
}
for ent in sorted(self.dir['entries'], key=lambda ent: ent['name'])
]
self.assertEqual(data_to_store, stored_data)
after_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([], after_missing)
@istest
def directory_entry_get_by_path(self):
# given
init_missing = list(self.storage.directory_missing([self.dir3['id']]))
self.assertEqual([self.dir3['id']], init_missing)
self.storage.directory_add([self.dir3])
expected_entries = [
{
'dir_id': self.dir3['id'],
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': 0o644,
},
{
'dir_id': self.dir3['id'],
'name': b'bar',
'type': 'dir',
'target': b'12345678901234560000',
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': 0o2000,
},
{
'dir_id': self.dir3['id'],
'name': b'hello',
'type': 'file',
'target': b'12345678901234567890',
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': 0o644,
},
]
# when (all must be found here)
for entry, expected_entry in zip(self.dir3['entries'],
expected_entries):
actual_entry = self.storage.directory_entry_get_by_path(
self.dir3['id'],
[entry['name']])
self.assertEqual(actual_entry, expected_entry)
# when (nothing should be found here since self.dir is not persisted.)
for entry in self.dir['entries']:
actual_entry = self.storage.directory_entry_get_by_path(
self.dir['id'],
[entry['name']])
self.assertIsNone(actual_entry)
@istest
def revision_add(self):
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
self.storage.revision_add([self.revision])
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
@istest
def revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_results = list(self.storage.revision_log(
[self.revision4['id']]))
# hack: ids generated
for actual_result in actual_results:
del actual_result['author']['id']
del actual_result['committer']['id']
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3
self.assertEquals(actual_results[0], self.revision4)
self.assertEquals(actual_results[1], self.revision3)
@istest
def revision_log_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_results = list(self.storage.revision_log(
[self.revision4['id']], 1))
# hack: ids generated
for actual_result in actual_results:
del actual_result['author']['id']
del actual_result['committer']['id']
self.assertEqual(len(actual_results), 1)
self.assertEquals(actual_results[0], self.revision4)
@staticmethod
def _short_revision(revision):
return [revision['id'], revision['parents']]
@istest
def revision_shortlog(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_results = list(self.storage.revision_shortlog(
[self.revision4['id']]))
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3
self.assertEquals(list(actual_results[0]),
self._short_revision(self.revision4))
self.assertEquals(list(actual_results[1]),
self._short_revision(self.revision3))
@istest
def revision_shortlog_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_results = list(self.storage.revision_shortlog(
[self.revision4['id']], 1))
self.assertEqual(len(actual_results), 1)
self.assertEquals(list(actual_results[0]),
self._short_revision(self.revision4))
@istest
def revision_get(self):
self.storage.revision_add([self.revision])
actual_revisions = list(self.storage.revision_get(
[self.revision['id'], self.revision2['id']]))
# when
del actual_revisions[0]['author']['id'] # hack: ids are generated
del actual_revisions[0]['committer']['id']
self.assertEqual(len(actual_revisions), 2)
self.assertEqual(actual_revisions[0], self.revision)
self.assertIsNone(actual_revisions[1])
@istest
def revision_get_no_parents(self):
self.storage.revision_add([self.revision3])
get = list(self.storage.revision_get([self.revision3['id']]))
self.assertEqual(len(get), 1)
self.assertEqual(get[0]['parents'], []) # no parents on this one
@istest
def revision_get_by(self):
# given
self.storage.content_add([self.cont2])
self.storage.directory_add([self.dir2]) # point to self.cont
self.storage.revision_add([self.revision2]) # points to self.dir
origin_id = self.storage.origin_add_one(self.origin2)
# occurrence2 points to 'revision2' with branch 'master', we
# need to point to the right origin
occurrence2 = self.occurrence2.copy()
occurrence2.update({'origin': origin_id})
self.storage.occurrence_add([occurrence2])
# we want only revision 2
expected_revisions = list(self.storage.revision_get(
[self.revision2['id']]))
# when
actual_results = list(self.storage.revision_get_by(
origin_id,
occurrence2['branch'],
None))
self.assertEqual(actual_results[0], expected_revisions[0])
# when (with no branch filtering, it's still ok)
actual_results = list(self.storage.revision_get_by(
origin_id,
None,
None))
self.assertEqual(actual_results[0], expected_revisions[0])
@istest
def revision_get_by_multiple_occurrence(self):
# 2 occurrences pointing to 2 different revisions
# each occurence have 1 hour delta
# the api must return the revision whose occurrence is the nearest.
# given
self.storage.content_add([self.cont2])
self.storage.directory_add([self.dir2])
self.storage.revision_add([self.revision2, self.revision3])
origin_id = self.storage.origin_add_one(self.origin2)
# occurrence2 points to 'revision2' with branch 'master', we
# need to point to the right origin
occurrence2 = self.occurrence2.copy()
occurrence2.update({'origin': origin_id,
'date': occurrence2['date']})
dt = datetime.timedelta(days=1)
occurrence3 = self.occurrence2.copy()
occurrence3.update({'origin': origin_id,
'date': occurrence3['date'] + dt,
'target': self.revision3['id']})
# 2 occurrences on same revision with lower validity date with 1h delta
self.storage.occurrence_add([occurrence2])
self.storage.occurrence_add([occurrence3])
# when
actual_results0 = list(self.storage.revision_get_by(
origin_id,
occurrence2['branch'],
occurrence2['date']))
# hack: ids are generated
del actual_results0[0]['author']['id']
del actual_results0[0]['committer']['id']
self.assertEquals(len(actual_results0), 1)
self.assertEqual(actual_results0, [self.revision2])
# when
actual_results1 = list(self.storage.revision_get_by(
origin_id,
occurrence2['branch'],
occurrence2['date'] + dt/3)) # closer to occurrence2
# hack: ids are generated
del actual_results1[0]['author']['id']
del actual_results1[0]['committer']['id']
self.assertEquals(len(actual_results1), 1)
self.assertEqual(actual_results1, [self.revision2])
# when
actual_results2 = list(self.storage.revision_get_by(
origin_id,
occurrence2['branch'],
occurrence2['date'] + 2*dt/3)) # closer to occurrence3
del actual_results2[0]['author']['id']
del actual_results2[0]['committer']['id']
self.assertEquals(len(actual_results2), 1)
self.assertEqual(actual_results2, [self.revision3])
# when
actual_results3 = list(self.storage.revision_get_by(
origin_id,
occurrence3['branch'],
occurrence3['date']))
# hack: ids are generated
del actual_results3[0]['author']['id']
del actual_results3[0]['committer']['id']
self.assertEquals(len(actual_results3), 1)
self.assertEqual(actual_results3, [self.revision3])
# when
actual_results4 = list(self.storage.revision_get_by(
origin_id,
None,
None))
for actual_result in actual_results4:
del actual_result['author']['id']
del actual_result['committer']['id']
self.assertEquals(len(actual_results4), 2)
self.assertCountEqual(actual_results4,
[self.revision3, self.revision2])
@istest
def release_add(self):
init_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
self.storage.release_add([self.release, self.release2])
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([], list(end_missing))
@istest
def release_get(self):
# given
self.storage.release_add([self.release, self.release2])
# when
actual_releases = list(self.storage.release_get([self.release['id'],
self.release2['id']]))
# then
for actual_release in actual_releases:
del actual_release['author']['id'] # hack: ids are generated
self.assertEquals([self.release, self.release2],
[actual_releases[0], actual_releases[1]])
@istest
def release_get_by(self):
# given
self.storage.revision_add([self.revision2]) # points to self.dir
self.storage.release_add([self.release3])
origin_id = self.storage.origin_add_one(self.origin2)
# occurrence2 points to 'revision2' with branch 'master', we
# need to point to the right origin
occurrence2 = self.occurrence2.copy()
occurrence2.update({'origin': origin_id})
self.storage.occurrence_add([occurrence2])
# we want only revision 2
expected_releases = list(self.storage.release_get(
[self.release3['id']]))
# when
actual_results = list(self.storage.release_get_by(
occurrence2['origin']))
# then
self.assertEqual(actual_results[0], expected_releases[0])
@istest
def origin_add_one(self):
origin0 = self.storage.origin_get(self.origin)
self.assertIsNone(origin0)
id = self.storage.origin_add_one(self.origin)
actual_origin = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin['id'], id)
id2 = self.storage.origin_add_one(self.origin)
self.assertEqual(id, id2)
@istest
def origin_get(self):
self.assertIsNone(self.storage.origin_get(self.origin))
id = self.storage.origin_add_one(self.origin)
# lookup per type and url (returns id)
actual_origin0 = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin0['id'], id)
# lookup per id (returns dict)
actual_origin1 = self.storage.origin_get({'id': id})
self.assertEqual(actual_origin1, {'id': id,
'type': self.origin['type'],
'url': self.origin['url'],
'lister': None,
'project': None})
@istest
def occurrence_add(self):
origin_id = self.storage.origin_add_one(self.origin2)
revision = self.revision.copy()
revision['id'] = self.occurrence['target']
self.storage.revision_add([revision])
occur = self.occurrence
occur['origin'] = origin_id
self.storage.occurrence_add([occur])
self.storage.occurrence_add([occur])
test_query = '''
with indiv_occurrences as (
select origin, branch, target, target_type, unnest(visits) as visit
from occurrence_history
)
select origin, branch, target, target_type, date
from indiv_occurrences
left join origin_visit using(origin, visit)
order by origin, date'''
self.cursor.execute(test_query)
ret = self.cursor.fetchall()
self.assertEqual(len(ret), 1)
self.assertEqual(
(ret[0][0], ret[0][1].tobytes(), ret[0][2].tobytes(),
ret[0][3], ret[0][4]),
(occur['origin'], occur['branch'], occur['target'],
occur['target_type'], occur['date']))
orig_date = occur['date']
occur['date'] += datetime.timedelta(hours=10)
self.storage.occurrence_add([occur])
self.cursor.execute(test_query)
ret = self.cursor.fetchall()
self.assertEqual(len(ret), 2)
self.assertEqual(
(ret[0][0], ret[0][1].tobytes(), ret[0][2].tobytes(),
ret[0][3], ret[0][4]),
(occur['origin'], occur['branch'], occur['target'],
occur['target_type'], orig_date))
self.assertEqual(
(ret[1][0], ret[1][1].tobytes(), ret[1][2].tobytes(),
ret[1][3], ret[1][4]),
(occur['origin'], occur['branch'], occur['target'],
occur['target_type'], occur['date']))
@istest
def occurrence_get(self):
# given
origin_id = self.storage.origin_add_one(self.origin2)
revision = self.revision.copy()
revision['id'] = self.occurrence['target']
self.storage.revision_add([revision])
occur = self.occurrence
occur['origin'] = origin_id
self.storage.occurrence_add([occur])
self.storage.occurrence_add([occur])
# when
actual_occurrence = list(self.storage.occurrence_get(origin_id))
# then
expected_occur = occur.copy()
del expected_occur['date']
self.assertEquals(len(actual_occurrence), 1)
self.assertEquals(actual_occurrence[0], expected_occur)
@istest
def content_find_occurrence_with_present_content(self):
# 1. with something to find
# given
self.storage.content_add([self.cont2])
self.storage.directory_add([self.dir2]) # point to self.cont
self.storage.revision_add([self.revision2]) # points to self.dir
origin_id = self.storage.origin_add_one(self.origin2)
occurrence = self.occurrence2
occurrence.update({'origin': origin_id})
self.storage.occurrence_add([occurrence])
# when
occ = self.storage.content_find_occurrence(
{'sha1': self.cont2['sha1']})
# then
self.assertEquals(occ['origin_type'], self.origin2['type'])
self.assertEquals(occ['origin_url'], self.origin2['url'])
self.assertEquals(occ['branch'], self.occurrence2['branch'])
self.assertEquals(occ['target'], self.revision2['id'])
self.assertEquals(occ['target_type'], self.occurrence2['target_type'])
self.assertEquals(occ['path'], self.dir2['entries'][0]['name'])
occ2 = self.storage.content_find_occurrence(
{'sha1_git': self.cont2['sha1_git']})
self.assertEquals(occ2['origin_type'], self.origin2['type'])
self.assertEquals(occ2['origin_url'], self.origin2['url'])
self.assertEquals(occ2['branch'], self.occurrence2['branch'])
self.assertEquals(occ2['target'], self.revision2['id'])
self.assertEquals(occ2['target_type'], self.occurrence2['target_type'])
self.assertEquals(occ2['path'], self.dir2['entries'][0]['name'])
occ3 = self.storage.content_find_occurrence(
{'sha256': self.cont2['sha256']})
self.assertEquals(occ3['origin_type'], self.origin2['type'])
self.assertEquals(occ3['origin_url'], self.origin2['url'])
self.assertEquals(occ3['branch'], self.occurrence2['branch'])
self.assertEquals(occ3['target'], self.revision2['id'])
self.assertEquals(occ3['target_type'], self.occurrence2['target_type'])
self.assertEquals(occ3['path'], self.dir2['entries'][0]['name'])
@istest
def content_find_occurrence_with_non_present_content(self):
# 1. with something that does not exist
missing_cont = self.missing_cont
occ = self.storage.content_find_occurrence(
{'sha1': missing_cont['sha1']})
self.assertEquals(occ, None,
"Content does not exist so no occurrence")
# 2. with something that does not exist
occ = self.storage.content_find_occurrence(
{'sha1_git': missing_cont['sha1_git']})
self.assertEquals(occ, None,
"Content does not exist so no occurrence")
# 3. with something that does not exist
occ = self.storage.content_find_occurrence(
{'sha256': missing_cont['sha256']})
self.assertEquals(occ, None,
"Content does not exist so no occurrence")
@istest
def content_find_occurrence_bad_input(self):
# 1. with bad input
with self.assertRaises(ValueError) as cm:
self.storage.content_find_occurrence({}) # empty is bad
self.assertIn('content keys', cm.exception.args[0])
# 2. with bad input
with self.assertRaises(ValueError) as cm:
self.storage.content_find_occurrence(
{'unknown-sha1': 'something'}) # not the right key
self.assertIn('content keys', cm.exception.args[0])
@istest
def entity_get_from_lister_metadata(self):
self.storage.entity_add([self.entity1])
fetched_entities = list(
self.storage.entity_get_from_lister_metadata(
[self.entity1_query, self.entity2_query]))
# Entity 1 should have full metadata, with last_seen/last_id instead
# of validity
entity1 = self.entity1.copy()
entity1['last_seen'] = entity1['validity'][0]
del fetched_entities[0]['last_id']
del entity1['validity']
# Entity 2 should have no metadata
entity2 = {
'uuid': None,
'lister_metadata': self.entity2_query.copy(),
}
self.assertEquals(fetched_entities, [entity1, entity2])
@istest
def entity_get_from_lister_metadata_twice(self):
self.storage.entity_add([self.entity1])
fetched_entities1 = list(
self.storage.entity_get_from_lister_metadata(
[self.entity1_query]))
fetched_entities2 = list(
self.storage.entity_get_from_lister_metadata(
[self.entity1_query]))
self.assertEquals(fetched_entities1, fetched_entities2)
@istest
def entity_get(self):
# given
self.storage.entity_add([self.entity4])
self.storage.entity_add([self.entity3])
# when: entity3 -child-of-> entity4
actual_entity3 = list(self.storage.entity_get(self.entity3['uuid']))
self.assertEquals(len(actual_entity3), 2)
# remove dynamic data (modified by db)
entity3 = self.entity3.copy()
entity4 = self.entity4.copy()
del entity3['validity']
del entity4['validity']
del actual_entity3[0]['last_seen']
del actual_entity3[0]['last_id']
del actual_entity3[1]['last_seen']
del actual_entity3[1]['last_id']
self.assertEquals(actual_entity3, [entity3, entity4])
# when: entity4 only child
actual_entity4 = list(self.storage.entity_get(self.entity4['uuid']))
self.assertEquals(len(actual_entity4), 1)
# remove dynamic data (modified by db)
entity4 = self.entity4.copy()
del entity4['validity']
del actual_entity4[0]['last_id']
del actual_entity4[0]['last_seen']
self.assertEquals(actual_entity4, [entity4])
@istest
def entity_get_one(self):
# given
self.storage.entity_add([self.entity3, self.entity4])
# when: entity3 -child-of-> entity4
actual_entity3 = self.storage.entity_get_one(self.entity3['uuid'])
# remove dynamic data (modified by db)
entity3 = self.entity3.copy()
del entity3['validity']
del actual_entity3['last_seen']
del actual_entity3['last_id']
self.assertEquals(actual_entity3, entity3)
@istest
def stat_counters(self):
expected_keys = ['content', 'directory', 'directory_entry_dir',
'occurrence', 'origin', 'person', 'revision']
counters = self.storage.stat_counters()
self.assertTrue(set(expected_keys) <= set(counters))
self.assertIsInstance(counters[expected_keys[0]], int)
@istest
def content_find_with_present_content(self):
# 1. with something to find
cont = self.cont
self.storage.content_add([cont])
actually_present = self.storage.content_find({'sha1': cont['sha1']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
# 2. with something to find
actually_present = self.storage.content_find(
{'sha1_git': cont['sha1_git']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
# 3. with something to find
actually_present = self.storage.content_find(
{'sha256': cont['sha256']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
# 4. with something to find
actually_present = self.storage.content_find(
{'sha1': cont['sha1'],
'sha1_git': cont['sha1_git'],
'sha256': cont['sha256']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
@istest
def content_find_with_non_present_content(self):
# 1. with something that does not exist
missing_cont = self.missing_cont
actually_present = self.storage.content_find(
{'sha1': missing_cont['sha1']})
self.assertIsNone(actually_present)
# 2. with something that does not exist
actually_present = self.storage.content_find(
{'sha1_git': missing_cont['sha1_git']})
self.assertIsNone(actually_present)
# 3. with something that does not exist
actually_present = self.storage.content_find(
{'sha256': missing_cont['sha256']})
self.assertIsNone(actually_present)
@istest
def content_find_bad_input(self):
# 1. with bad input
with self.assertRaises(ValueError):
self.storage.content_find({}) # empty is bad
# 2. with bad input
with self.assertRaises(ValueError):
self.storage.content_find(
{'unknown-sha1': 'something'}) # not the right key
@istest
def object_find_by_sha1_git(self):
sha1_gits = [b'00000000000000000000']
expected = {
b'00000000000000000000': [],
}
self.storage.content_add([self.cont])
sha1_gits.append(self.cont['sha1_git'])
expected[self.cont['sha1_git']] = [{
'sha1_git': self.cont['sha1_git'],
'type': 'content',
'id': self.cont['sha1'],
}]
self.storage.directory_add([self.dir])
sha1_gits.append(self.dir['id'])
expected[self.dir['id']] = [{
'sha1_git': self.dir['id'],
'type': 'directory',
'id': self.dir['id'],
}]
self.storage.revision_add([self.revision])
sha1_gits.append(self.revision['id'])
expected[self.revision['id']] = [{
'sha1_git': self.revision['id'],
'type': 'revision',
'id': self.revision['id'],
}]
self.storage.release_add([self.release])
sha1_gits.append(self.release['id'])
expected[self.release['id']] = [{
'sha1_git': self.release['id'],
'type': 'release',
'id': self.release['id'],
}]
ret = self.storage.object_find_by_sha1_git(sha1_gits)
for val in ret.values():
for obj in val:
del obj['object_id']
self.assertEqual(expected, ret)
class TestStorage(AbstractTestStorage, unittest.TestCase):
"""Test the local storage"""
# Can only be tested with local storage as you can't mock
# datetimes for the remote server
@istest
def fetch_history(self):
origin = self.storage.origin_add_one(self.origin)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_date
fetch_history_id = self.storage.fetch_history_start(origin)
datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_end
self.storage.fetch_history_end(fetch_history_id,
self.fetch_history_data)
fetch_history = self.storage.fetch_history_get(fetch_history_id)
expected_fetch_history = self.fetch_history_data.copy()
expected_fetch_history['id'] = fetch_history_id
expected_fetch_history['origin'] = origin
expected_fetch_history['date'] = self.fetch_history_date
expected_fetch_history['duration'] = self.fetch_history_duration
self.assertEqual(expected_fetch_history, fetch_history)
@istest
def person_get(self):
# given
person0 = {
'fullname': b'bob <alice@bob>',
'name': b'bob',
'email': b'alice@bob',
}
id0 = self.storage._person_add(person0)
person1 = {
'fullname': b'tony <tony@bob>',
'name': b'tony',
'email': b'tony@bob',
}
id1 = self.storage._person_add(person1)
# when
actual_persons = self.storage.person_get([id0, id1])
# given (person injection through release for example)
self.assertEqual(
list(actual_persons), [
{
'id': id0,
'fullname': person0['fullname'],
'name': person0['name'],
'email': person0['email'],
},
{
'id': id1,
'fullname': person1['fullname'],
'name': person1['name'],
'email': person1['email'],
},
])
diff --git a/version.txt b/version.txt
index 33a7290e..8504c2ad 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-v0.0.37-0-g24979b7
\ No newline at end of file
+v0.0.38-0-ge522d1e
\ No newline at end of file
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Apr 15, 7:52 AM (1 h, 36 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286509
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment