diff --git a/PKG-INFO b/PKG-INFO index 41a768eb..ed3ea4e0 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.37 +Version: 0.0.38 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/doc/archiver-blueprint.md b/docs/archiver-blueprint.md similarity index 100% rename from doc/archiver-blueprint.md rename to docs/archiver-blueprint.md diff --git a/requirements.txt b/requirements.txt index 4f10697e..faeaec3b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,15 @@ dateutil psycopg2 vcversioner # remote storage API client requests # remote storage API server flask # Internal dependencies swh.core >= 0.0.17 + +click +swh.scheduler diff --git a/setup.py b/setup.py index 292e7c22..145e3590 100644 --- a/setup.py +++ b/setup.py @@ -1,34 +1,41 @@ #!/usr/bin/env python3 from setuptools import setup def parse_requirements(): requirements = [] with open('requirements.txt') as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.storage', description='Software Heritage storage manager', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSTO/', - packages=['swh.storage', 'swh.storage.api', 'swh.storage.tests'], + packages=[ + 'swh.storage', + 'swh.storage.archiver', + 'swh.storage.api', + 'swh.storage.objstorage', + 'swh.storage.objstorage.api', + 'swh.storage.tests', + ], scripts=[ 'bin/swh-objstorage-add-dir', 'bin/swh-objstorage-fsck', 'bin/swh-storage-add-dir', ], install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql index b7a8307a..1704d680 100644 --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -1,428 +1,454 @@ --- --- Software Heritage Data Model --- -- drop schema if exists swh cascade; -- create schema swh; -- set search_path to swh; create table dbversion ( version int primary key, release timestamptz, description text ); insert into dbversion(version, release, description) - values(68, now(), 'Work In Progress'); + values(69, now(), 'Work In Progress'); -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); -- a Git object ID, i.e., a SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- a SHA256 checksum create domain sha256 as bytea check (length(value) = 32); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; create type content_status as enum ('absent', 'visible', 'hidden'); -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is -- used as key there, but the other can be used to verify that we do not inject -- content collisions not knowingly. create table content ( sha1 sha1 primary key, sha1_git sha1_git not null, sha256 sha256 not null, length bigint not null, ctime timestamptz not null default now(), -- creation time, i.e. time of (first) injection into the storage status content_status not null default 'visible', object_id bigserial ); create unique index on content(sha1_git); create unique index on content(sha256); create index on content(ctime); -- TODO use a BRIN index here (postgres >= 9.5) -- Entities constitute a typed hierarchy of organization, hosting -- facilities, groups, people and software projects. -- -- Examples of entities: Software Heritage, Debian, GNU, GitHub, -- Apache, The Linux Foundation, the Debian Python Modules Team, the -- torvalds GitHub user, the torvalds/linux GitHub project. -- -- The data model is hierarchical (via the parent attribute) and might -- store sub-branches of existing entities. The key feature of an -- entity is might be *listed* (if it is available in listable_entity) -- to retrieve information about its content, i.e: sub-entities, -- projects, origins. -- Types of entities. -- -- - organization: a root entity, usually backed by a non-profit, a -- company, or another kind of "association". (examples: Software -- Heritage, Debian, GNU, GitHub) -- -- - group_of_entities: used for hierarchies, doesn't need to have a -- concrete existence. (examples: GNU hosting facilities, Debian -- hosting facilities, GitHub users, ...) -- -- - hosting: a hosting facility, can usually be listed to generate -- other data. (examples: GitHub git hosting, alioth.debian.org, -- snapshot.debian.org) -- -- - group_of_persons: an entity representing a group of -- persons. (examples: a GitHub organization, a Debian team) -- -- - person: an entity representing a person. (examples: -- a GitHub user, a Debian developer) -- -- - project: an entity representing a software project. (examples: a -- GitHub project, Apache httpd, a Debian source package, ...) create type entity_type as enum ( 'organization', 'group_of_entities', 'hosting', 'group_of_persons', 'person', 'project' ); -- The history of entities. Allows us to keep historical metadata -- about entities. The temporal invariant is the uuid. Root -- organization uuids are manually generated (and available in -- swh-data.sql). -- -- For generated entities (generated = true), we can provide -- generation_metadata to allow listers to retrieve the uuids of previous -- iterations of the entity. -- -- Inactive entities that have been active in the past (active = -- false) should register the timestamp at which we saw them -- deactivate, in a new entry of entity_history. create table entity_history ( id bigserial primary key, uuid uuid, parent uuid, -- should reference entity_history(uuid) name text not null, type entity_type not null, description text, homepage text, active boolean not null, -- whether the entity was seen on the last listing generated boolean not null, -- whether this entity has been generated by a lister lister_metadata jsonb, -- lister-specific metadata, used for queries metadata jsonb, validity timestamptz[] -- timestamps at which we have seen this entity ); create index on entity_history(uuid); create index on entity_history(name); -- The entity table provides a view of the latest information on a -- given entity. It is updated via a trigger on entity_history. create table entity ( uuid uuid primary key, parent uuid references entity(uuid) deferrable initially deferred, name text not null, type entity_type not null, description text, homepage text, active boolean not null, -- whether the entity was seen on the last listing generated boolean not null, -- whether this entity has been generated by a lister lister_metadata jsonb, -- lister-specific metadata, used for queries metadata jsonb, last_seen timestamptz, -- last listing time or disappearance time for active=false last_id bigint references entity_history(id) -- last listing id ); create index on entity(name); create index on entity using gin(lister_metadata jsonb_path_ops); -- Register the equivalence between two entities. Allows sideways -- navigation in the entity table create table entity_equivalence ( entity1 uuid references entity(uuid), entity2 uuid references entity(uuid), primary key (entity1, entity2), constraint order_entities check (entity1 < entity2) ); -- Register a lister for a specific entity. create table listable_entity ( uuid uuid references entity(uuid) primary key, enabled boolean not null default true, -- do we list this entity automatically? list_engine text, -- crawler to be used to list entity's content list_url text, -- root URL to start the listing list_params jsonb, -- org-specific listing parameter latest_list timestamptz -- last time the entity's content has been listed ); -- Log of all entity listings (i.e., entity crawling) that have been -- done in the past, or are still ongoing. create table list_history ( id bigserial primary key, entity uuid references listable_entity(uuid), date timestamptz not null, status boolean, -- true if and only if the listing has been successful result jsonb, -- more detailed return value, depending on status stdout text, stderr text, duration interval -- fetch duration of NULL if still ongoing ); -- An origin is a place, identified by an URL, where software can be found. We -- support different kinds of origins, e.g., git and other VCS repositories, -- web pages that list tarballs URLs (e.g., http://www.kernel.org), indirect -- tarball URLs (e.g., http://www.example.org/latest.tar.gz), etc. The key -- feature of an origin is that it can be *fetched* (wget, git clone, svn -- checkout, etc.) to retrieve all the contained software. create table origin ( id bigserial primary key, type text, -- TODO use an enum here (?) url text not null, lister uuid references listable_entity(uuid), project uuid references entity(uuid) ); create index on origin(type, url); -- Content we have seen but skipped for some reason. This table is -- separate from the content table as we might not have the sha1 -- checksum of that data (for instance when we inject git -- repositories, objects that are too big will be skipped here, and we -- will only know their sha1_git). 'reason' contains the reason the -- content was skipped. origin is a nullable column allowing to find -- out which origin contains that skipped content. create table skipped_content ( sha1 sha1, sha1_git sha1_git, sha256 sha256, length bigint not null, ctime timestamptz not null default now(), status content_status not null default 'absent', reason text not null, origin bigint references origin(id), object_id bigserial, unique (sha1, sha1_git, sha256) ); -- those indexes support multiple NULL values. create unique index on skipped_content(sha1); create unique index on skipped_content(sha1_git); create unique index on skipped_content(sha256); -- Log of all origin fetches (i.e., origin crawling) that have been done in the -- past, or are still ongoing. Similar to list_history, but for origins. create table fetch_history ( id bigserial primary key, origin bigint references origin(id), date timestamptz not null, status boolean, -- true if and only if the fetch has been successful result jsonb, -- more detailed returned values, times, etc... stdout text, stderr text, -- null when status is true, filled otherwise duration interval -- fetch duration of NULL if still ongoing ); -- A file-system directory. A directory is a list of directory entries (see -- tables: directory_entry_{dir,file}). -- -- To list the contents of a directory: -- 1. list the contained directory_entry_dir using array dir_entries -- 2. list the contained directory_entry_file using array file_entries -- 3. list the contained directory_entry_rev using array rev_entries -- 4. UNION -- -- Synonyms/mappings: -- * git: tree create table directory ( id sha1_git primary key, dir_entries bigint[], -- sub-directories, reference directory_entry_dir file_entries bigint[], -- contained files, reference directory_entry_file rev_entries bigint[], -- mounted revisions, reference directory_entry_rev object_id bigserial -- short object identifier ); create index on directory using gin (dir_entries); create index on directory using gin (file_entries); create index on directory using gin (rev_entries); -- A directory entry pointing to a sub-directory. create table directory_entry_dir ( id bigserial primary key, target sha1_git, -- id of target directory name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); create unique index on directory_entry_dir(target, name, perms); -- A directory entry pointing to a file. create table directory_entry_file ( id bigserial primary key, target sha1_git, -- id of target file name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); create unique index on directory_entry_file(target, name, perms); -- A directory entry pointing to a revision. create table directory_entry_rev ( id bigserial primary key, target sha1_git, -- id of target revision name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); create unique index on directory_entry_rev(target, name, perms); create table person ( id bigserial primary key, fullname bytea not null, -- freeform specification; what is actually used in the checksums -- will usually be of the form 'name ' name bytea, -- advisory: not null if we managed to parse a name email bytea -- advisory: not null if we managed to parse an email ); create unique index on person(fullname); create index on person(name); create index on person(email); create type revision_type as enum ('git', 'tar', 'dsc', 'svn'); -- the data object types stored in our data model create type object_type as enum ('content', 'directory', 'revision', 'release'); -- A snapshot of a software project at a specific point in time. -- -- Synonyms/mappings: -- * git / subversion / etc: commit -- * tarball: a specific tarball -- -- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in -- case of merges) parent revisions. Each revision points to a directory, i.e., -- a file-system tree containing files and directories. create table revision ( id sha1_git primary key, date timestamptz, date_offset smallint, date_neg_utc_offset boolean, committer_date timestamptz, committer_date_offset smallint, committer_date_neg_utc_offset boolean, type revision_type not null, directory sha1_git, -- file-system tree message bytea, author bigint references person(id), committer bigint references person(id), metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...) synthetic boolean not null default false, -- true if synthetic (cf. swh-loader-tar) object_id bigserial ); create index on revision(directory); -- either this table or the sha1_git[] column on the revision table create table revision_history ( id sha1_git references revision(id), parent_id sha1_git, parent_rank int not null default 0, -- parent position in merge commits, 0-based primary key (id, parent_rank) ); create index on revision_history(parent_id); -- The timestamps at which Software Heritage has made a visit of the given origin. create table origin_visit ( origin bigint not null references origin(id), visit bigint not null, date timestamptz not null, primary key (origin, visit) ); create index on origin_visit(date); -- The content of software origins is indexed starting from top-level pointers -- called "branches". Every time we fetch some origin we store in this table -- where the branches pointed to at fetch time. -- -- Synonyms/mappings: -- * git: ref (in the "git update-ref" sense) create table occurrence_history ( origin bigint references origin(id) not null, branch bytea not null, -- e.g., b"master" (for VCS), or b"sid" (for Debian) target sha1_git not null, -- ref target, e.g., commit id target_type object_type not null, -- ref target type object_id bigserial not null, -- short object identifier visits bigint[] not null, -- the visits where that occurrence was valid. References -- origin_visit(visit), where o_h.origin = origin_visit.origin. primary key (object_id) ); create index on occurrence_history(target, target_type); create index on occurrence_history(origin, branch); create unique index on occurrence_history(origin, branch, target, target_type); -- Materialized view of occurrence_history, storing the *current* value of each -- branch, as last seen by SWH. create table occurrence ( origin bigint references origin(id) not null, branch bytea not null, target sha1_git not null, target_type object_type not null, primary key(origin, branch) ); -- A "memorable" point in the development history of a project. -- -- Synonyms/mappings: -- * git: tag (of the annotated kind, otherwise they are just references) -- * tarball: the release version number create table release ( id sha1_git primary key, target sha1_git, target_type object_type, date timestamptz, date_offset smallint, date_neg_utc_offset boolean, name bytea, comment bytea, author bigint references person(id), synthetic boolean not null default false, -- true if synthetic (cf. swh-loader-tar) object_id bigserial ); create index on release(target, target_type); + + +-- In order to archive the content of the object storage, add +-- some tables to keep trace of what have already been archived. + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archives ( + id archive_id PRIMARY KEY, + url TEXT +); + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +CREATE TABLE content_archive ( + content_id sha1 REFERENCES content(sha1), + archive_id archive_id REFERENCES archives(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); + diff --git a/sql/upgrades/069.sql b/sql/upgrades/069.sql new file mode 100644 index 00000000..6190c88d --- /dev/null +++ b/sql/upgrades/069.sql @@ -0,0 +1,28 @@ +-- SWH DB schema upgrade +-- from_version: 68 +-- to_version: 69 +-- description: add tables for the archiver. + +insert into dbversion(version, release, description) + values(69, now(), 'Work In Progress'); + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archives ( + id archive_id PRIMARY KEY, + url TEXT +); + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +CREATE TABLE content_archive ( + content_id sha1 REFERENCES content(sha1), + archive_id archive_id REFERENCES archives(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 41a768eb..ed3ea4e0 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.37 +Version: 0.0.38 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index a774771a..2dc3b4fc 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,126 +1,141 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile Makefile.local README.db_testing README.dev requirements.txt setup.py version.txt bin/swh-objstorage-add-dir bin/swh-objstorage-fsck bin/swh-storage-add-dir debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format -doc/archiver-blueprint.md +docs/archiver-blueprint.md sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/swh-data.sql sql/swh-func.sql sql/swh-init.sql sql/swh-schema.sql sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/revision.metadata.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/revision.metadata.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql +sql/upgrades/069.sql swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py swh/storage/converters.py swh/storage/db.py swh/storage/exc.py -swh/storage/objstorage.py swh/storage/storage.py swh/storage/api/__init__.py swh/storage/api/client.py +swh/storage/api/common.py swh/storage/api/server.py +swh/storage/archiver/__init__.py +swh/storage/archiver/copier.py +swh/storage/archiver/director.py +swh/storage/archiver/tasks.py +swh/storage/archiver/worker.py +swh/storage/objstorage/__init__.py +swh/storage/objstorage/objstorage.py +swh/storage/objstorage/api/__init__.py +swh/storage/objstorage/api/client.py +swh/storage/objstorage/api/server.py +swh/storage/tests/manual_test_archiver.py +swh/storage/tests/server_testing.py swh/storage/tests/test_api_client.py +swh/storage/tests/test_archiver.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py swh/storage/tests/test_objstorage.py +swh/storage/tests/test_objstorage_api.py swh/storage/tests/test_storage.py utils/dump_revisions.py utils/fix_revisions_from_dump.py \ No newline at end of file diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index 8655d947..0ae88685 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,6 +1,8 @@ +click dateutil flask psycopg2 requests swh.core>=0.0.17 +swh.scheduler vcversioner diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py index 937d7f6a..f8f9ad20 100644 --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -1,199 +1,178 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pickle - import requests from requests.exceptions import ConnectionError -from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder -from swh.storage.exc import StorageAPIError - - -def encode_data(data): - try: - return msgpack_dumps(data) - except OverflowError as e: - raise ValueError('Limits were reached. Please, check your input.\n' + - str(e)) - - -def decode_response(response): - content_type = response.headers['content-type'] - - if content_type.startswith('application/x-msgpack'): - r = msgpack_loads(response.content) - elif content_type.startswith('application/json'): - r = response.json(cls=SWHJSONDecoder) - else: - raise ValueError('Wrong content type `%s` for API response' - % content_type) - return r +from ..exc import StorageAPIError +from ..api.common import (decode_response, + encode_data_client as encode_data) class RemoteStorage(): """Proxy to a remote storage API""" def __init__(self, base_url): self.base_url = base_url self.session = requests.Session() def url(self, endpoint): return '%s%s' % (self.base_url, endpoint) def post(self, endpoint, data): try: response = self.session.post( self.url(endpoint), data=encode_data(data), headers={'content-type': 'application/x-msgpack'}, ) except ConnectionError as e: print(str(e)) raise StorageAPIError(e) # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) return decode_response(response) def get(self, endpoint, data=None): try: response = self.session.get( self.url(endpoint), params=data, ) except ConnectionError as e: print(str(e)) raise StorageAPIError(e) if response.status_code == 404: return None # XXX: this breaks language-independence and should be # replaced by proper unserialization if response.status_code == 400: raise pickle.loads(decode_response(response)) else: return decode_response(response) def content_add(self, content): return self.post('content/add', {'content': content}) def content_missing(self, content, key_hash='sha1'): return self.post('content/missing', {'content': content, 'key_hash': key_hash}) def content_missing_per_sha1(self, contents): return self.post('content/missing/sha1', {'contents': contents}) def content_get(self, content): return self.post('content/data', {'content': content}) def content_find(self, content): return self.post('content/present', {'content': content}) def content_find_occurrence(self, content): return self.post('content/occurrence', {'content': content}) def directory_add(self, directories): return self.post('directory/add', {'directories': directories}) def directory_missing(self, directories): return self.post('directory/missing', {'directories': directories}) def directory_get(self, directories): return self.post('directory', dict(directories=directories)) def directory_ls(self, directory, recursive=False): return self.get('directory/ls', {'directory': directory, 'recursive': recursive}) def revision_get(self, revisions): return self.post('revision', {'revisions': revisions}) def revision_get_by(self, origin_id, branch_name, timestamp, limit=None): return self.post('revision/by', dict(origin_id=origin_id, branch_name=branch_name, timestamp=timestamp, limit=limit)) def revision_log(self, revisions, limit=None): return self.post('revision/log', {'revisions': revisions, 'limit': limit}) def revision_shortlog(self, revisions, limit=None): return self.post('revision/shortlog', {'revisions': revisions, 'limit': limit}) def revision_add(self, revisions): return self.post('revision/add', {'revisions': revisions}) def revision_missing(self, revisions): return self.post('revision/missing', {'revisions': revisions}) def release_add(self, releases): return self.post('release/add', {'releases': releases}) def release_get(self, releases): return self.post('release', {'releases': releases}) def release_get_by(self, origin_id, limit=None): return self.post('release/by', dict(origin_id=origin_id, limit=limit)) def release_missing(self, releases): return self.post('release/missing', {'releases': releases}) def object_find_by_sha1_git(self, ids): return self.post('object/find_by_sha1_git', {'ids': ids}) def occurrence_get(self, origin_id): return self.post('occurrence', {'origin_id': origin_id}) def occurrence_add(self, occurrences): return self.post('occurrence/add', {'occurrences': occurrences}) def origin_get(self, origin): return self.post('origin/get', {'origin': origin}) def origin_add_one(self, origin): return self.post('origin/add', {'origin': origin}) def person_get(self, person): return self.post('person', {'person': person}) def fetch_history_start(self, origin_id): return self.post('fetch_history/start', {'origin_id': origin_id}) def fetch_history_end(self, fetch_history_id, data): return self.post('fetch_history/end', {'fetch_history_id': fetch_history_id, 'data': data}) def fetch_history_get(self, fetch_history_id): return self.get('fetch_history', {'id': fetch_history_id}) def entity_add(self, entities): return self.post('entity/add', {'entities': entities}) def entity_get(self, uuid): return self.post('entity/get', {'uuid': uuid}) def entity_get_one(self, uuid): return self.get('entity', {'uuid': uuid}) def entity_get_from_lister_metadata(self, entities): return self.post('entity/from_lister_metadata', {'entities': entities}) def stat_counters(self): return self.get('stat/counters') def directory_entry_get_by_path(self, directory, paths): return self.post('directory/path', dict(directory=directory, paths=paths)) diff --git a/swh/storage/api/common.py b/swh/storage/api/common.py new file mode 100644 index 00000000..328d8260 --- /dev/null +++ b/swh/storage/api/common.py @@ -0,0 +1,69 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import pickle + +from flask import Request, Response + +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + + +def encode_data_server(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def encode_data_client(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +def error_handler(exception, encoder): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encoder(pickle.dumps(exception)) + response.status_code = 400 + return response diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py index bce183e7..324e9dd1 100644 --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -1,278 +1,255 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging -import pickle +import click -from flask import Flask, Request, Response, g, request +from flask import Flask, g, request from swh.core import config -from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder from swh.storage import Storage +from swh.storage.api.common import (BytesRequest, decode_request, + error_handler, + encode_data_server as encode_data) DEFAULT_CONFIG = { 'db': ('str', 'dbname=softwareheritage-dev'), 'storage_base': ('str', '/tmp/swh-storage/test'), } -class BytesRequest(Request): - """Request with proper escaping of arbitrary byte sequences.""" - encoding = 'utf-8' - encoding_errors = 'surrogateescape' - - app = Flask(__name__) app.request_class = BytesRequest -def encode_data(data): - return Response( - msgpack_dumps(data), - mimetype='application/x-msgpack', - ) - - -def decode_request(request): - content_type = request.mimetype - data = request.get_data() - - if content_type == 'application/x-msgpack': - r = msgpack_loads(data) - elif content_type == 'application/json': - r = json.loads(data, cls=SWHJSONDecoder) - else: - raise ValueError('Wrong content type `%s` for API request' - % content_type) - - return r - - @app.errorhandler(Exception) -def error_handler(exception): - # XXX: this breaks language-independence and should be - # replaced by proper serialization of errors - response = encode_data(pickle.dumps(exception)) - response.status_code = 400 - return response +def my_error_handler(exception): + return error_handler(exception, encode_data) @app.before_request def before_request(): g.storage = Storage(app.config['db'], app.config['storage_base']) @app.route('/') def index(): return 'Hello' @app.route('/content/missing', methods=['POST']) def content_missing(): return encode_data(g.storage.content_missing(**decode_request(request))) @app.route('/content/missing/sha1', methods=['POST']) def content_missing_per_sha1(): return encode_data(g.storage.content_missing_per_sha1( **decode_request(request))) @app.route('/content/present', methods=['POST']) def content_find(): return encode_data(g.storage.content_find(**decode_request(request))) @app.route('/content/occurrence', methods=['POST']) def content_find_occurrence(): res = g.storage.content_find_occurrence(**decode_request(request)) return encode_data(res) @app.route('/content/add', methods=['POST']) def content_add(): return encode_data(g.storage.content_add(**decode_request(request))) @app.route('/content/data', methods=['POST']) def content_get(): return encode_data(g.storage.content_get(**decode_request(request))) @app.route('/directory', methods=['POST']) def directory_get(): return encode_data(g.storage.directory_get(**decode_request(request))) @app.route('/directory/missing', methods=['POST']) def directory_missing(): return encode_data(g.storage.directory_missing(**decode_request(request))) @app.route('/directory/add', methods=['POST']) def directory_add(): return encode_data(g.storage.directory_add(**decode_request(request))) @app.route('/directory/path', methods=['POST']) def directory_entry_get_by_path(): return encode_data(g.storage.directory_entry_get_by_path( **decode_request(request))) @app.route('/directory/ls', methods=['GET']) def directory_ls(): dir = request.args['directory'].encode('utf-8', 'surrogateescape') rec = json.loads(request.args.get('recursive', 'False').lower()) return encode_data(g.storage.directory_ls(dir, recursive=rec)) @app.route('/revision/add', methods=['POST']) def revision_add(): return encode_data(g.storage.revision_add(**decode_request(request))) @app.route('/revision', methods=['POST']) def revision_get(): return encode_data(g.storage.revision_get(**decode_request(request))) @app.route('/revision/by', methods=['POST']) def revision_get_by(): return encode_data(g.storage.revision_get_by(**decode_request(request))) @app.route('/revision/log', methods=['POST']) def revision_log(): return encode_data(g.storage.revision_log(**decode_request(request))) @app.route('/revision/shortlog', methods=['POST']) def revision_shortlog(): return encode_data(g.storage.revision_shortlog(**decode_request(request))) @app.route('/revision/missing', methods=['POST']) def revision_missing(): return encode_data(g.storage.revision_missing(**decode_request(request))) @app.route('/release/add', methods=['POST']) def release_add(): return encode_data(g.storage.release_add(**decode_request(request))) @app.route('/release', methods=['POST']) def release_get(): return encode_data(g.storage.release_get(**decode_request(request))) @app.route('/release/by', methods=['POST']) def release_get_by(): return encode_data(g.storage.release_get_by(**decode_request(request))) @app.route('/release/missing', methods=['POST']) def release_missing(): return encode_data(g.storage.release_missing(**decode_request(request))) @app.route('/object/find_by_sha1_git', methods=['POST']) def object_find_by_sha1_git(): return encode_data(g.storage.object_find_by_sha1_git( **decode_request(request))) @app.route('/occurrence', methods=['POST']) def occurrence_get(): return encode_data(g.storage.occurrence_get(**decode_request(request))) @app.route('/occurrence/add', methods=['POST']) def occurrence_add(): return encode_data(g.storage.occurrence_add(**decode_request(request))) @app.route('/origin/get', methods=['POST']) def origin_get(): return encode_data(g.storage.origin_get(**decode_request(request))) @app.route('/origin/add', methods=['POST']) def origin_add_one(): return encode_data(g.storage.origin_add_one(**decode_request(request))) @app.route('/person', methods=['POST']) def person_get(): return encode_data(g.storage.person_get(**decode_request(request))) @app.route('/fetch_history', methods=['GET']) def fetch_history_get(): return encode_data(g.storage.fetch_history_get(request.args['id'])) @app.route('/fetch_history/start', methods=['POST']) def fetch_history_start(): return encode_data( g.storage.fetch_history_start(**decode_request(request))) @app.route('/fetch_history/end', methods=['POST']) def fetch_history_end(): return encode_data( g.storage.fetch_history_end(**decode_request(request))) @app.route('/entity/add', methods=['POST']) def entity_add(): return encode_data( g.storage.entity_add(**decode_request(request))) @app.route('/entity/get', methods=['POST']) def entity_get(): return encode_data( g.storage.entity_get(**decode_request(request))) @app.route('/entity', methods=['GET']) def entity_get_one(): return encode_data(g.storage.entity_get_one(request.args['uuid'])) @app.route('/entity/from_lister_metadata', methods=['POST']) def entity_from_lister_metadata(): return encode_data( g.storage.entity_get_from_lister_metadata(**decode_request(request))) @app.route('/stat/counters', methods=['GET']) def stat_counters(): return encode_data(g.storage.stat_counters()) def run_from_webserver(environ, start_response): """Run the WSGI app from the webserver, loading the configuration.""" config_path = '/etc/softwareheritage/storage/storage.ini' app.config.update(config.read(config_path, DEFAULT_CONFIG)) handler = logging.StreamHandler() app.logger.addHandler(handler) return app(environ, start_response) -if __name__ == '__main__': - import sys +@click.command() +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5000, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +def launch(config_path, host, port, debug): + app.config.update(config.read(config_path, DEFAULT_CONFIG)) + app.run(host, port=int(port), debug=bool(debug)) - app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG)) - host = sys.argv[2] if len(sys.argv) >= 3 else '127.0.0.1' - app.run(host, debug=True) +if __name__ == '__main__': + launch() diff --git a/swh/storage/archiver/__init__.py b/swh/storage/archiver/__init__.py new file mode 100644 index 00000000..3507b4ea --- /dev/null +++ b/swh/storage/archiver/__init__.py @@ -0,0 +1,3 @@ +from .director import ArchiverDirector # NOQA +from .worker import ArchiverWorker # NOQA +from .copier import ArchiverCopier # NOQA diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py new file mode 100644 index 00000000..988cfaf9 --- /dev/null +++ b/swh/storage/archiver/copier.py @@ -0,0 +1,60 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core import hashutil +from ..objstorage.api.client import RemoteObjStorage + + +class ArchiverCopier(): + """ This archiver copy some files into a remote objstorage + in order to get a backup. + + Attributes: + content_ids: A list of sha1's that represents the content this copier + has to archive. + server (RemoteArchive): The remote object storage that is used to + backup content. + master_storage (Storage): The master storage that contains the data + the copier needs to archive. + """ + def __init__(self, destination, content, master_storage): + """ Create a Copier for the archiver + + Args: + destination: A tuple (archive_name, archive_url) that represents a + remote object storage as in the 'archives' table. + content: A list of sha1 that represents the content this copier + have to archive. + master_storage (Storage): The master storage of the system that + contains the data to archive. + """ + _name, self.url = destination + self.content_ids = content + self.server = RemoteObjStorage(self.url) + self.master_storage = master_storage + + def run(self): + """ Do the copy on the backup storage. + + Run the archiver copier in order to copy the required content + into the current destination. + The content which corresponds to the sha1 in self.content_ids + will be fetched from the master_storage and then copied into + the backup object storage. + + Returns: + A boolean that indicates if the whole content have been copied. + """ + self.content_ids = list(map(lambda x: hashutil.hex_to_hash(x[2:]), + self.content_ids)) + contents = self.master_storage.content_get(self.content_ids) + try: + for content in contents: + content_data = content['data'] + self.server.content_add(content_data) + except: + return False + + return True diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py new file mode 100644 index 00000000..74003f0d --- /dev/null +++ b/swh/storage/archiver/director.py @@ -0,0 +1,243 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import swh +import logging +import click + +from datetime import datetime + +from swh.core import hashutil, config +from swh.scheduler.celery_backend.config import app +from . import tasks # NOQA + + +DEFAULT_CONFIG = { + 'objstorage_path': ('str', '/tmp/swh-storage/objects'), + 'batch_max_size': ('int', 50), + 'archival_max_age': ('int', 3600), + 'retention_policy': ('int', 2), + 'asynchronous': ('bool', True), + + 'dbconn': ('str', 'dbname=softwareheritage-dev user=guest') +} + +task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' + +logger = logging.getLogger() + + +class ArchiverDirector(): + """Process the files in order to know which one is needed as backup. + + The archiver director processes the files in the local storage in order + to know which one needs archival and it delegates this task to + archiver workers. + + Attributes: + master_storage: the local storage of the master server. + slave_storages: Iterable of remote obj storages to the slaves servers + used for backup. + config: Archiver_configuration. A dictionary that must contain + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. + """ + + def __init__(self, db_conn, config): + """ Constructor of the archiver director. + + Args: + db_conn: db_conn: Either a libpq connection string, + or a psycopg2 connection. + config: Archiver_configuration. A dictionary that must contains + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. + """ + # Get the local storage of the master and remote ones for the slaves. + self.master_storage_args = [db_conn, config['objstorage_path']] + master_storage = swh.storage.get_storage('local_storage', + self.master_storage_args) + slaves = { + id: url + for id, url + in master_storage.db.archive_ls() + } + + # TODO Database should be initialized somehow before going in + # production. For now, assumes that the database contains + # datas for all the current content. + + self.master_storage = master_storage + self.slave_storages = slaves + self.config = config + + def run(self): + """ Run the archiver director. + + The archiver director will check all the contents of the archiver + database and do the required backup jobs. + """ + if self.config['asynchronous']: + run_fn = self.run_async_worker + else: + run_fn = self.run_sync_worker + for batch in self.get_unarchived_content(): + run_fn(batch) + + def run_async_worker(self, batch): + """ Produce a worker that will be added to the task queue. + """ + task = app.tasks[task_name] + task.delay(batch, self.master_storage_args, + self.slave_storages, self.config['retention_policy']) + + def run_sync_worker(self, batch): + """ Run synchronously a worker on the given batch. + """ + task = app.tasks[task_name] + task(batch, self.master_storage_args, + self.slave_storages, self.config) + + def get_unarchived_content(self): + """ get all the contents that needs to be archived. + + Yields: + A batch of contents. Batches are dictionaries which associates + a content id to the data about servers that contains it or not. + + {'id1': + {'present': [('slave1', 'slave1_url')], + 'missing': [('slave2', 'slave2_url'), + ('slave3', 'slave3_url')] + }, + 'id2': + {'present': [], + 'missing': [ + ('slave1', 'slave1_url'), + ('slave2', 'slave2_url'), + ('slave3', 'slave3_url') + ]} + } + + Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) + are ids and urls of the storage slaves. + + At least all the content that don't have enough copies on the + backups servers are distributed into these batches. + """ + # Get the data about each content referenced into the archiver. + missing_copy = {} + for content_id in self.master_storage.db.content_archive_ls(): + db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) + + # Fetch the datas about archival status of the content + backups = self.master_storage.db.content_archive_get( + content=db_content_id + ) + for _content_id, server_id, status, mtime in backups: + virtual_status = self.get_virtual_status(status, mtime) + server_data = (server_id, self.slave_storages[server_id]) + + missing_copy.setdefault( + db_content_id, + {'present': [], 'missing': []} + ).setdefault(virtual_status, []).append(server_data) + + # Check the content before archival. + try: + self.master_storage.objstorage.check(content_id[0]) + except Exception as e: + # Exception can be Error or ObjNotFoundError. + logger.error(e) + # TODO Do something to restore the content? + + if len(missing_copy) >= self.config['batch_max_size']: + yield missing_copy + missing_copy = {} + + if len(missing_copy) > 0: + yield missing_copy + + def get_virtual_status(self, status, mtime): + """ Compute the virtual presence of a content. + + If the status is ongoing but the time is not elasped, the archiver + consider it will be present in the futur, and so consider it as + present. + However, if the time is elasped, the copy may have failed, so consider + the content as missing. + + Arguments: + status (string): One of ('present', 'missing', 'ongoing'). The + status of the content. + mtime (datetime): Time at which the content have been updated for + the last time. + + Returns: + The virtual status of the studied content, which is 'present' or + 'missing'. + + Raises: + ValueError: if the status is not one 'present', 'missing' + or 'ongoing' + """ + if status in ('present', 'missing'): + return status + + # If the status is 'ongoing' but there is still time, another worker + # may still be on the task. + if status == 'ongoing': + mtime = mtime.replace(tzinfo=None) + elapsed = (datetime.now() - mtime).total_seconds() + if elapsed <= self.config['archival_max_age']: + return 'present' + else: + return 'missing' + else: + raise ValueError("status must be either 'present', 'missing' " + "or 'ongoing'") + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], + help="Connection string for the database") +@click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], + help="Indicates if the archiver should run asynchronously") +def launch(config_path, dbconn, async): + # The configuration have following priority : + # command line > file config > default config + cl_config = { + 'dbconn': dbconn, + 'asynchronous': async + } + conf = config.read(config_path, DEFAULT_CONFIG) + conf.update(cl_config) + # Create connection data and run the archiver. + archiver = ArchiverDirector(conf['dbconn'], conf) + logger.info("Starting an archival at", datetime.now()) + archiver.run() + + +if __name__ == '__main__': + launch() diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py new file mode 100644 index 00000000..0b7ce61e --- /dev/null +++ b/swh/storage/archiver/tasks.py @@ -0,0 +1,20 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.task import Task +from .worker import ArchiverWorker + + +class SWHArchiverTask(Task): + """ Main task that archive a batch of content. + """ + task_queue = 'swh_storage_archive_worker' + + def run(self, batch, master_storage_args, + slave_storages, config): + aw = ArchiverWorker(batch, master_storage_args, + slave_storages, config) + if aw.run(): + self.log("Successful backup for a batch of size %s" % len(batch)) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py new file mode 100644 index 00000000..8fda96bc --- /dev/null +++ b/swh/storage/archiver/worker.py @@ -0,0 +1,239 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import logging + +from .copier import ArchiverCopier +from .. import get_storage + +from datetime import datetime + +logger = logging.getLogger() + + +class ArchiverWorker(): + """ Do the required backups on a given batch of contents. + + Process the content of a content batch in order to do the needed backups on + the slaves servers. + + Attributes: + batch: The content this worker has to archive, which is a dictionary + that associates a content's sha1 id to the list of servers where + the content is present or missing + (see ArchiverDirector::get_unarchived_content). + master_storage_args: The connection argument to initialize the + master storage with the db connection url & the object storage + path. + slave_storages: A map that associates server_id to the remote server. + config: Archiver_configuration. A dictionary that must contains + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. + """ + def __init__(self, batch, master_storage_args, slave_storages, config): + """ Constructor of the ArchiverWorker class. + + Args: + batch: A batch of content, which is a dictionary that associates + a content's sha1 id to the list of servers where the content + is present. + master_storage_args: The master storage arguments. + slave_storages: A map that associates server_id to the remote + server. + config: Archiver_configuration. A dictionary that must contains + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. + """ + self.batch = batch + self.master_storage = get_storage('local_storage', master_storage_args) + self.slave_storages = slave_storages + self.config = config + + def __choose_backup_servers(self, allowed_storage, backup_number): + """ Choose the slave servers for archival. + + Choose the given amount of servers among those which don't already + contain a copy of the content. + + Args: + allowed_storage: servers when the content is not already present. + backup_number (int): The number of servers we have to choose in + order to fullfill the objective. + """ + # In case there is not enough backup servers to get all the backups + # we need, just do our best. + # TODO such situation can only be caused by an incorrect configuration + # setting. Do a verification previously. + backup_number = min(backup_number, len(allowed_storage)) + + # TODO Find a better (or a good) policy to choose the backup servers. + # The random choice should be equivalently distributed between + # servers for a great amount of data, but don't take care of servers + # capacities. + return random.sample(allowed_storage, backup_number) + + def __get_archival_status(self, content_id, server): + """ Get the archival status of the required content. + + Attributes: + content_id (string): Sha1 of the content. + server: Tuple (archive_id, archive_url) of the archive server. + Returns: + A dictionary that contains all the required data : 'content_id', + 'archive_id', 'status', and 'mtime' + """ + t, = list( + self.master_storage.db.content_archive_get(content_id, server[0]) + ) + return { + 'content_id': t[0], + 'archive_id': t[1], + 'status': t[2], + 'mtime': t[3] + } + + def __content_archive_update(self, content_id, archive_id, + new_status=None): + """ Update the status of a archive content and set it's mtime to now() + + Change the last modification time of an archived content and change + its status to the given one. + + Args: + content_id (string): The content id. + archive_id (string): The id of the concerned archive. + new_status (string): One of missing, ongoing or present, this + status will replace the previous one. If not given, the + function only changes the mtime of the content. + """ + self.master_storage.db.content_archive_update( + content_id, + archive_id, + new_status + ) + + def need_archival(self, content, destination): + """ Indicates whenever a content need archivage. + + Filter function that returns True if a given content + still require to be archived. + + Args: + content (str): Sha1 of a content. + destination: Tuple (archive id, archive url). + """ + archival_status = self.__get_archival_status( + content, + destination + ) + status = archival_status['status'] + mtime = archival_status['mtime'] + # If the archive is already present, no need to backup. + if status == 'present': + return False + # If the content is ongoing but still have time, there is + # another worker working on this content. + elif status == 'ongoing': + mtime = mtime.replace(tzinfo=None) + elapsed = (datetime.now() - mtime).total_seconds() + if elapsed <= self.config['archival_max_age']: + return False + return True + + def sort_content_by_archive(self): + """ Create a map {archive_server -> list of content) + + Create a mapping that associate to a archive server all the + contents that needs to be archived in it by the current worker. + + The map is in the form of : + { + (archive_1, archive_1_url): [content1, content2, content_3] + (archive_2, archive_2_url): [content1, content3] + } + + Returns: + The created mapping. + """ + slaves_copy = {} + for content_id in self.batch: + # Choose some servers to upload the content among the missing ones. + server_data = self.batch[content_id] + nb_present = len(server_data['present']) + nb_backup = self.config['retention_policy'] - nb_present + backup_servers = self.__choose_backup_servers( + server_data['missing'], + nb_backup + ) + # Fill the map destination -> content to upload + for server in backup_servers: + slaves_copy.setdefault(server, []).append(content_id) + return slaves_copy + + def run(self): + """ Do the task expected from the archiver worker. + + Process the content in the batch, ensure that the elements still need + an archival, and spawn copiers to copy files in each destinations. + """ + # Get a map (archive -> [contents]) + slaves_copy = self.sort_content_by_archive() + + # At this point, re-check the archival status in order to know if the + # job have been done by another worker. + for destination in slaves_copy: + # list() is needed because filter's result will be consumed twice. + slaves_copy[destination] = list(filter( + lambda content_id: self.need_archival(content_id, destination), + slaves_copy[destination] + )) + for content_id in slaves_copy[destination]: + self.__content_archive_update(content_id, destination[0], + new_status='ongoing') + + # Spawn a copier for each destination + for destination in slaves_copy: + try: + self.run_copier(destination, slaves_copy[destination]) + except: + logger.error('Unable to copy a batch to %s' % destination) + + def run_copier(self, destination, contents): + """ Run a copier in order to archive the given contents + + Upload the given contents to the given archive. + If the process fail, the whole content is considered uncopied + and remains 'ongoing', waiting to be rescheduled as there is + a delay. + + Attributes: + destination: Tuple (archive_id, archive_url) of the destination. + contents: List of contents to archive. + """ + ac = ArchiverCopier(destination, contents, self.master_storage) + if ac.run(): + # Once the archival complete, update the database. + for content_id in contents: + self.__content_archive_update(content_id, destination[0], + new_status='present') diff --git a/swh/storage/converters.py b/swh/storage/converters.py index 4f82550c..9bf2a35a 100644 --- a/swh/storage/converters.py +++ b/swh/storage/converters.py @@ -1,360 +1,360 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import codecs import datetime import numbers DEFAULT_AUTHOR = { 'fullname': None, 'name': None, 'email': None, } DEFAULT_DATE = { 'timestamp': None, 'offset': 0, 'neg_utc_offset': None, } def backslashescape_errors(exception): if isinstance(exception, UnicodeDecodeError): bad_data = exception.object[exception.start:exception.end] escaped = ''.join(r'\x%02x' % x for x in bad_data) return escaped, exception.end return codecs.backslashreplace_errors(exception) codecs.register_error('backslashescape', backslashescape_errors) def decode_with_escape(value): """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences as \\x. We also escape NUL bytes as they are invalid in JSON strings. """ # escape backslashes value = value.replace(b'\\', b'\\\\') value = value.replace(b'\x00', b'\\x00') return value.decode('utf-8', 'backslashescape') def encode_with_unescape(value): """Encode an unicode string containing \\x backslash escapes""" slices = [] start = 0 odd_backslashes = False i = 0 while i < len(value): if value[i] == '\\': odd_backslashes = not odd_backslashes else: if odd_backslashes: if value[i] != 'x': raise ValueError('invalid escape for %r at position %d' % (value, i-1)) slices.append( value[start:i-1].replace('\\\\', '\\').encode('utf-8') ) slices.append(bytes.fromhex(value[i+1:i+3])) odd_backslashes = False start = i = i + 3 continue i += 1 slices.append( value[start:i].replace('\\\\', '\\').encode('utf-8') ) return b''.join(slices) def author_to_db(author): """Convert a swh-model author to its DB representation. Args: a swh-model compatible author Returns: a dict containing three keys: author, fullname and email """ if author is None: return DEFAULT_AUTHOR return author def db_to_author(id, fullname, name, email): """Convert the DB representation of an author to a swh-model author. Args: id (long): the author's identifier fullname (bytes): the author's fullname name (bytes): the author's name email (bytes): the author's email Returns: a dict with four keys: id, fullname, name and email, or None if the id is None """ if id is None: return None return { 'id': id, 'fullname': fullname, 'name': name, 'email': email, } def git_headers_to_db(git_headers): """Convert git headers to their database representation. We convert the bytes to unicode by decoding them into utf-8 and replacing invalid utf-8 sequences with backslash escapes. """ ret = [] for key, values in git_headers: if isinstance(values, list): ret.append([key, [decode_with_escape(value) for value in values]]) else: ret.append([key, decode_with_escape(values)]) return ret def db_to_git_headers(db_git_headers): ret = [] for key, values in db_git_headers: if isinstance(values, list): ret.append([key, [encode_with_unescape(value) for value in values]]) else: ret.append([key, encode_with_unescape(values)]) return ret def db_to_date(date, offset, neg_utc_offset): """Convert the DB representation of a date to a swh-model compatible date. Args: date (datetime.datetime): a date pulled out of the database offset (int): an integer number of minutes representing an UTC offset neg_utc_offset (boolean): whether an utc offset is negative Returns: a dict with three keys: timestamp: a timestamp from UTC offset: the number of minutes since UTC negative_utc: whether a null UTC offset is negative """ if date is None: return None return { 'timestamp': date.timestamp(), 'offset': offset, 'negative_utc': neg_utc_offset, } def date_to_db(date_offset): """Convert a swh-model date_offset to its DB representation. Args: a swh-model compatible date_offset Returns: a dict with three keys: timestamp: a date in ISO format offset: the UTC offset in minutes neg_utc_offset: a boolean indicating whether a null offset is negative or positive. """ if date_offset is None: return DEFAULT_DATE if isinstance(date_offset, numbers.Real): date_offset = datetime.datetime.fromtimestamp(date_offset, tz=datetime.timezone.utc) if isinstance(date_offset, datetime.datetime): timestamp = date_offset utcoffset = date_offset.utcoffset() offset = int(utcoffset.total_seconds()) // 60 neg_utc_offset = False if offset == 0 else None else: if isinstance(date_offset['timestamp'], numbers.Real): timestamp = datetime.datetime.fromtimestamp( date_offset['timestamp'], tz=datetime.timezone.utc) else: timestamp = date_offset['timestamp'] offset = date_offset['offset'] neg_utc_offset = date_offset.get('negative_utc', None) return { 'timestamp': timestamp.isoformat(), 'offset': offset, 'neg_utc_offset': neg_utc_offset, } def revision_to_db(revision): """Convert a swh-model revision to its database representation. """ author = author_to_db(revision['author']) date = date_to_db(revision['date']) committer = author_to_db(revision['committer']) committer_date = date_to_db(revision['committer_date']) metadata = revision['metadata'] - if metadata and 'extra_git_headers' in metadata: + if metadata and 'extra_headers' in metadata: metadata = metadata.copy() - extra_git_headers = git_headers_to_db(metadata['extra_git_headers']) - metadata['extra_git_headers'] = extra_git_headers + extra_headers = git_headers_to_db(metadata['extra_headers']) + metadata['extra_headers'] = extra_headers return { 'id': revision['id'], 'author_fullname': author['fullname'], 'author_name': author['name'], 'author_email': author['email'], 'date': date['timestamp'], 'date_offset': date['offset'], 'date_neg_utc_offset': date['neg_utc_offset'], 'committer_fullname': committer['fullname'], 'committer_name': committer['name'], 'committer_email': committer['email'], 'committer_date': committer_date['timestamp'], 'committer_date_offset': committer_date['offset'], 'committer_date_neg_utc_offset': committer_date['neg_utc_offset'], 'type': revision['type'], 'directory': revision['directory'], 'message': revision['message'], 'metadata': metadata, 'synthetic': revision['synthetic'], 'parents': [ { 'id': revision['id'], 'parent_id': parent, 'parent_rank': i, } for i, parent in enumerate(revision['parents']) ], } def db_to_revision(db_revision): """Convert a database representation of a revision to its swh-model representation.""" author = db_to_author( db_revision['author_id'], db_revision['author_fullname'], db_revision['author_name'], db_revision['author_email'], ) date = db_to_date( db_revision['date'], db_revision['date_offset'], db_revision['date_neg_utc_offset'], ) committer = db_to_author( db_revision['committer_id'], db_revision['committer_fullname'], db_revision['committer_name'], db_revision['committer_email'], ) committer_date = db_to_date( db_revision['committer_date'], db_revision['committer_date_offset'], db_revision['committer_date_neg_utc_offset'] ) metadata = db_revision['metadata'] - if metadata and 'extra_git_headers' in metadata: - extra_git_headers = db_to_git_headers(metadata['extra_git_headers']) - metadata['extra_git_headers'] = extra_git_headers + if metadata and 'extra_headers' in metadata: + extra_headers = db_to_git_headers(metadata['extra_headers']) + metadata['extra_headers'] = extra_headers parents = [] if 'parents' in db_revision: for parent in db_revision['parents']: if parent: parents.append(parent) return { 'id': db_revision['id'], 'author': author, 'date': date, 'committer': committer, 'committer_date': committer_date, 'type': db_revision['type'], 'directory': db_revision['directory'], 'message': db_revision['message'], 'metadata': metadata, 'synthetic': db_revision['synthetic'], 'parents': parents, } def release_to_db(release): """Convert a swh-model release to its database representation. """ author = author_to_db(release['author']) date = date_to_db(release['date']) return { 'id': release['id'], 'author_fullname': author['fullname'], 'author_name': author['name'], 'author_email': author['email'], 'date': date['timestamp'], 'date_offset': date['offset'], 'date_neg_utc_offset': date['neg_utc_offset'], 'name': release['name'], 'target': release['target'], 'target_type': release['target_type'], 'comment': release['message'], 'synthetic': release['synthetic'], } def db_to_release(db_release): """Convert a database representation of a release to its swh-model representation. """ author = db_to_author( db_release['author_id'], db_release['author_fullname'], db_release['author_name'], db_release['author_email'], ) date = db_to_date( db_release['date'], db_release['date_offset'], db_release['date_neg_utc_offset'] ) return { 'author': author, 'date': date, 'id': db_release['id'], 'name': db_release['name'], 'message': db_release['comment'], 'synthetic': db_release['synthetic'], 'target': db_release['target'], 'target_type': db_release['target_type'], } diff --git a/swh/storage/db.py b/swh/storage/db.py index 08597a66..14fb23f7 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,626 +1,716 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import functools import json import psycopg2 import psycopg2.extras import tempfile from contextlib import contextmanager TMP_CONTENT_TABLE = 'tmp_content' psycopg2.extras.register_uuid() def stored_procedure(stored_proc): """decorator to execute remote stored procedure, specified as argument Generally, the body of the decorated function should be empty. If it is not, the stored procedure will be executed first; the function body then. """ def wrap(meth): @functools.wraps(meth) def _meth(self, *args, **kwargs): cur = kwargs.get('cur', None) self._cursor(cur).execute('SELECT %s()' % stored_proc) meth(self, *args, **kwargs) return _meth return wrap def jsonize(value): """Convert a value to a psycopg2 JSON object if necessary""" if isinstance(value, dict): return psycopg2.extras.Json(value) return value def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() if isinstance(entry, list): return [entry_to_bytes(value) for value in entry] return entry def line_to_bytes(line): """Convert a line coming from the database to bytes""" if isinstance(line, dict): return {k: entry_to_bytes(v) for k, v in line.items()} return line.__class__(entry_to_bytes(entry) for entry in line) def cursor_to_bytes(cursor): """Yield all the data from a cursor as bytes""" yield from (line_to_bytes(line) for line in cursor) class Db: """Proxy to the SWH DB, with wrappers around stored procedures """ @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) return cls(conn) def _cursor(self, cur_arg): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg # elif self.cur is not None: # return self.cur else: return self.conn.cursor() def __init__(self, conn): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB """ self.conn = conn @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except: if not self.conn.closed: self.conn.rollback() raise def mktemp(self, tblname, cur=None): self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)', (('directory_entry_%s' % entry_type),)) @stored_procedure('swh_mktemp_revision') def mktemp_revision(self, cur=None): pass @stored_procedure('swh_mktemp_release') def mktemp_release(self, cur=None): pass @stored_procedure('swh_mktemp_occurrence_history') def mktemp_occurrence_history(self, cur=None): pass @stored_procedure('swh_mktemp_entity_lister') def mktemp_entity_lister(self, cur=None): pass @stored_procedure('swh_mktemp_entity_history') def mktemp_entity_history(self, cur=None): pass @stored_procedure('swh_mktemp_bytea') def mktemp_bytea(self, cur=None): pass def copy_to(self, items, tblname, columns, cur=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) @stored_procedure('swh_content_add') def content_add_from_temp(self, cur=None): pass @stored_procedure('swh_directory_add') def directory_add_from_temp(self, cur=None): pass @stored_procedure('swh_skipped_content_add') def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure('swh_revision_add') def revision_add_from_temp(self, cur=None): pass @stored_procedure('swh_release_add') def release_add_from_temp(self, cur=None): pass @stored_procedure('swh_occurrence_history_add') def occurrence_history_add_from_temp(self, cur=None): pass @stored_procedure('swh_entity_history_add') def entity_history_add_from_temp(self, cur=None): pass def store_tmp_bytea(self, ids, cur=None): """Store the given identifiers in a new tmp_bytea table""" cur = self._cursor(cur) self.mktemp_bytea(cur) self.copy_to(({'id': elem} for elem in ids), 'tmp_bytea', ['id'], cur) def content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_content_missing()""") yield from cursor_to_bytes(cur) def content_missing_per_sha1_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT * FROM swh_content_missing_per_sha1()""") yield from cursor_to_bytes(cur) def skipped_content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_skipped_content_missing()""") yield from cursor_to_bytes(cur) def occurrence_get(self, origin_id, cur=None): """Retrieve latest occurrence's information by origin_id. """ cur = self._cursor(cur) cur.execute("""SELECT origin, branch, target, target_type, (select max(date) from origin_visit where origin=%s) as date FROM occurrence WHERE origin=%s """, (origin_id, origin_id)) yield from cursor_to_bytes(cur) def content_find(self, sha1=None, sha1_git=None, sha256=None, cur=None): """Find the content optionally on a combination of the following checksums sha1, sha1_git or sha256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content Returns: The triplet (sha1, sha1_git, sha256) if found or None. """ cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256, length, ctime, status FROM swh_content_find(%s, %s, %s) LIMIT 1""", (sha1, sha1_git, sha256)) content = line_to_bytes(cur.fetchone()) if set(content) == {None}: return None else: return content def content_find_occurrence(self, sha1, cur=None): """Find one content's occurrence. Args: sha1: sha1 content cur: cursor to use Returns: One occurrence for that particular sha1 """ cur = self._cursor(cur) cur.execute("""SELECT origin_type, origin_url, branch, target, target_type, path FROM swh_content_find_occurrence(%s) LIMIT 1""", (sha1, )) return line_to_bytes(cur.fetchone()) def directory_get_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('''SELECT id, file_entries, dir_entries, rev_entries FROM swh_directory_get()''') yield from cursor_to_bytes(cur) def directory_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_missing()') yield from cursor_to_bytes(cur) def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_walk_one(%s)', (directory,)) yield from cursor_to_bytes(cur) def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_walk(%s)', (directory,)) yield from cursor_to_bytes(cur) def revision_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_revision_missing() as r(id)') yield from cursor_to_bytes(cur) revision_add_cols = [ 'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', 'committer_date_offset', 'committer_date_neg_utc_offset', 'type', 'directory', 'message', 'author_fullname', 'author_name', 'author_email', 'committer_fullname', 'committer_name', 'committer_email', 'metadata', 'synthetic', ] revision_get_cols = revision_add_cols + [ 'author_id', 'committer_id', 'parents'] def revision_get_from_temp(self, cur=None): cur = self._cursor(cur) query = 'SELECT %s FROM swh_revision_get()' % ( ', '.join(self.revision_get_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ', '.join(self.revision_get_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) revision_shortlog_cols = ['id', 'parents'] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ', '.join(self.revision_shortlog_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) def release_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_release_missing() as r(id)') yield from cursor_to_bytes(cur) object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) self.store_tmp_bytea(ids, cur) query = 'select %s from swh_object_find_by_sha1_git()' % ( ', '.join(self.object_find_by_sha1_git_cols) ) cur.execute(query) yield from cursor_to_bytes(cur) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_stat_counters()') yield from cur fetch_history_cols = ['origin', 'date', 'status', 'result', 'stdout', 'stderr', 'duration'] def create_fetch_history(self, fetch_history, cur=None): """Create a fetch_history entry with the data in fetch_history""" cur = self._cursor(cur) query = '''INSERT INTO fetch_history (%s) VALUES (%s) RETURNING id''' % ( ','.join(self.fetch_history_cols), ','.join(['%s'] * len(self.fetch_history_cols)) ) cur.execute(query, [fetch_history.get(col) for col in self.fetch_history_cols]) return cur.fetchone()[0] def get_fetch_history(self, fetch_history_id, cur=None): """Get a fetch_history entry with the given id""" cur = self._cursor(cur) query = '''SELECT %s FROM fetch_history WHERE id=%%s''' % ( ', '.join(self.fetch_history_cols), ) cur.execute(query, (fetch_history_id,)) data = cur.fetchone() if not data: return None ret = {'id': fetch_history_id} for i, col in enumerate(self.fetch_history_cols): ret[col] = data[i] return ret def update_fetch_history(self, fetch_history, cur=None): """Update the fetch_history entry from the data in fetch_history""" cur = self._cursor(cur) query = '''UPDATE fetch_history SET %s WHERE id=%%s''' % ( ','.join('%s=%%s' % col for col in self.fetch_history_cols) ) cur.execute(query, [jsonize(fetch_history.get(col)) for col in self.fetch_history_cols + ['id']]) base_entity_cols = ['uuid', 'parent', 'name', 'type', 'description', 'homepage', 'active', 'generated', 'lister_metadata', 'metadata'] entity_cols = base_entity_cols + ['last_seen', 'last_id'] entity_history_cols = base_entity_cols + ['id', 'validity'] def origin_add(self, type, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (type, url) values (%s, %s) RETURNING id""" cur.execute(insert, (type, url)) return cur.fetchone()[0] def origin_get_with(self, type, url, cur=None): """Retrieve the origin id from its type and url if found.""" cur = self._cursor(cur) query = """SELECT id, type, url, lister, project FROM origin WHERE type=%s AND url=%s""" cur.execute(query, (type, url)) data = cur.fetchone() if data: return line_to_bytes(data) return None def origin_get(self, id, cur=None): """Retrieve the origin per its identifier. """ cur = self._cursor(cur) query = "SELECT id, type, url, lister, project FROM origin WHERE id=%s" cur.execute(query, (id,)) data = cur.fetchone() if data: return line_to_bytes(data) return None person_cols = ['fullname', 'name', 'email'] person_get_cols = person_cols + ['id'] def person_add(self, person, cur=None): """Add a person identified by its name and email. Returns: The new person's id """ cur = self._cursor(cur) query_new_person = '''\ INSERT INTO person(%s) VALUES (%s) RETURNING id''' % ( ', '.join(self.person_cols), ', '.join('%s' for i in range(len(self.person_cols))) ) cur.execute(query_new_person, [person[col] for col in self.person_cols]) return cur.fetchone()[0] def person_get(self, ids, cur=None): """Retrieve the persons identified by the list of ids. """ cur = self._cursor(cur) query = """SELECT %s FROM person WHERE id IN %%s""" % ', '.join(self.person_get_cols) cur.execute(query, (tuple(ids),)) yield from cursor_to_bytes(cur) release_add_cols = [ 'id', 'target', 'target_type', 'date', 'date_offset', 'date_neg_utc_offset', 'name', 'comment', 'synthetic', 'author_fullname', 'author_name', 'author_email', ] release_get_cols = release_add_cols + ['author_id'] def release_get_from_temp(self, cur=None): cur = self._cursor(cur) query = ''' SELECT %s FROM swh_release_get() ''' % ', '.join(self.release_get_cols) cur.execute(query) yield from cursor_to_bytes(cur) def release_get_by(self, origin_id, limit=None, cur=None): """Retrieve a release by occurrence criterion (only origin right now) Args: - origin_id: The origin to look for. """ cur = self._cursor(cur) query = """ SELECT %s FROM swh_release_get_by(%%s) LIMIT %%s """ % ', '.join(self.release_get_cols) cur.execute(query, (origin_id, limit)) yield from cursor_to_bytes(cur) def revision_get_by(self, origin_id, branch_name, datetime, limit=None, cur=None): """Retrieve a revision by occurrence criterion. Args: - origin_id: The origin to look for - branch_name: the branch name to look for - datetime: the lower bound of timerange to look for. - limit: limit number of results to return The upper bound being now. """ cur = self._cursor(cur) if branch_name and isinstance(branch_name, str): branch_name = branch_name.encode('utf-8') query = ''' SELECT %s FROM swh_revision_get_by(%%s, %%s, %%s) LIMIT %%s ''' % ', '.join(self.revision_get_cols) cur.execute(query, (origin_id, branch_name, datetime, limit)) yield from cursor_to_bytes(cur) def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cur.execute("""SELECT dir_id, type, target, name, perms, status, sha1, sha1_git, sha256 FROM swh_find_directory_entry_by_path(%s, %s)""", (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return line_to_bytes(data) def entity_get(self, uuid, cur=None): """Retrieve the entity and its parent hierarchy chain per uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM swh_entity_get(%%s)""" % ( ', '.join(self.entity_cols)), (uuid, )) yield from cursor_to_bytes(cur) def entity_get_one(self, uuid, cur=None): """Retrieve a single entity given its uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM entity WHERE uuid = %%s""" % ( ', '.join(self.entity_cols)), (uuid, )) data = cur.fetchone() if not data: return None return line_to_bytes(data) + + def archive_ls(self, cur=None): + """ Get all the archives registered on the server. + + Yields: + a tuple (server_id, server_url) for each archive server. + """ + cur = self._cursor(cur) + cur.execute("""SELECT id, url + FROM archives + """) + yield from cursor_to_bytes(cur) + + def content_archive_ls(self, cur=None): + """ Get the archival status of the content + + Get an iterable over all the content that is referenced + in a backup server. + + Yields: + the sha1 of each content referenced at least one time + in the database of archiveal status. + """ + cur = self._cursor(cur) + cur.execute("""SELECT DISTINCT content_id + FROM content_archive""") + yield from cursor_to_bytes(cur) + + def content_archive_get(self, content=None, archive=None, cur=None): + """ Get the archival status of a content in a specific server. + + Retreive from the database the archival status of the given content + in the given archive server. + + Args: + content: the sha1 of the content. May be None for any id. + archive: the database id of the server we're looking into + may be None for any server. + + Yields: + A tuple (content_id, server_id, archival status, mtime, tzinfo). + """ + query = """SELECT content_id, archive_id, status, mtime + FROM content_archive + """ + conditions = [] + if content: + conditions.append("content_id='%s'" % content) + if archive: + conditions.append("archive_id='%s'" % archive) + + if conditions: + query = """%s + WHERE %s + """ % (query, ' and '.join(conditions)) + + cur = self._cursor(cur) + cur.execute(query) + yield from cursor_to_bytes(cur) + + def content_archive_update(self, content_id, archive_id, + new_status=None, cur=None): + """ Update the status of a archive content and set it's mtime to now() + + Change the last modification time of an archived content and change + its status to the given one. + + Args: + content_id (string): The content id. + archive_id (string): The id of the concerned archive. + new_status (string): One of missing, ongoing or present, this + status will replace the previous one. If not given, the + function only changes the mtime of the content. + """ + query = """UPDATE content_archive + SET %(fields)s + WHERE content_id='%(content_id)s' + and archive_id='%(archive_id)s' + """ + fields = [] + if new_status: + fields.append("status='%s'" % new_status) + fields.append("mtime=now()") + + d = {'fields': ', '.join(fields), + 'content_id': content_id, + 'archive_id': archive_id} + + cur = self._cursor(cur) + cur.execute(query % d) diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py new file mode 100644 index 00000000..6ce4572f --- /dev/null +++ b/swh/storage/objstorage/__init__.py @@ -0,0 +1 @@ +from .objstorage import ObjStorage, DIR_MODE, FILE_MODE # NOQA diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py new file mode 100644 index 00000000..8f030865 --- /dev/null +++ b/swh/storage/objstorage/api/client.py @@ -0,0 +1,92 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pickle + +import requests + +from requests.exceptions import ConnectionError +from ...exc import StorageAPIError +from ...api.common import (decode_response, + encode_data_client as encode_data) + + +class RemoteObjStorage(): + """ Proxy to a remote object storage. + + This class allows to connect to an object storage server via + http protocol. + + Attributes: + base_url (string): The url of the server to connect. Must end + with a '/' + session: The session to send requests. + """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + + def url(self, endpoint): + return '%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + return self.post('content/get', {'obj_id': obj_id}) + + def content_check(self, obj_id): + """ Integrity check for a given object + + verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py new file mode 100644 index 00000000..030a721d --- /dev/null +++ b/swh/storage/objstorage/api/server.py @@ -0,0 +1,74 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click + +from flask import Flask, g, request + +from swh.core import config +from swh.storage.objstorage import ObjStorage +from swh.storage.api.common import (BytesRequest, decode_request, + error_handler, + encode_data_server as encode_data) + +DEFAULT_CONFIG = { + 'storage_base': ('str', '/tmp/swh-storage/objects/'), + 'storage_depth': ('int', 3) +} + +app = Flask(__name__) +app.request_class = BytesRequest + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.before_request +def before_request(): + g.objstorage = ObjStorage(app.config['storage_base'], + app.config['storage_depth']) + + +@app.route('/') +def index(): + return "Helloworld!" + + +@app.route('/content') +def content(): + return str(list(g.storage)) + + +@app.route('/content/add', methods=['POST']) +def add_bytes(): + return encode_data(g.objstorage.add_bytes(**decode_request(request))) + + +@app.route('/content/get', methods=['POST']) +def get_bytes(): + return encode_data(g.objstorage.get_bytes(**decode_request(request))) + + +@app.route('/content/check', methods=['POST']) +def check(): + return encode_data(g.objstorage.check(**decode_request(request))) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5000, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +def launch(config_path, host, port, debug): + app.config.update(config.read(config_path, DEFAULT_CONFIG)) + app.run(host, port=int(port), debug=bool(debug)) + + +if __name__ == '__main__': + launch() diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage/objstorage.py similarity index 99% rename from swh/storage/objstorage.py rename to swh/storage/objstorage/objstorage.py index 66eba6f0..1f78de0c 100644 --- a/swh/storage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -1,386 +1,386 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import os import shutil import tempfile from contextlib import contextmanager -from .exc import ObjNotFoundError, Error +from ..exc import ObjNotFoundError, Error from swh.core import hashutil ID_HASH_ALGO = 'sha1' # ID_HASH_ALGO = 'sha1_git' GZIP_BUFSIZ = 1048576 DIR_MODE = 0o755 FILE_MODE = 0o644 def _obj_dir(hex_obj_id, root_dir, depth): """compute the storage directory of an object Args: hex_obj_id: object id as hexlified string root_dir: object storage root directory depth: slicing depth of object IDs in the storage see also: `_obj_path` """ if len(hex_obj_id) < depth * 2: raise ValueError('object id "%s" is too short for slicing at depth %d' % (hex_obj_id, depth)) # compute [depth] substrings of [obj_id], each of length 2, starting from # the beginning id_steps = [hex_obj_id[i*2:i*2+2] for i in range(0, depth)] steps = [root_dir] + id_steps return os.path.join(*steps) def _obj_path(hex_obj_id, root_dir, depth): """similar to `obj_dir`, but also include the actual object file name in the returned path """ return os.path.join(_obj_dir(hex_obj_id, root_dir, depth), hex_obj_id) @contextmanager def _write_obj_file(hex_obj_id, root_dir, depth): """context manager for writing object files to the object storage During writing data are written to a temporary file, which is atomically renamed to the right file name after closing. This context manager also takes care of (gzip) compressing the data on the fly. Yields: a file-like object open for writing bytes Sample usage: with _write_obj_file(hex_obj_id, root_dir, depth) as f: f.write(obj_data) """ dir = _obj_dir(hex_obj_id, root_dir, depth) if not os.path.isdir(dir): os.makedirs(dir, DIR_MODE, exist_ok=True) path = os.path.join(dir, hex_obj_id) (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', dir=dir) tmp_f = os.fdopen(tmp, 'wb') with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: yield f tmp_f.close() os.chmod(tmp_path, FILE_MODE) os.rename(tmp_path, path) class ObjStorage: """high-level API to manipulate the Software Heritage object storage Conceptually, the object storage offers 4 methods: - add() add a new object, returning an object id - __contains__() check if an object is present, by object id - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id Variants of the above methods are implemented by this class, depending on how the content of an object is specified (bytes, file-like object, etc.). On disk, an object storage is a directory tree containing files named after their object IDs. An object ID is a checksum of its content, depending on the value of the ID_HASH_ALGO constant (see hashutil for its meaning). To avoid directories that contain too many files, the object storage has a given depth (default: 3). Each depth level consumes two characters of the object id. So for instance a file with (git) SHA1 of 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in an object storage configured at depth 3 at 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689. The actual files in the storage are stored in gzipped compressed format. Each file can hence be self-verified (on the shell) with something like: actual_id=34973274ccef6ab4dfaaf86599792fa9c3fe4689 expected_id=$(zcat $filename | sha1sum | cut -f 1 -d' ') if [ $actual_id != $expected_id ] ; then echo "AYEEE, invalid object $actual_id /o\" fi """ def __init__(self, root, depth=3): """create a proxy object to the object storage Args: root: object storage root directory depth: slicing depth of object IDs in the storage """ if not os.path.isdir(root): raise ValueError('obj storage root "%s" is not a directory' % root) self._root_dir = root self._depth = depth self._temp_dir = os.path.join(root, 'tmp') if not os.path.isdir(self._temp_dir): os.makedirs(self._temp_dir, DIR_MODE, exist_ok=True) def __obj_dir(self, hex_obj_id): """_obj_dir wrapper using this storage configuration""" return _obj_dir(hex_obj_id, self._root_dir, self._depth) def __obj_path(self, hex_obj_id): """_obj_path wrapper using this storage configuration""" return _obj_path(hex_obj_id, self._root_dir, self._depth) def __contains__(self, obj_id): """check whether a given object id is present in the storage or not Return: True iff the object id is present in the storage """ hex_obj_id = hashutil.hash_to_hex(obj_id) return os.path.exists(_obj_path(hex_obj_id, self._root_dir, self._depth)) def add_bytes(self, bytes, obj_id=None): """add a new object to the object storage Args: bytes: content of the object to be added to the storage obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ if obj_id is None: # missing checksum, compute it in memory and write to file h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) h.update(bytes) obj_id = h.digest() if obj_id in self: return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) # object is either absent, or present but overwrite is requested with _write_obj_file(hex_obj_id, root_dir=self._root_dir, depth=self._depth) as f: f.write(bytes) return obj_id def add_file(self, f, length, obj_id=None): """similar to `add_bytes`, but add the content of file-like object f to the object storage add_file will read the file content only once, and avoid storing all of it in memory """ if obj_id is None: # unknkown object id: work on temp file, compute checksum as we go, # mv temp file into place (tmp, tmp_path) = tempfile.mkstemp(dir=self._temp_dir) try: t = os.fdopen(tmp, 'wb') tz = gzip.GzipFile(fileobj=t) sums = hashutil._hash_file_obj(f, length, algorithms=[ID_HASH_ALGO], chunk_cb=lambda b: tz.write(b)) tz.close() t.close() obj_id = sums[ID_HASH_ALGO] if obj_id in self: return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) dir = self.__obj_dir(hex_obj_id) if not os.path.isdir(dir): os.makedirs(dir, DIR_MODE, exist_ok=True) path = os.path.join(dir, hex_obj_id) os.chmod(tmp_path, FILE_MODE) os.rename(tmp_path, path) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) else: # known object id: write to .new file, rename if obj_id in self: return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) with _write_obj_file(hex_obj_id, root_dir=self._root_dir, depth=self._depth) as obj: shutil.copyfileobj(f, obj) return obj_id @contextmanager def get_file_obj(self, obj_id): """context manager to read the content of an object Args: obj_id: object id Yields: a file-like object open for reading (bytes) Raises: ObjNotFoundError: if the requested object is missing Sample usage: with objstorage.get_file_obj(obj_id) as f: do_something(f.read()) """ if obj_id not in self: raise ObjNotFoundError(obj_id) hex_obj_id = hashutil.hash_to_hex(obj_id) path = self.__obj_path(hex_obj_id) with gzip.GzipFile(path, 'rb') as f: yield f def get_bytes(self, obj_id): """retrieve the content of a given object Args: obj_id: object id Returns: the content of the requested objects as bytes Raises: ObjNotFoundError: if the requested object is missing """ with self.get_file_obj(obj_id) as f: return f.read() def _get_file_path(self, obj_id): """retrieve the path of a given object in the objects storage Note that the path point to a gzip-compressed file, so you need gzip.open() or equivalent to get the actual object content. Args: obj_id: object id Returns: a file path pointing into the object storage Raises: ObjNotFoundError: if the requested object is missing """ if obj_id not in self: raise ObjNotFoundError(obj_id) hex_obj_id = hashutil.hash_to_hex(obj_id) return self.__obj_path(hex_obj_id) def check(self, obj_id): """integrity check for a given object verify that the file object is in place, and that the gzipped content matches the object id Args: obj_id: object id Raises: ObjNotFoundError: if the requested object is missing Error: if the requested object is corrupt """ if obj_id not in self: raise ObjNotFoundError(obj_id) hex_obj_id = hashutil.hash_to_hex(obj_id) try: with gzip.open(self.__obj_path(hex_obj_id)) as f: length = None if ID_HASH_ALGO.endswith('_git'): # if the hashing algorithm is git-like, we need to know the # content size to hash on the fly. Do a first pass here to # compute the size length = 0 while True: chunk = f.read(GZIP_BUFSIZ) length += len(chunk) if not chunk: break f.rewind() checksums = hashutil._hash_file_obj(f, length, algorithms=[ID_HASH_ALGO]) actual_obj_id = checksums[ID_HASH_ALGO] if obj_id != actual_obj_id: raise Error('corrupt object %s should have id %s' % (obj_id, actual_obj_id)) except (OSError, IOError): # IOError is for compatibility with older python versions raise Error('corrupt object %s is not a gzip file' % obj_id) def __iter__(self): """iterate over the object identifiers currently available in the storage Warning: with the current implementation of the object storage, this method will walk the filesystem to list objects, meaning that listing all objects will be very slow for large storages. You almost certainly don't want to use this method in production. Return: iterator over object IDs """ def obj_iterator(): # XXX hackish: it does not verify that the depth of found files # matches the slicing depth of the storage for root, _dirs, files in os.walk(self._root_dir): for f in files: yield bytes.fromhex(f) return obj_iterator() def __len__(self): """compute the number of objects available in the storage Warning: this currently uses `__iter__`, its warning about bad performances applies Return: number of objects contained in the storage """ return sum(1 for i in self) diff --git a/swh/storage/tests/manual_test_archiver.py b/swh/storage/tests/manual_test_archiver.py new file mode 100644 index 00000000..26d4f4cc --- /dev/null +++ b/swh/storage/tests/manual_test_archiver.py @@ -0,0 +1,95 @@ +import string +import random + +from swh.core import hashutil +from swh.storage import Storage +from swh.storage.db import cursor_to_bytes + +from swh.storage.archiver import ArchiverDirector + + +def rs(size=6, chars=string.ascii_uppercase + string.ascii_lowercase): + return ''.join(random.choice(chars) for _ in range(size)) + + +def mc(data): + data = bytes(data, 'utf8') + content = hashutil.hashdata(data) + content.update({'data': data}) + return content + + +def initialize_content_archive(db, sample_size, names=['Local']): + """ Initialize the content_archive table with a sample. + + From the content table, get a sample of id, and fill the + content_archive table with those id in order to create a test sample + for the archiver. + + Args: + db: The database of the storage. + sample_size (int): The size of the sample to create. + names: A list of archive names. Those archives must already exists. + Archival status of the archives content will be erased on db. + + Returns: + Tha amount of entry created. + """ + with db.transaction() as cur: + cur.execute('DELETE FROM content_archive') + + with db.transaction() as cur: + cur.execute('SELECT sha1 from content limit %d' % sample_size) + ids = list(cursor_to_bytes(cur)) + + for id, in ids: + tid = r'\x' + hashutil.hash_to_hex(id) + + with db.transaction() as cur: + for name in names: + s = """INSERT INTO content_archive + VALUES('%s'::sha1, '%s', 'missing', now()) + """ % (tid, name) + cur.execute(s) + + print('Initialized database with', sample_size * len(names), 'items') + return sample_size * len(names) + + +def clean(): + # Clean all + with loc.db.transaction() as cur: + cur.execute('delete from content_archive') + cur.execute('delete from content') + import os + os.system("rm -r /tmp/swh/storage-dev/2/*") + + +CONTENT_SIZE = 10 + +if __name__ == '__main__': + random.seed(0) + # Local database + dbname = 'softwareheritage-dev' + user = 'qcampos' + cstring = 'dbname=%s user=%s' % (dbname, user) + # Archiver config + config = { + 'objstorage_path': '/tmp/swh/storage-dev/2', + 'archival_max_age': 3600, + 'batch_max_size': 10, + 'retention_policy': 1, + 'asynchronous': False + } + + # Grand-palais's storage + loc = Storage(cstring, config['objstorage_path']) + + # Add the content + l = [mc(rs(100)) for _ in range(CONTENT_SIZE)] + loc.content_add(l) + initialize_content_archive(loc.db, CONTENT_SIZE, ['petit-palais']) + + # Launch the archiver + archiver = ArchiverDirector(cstring, config) + archiver.run() diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/server_testing.py similarity index 59% copy from swh/storage/tests/test_api_client.py copy to swh/storage/tests/server_testing.py index 2af6b875..61277d30 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/server_testing.py @@ -1,77 +1,80 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import multiprocessing import socket import time -import unittest + from urllib.request import urlopen -from swh.storage.tests.test_storage import AbstractTestStorage -from swh.storage.api.client import RemoteStorage -from swh.storage.api.server import app +class ServerTestFixture(): + """ Base class for http client/server testing. + + Mix this in a test class in order to have access to an http flask + server running in background. + + Note that the subclass should define a dictionary in self.config + that contains the flask server config. + And a flask application in self.app that corresponds to the type of + server the tested client needs. -class TestRemoteStorage(AbstractTestStorage, unittest.TestCase): - """Test the remote storage API. + To ensure test isolation, each test will run in a different server + and a different repertory. - This class doesn't define any tests as we want identical - functionality between local and remote storage. All the tests are - therefore defined in AbstractTestStorage. + In order to correctly work, the subclass must call the parents class's + setUp() and tearDown() methods. """ def setUp(self): super().setUp() - self.start_server() - self.storage = RemoteStorage(self.url()) def tearDown(self): self.stop_server() - super().tearDown() def url(self): return 'http://127.0.0.1:%d/' % self.port def start_server(self): - """Spawn the API server using multiprocessing""" + """ Spawn the API server using multiprocessing. + """ self.process = None # WSGI app configuration - self.app = app - self.app.config['db'] = 'dbname=%s' % self.dbname - self.app.config['storage_base'] = self.objroot - + for key, value in self.config.items(): + self.app.config[key] = value # Get an available port number sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) self.port = sock.getsockname()[1] sock.close() - # We need a worker function for multiprocessing + # Worker function for multiprocessing def worker(app, port): return app.run(port=port, use_reloader=False) self.process = multiprocessing.Process( target=worker, args=(self.app, self.port) ) self.process.start() - # Wait max. 5 seconds for server to spawn + # Wait max 5 seconds for server to spawn i = 0 while i < 20: try: urlopen(self.url()) except Exception: i += 1 time.sleep(0.25) else: - break + return def stop_server(self): - """Terminate the API server""" + """ Terminate the API server's process. + """ if self.process: self.process.terminate() diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py index 2af6b875..10d2125d 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -1,77 +1,36 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import multiprocessing -import socket -import time import unittest -from urllib.request import urlopen +import tempfile from swh.storage.tests.test_storage import AbstractTestStorage +from swh.storage.tests.server_testing import ServerTestFixture from swh.storage.api.client import RemoteStorage from swh.storage.api.server import app -class TestRemoteStorage(AbstractTestStorage, unittest.TestCase): +class TestRemoteStorage(AbstractTestStorage, ServerTestFixture, + unittest.TestCase): """Test the remote storage API. This class doesn't define any tests as we want identical functionality between local and remote storage. All the tests are therefore defined in AbstractTestStorage. """ def setUp(self): + # ServerTestFixture needs to have self.objroot for + # setUp() method, but this field is defined in + # AbstractTestStorage's setUp() + # To avoid confusion, override the self.objroot to a + # one choosen in this class. + storage_base = tempfile.mkdtemp() + self.config = {'db': 'dbname=%s' % self.dbname, + 'storage_base': storage_base} + self.app = app super().setUp() - - self.start_server() self.storage = RemoteStorage(self.url()) - - def tearDown(self): - self.stop_server() - - super().tearDown() - - def url(self): - return 'http://127.0.0.1:%d/' % self.port - - def start_server(self): - """Spawn the API server using multiprocessing""" - self.process = None - - # WSGI app configuration - self.app = app - self.app.config['db'] = 'dbname=%s' % self.dbname - self.app.config['storage_base'] = self.objroot - - # Get an available port number - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('127.0.0.1', 0)) - self.port = sock.getsockname()[1] - sock.close() - - # We need a worker function for multiprocessing - def worker(app, port): - return app.run(port=port, use_reloader=False) - - self.process = multiprocessing.Process( - target=worker, args=(self.app, self.port) - ) - self.process.start() - - # Wait max. 5 seconds for server to spawn - i = 0 - while i < 20: - try: - urlopen(self.url()) - except Exception: - i += 1 - time.sleep(0.25) - else: - break - - def stop_server(self): - """Terminate the API server""" - if self.process: - self.process.terminate() + self.objroot = storage_base diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py new file mode 100644 index 00000000..cf05270d --- /dev/null +++ b/swh/storage/tests/test_archiver.py @@ -0,0 +1,246 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest +import os + +from nose.tools import istest +from nose.plugins.attrib import attr +from datetime import datetime, timedelta + +from swh.core import hashutil +from swh.core.tests.db_testing import DbTestFixture +from server_testing import ServerTestFixture + +from swh.storage import Storage +from swh.storage.exc import ObjNotFoundError +from swh.storage.archiver import ArchiverDirector, ArchiverWorker +from swh.storage.objstorage.api.client import RemoteObjStorage +from swh.storage.objstorage.api.server import app + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') + + +@attr('db') +class TestArchiver(DbTestFixture, ServerTestFixture, + unittest.TestCase): + """ Test the objstorage archiver. + """ + + TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') + + def setUp(self): + # Launch the backup server + self.backup_objroot = tempfile.mkdtemp(prefix='remote') + self.config = {'storage_base': self.backup_objroot, + 'storage_depth': 3} + self.app = app + super().setUp() + + # Launch a client to check objects presence + print("url", self.url()) + self.remote_objstorage = RemoteObjStorage(self.url()) + # Create the local storage. + self.objroot = tempfile.mkdtemp(prefix='local') + self.storage = Storage(self.conn, self.objroot) + # Initializes and fill the tables. + self.initialize_tables() + # Create the archiver + self.archiver = self.__create_director() + + self.storage_data = ('Local', 'http://localhost:%s/' % self.port) + + def tearDown(self): + self.empty_tables() + super().tearDown() + + def initialize_tables(self): + """ Initializes the database with a sample of items. + """ + # Add an archive + self.cursor.execute("""INSERT INTO archives(id, url) + VALUES('Local', 'http://localhost:{}/') + """.format(self.port)) + self.conn.commit() + + def empty_tables(self): + # Remove all content + self.cursor.execute('DELETE FROM content_archive') + self.cursor.execute('DELETE FROM archives') + self.conn.commit() + + def __add_content(self, content_data, status='missing', date='now()'): + # Add the content + content = hashutil.hashdata(content_data) + content.update({'data': content_data}) + self.storage.content_add([content]) + # Then update database + content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) + self.cursor.execute("""INSERT INTO content_archive + VALUES('%s'::sha1, 'Local', '%s', %s) + """ % (content_id, status, date)) + return content['sha1'] + + def __get_missing(self): + self.cursor.execute("""SELECT content_id + FROM content_archive + WHERE status='missing'""") + return self.cursor.fetchall() + + def __create_director(self, batch_size=5000, archival_max_age=3600, + retention_policy=1, asynchronous=False): + config = { + 'objstorage_path': self.objroot, + 'batch_max_size': batch_size, + 'archival_max_age': archival_max_age, + 'retention_policy': retention_policy, + 'asynchronous': asynchronous # Avoid depending on queue for tests. + } + director = ArchiverDirector(self.conn, config) + return director + + def __create_worker(self, batch={}, config={}): + mstorage_args = [self.archiver.master_storage.db.conn, + self.objroot] + slaves = [self.storage_data] + if not config: + config = self.archiver.config + return ArchiverWorker(batch, mstorage_args, slaves, config) + + # Integration test + + @istest + def archive_missing_content(self): + """ Run archiver on a missing content should archive it. + """ + content_data = b'archive_missing_content' + id = self.__add_content(content_data) + # After the run, the content should be in the archive. + self.archiver.run() + remote_data = self.remote_objstorage.content_get(id) + self.assertEquals(content_data, remote_data) + + @istest + def archive_present_content(self): + """ A content that is not 'missing' shouldn't be archived. + """ + id = self.__add_content(b'archive_present_content', status='present') + # After the run, the content should NOT be in the archive.* + self.archiver.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id) + + @istest + def archive_already_enough(self): + """ A content missing with enough copies shouldn't be archived. + """ + id = self.__add_content(b'archive_alread_enough') + director = self.__create_director(retention_policy=0) + director.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id) + + # Unit test for ArchiverDirector + + def vstatus(self, status, mtime): + return self.archiver.get_virtual_status(status, mtime) + + @istest + def vstatus_present(self): + self.assertEquals( + self.vstatus('present', None), + 'present' + ) + + @istest + def vstatus_missing(self): + self.assertEquals( + self.vstatus('missing', None), + 'missing' + ) + + @istest + def vstatus_ongoing_remaining(self): + current_time = datetime.now() + self.assertEquals( + self.vstatus('ongoing', current_time), + 'present' + ) + + @istest + def vstatus_ongoing_elapsed(self): + past_time = datetime.now() - timedelta( + seconds=self.archiver.config['archival_max_age'] + 1 + ) + self.assertEquals( + self.vstatus('ongoing', past_time), + 'missing' + ) + + # Unit tests for archive worker + + @istest + def need_archival_missing(self): + """ A content should still need archival when it is missing. + """ + id = self.__add_content(b'need_archival_missing', status='missing') + id = r'\x' + hashutil.hash_to_hex(id) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), True) + + @istest + def need_archival_present(self): + """ A content should still need archival when it is missing + """ + id = self.__add_content(b'need_archival_missing', status='present') + id = r'\x' + hashutil.hash_to_hex(id) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), False) + + @istest + def need_archival_ongoing_remaining(self): + """ An ongoing archival with remaining time shouldnt need archival. + """ + id = self.__add_content(b'need_archival_ongoing_remaining', + status='ongoing', date="'%s'" % datetime.now()) + id = r'\x' + hashutil.hash_to_hex(id) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), False) + + @istest + def need_archival_ongoing_elasped(self): + """ An ongoing archival with elapsed time should be scheduled again. + """ + id = self.__add_content( + b'archive_ongoing_elapsed', + status='ongoing', + date="'%s'" % (datetime.now() - timedelta( + seconds=self.archiver.config['archival_max_age'] + 1 + )) + ) + id = r'\x' + hashutil.hash_to_hex(id) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), True) + + @istest + def content_sorting_by_archiver(self): + """ Check that the content is correctly sorted. + """ + batch = { + 'id1': { + 'present': [('slave1', 'slave1_url')], + 'missing': [] + }, + 'id2': { + 'present': [], + 'missing': [('slave1', 'slave1_url')] + } + } + worker = self.__create_worker(batch=batch) + mapping = worker.sort_content_by_archive() + self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) + self.assertIn('id2', mapping[('slave1', 'slave1_url')]) diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py new file mode 100644 index 00000000..284e1834 --- /dev/null +++ b/swh/storage/tests/test_objstorage_api.py @@ -0,0 +1,83 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.exc import ObjNotFoundError, Error +from swh.storage.tests.server_testing import ServerTestFixture +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.objstorage.api.client import RemoteObjStorage +from swh.storage.objstorage.api.server import app + + +@attr('db') +class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): + """ Test the remote archive API. + """ + + def setUp(self): + self.config = {'storage_base': tempfile.mkdtemp(), + 'storage_depth': 3} + self.app = app + super().setUp() + self.objstorage = RemoteObjStorage(self.url()) + + def tearDown(self): + super().tearDown() + + @istest + def content_add(self): + content = bytes('Test content', 'utf8') + id = self.objstorage.content_add(content) + self.assertEquals(self.objstorage.content_get(id), content) + + @istest + def content_get_present(self): + content = bytes('content_get_present', 'utf8') + content_hash = hashutil.hashdata(content) + id = self.objstorage.content_add(content) + self.assertEquals(content_hash['sha1'], id) + + @istest + def content_get_missing(self): + content = bytes('content_get_missing', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_get(content_hash['sha1']) + + @istest + def content_check_invalid(self): + content = bytes('content_check_invalid', 'utf8') + id = self.objstorage.content_add(content) + path = _obj_path(hashutil.hash_to_hex(id), + self.app.config['storage_base'], + self.app.config['storage_depth']) + content = list(content) + with open(path, 'bw') as f: + content[0] = (content[0] + 1) % 128 + f.write(bytes(content)) + with self.assertRaises(Error): + self.objstorage.content_check(id) + + @istest + def content_check_valid(self): + content = bytes('content_check_valid', 'utf8') + id = self.objstorage.content_add(content) + try: + self.objstorage.content_check(id) + except: + self.fail('Integrity check failed') + + @istest + def content_check_missing(self): + content = bytes('content_check_valid', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_check(content_hash['sha1']) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py index df8332b9..79ba446d 100644 --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -1,1471 +1,1471 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import psycopg2 import shutil import tempfile import unittest from uuid import UUID from unittest.mock import patch from nose.tools import istest from nose.plugins.attrib import attr from swh.core.tests.db_testing import DbTestFixture from swh.core.hashutil import hex_to_hash from swh.storage import Storage TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class AbstractTestStorage(DbTestFixture): """Base class for Storage testing. This class is used as-is to test local storage (see TestStorage below) and remote storage (see TestRemoteStorage in test_remote_storage.py. We need to have the two classes inherit from this base class separately to avoid nosetests running the tests from the base class twice. """ TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') def setUp(self): super().setUp() self.maxDiff = None self.objroot = tempfile.mkdtemp() self.storage = Storage(self.conn, self.objroot) self.cont = { 'data': b'42\n', 'length': 3, 'sha1': hex_to_hash( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': hex_to_hash( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hex_to_hash( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), } self.cont2 = { 'data': b'4242\n', 'length': 5, 'sha1': hex_to_hash( '61c2b3a30496d329e21af70dd2d7e097046d07b7'), 'sha1_git': hex_to_hash( '36fade77193cb6d2bd826161a0979d64c28ab4fa'), 'sha256': hex_to_hash( '859f0b154fdb2d630f45e1ecae4a8629' '15435e663248bb8461d914696fc047cd'), } self.missing_cont = { 'data': b'missing\n', 'length': 8, 'sha1': hex_to_hash( 'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'), 'sha1_git': hex_to_hash( '33e45d56f88993aae6a0198013efa80716fd8919'), 'sha256': hex_to_hash( '6bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), } self.skipped_cont = { 'length': 1024 * 1024 * 200, 'sha1_git': hex_to_hash( '33e45d56f88993aae6a0198013efa80716fd8920'), 'reason': 'Content too long', 'status': 'absent', } self.skipped_cont2 = { 'length': 1024 * 1024 * 300, 'sha1_git': hex_to_hash( '33e45d56f88993aae6a0198013efa80716fd8921'), 'reason': 'Content too long', 'status': 'absent', } self.dir = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90', 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': 0o644, }, { 'name': b'bar\xc3', 'type': 'dir', 'target': b'12345678901234567890', 'perms': 0o2000, }, ], } self.dir2 = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95', 'entries': [ { 'name': b'oof', 'type': 'file', 'target': self.cont2['sha1_git'], 'perms': 0o644, } ], } self.dir3 = { 'id': hex_to_hash('33e45d56f88993aae6a0198013efa80716fd8921'), 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': 0o644, }, { 'name': b'bar', 'type': 'dir', 'target': b'12345678901234560000', 'perms': 0o2000, }, { 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'perms': 0o644, }, ], } self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120)) self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120)) self.revision = { 'id': b'56789012345678901234', 'message': b'hello', 'author': { 'name': b'Nicolas Dandrimont', 'email': b'nicolas@example.com', 'fullname': b'Nicolas Dandrimont ', }, 'date': { 'timestamp': 1234567890, 'offset': 120, 'negative_utc': None, }, 'committer': { 'name': b'St\xc3fano Zacchiroli', 'email': b'stefano@example.com', 'fullname': b'St\xc3fano Zacchiroli ' }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': True, }, 'parents': [b'01234567890123456789', b'23434512345123456789'], 'type': 'git', 'directory': self.dir['id'], 'metadata': { 'checksums': { 'sha1': 'tarball-sha1', 'sha256': 'tarball-sha256', }, 'signed-off-by': 'some-dude', - 'extra_git_headers': [ + 'extra_headers': [ ['gpgsig', b'test123'], ['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']], ], }, 'synthetic': True } self.revision2 = { 'id': b'87659012345678904321', 'message': b'hello again', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': 1234567843.22, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': False, }, 'parents': [b'01234567890123456789'], 'type': 'git', 'directory': self.dir2['id'], 'metadata': None, 'synthetic': False } self.revision3 = { 'id': hex_to_hash('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'a simple revision with no parents this time', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': 1234567843.22, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1127351742, 'offset': 0, 'negative_utc': False, }, 'parents': [], 'type': 'git', 'directory': self.dir2['id'], 'metadata': None, 'synthetic': True } self.revision4 = { 'id': hex_to_hash('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'parent of self.revision2', 'author': { 'name': b'me', 'email': b'me@soft.heri', 'fullname': b'me ', }, 'date': { 'timestamp': 1244567843.22, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'committer-dude', 'email': b'committer@dude.com', 'fullname': b'committer-dude ', }, 'committer_date': { 'timestamp': 1244567843.22, 'offset': -720, 'negative_utc': None, }, 'parents': [self.revision3['id']], 'type': 'git', 'directory': self.dir['id'], 'metadata': None, 'synthetic': False } self.origin = { 'url': 'file:///dev/null', 'type': 'git', } self.origin2 = { 'url': 'file:///dev/zero', 'type': 'git', } self.occurrence = { 'branch': b'master', 'target': b'67890123456789012345', 'target_type': 'revision', 'date': datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), } self.occurrence2 = { 'branch': b'master', 'target': self.revision2['id'], 'target_type': 'revision', 'date': datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), } self.release = { 'id': b'87659012345678901234', 'name': b'v0.0.1', 'author': { 'name': b'olasd', 'email': b'nic@olasd.fr', 'fullname': b'olasd ', }, 'date': { 'timestamp': 1234567890, 'offset': 42, 'negative_utc': None, }, 'target': b'43210987654321098765', 'target_type': 'revision', 'message': b'synthetic release', 'synthetic': True, } self.release2 = { 'id': b'56789012348765901234', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634366813, 'offset': -120, 'negative_utc': None, }, 'target': b'432109\xa9765432\xc309\x00765', 'target_type': 'revision', 'message': b'v0.0.2\nMisc performance improvments + bug fixes', 'synthetic': False } self.release3 = { 'id': b'87659012345678904321', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'tony@ardumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634336813, 'offset': 0, 'negative_utc': False, }, 'target': self.revision2['id'], 'target_type': 'revision', 'message': b'yet another synthetic release', 'synthetic': True, } self.fetch_history_date = datetime.datetime( 2015, 1, 2, 21, 0, 0, tzinfo=datetime.timezone.utc) self.fetch_history_end = datetime.datetime( 2015, 1, 2, 23, 0, 0, tzinfo=datetime.timezone.utc) self.fetch_history_duration = (self.fetch_history_end - self.fetch_history_date) self.fetch_history_data = { 'status': True, 'result': {'foo': 'bar'}, 'stdout': 'blabla', 'stderr': 'blablabla', } self.entity1 = { 'uuid': UUID('f96a7ec1-0058-4920-90cc-7327e4b5a4bf'), # GitHub users 'parent': UUID('ad6df473-c1d2-4f40-bc58-2b091d4a750e'), 'name': 'github:user:olasd', 'type': 'person', 'description': 'Nicolas Dandrimont', 'homepage': 'http://example.com', 'active': True, 'generated': True, 'lister_metadata': { # swh.lister.github 'lister': '34bd6b1b-463f-43e5-a697-785107f598e4', 'id': 12877, 'type': 'user', 'last_activity': '2015-11-03', }, 'metadata': None, 'validity': [ datetime.datetime(2015, 11, 3, 11, 0, 0, tzinfo=datetime.timezone.utc), ] } self.entity1_query = { 'lister': '34bd6b1b-463f-43e5-a697-785107f598e4', 'id': 12877, 'type': 'user', } self.entity2 = { 'uuid': UUID('3903d075-32d6-46d4-9e29-0aef3612c4eb'), # GitHub users 'parent': UUID('ad6df473-c1d2-4f40-bc58-2b091d4a750e'), 'name': 'github:user:zacchiro', 'type': 'person', 'description': 'Stefano Zacchiroli', 'homepage': 'http://example.com', 'active': True, 'generated': True, 'lister_metadata': { # swh.lister.github 'lister': '34bd6b1b-463f-43e5-a697-785107f598e4', 'id': 216766, 'type': 'user', 'last_activity': '2015-11-03', }, 'metadata': None, 'validity': [ datetime.datetime(2015, 11, 3, 11, 0, 0, tzinfo=datetime.timezone.utc), ] } self.entity3 = { 'uuid': UUID('111df473-c1d2-4f40-bc58-2b091d4a7111'), # GitHub users 'parent': UUID('222df473-c1d2-4f40-bc58-2b091d4a7222'), 'name': 'github:user:ardumont', 'type': 'person', 'description': 'Antoine R. Dumont a.k.a tony', 'homepage': 'https://ardumont.github.io', 'active': True, 'generated': True, 'lister_metadata': { 'lister': '34bd6b1b-463f-43e5-a697-785107f598e4', 'id': 666, 'type': 'user', 'last_activity': '2016-01-15', }, 'metadata': None, 'validity': [ datetime.datetime(2015, 11, 3, 11, 0, 0, tzinfo=datetime.timezone.utc), ] } self.entity4 = { 'uuid': UUID('222df473-c1d2-4f40-bc58-2b091d4a7222'), # GitHub users 'parent': None, 'name': 'github:user:ToNyX', 'type': 'person', 'description': 'ToNyX', 'homepage': 'https://ToNyX.github.io', 'active': True, 'generated': True, 'lister_metadata': { 'lister': '34bd6b1b-463f-43e5-a697-785107f598e4', 'id': 999, 'type': 'user', 'last_activity': '2015-12-24', }, 'metadata': None, 'validity': [ datetime.datetime(2015, 11, 3, 11, 0, 0, tzinfo=datetime.timezone.utc), ] } self.entity2_query = { 'lister_metadata': { 'lister': '34bd6b1b-463f-43e5-a697-785107f598e4', 'id': 216766, 'type': 'user', }, } def tearDown(self): shutil.rmtree(self.objroot) self.cursor.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public',)) tables = set(table for (table,) in self.cursor.fetchall()) tables -= {'dbversion', 'entity', 'entity_history', 'listable_entity'} for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.cursor.execute('delete from entity where generated=true') self.cursor.execute('delete from entity_history where generated=true') self.conn.commit() super().tearDown() @istest def content_add(self): cont = self.cont self.storage.content_add([cont]) if hasattr(self.storage, 'objstorage'): self.assertIn(cont['sha1'], self.storage.objstorage) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = self.cursor.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible')) @istest def content_add_collision(self): cont1 = self.cont # create (corrupted) content with same sha1{,_git} but != sha256 cont1b = cont1.copy() sha256_array = bytearray(cont1b['sha256']) sha256_array[0] += 1 cont1b['sha256'] = bytes(sha256_array) with self.assertRaises(psycopg2.IntegrityError): self.storage.content_add([cont1, cont1b]) @istest def skipped_content_add(self): cont = self.skipped_cont cont2 = self.skipped_cont2 self.storage.content_add([cont]) self.storage.content_add([cont2]) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status,' 'reason FROM skipped_content ORDER BY sha1_git') datum = self.cursor.fetchone() self.assertEqual( (datum[0], datum[1].tobytes(), datum[2], datum[3], datum[4], datum[5]), (None, cont['sha1_git'], None, cont['length'], 'absent', 'Content too long')) datum2 = self.cursor.fetchone() self.assertEqual( (datum2[0], datum2[1].tobytes(), datum2[2], datum2[3], datum2[4], datum2[5]), (None, cont2['sha1_git'], None, cont2['length'], 'absent', 'Content too long')) @istest def content_missing(self): cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) gen = self.storage.content_missing([cont2, missing_cont]) self.assertEqual(list(gen), [missing_cont['sha1']]) @istest def content_missing_per_sha1(self): # given cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) # when gen = self.storage.content_missing_per_sha1([cont2['sha1'], missing_cont['sha1']]) # then self.assertEqual(list(gen), [missing_cont['sha1']]) @istest def directory_get(self): # given init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) self.storage.directory_add([self.dir]) # when actual_dirs = list(self.storage.directory_get([self.dir['id']])) self.assertEqual(len(actual_dirs), 1) dir0 = actual_dirs[0] self.assertEqual(dir0['id'], self.dir['id']) # ids are generated so non deterministic value self.assertEqual(len(dir0['file_entries']), 1) self.assertEqual(len(dir0['dir_entries']), 1) self.assertIsNone(dir0['rev_entries']) after_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([], after_missing) @istest def directory_add(self): init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) self.storage.directory_add([self.dir]) stored_data = list(self.storage.directory_ls(self.dir['id'])) data_to_store = [{ 'dir_id': self.dir['id'], 'type': ent['type'], 'target': ent['target'], 'name': ent['name'], 'perms': ent['perms'], 'status': None, 'sha1': None, 'sha1_git': None, 'sha256': None, } for ent in sorted(self.dir['entries'], key=lambda ent: ent['name']) ] self.assertEqual(data_to_store, stored_data) after_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([], after_missing) @istest def directory_entry_get_by_path(self): # given init_missing = list(self.storage.directory_missing([self.dir3['id']])) self.assertEqual([self.dir3['id']], init_missing) self.storage.directory_add([self.dir3]) expected_entries = [ { 'dir_id': self.dir3['id'], 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': 0o644, }, { 'dir_id': self.dir3['id'], 'name': b'bar', 'type': 'dir', 'target': b'12345678901234560000', 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': 0o2000, }, { 'dir_id': self.dir3['id'], 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': 0o644, }, ] # when (all must be found here) for entry, expected_entry in zip(self.dir3['entries'], expected_entries): actual_entry = self.storage.directory_entry_get_by_path( self.dir3['id'], [entry['name']]) self.assertEqual(actual_entry, expected_entry) # when (nothing should be found here since self.dir is not persisted.) for entry in self.dir['entries']: actual_entry = self.storage.directory_entry_get_by_path( self.dir['id'], [entry['name']]) self.assertIsNone(actual_entry) @istest def revision_add(self): init_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([self.revision['id']], list(init_missing)) self.storage.revision_add([self.revision]) end_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([], list(end_missing)) @istest def revision_log(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) # when actual_results = list(self.storage.revision_log( [self.revision4['id']])) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 self.assertEquals(actual_results[0], self.revision4) self.assertEquals(actual_results[1], self.revision3) @istest def revision_log_with_limit(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) actual_results = list(self.storage.revision_log( [self.revision4['id']], 1)) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 1) self.assertEquals(actual_results[0], self.revision4) @staticmethod def _short_revision(revision): return [revision['id'], revision['parents']] @istest def revision_shortlog(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) # when actual_results = list(self.storage.revision_shortlog( [self.revision4['id']])) self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 self.assertEquals(list(actual_results[0]), self._short_revision(self.revision4)) self.assertEquals(list(actual_results[1]), self._short_revision(self.revision3)) @istest def revision_shortlog_with_limit(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) actual_results = list(self.storage.revision_shortlog( [self.revision4['id']], 1)) self.assertEqual(len(actual_results), 1) self.assertEquals(list(actual_results[0]), self._short_revision(self.revision4)) @istest def revision_get(self): self.storage.revision_add([self.revision]) actual_revisions = list(self.storage.revision_get( [self.revision['id'], self.revision2['id']])) # when del actual_revisions[0]['author']['id'] # hack: ids are generated del actual_revisions[0]['committer']['id'] self.assertEqual(len(actual_revisions), 2) self.assertEqual(actual_revisions[0], self.revision) self.assertIsNone(actual_revisions[1]) @istest def revision_get_no_parents(self): self.storage.revision_add([self.revision3]) get = list(self.storage.revision_get([self.revision3['id']])) self.assertEqual(len(get), 1) self.assertEqual(get[0]['parents'], []) # no parents on this one @istest def revision_get_by(self): # given self.storage.content_add([self.cont2]) self.storage.directory_add([self.dir2]) # point to self.cont self.storage.revision_add([self.revision2]) # points to self.dir origin_id = self.storage.origin_add_one(self.origin2) # occurrence2 points to 'revision2' with branch 'master', we # need to point to the right origin occurrence2 = self.occurrence2.copy() occurrence2.update({'origin': origin_id}) self.storage.occurrence_add([occurrence2]) # we want only revision 2 expected_revisions = list(self.storage.revision_get( [self.revision2['id']])) # when actual_results = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], None)) self.assertEqual(actual_results[0], expected_revisions[0]) # when (with no branch filtering, it's still ok) actual_results = list(self.storage.revision_get_by( origin_id, None, None)) self.assertEqual(actual_results[0], expected_revisions[0]) @istest def revision_get_by_multiple_occurrence(self): # 2 occurrences pointing to 2 different revisions # each occurence have 1 hour delta # the api must return the revision whose occurrence is the nearest. # given self.storage.content_add([self.cont2]) self.storage.directory_add([self.dir2]) self.storage.revision_add([self.revision2, self.revision3]) origin_id = self.storage.origin_add_one(self.origin2) # occurrence2 points to 'revision2' with branch 'master', we # need to point to the right origin occurrence2 = self.occurrence2.copy() occurrence2.update({'origin': origin_id, 'date': occurrence2['date']}) dt = datetime.timedelta(days=1) occurrence3 = self.occurrence2.copy() occurrence3.update({'origin': origin_id, 'date': occurrence3['date'] + dt, 'target': self.revision3['id']}) # 2 occurrences on same revision with lower validity date with 1h delta self.storage.occurrence_add([occurrence2]) self.storage.occurrence_add([occurrence3]) # when actual_results0 = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], occurrence2['date'])) # hack: ids are generated del actual_results0[0]['author']['id'] del actual_results0[0]['committer']['id'] self.assertEquals(len(actual_results0), 1) self.assertEqual(actual_results0, [self.revision2]) # when actual_results1 = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], occurrence2['date'] + dt/3)) # closer to occurrence2 # hack: ids are generated del actual_results1[0]['author']['id'] del actual_results1[0]['committer']['id'] self.assertEquals(len(actual_results1), 1) self.assertEqual(actual_results1, [self.revision2]) # when actual_results2 = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], occurrence2['date'] + 2*dt/3)) # closer to occurrence3 del actual_results2[0]['author']['id'] del actual_results2[0]['committer']['id'] self.assertEquals(len(actual_results2), 1) self.assertEqual(actual_results2, [self.revision3]) # when actual_results3 = list(self.storage.revision_get_by( origin_id, occurrence3['branch'], occurrence3['date'])) # hack: ids are generated del actual_results3[0]['author']['id'] del actual_results3[0]['committer']['id'] self.assertEquals(len(actual_results3), 1) self.assertEqual(actual_results3, [self.revision3]) # when actual_results4 = list(self.storage.revision_get_by( origin_id, None, None)) for actual_result in actual_results4: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEquals(len(actual_results4), 2) self.assertCountEqual(actual_results4, [self.revision3, self.revision2]) @istest def release_add(self): init_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) self.assertEqual([self.release['id'], self.release2['id']], list(init_missing)) self.storage.release_add([self.release, self.release2]) end_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) self.assertEqual([], list(end_missing)) @istest def release_get(self): # given self.storage.release_add([self.release, self.release2]) # when actual_releases = list(self.storage.release_get([self.release['id'], self.release2['id']])) # then for actual_release in actual_releases: del actual_release['author']['id'] # hack: ids are generated self.assertEquals([self.release, self.release2], [actual_releases[0], actual_releases[1]]) @istest def release_get_by(self): # given self.storage.revision_add([self.revision2]) # points to self.dir self.storage.release_add([self.release3]) origin_id = self.storage.origin_add_one(self.origin2) # occurrence2 points to 'revision2' with branch 'master', we # need to point to the right origin occurrence2 = self.occurrence2.copy() occurrence2.update({'origin': origin_id}) self.storage.occurrence_add([occurrence2]) # we want only revision 2 expected_releases = list(self.storage.release_get( [self.release3['id']])) # when actual_results = list(self.storage.release_get_by( occurrence2['origin'])) # then self.assertEqual(actual_results[0], expected_releases[0]) @istest def origin_add_one(self): origin0 = self.storage.origin_get(self.origin) self.assertIsNone(origin0) id = self.storage.origin_add_one(self.origin) actual_origin = self.storage.origin_get({'url': self.origin['url'], 'type': self.origin['type']}) self.assertEqual(actual_origin['id'], id) id2 = self.storage.origin_add_one(self.origin) self.assertEqual(id, id2) @istest def origin_get(self): self.assertIsNone(self.storage.origin_get(self.origin)) id = self.storage.origin_add_one(self.origin) # lookup per type and url (returns id) actual_origin0 = self.storage.origin_get({'url': self.origin['url'], 'type': self.origin['type']}) self.assertEqual(actual_origin0['id'], id) # lookup per id (returns dict) actual_origin1 = self.storage.origin_get({'id': id}) self.assertEqual(actual_origin1, {'id': id, 'type': self.origin['type'], 'url': self.origin['url'], 'lister': None, 'project': None}) @istest def occurrence_add(self): origin_id = self.storage.origin_add_one(self.origin2) revision = self.revision.copy() revision['id'] = self.occurrence['target'] self.storage.revision_add([revision]) occur = self.occurrence occur['origin'] = origin_id self.storage.occurrence_add([occur]) self.storage.occurrence_add([occur]) test_query = ''' with indiv_occurrences as ( select origin, branch, target, target_type, unnest(visits) as visit from occurrence_history ) select origin, branch, target, target_type, date from indiv_occurrences left join origin_visit using(origin, visit) order by origin, date''' self.cursor.execute(test_query) ret = self.cursor.fetchall() self.assertEqual(len(ret), 1) self.assertEqual( (ret[0][0], ret[0][1].tobytes(), ret[0][2].tobytes(), ret[0][3], ret[0][4]), (occur['origin'], occur['branch'], occur['target'], occur['target_type'], occur['date'])) orig_date = occur['date'] occur['date'] += datetime.timedelta(hours=10) self.storage.occurrence_add([occur]) self.cursor.execute(test_query) ret = self.cursor.fetchall() self.assertEqual(len(ret), 2) self.assertEqual( (ret[0][0], ret[0][1].tobytes(), ret[0][2].tobytes(), ret[0][3], ret[0][4]), (occur['origin'], occur['branch'], occur['target'], occur['target_type'], orig_date)) self.assertEqual( (ret[1][0], ret[1][1].tobytes(), ret[1][2].tobytes(), ret[1][3], ret[1][4]), (occur['origin'], occur['branch'], occur['target'], occur['target_type'], occur['date'])) @istest def occurrence_get(self): # given origin_id = self.storage.origin_add_one(self.origin2) revision = self.revision.copy() revision['id'] = self.occurrence['target'] self.storage.revision_add([revision]) occur = self.occurrence occur['origin'] = origin_id self.storage.occurrence_add([occur]) self.storage.occurrence_add([occur]) # when actual_occurrence = list(self.storage.occurrence_get(origin_id)) # then expected_occur = occur.copy() del expected_occur['date'] self.assertEquals(len(actual_occurrence), 1) self.assertEquals(actual_occurrence[0], expected_occur) @istest def content_find_occurrence_with_present_content(self): # 1. with something to find # given self.storage.content_add([self.cont2]) self.storage.directory_add([self.dir2]) # point to self.cont self.storage.revision_add([self.revision2]) # points to self.dir origin_id = self.storage.origin_add_one(self.origin2) occurrence = self.occurrence2 occurrence.update({'origin': origin_id}) self.storage.occurrence_add([occurrence]) # when occ = self.storage.content_find_occurrence( {'sha1': self.cont2['sha1']}) # then self.assertEquals(occ['origin_type'], self.origin2['type']) self.assertEquals(occ['origin_url'], self.origin2['url']) self.assertEquals(occ['branch'], self.occurrence2['branch']) self.assertEquals(occ['target'], self.revision2['id']) self.assertEquals(occ['target_type'], self.occurrence2['target_type']) self.assertEquals(occ['path'], self.dir2['entries'][0]['name']) occ2 = self.storage.content_find_occurrence( {'sha1_git': self.cont2['sha1_git']}) self.assertEquals(occ2['origin_type'], self.origin2['type']) self.assertEquals(occ2['origin_url'], self.origin2['url']) self.assertEquals(occ2['branch'], self.occurrence2['branch']) self.assertEquals(occ2['target'], self.revision2['id']) self.assertEquals(occ2['target_type'], self.occurrence2['target_type']) self.assertEquals(occ2['path'], self.dir2['entries'][0]['name']) occ3 = self.storage.content_find_occurrence( {'sha256': self.cont2['sha256']}) self.assertEquals(occ3['origin_type'], self.origin2['type']) self.assertEquals(occ3['origin_url'], self.origin2['url']) self.assertEquals(occ3['branch'], self.occurrence2['branch']) self.assertEquals(occ3['target'], self.revision2['id']) self.assertEquals(occ3['target_type'], self.occurrence2['target_type']) self.assertEquals(occ3['path'], self.dir2['entries'][0]['name']) @istest def content_find_occurrence_with_non_present_content(self): # 1. with something that does not exist missing_cont = self.missing_cont occ = self.storage.content_find_occurrence( {'sha1': missing_cont['sha1']}) self.assertEquals(occ, None, "Content does not exist so no occurrence") # 2. with something that does not exist occ = self.storage.content_find_occurrence( {'sha1_git': missing_cont['sha1_git']}) self.assertEquals(occ, None, "Content does not exist so no occurrence") # 3. with something that does not exist occ = self.storage.content_find_occurrence( {'sha256': missing_cont['sha256']}) self.assertEquals(occ, None, "Content does not exist so no occurrence") @istest def content_find_occurrence_bad_input(self): # 1. with bad input with self.assertRaises(ValueError) as cm: self.storage.content_find_occurrence({}) # empty is bad self.assertIn('content keys', cm.exception.args[0]) # 2. with bad input with self.assertRaises(ValueError) as cm: self.storage.content_find_occurrence( {'unknown-sha1': 'something'}) # not the right key self.assertIn('content keys', cm.exception.args[0]) @istest def entity_get_from_lister_metadata(self): self.storage.entity_add([self.entity1]) fetched_entities = list( self.storage.entity_get_from_lister_metadata( [self.entity1_query, self.entity2_query])) # Entity 1 should have full metadata, with last_seen/last_id instead # of validity entity1 = self.entity1.copy() entity1['last_seen'] = entity1['validity'][0] del fetched_entities[0]['last_id'] del entity1['validity'] # Entity 2 should have no metadata entity2 = { 'uuid': None, 'lister_metadata': self.entity2_query.copy(), } self.assertEquals(fetched_entities, [entity1, entity2]) @istest def entity_get_from_lister_metadata_twice(self): self.storage.entity_add([self.entity1]) fetched_entities1 = list( self.storage.entity_get_from_lister_metadata( [self.entity1_query])) fetched_entities2 = list( self.storage.entity_get_from_lister_metadata( [self.entity1_query])) self.assertEquals(fetched_entities1, fetched_entities2) @istest def entity_get(self): # given self.storage.entity_add([self.entity4]) self.storage.entity_add([self.entity3]) # when: entity3 -child-of-> entity4 actual_entity3 = list(self.storage.entity_get(self.entity3['uuid'])) self.assertEquals(len(actual_entity3), 2) # remove dynamic data (modified by db) entity3 = self.entity3.copy() entity4 = self.entity4.copy() del entity3['validity'] del entity4['validity'] del actual_entity3[0]['last_seen'] del actual_entity3[0]['last_id'] del actual_entity3[1]['last_seen'] del actual_entity3[1]['last_id'] self.assertEquals(actual_entity3, [entity3, entity4]) # when: entity4 only child actual_entity4 = list(self.storage.entity_get(self.entity4['uuid'])) self.assertEquals(len(actual_entity4), 1) # remove dynamic data (modified by db) entity4 = self.entity4.copy() del entity4['validity'] del actual_entity4[0]['last_id'] del actual_entity4[0]['last_seen'] self.assertEquals(actual_entity4, [entity4]) @istest def entity_get_one(self): # given self.storage.entity_add([self.entity3, self.entity4]) # when: entity3 -child-of-> entity4 actual_entity3 = self.storage.entity_get_one(self.entity3['uuid']) # remove dynamic data (modified by db) entity3 = self.entity3.copy() del entity3['validity'] del actual_entity3['last_seen'] del actual_entity3['last_id'] self.assertEquals(actual_entity3, entity3) @istest def stat_counters(self): expected_keys = ['content', 'directory', 'directory_entry_dir', 'occurrence', 'origin', 'person', 'revision'] counters = self.storage.stat_counters() self.assertTrue(set(expected_keys) <= set(counters)) self.assertIsInstance(counters[expected_keys[0]], int) @istest def content_find_with_present_content(self): # 1. with something to find cont = self.cont self.storage.content_add([cont]) actually_present = self.storage.content_find({'sha1': cont['sha1']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'length': cont['length'], 'status': 'visible' }) # 2. with something to find actually_present = self.storage.content_find( {'sha1_git': cont['sha1_git']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'length': cont['length'], 'status': 'visible' }) # 3. with something to find actually_present = self.storage.content_find( {'sha256': cont['sha256']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'length': cont['length'], 'status': 'visible' }) # 4. with something to find actually_present = self.storage.content_find( {'sha1': cont['sha1'], 'sha1_git': cont['sha1_git'], 'sha256': cont['sha256']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'length': cont['length'], 'status': 'visible' }) @istest def content_find_with_non_present_content(self): # 1. with something that does not exist missing_cont = self.missing_cont actually_present = self.storage.content_find( {'sha1': missing_cont['sha1']}) self.assertIsNone(actually_present) # 2. with something that does not exist actually_present = self.storage.content_find( {'sha1_git': missing_cont['sha1_git']}) self.assertIsNone(actually_present) # 3. with something that does not exist actually_present = self.storage.content_find( {'sha256': missing_cont['sha256']}) self.assertIsNone(actually_present) @istest def content_find_bad_input(self): # 1. with bad input with self.assertRaises(ValueError): self.storage.content_find({}) # empty is bad # 2. with bad input with self.assertRaises(ValueError): self.storage.content_find( {'unknown-sha1': 'something'}) # not the right key @istest def object_find_by_sha1_git(self): sha1_gits = [b'00000000000000000000'] expected = { b'00000000000000000000': [], } self.storage.content_add([self.cont]) sha1_gits.append(self.cont['sha1_git']) expected[self.cont['sha1_git']] = [{ 'sha1_git': self.cont['sha1_git'], 'type': 'content', 'id': self.cont['sha1'], }] self.storage.directory_add([self.dir]) sha1_gits.append(self.dir['id']) expected[self.dir['id']] = [{ 'sha1_git': self.dir['id'], 'type': 'directory', 'id': self.dir['id'], }] self.storage.revision_add([self.revision]) sha1_gits.append(self.revision['id']) expected[self.revision['id']] = [{ 'sha1_git': self.revision['id'], 'type': 'revision', 'id': self.revision['id'], }] self.storage.release_add([self.release]) sha1_gits.append(self.release['id']) expected[self.release['id']] = [{ 'sha1_git': self.release['id'], 'type': 'release', 'id': self.release['id'], }] ret = self.storage.object_find_by_sha1_git(sha1_gits) for val in ret.values(): for obj in val: del obj['object_id'] self.assertEqual(expected, ret) class TestStorage(AbstractTestStorage, unittest.TestCase): """Test the local storage""" # Can only be tested with local storage as you can't mock # datetimes for the remote server @istest def fetch_history(self): origin = self.storage.origin_add_one(self.origin) with patch('datetime.datetime'): datetime.datetime.now.return_value = self.fetch_history_date fetch_history_id = self.storage.fetch_history_start(origin) datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc) with patch('datetime.datetime'): datetime.datetime.now.return_value = self.fetch_history_end self.storage.fetch_history_end(fetch_history_id, self.fetch_history_data) fetch_history = self.storage.fetch_history_get(fetch_history_id) expected_fetch_history = self.fetch_history_data.copy() expected_fetch_history['id'] = fetch_history_id expected_fetch_history['origin'] = origin expected_fetch_history['date'] = self.fetch_history_date expected_fetch_history['duration'] = self.fetch_history_duration self.assertEqual(expected_fetch_history, fetch_history) @istest def person_get(self): # given person0 = { 'fullname': b'bob ', 'name': b'bob', 'email': b'alice@bob', } id0 = self.storage._person_add(person0) person1 = { 'fullname': b'tony ', 'name': b'tony', 'email': b'tony@bob', } id1 = self.storage._person_add(person1) # when actual_persons = self.storage.person_get([id0, id1]) # given (person injection through release for example) self.assertEqual( list(actual_persons), [ { 'id': id0, 'fullname': person0['fullname'], 'name': person0['name'], 'email': person0['email'], }, { 'id': id1, 'fullname': person1['fullname'], 'name': person1['name'], 'email': person1['email'], }, ]) diff --git a/version.txt b/version.txt index 33a7290e..8504c2ad 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.37-0-g24979b7 \ No newline at end of file +v0.0.38-0-ge522d1e \ No newline at end of file