diff --git a/sql/archiver/Makefile b/sql/archiver/Makefile new file mode 100644 index 00000000..021dcb30 --- /dev/null +++ b/sql/archiver/Makefile @@ -0,0 +1,41 @@ +# Depends: postgresql-client, postgresql-autodoc + +DBNAME = softwareheritage-archiver-dev +DOCDIR = autodoc + +SQL_INIT = ../swh-init.sql +SQL_SCHEMA = swh-archiver-schema.sql +SQL_DATA = swh-archiver-data.sql +SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_DATA) + +PSQL_BIN = psql +PSQL_FLAGS = --single-transaction --echo-all -X +PSQL = $(PSQL_BIN) $(PSQL_FLAGS) + + +all: + +createdb: createdb-stamp +createdb-stamp: $(SQL_INIT) + createdb $(DBNAME) + touch $@ + +filldb: filldb-stamp +filldb-stamp: createdb-stamp + cat $(SQLS) | $(PSQL) $(DBNAME) + touch $@ + +dropdb: + -dropdb $(DBNAME) + +dumpdb: swh-archiver.dump +swh.dump: filldb-stamp + pg_dump -Fc $(DBNAME) > $@ + +clean: + rm -rf *-stamp $(DOCDIR)/ + +distclean: clean dropdb + rm -f swh.dump + +.PHONY: all initdb createdb dropdb doc clean diff --git a/sql/archiver/swh-archiver-data.sql b/sql/archiver/swh-archiver-data.sql new file mode 100644 index 00000000..a156bd37 --- /dev/null +++ b/sql/archiver/swh-archiver-data.sql @@ -0,0 +1,2 @@ +INSERT INTO archive(id, url) +VALUES('Banco', 'http://banco.softwareheritage.org:5003/'); diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql new file mode 100644 index 00000000..87df5dc2 --- /dev/null +++ b/sql/archiver/swh-archiver-schema.sql @@ -0,0 +1,50 @@ +-- In order to archive the content of the object storage, add +-- some tables to keep trace of what have already been archived. + +create table dbversion +( + version int primary key, + release timestamptz, + description text +); + +comment on table dbversion is 'Schema update tracking'; + +INSERT INTO dbversion(version, release, description) +VALUES(1, now(), 'Work In Progress'); + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archive ( + id archive_id PRIMARY KEY, + url TEXT +); + +comment on table archive is 'Possible archives'; +comment on column archive.id is 'Short identifier for the archive'; +comment on column archive.url is 'Url identifying the archiver api'; + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +comment on type archive_status is 'Status of a given archive'; + +-- a SHA1 checksum (not necessarily originating from Git) +CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20); + +CREATE TABLE content_archive ( + content_id sha1, + archive_id archive_id REFERENCES archive(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); + +comment on table content_archive is 'Referencing the status and whereabouts of a content'; +comment on column content_archive.content_id is 'content identifier'; +comment on column content_archive.archive_id is 'content whereabouts'; +comment on column content_archive.status is 'content status'; +comment on column content_archive.mtime is 'last time the content was stored'; diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql index c6698a2e..c5b3c4c7 100644 --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -1,453 +1,428 @@ --- --- Software Heritage Data Model --- -- drop schema if exists swh cascade; -- create schema swh; -- set search_path to swh; create table dbversion ( version int primary key, release timestamptz, description text ); insert into dbversion(version, release, description) values(69, now(), 'Work In Progress'); -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); -- a Git object ID, i.e., a SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- a SHA256 checksum create domain sha256 as bytea check (length(value) = 32); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; create type content_status as enum ('absent', 'visible', 'hidden'); -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is -- used as key there, but the other can be used to verify that we do not inject -- content collisions not knowingly. create table content ( sha1 sha1 primary key, sha1_git sha1_git not null, sha256 sha256 not null, length bigint not null, ctime timestamptz not null default now(), -- creation time, i.e. time of (first) injection into the storage status content_status not null default 'visible', object_id bigserial ); create unique index on content(sha1_git); create unique index on content(sha256); create index on content(ctime); -- TODO use a BRIN index here (postgres >= 9.5) -- Entities constitute a typed hierarchy of organization, hosting -- facilities, groups, people and software projects. -- -- Examples of entities: Software Heritage, Debian, GNU, GitHub, -- Apache, The Linux Foundation, the Debian Python Modules Team, the -- torvalds GitHub user, the torvalds/linux GitHub project. -- -- The data model is hierarchical (via the parent attribute) and might -- store sub-branches of existing entities. The key feature of an -- entity is might be *listed* (if it is available in listable_entity) -- to retrieve information about its content, i.e: sub-entities, -- projects, origins. -- Types of entities. -- -- - organization: a root entity, usually backed by a non-profit, a -- company, or another kind of "association". (examples: Software -- Heritage, Debian, GNU, GitHub) -- -- - group_of_entities: used for hierarchies, doesn't need to have a -- concrete existence. (examples: GNU hosting facilities, Debian -- hosting facilities, GitHub users, ...) -- -- - hosting: a hosting facility, can usually be listed to generate -- other data. (examples: GitHub git hosting, alioth.debian.org, -- snapshot.debian.org) -- -- - group_of_persons: an entity representing a group of -- persons. (examples: a GitHub organization, a Debian team) -- -- - person: an entity representing a person. (examples: -- a GitHub user, a Debian developer) -- -- - project: an entity representing a software project. (examples: a -- GitHub project, Apache httpd, a Debian source package, ...) create type entity_type as enum ( 'organization', 'group_of_entities', 'hosting', 'group_of_persons', 'person', 'project' ); -- The history of entities. Allows us to keep historical metadata -- about entities. The temporal invariant is the uuid. Root -- organization uuids are manually generated (and available in -- swh-data.sql). -- -- For generated entities (generated = true), we can provide -- generation_metadata to allow listers to retrieve the uuids of previous -- iterations of the entity. -- -- Inactive entities that have been active in the past (active = -- false) should register the timestamp at which we saw them -- deactivate, in a new entry of entity_history. create table entity_history ( id bigserial primary key, uuid uuid, parent uuid, -- should reference entity_history(uuid) name text not null, type entity_type not null, description text, homepage text, active boolean not null, -- whether the entity was seen on the last listing generated boolean not null, -- whether this entity has been generated by a lister lister_metadata jsonb, -- lister-specific metadata, used for queries metadata jsonb, validity timestamptz[] -- timestamps at which we have seen this entity ); create index on entity_history(uuid); create index on entity_history(name); -- The entity table provides a view of the latest information on a -- given entity. It is updated via a trigger on entity_history. create table entity ( uuid uuid primary key, parent uuid references entity(uuid) deferrable initially deferred, name text not null, type entity_type not null, description text, homepage text, active boolean not null, -- whether the entity was seen on the last listing generated boolean not null, -- whether this entity has been generated by a lister lister_metadata jsonb, -- lister-specific metadata, used for queries metadata jsonb, last_seen timestamptz, -- last listing time or disappearance time for active=false last_id bigint references entity_history(id) -- last listing id ); create index on entity(name); create index on entity using gin(lister_metadata jsonb_path_ops); -- Register the equivalence between two entities. Allows sideways -- navigation in the entity table create table entity_equivalence ( entity1 uuid references entity(uuid), entity2 uuid references entity(uuid), primary key (entity1, entity2), constraint order_entities check (entity1 < entity2) ); -- Register a lister for a specific entity. create table listable_entity ( uuid uuid references entity(uuid) primary key, enabled boolean not null default true, -- do we list this entity automatically? list_engine text, -- crawler to be used to list entity's content list_url text, -- root URL to start the listing list_params jsonb, -- org-specific listing parameter latest_list timestamptz -- last time the entity's content has been listed ); -- Log of all entity listings (i.e., entity crawling) that have been -- done in the past, or are still ongoing. create table list_history ( id bigserial primary key, entity uuid references listable_entity(uuid), date timestamptz not null, status boolean, -- true if and only if the listing has been successful result jsonb, -- more detailed return value, depending on status stdout text, stderr text, duration interval -- fetch duration of NULL if still ongoing ); -- An origin is a place, identified by an URL, where software can be found. We -- support different kinds of origins, e.g., git and other VCS repositories, -- web pages that list tarballs URLs (e.g., http://www.kernel.org), indirect -- tarball URLs (e.g., http://www.example.org/latest.tar.gz), etc. The key -- feature of an origin is that it can be *fetched* (wget, git clone, svn -- checkout, etc.) to retrieve all the contained software. create table origin ( id bigserial primary key, type text, -- TODO use an enum here (?) url text not null, lister uuid references listable_entity(uuid), project uuid references entity(uuid) ); create index on origin(type, url); -- Content we have seen but skipped for some reason. This table is -- separate from the content table as we might not have the sha1 -- checksum of that data (for instance when we inject git -- repositories, objects that are too big will be skipped here, and we -- will only know their sha1_git). 'reason' contains the reason the -- content was skipped. origin is a nullable column allowing to find -- out which origin contains that skipped content. create table skipped_content ( sha1 sha1, sha1_git sha1_git, sha256 sha256, length bigint not null, ctime timestamptz not null default now(), status content_status not null default 'absent', reason text not null, origin bigint references origin(id), object_id bigserial, unique (sha1, sha1_git, sha256) ); -- those indexes support multiple NULL values. create unique index on skipped_content(sha1); create unique index on skipped_content(sha1_git); create unique index on skipped_content(sha256); -- Log of all origin fetches (i.e., origin crawling) that have been done in the -- past, or are still ongoing. Similar to list_history, but for origins. create table fetch_history ( id bigserial primary key, origin bigint references origin(id), date timestamptz not null, status boolean, -- true if and only if the fetch has been successful result jsonb, -- more detailed returned values, times, etc... stdout text, stderr text, -- null when status is true, filled otherwise duration interval -- fetch duration of NULL if still ongoing ); -- A file-system directory. A directory is a list of directory entries (see -- tables: directory_entry_{dir,file}). -- -- To list the contents of a directory: -- 1. list the contained directory_entry_dir using array dir_entries -- 2. list the contained directory_entry_file using array file_entries -- 3. list the contained directory_entry_rev using array rev_entries -- 4. UNION -- -- Synonyms/mappings: -- * git: tree create table directory ( id sha1_git primary key, dir_entries bigint[], -- sub-directories, reference directory_entry_dir file_entries bigint[], -- contained files, reference directory_entry_file rev_entries bigint[], -- mounted revisions, reference directory_entry_rev object_id bigserial -- short object identifier ); create index on directory using gin (dir_entries); create index on directory using gin (file_entries); create index on directory using gin (rev_entries); -- A directory entry pointing to a sub-directory. create table directory_entry_dir ( id bigserial primary key, target sha1_git, -- id of target directory name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); create unique index on directory_entry_dir(target, name, perms); -- A directory entry pointing to a file. create table directory_entry_file ( id bigserial primary key, target sha1_git, -- id of target file name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); create unique index on directory_entry_file(target, name, perms); -- A directory entry pointing to a revision. create table directory_entry_rev ( id bigserial primary key, target sha1_git, -- id of target revision name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); create unique index on directory_entry_rev(target, name, perms); create table person ( id bigserial primary key, fullname bytea not null, -- freeform specification; what is actually used in the checksums -- will usually be of the form 'name ' name bytea, -- advisory: not null if we managed to parse a name email bytea -- advisory: not null if we managed to parse an email ); create unique index on person(fullname); create index on person(name); create index on person(email); create type revision_type as enum ('git', 'tar', 'dsc', 'svn'); -- the data object types stored in our data model create type object_type as enum ('content', 'directory', 'revision', 'release'); -- A snapshot of a software project at a specific point in time. -- -- Synonyms/mappings: -- * git / subversion / etc: commit -- * tarball: a specific tarball -- -- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in -- case of merges) parent revisions. Each revision points to a directory, i.e., -- a file-system tree containing files and directories. create table revision ( id sha1_git primary key, date timestamptz, date_offset smallint, date_neg_utc_offset boolean, committer_date timestamptz, committer_date_offset smallint, committer_date_neg_utc_offset boolean, type revision_type not null, directory sha1_git, -- file-system tree message bytea, author bigint references person(id), committer bigint references person(id), metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...) synthetic boolean not null default false, -- true if synthetic (cf. swh-loader-tar) object_id bigserial ); create index on revision(directory); -- either this table or the sha1_git[] column on the revision table create table revision_history ( id sha1_git references revision(id), parent_id sha1_git, parent_rank int not null default 0, -- parent position in merge commits, 0-based primary key (id, parent_rank) ); create index on revision_history(parent_id); -- The timestamps at which Software Heritage has made a visit of the given origin. create table origin_visit ( origin bigint not null references origin(id), visit bigint not null, date timestamptz not null, primary key (origin, visit) ); create index on origin_visit(date); -- The content of software origins is indexed starting from top-level pointers -- called "branches". Every time we fetch some origin we store in this table -- where the branches pointed to at fetch time. -- -- Synonyms/mappings: -- * git: ref (in the "git update-ref" sense) create table occurrence_history ( origin bigint references origin(id) not null, branch bytea not null, -- e.g., b"master" (for VCS), or b"sid" (for Debian) target sha1_git not null, -- ref target, e.g., commit id target_type object_type not null, -- ref target type object_id bigserial not null, -- short object identifier visits bigint[] not null, -- the visits where that occurrence was valid. References -- origin_visit(visit), where o_h.origin = origin_visit.origin. primary key (object_id) ); create index on occurrence_history(target, target_type); create index on occurrence_history(origin, branch); create unique index on occurrence_history(origin, branch, target, target_type); -- Materialized view of occurrence_history, storing the *current* value of each -- branch, as last seen by SWH. create table occurrence ( origin bigint references origin(id) not null, branch bytea not null, target sha1_git not null, target_type object_type not null, primary key(origin, branch) ); -- A "memorable" point in the development history of a project. -- -- Synonyms/mappings: -- * git: tag (of the annotated kind, otherwise they are just references) -- * tarball: the release version number create table release ( id sha1_git primary key, target sha1_git, target_type object_type, date timestamptz, date_offset smallint, date_neg_utc_offset boolean, name bytea, comment bytea, author bigint references person(id), synthetic boolean not null default false, -- true if synthetic (cf. swh-loader-tar) object_id bigserial ); create index on release(target, target_type); - - --- In order to archive the content of the object storage, add --- some tables to keep trace of what have already been archived. - -CREATE DOMAIN archive_id AS TEXT; - -CREATE TABLE archives ( - id archive_id PRIMARY KEY, - url TEXT -); - -CREATE TYPE archive_status AS ENUM ( - 'missing', - 'ongoing', - 'present' -); - -CREATE TABLE content_archive ( - content_id sha1 REFERENCES content(sha1), - archive_id archive_id REFERENCES archives(id), - status archive_status, - mtime timestamptz, - PRIMARY KEY (content_id, archive_id) -); diff --git a/sql/upgrades/070.sql b/sql/upgrades/070.sql new file mode 100644 index 00000000..df84de53 --- /dev/null +++ b/sql/upgrades/070.sql @@ -0,0 +1,15 @@ +-- SWH DB schema upgrade +-- from_version: 69 +-- to_version: 70 +-- description: Drop the archiver's related tables in main schema (move to its own database) + +insert into dbversion(version, release, description) + values(70, now(), 'Work In Progress'); + +drop domain archive_id; + +drop tab le archives; + +drop type archive_status; + +drop table content_archive; diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py index 1484dbe6..3669c762 100644 --- a/swh/storage/archiver/copier.py +++ b/swh/storage/archiver/copier.py @@ -1,60 +1,60 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core import hashutil from swh.objstorage.api.client import RemoteObjStorage class ArchiverCopier(): """ This archiver copy some files into a remote objstorage in order to get a backup. Attributes: content_ids: A list of sha1's that represents the content this copier has to archive. server (RemoteArchive): The remote object storage that is used to backup content. master_storage (Storage): The master storage that contains the data the copier needs to archive. """ def __init__(self, destination, content, master_storage): """ Create a Copier for the archiver Args: destination: A tuple (archive_name, archive_url) that represents a - remote object storage as in the 'archives' table. + remote object storage as in the 'archive' table. content: A list of sha1 that represents the content this copier have to archive. master_storage (Storage): The master storage of the system that contains the data to archive. """ _name, self.url = destination self.content_ids = content self.server = RemoteObjStorage(self.url) self.master_storage = master_storage def run(self): """ Do the copy on the backup storage. Run the archiver copier in order to copy the required content into the current destination. The content which corresponds to the sha1 in self.content_ids will be fetched from the master_storage and then copied into the backup object storage. Returns: A boolean that indicates if the whole content have been copied. """ self.content_ids = list(map(lambda x: hashutil.hex_to_hash(x[2:]), self.content_ids)) contents = self.master_storage.content_get(self.content_ids) try: for content in contents: content_data = content['data'] self.server.content_add(content_data) except: return False return True diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py index 15f26ff2..8740f27a 100644 --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,243 +1,258 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import swh import logging import click from datetime import datetime from swh.core import hashutil, config from swh.scheduler.celery_backend.config import app + from . import tasks # NOQA +from .storage import ArchiverStorage DEFAULT_CONFIG = { 'objstorage_path': ('str', '/tmp/swh-storage/objects'), 'batch_max_size': ('int', 50), 'archival_max_age': ('int', 3600), 'retention_policy': ('int', 2), 'asynchronous': ('bool', True), - 'dbconn': ('str', 'dbname=softwareheritage-dev user=guest') + 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest'), + 'dbconn_storage': ('str', 'dbname=softwareheritage-dev user=guest') } task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' logger = logging.getLogger() class ArchiverDirector(): """Process the files in order to know which one is needed as backup. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. - Attributes: master_storage: the local storage of the master server. slave_storages: Iterable of remote obj storages to the slaves servers used for backup. config: Archiver_configuration. A dictionary that must contain the following keys. - objstorage_path (string): the path of the objstorage of the - master. + objstorage_path (string): master's objstorage path + batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ - def __init__(self, db_conn, config): + def __init__(self, db_conn_archiver, db_conn_storage, config): """ Constructor of the archiver director. Args: - db_conn: db_conn: Either a libpq connection string, - or a psycopg2 connection. - config: Archiver_configuration. A dictionary that must contains + db_conn_archiver: Either a libpq connection string, + or a psycopg2 connection for the archiver db connection. + db_conn_storage: Either a libpq connection string, + or a psycopg2 connection for the db storage connection. + config: Archiver_configuration. A dictionary that must contain the following keys. - objstorage_path (string): the path of the objstorage of the - master. + objstorage_path (string): master's objstorage path batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ # Get the local storage of the master and remote ones for the slaves. - self.master_storage_args = [db_conn, config['objstorage_path']] + self.db_conn_archiver = db_conn_archiver + self.archiver_storage = ArchiverStorage(db_conn_archiver) + + self.master_storage_args = [db_conn_storage, config['objstorage_path']] master_storage = swh.storage.get_storage('local_storage', self.master_storage_args) slaves = { id: url for id, url - in master_storage.db.archive_ls() + in self.archiver_storage.archive_ls() } # TODO Database should be initialized somehow before going in # production. For now, assumes that the database contains - # datas for all the current content. + # data for all the current content. self.master_storage = master_storage self.slave_storages = slaves self.config = config def run(self): """ Run the archiver director. The archiver director will check all the contents of the archiver database and do the required backup jobs. """ if self.config['asynchronous']: run_fn = self.run_async_worker else: run_fn = self.run_sync_worker for batch in self.get_unarchived_content(): run_fn(batch) def run_async_worker(self, batch): """ Produce a worker that will be added to the task queue. """ task = app.tasks[task_name] - task.delay(batch, self.master_storage_args, - self.slave_storages, self.config) + task.delay(batch, + archiver_args=self.db_conn_archiver, + master_storage_args=self.master_storage_args, + slave_storages=self.slave_storages, + config=self.config) def run_sync_worker(self, batch): """ Run synchronously a worker on the given batch. """ task = app.tasks[task_name] - task(batch, self.master_storage_args, - self.slave_storages, self.config) + task(batch, + archiver_args=self.db_conn_archiver, + master_storage_args=self.master_storage_args, + slave_storages=self.slave_storages, + config=self.config) def get_unarchived_content(self): - """ get all the contents that needs to be archived. + """ Get contents that need to be archived. Yields: A batch of contents. Batches are dictionaries which associates a content id to the data about servers that contains it or not. {'id1': {'present': [('slave1', 'slave1_url')], 'missing': [('slave2', 'slave2_url'), ('slave3', 'slave3_url')] }, 'id2': {'present': [], 'missing': [ ('slave1', 'slave1_url'), ('slave2', 'slave2_url'), ('slave3', 'slave3_url') ]} } Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) are ids and urls of the storage slaves. At least all the content that don't have enough copies on the backups servers are distributed into these batches. """ # Get the data about each content referenced into the archiver. missing_copy = {} - for content_id in self.master_storage.db.content_archive_ls(): + for content_id in self.archiver_storage.content_archive_ls(): db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) # Fetch the datas about archival status of the content - backups = self.master_storage.db.content_archive_get( + backups = self.archiver_storage.content_archive_get( content=db_content_id ) for _content_id, server_id, status, mtime in backups: virtual_status = self.get_virtual_status(status, mtime) server_data = (server_id, self.slave_storages[server_id]) missing_copy.setdefault( db_content_id, {'present': [], 'missing': []} ).setdefault(virtual_status, []).append(server_data) # Check the content before archival. try: self.master_storage.objstorage.check(content_id[0]) except Exception as e: # Exception can be Error or ObjNotFoundError. logger.error(e) # TODO Do something to restore the content? if len(missing_copy) >= self.config['batch_max_size']: yield missing_copy missing_copy = {} if len(missing_copy) > 0: yield missing_copy def get_virtual_status(self, status, mtime): """ Compute the virtual presence of a content. If the status is ongoing but the time is not elasped, the archiver consider it will be present in the futur, and so consider it as present. However, if the time is elasped, the copy may have failed, so consider the content as missing. Arguments: status (string): One of ('present', 'missing', 'ongoing'). The status of the content. mtime (datetime): Time at which the content have been updated for the last time. Returns: The virtual status of the studied content, which is 'present' or 'missing'. Raises: ValueError: if the status is not one 'present', 'missing' or 'ongoing' """ if status in ('present', 'missing'): return status # If the status is 'ongoing' but there is still time, another worker # may still be on the task. if status == 'ongoing': mtime = mtime.replace(tzinfo=None) elapsed = (datetime.now() - mtime).total_seconds() if elapsed <= self.config['archival_max_age']: return 'present' else: return 'missing' else: raise ValueError("status must be either 'present', 'missing' " "or 'ongoing'") @click.command() @click.argument('config-path', required=1) @click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], - help="Connection string for the database") + help="Connection string for the archiver database") +@click.option('--dbconn-storage', default=DEFAULT_CONFIG['dbconn_storage'][1], + help="Connection string for the storage database") @click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], help="Indicates if the archiver should run asynchronously") -def launch(config_path, dbconn, async): +def launch(config_path, dbconn, dbconn_storage, async): # The configuration have following priority : # command line > file config > default config cl_config = { 'dbconn': dbconn, + 'dbconn_storage': dbconn_storage, 'asynchronous': async } conf = config.read(config_path, DEFAULT_CONFIG) conf.update(cl_config) # Create connection data and run the archiver. - archiver = ArchiverDirector(conf['dbconn'], conf) + archiver = ArchiverDirector(conf['dbconn'], conf['dbconn_storage'], conf) logger.info("Starting an archival at", datetime.now()) archiver.run() if __name__ == '__main__': launch() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py new file mode 100644 index 00000000..028304cc --- /dev/null +++ b/swh/storage/archiver/storage.py @@ -0,0 +1,89 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import psycopg2 + +from ..common import db_transaction_generator +from ..db import Db +from ..exc import StorageDBError + + +class ArchiverStorage(): + """SWH Archiver storage proxy, encompassing DB + + """ + def __init__(self, db_conn): + """ + Args: + db_conn: either a libpq connection string, or a psycopg2 connection + + """ + try: + if isinstance(db_conn, psycopg2.extensions.connection): + self.db = Db(db_conn) + else: + self.db = Db.connect(db_conn) + except psycopg2.OperationalError as e: + raise StorageDBError(e) + + @db_transaction_generator + def archive_ls(self, cur=None): + """ Get all the archives registered on the server. + + Yields: + a tuple (server_id, server_url) for each archive server. + """ + yield from self.db.archive_ls(cur) + + @db_transaction_generator + def content_archive_ls(self, cur=None): + """ Get the archival status of the content + + Get an iterable over all the content that is referenced + in a backup server. + + Yields: + the sha1 of each content referenced at least one time + in the database of archiveal status. + + """ + yield from self.db.content_archive_ls(cur) + + @db_transaction_generator + def content_archive_get(self, content=None, archive=None, cur=None): + """ Get the archival status of a content in a specific server. + + Retreive from the database the archival status of the given content + in the given archive server. + + Args: + content: the sha1 of the content. May be None for any id. + archive: the database id of the server we're looking into + may be None for any server. + + Yields: + A tuple (content_id, server_id, archival status, mtime, tzinfo). + """ + yield from self.db.content_archive_get(content, archive, cur) + + @db_transaction_generator + def content_archive_update(self, content_id, archive_id, + new_status=None, cur=None): + """ Update the status of a archive content and set it's mtime to now() + + Change the last modification time of an archived content and change + its status to the given one. + + Args: + content_id (string): The content id. + archive_id (string): The id of the concerned archive. + new_status (string): One of missing, ongoing or present, this + status will replace the previous one. If not given, the + function only changes the mtime of the content. + """ + yield from self.db.content_archive_update(content_id, + archive_id, + new_status, + cur) diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py index 0b7ce61e..439aae0d 100644 --- a/swh/storage/archiver/tasks.py +++ b/swh/storage/archiver/tasks.py @@ -1,20 +1,20 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from .worker import ArchiverWorker class SWHArchiverTask(Task): """ Main task that archive a batch of content. """ task_queue = 'swh_storage_archive_worker' - def run(self, batch, master_storage_args, - slave_storages, config): - aw = ArchiverWorker(batch, master_storage_args, + def run(self, batch, archiver_args, master_storage_args, slave_storages, + config): + aw = ArchiverWorker(batch, archiver_args, master_storage_args, slave_storages, config) if aw.run(): self.log("Successful backup for a batch of size %s" % len(batch)) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py index 8fda96bc..a14476cc 100644 --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -1,239 +1,244 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import logging +from .storage import ArchiverStorage from .copier import ArchiverCopier from .. import get_storage from datetime import datetime logger = logging.getLogger() class ArchiverWorker(): """ Do the required backups on a given batch of contents. Process the content of a content batch in order to do the needed backups on the slaves servers. Attributes: batch: The content this worker has to archive, which is a dictionary that associates a content's sha1 id to the list of servers where the content is present or missing (see ArchiverDirector::get_unarchived_content). master_storage_args: The connection argument to initialize the master storage with the db connection url & the object storage path. slave_storages: A map that associates server_id to the remote server. config: Archiver_configuration. A dictionary that must contains the following keys. objstorage_path (string): the path of the objstorage of the master. batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ - def __init__(self, batch, master_storage_args, slave_storages, config): + def __init__(self, batch, archiver_args, master_storage_args, + slave_storages, config): """ Constructor of the ArchiverWorker class. Args: batch: A batch of content, which is a dictionary that associates a content's sha1 id to the list of servers where the content is present. + archiver_args: The archiver's arguments to establish connection to + db. master_storage_args: The master storage arguments. slave_storages: A map that associates server_id to the remote server. config: Archiver_configuration. A dictionary that must contains the following keys. objstorage_path (string): the path of the objstorage of the master. batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ self.batch = batch + self.archiver_storage = ArchiverStorage(archiver_args) self.master_storage = get_storage('local_storage', master_storage_args) self.slave_storages = slave_storages self.config = config def __choose_backup_servers(self, allowed_storage, backup_number): """ Choose the slave servers for archival. Choose the given amount of servers among those which don't already contain a copy of the content. Args: allowed_storage: servers when the content is not already present. backup_number (int): The number of servers we have to choose in order to fullfill the objective. """ # In case there is not enough backup servers to get all the backups # we need, just do our best. # TODO such situation can only be caused by an incorrect configuration # setting. Do a verification previously. backup_number = min(backup_number, len(allowed_storage)) # TODO Find a better (or a good) policy to choose the backup servers. # The random choice should be equivalently distributed between # servers for a great amount of data, but don't take care of servers # capacities. return random.sample(allowed_storage, backup_number) def __get_archival_status(self, content_id, server): """ Get the archival status of the required content. Attributes: content_id (string): Sha1 of the content. server: Tuple (archive_id, archive_url) of the archive server. Returns: A dictionary that contains all the required data : 'content_id', 'archive_id', 'status', and 'mtime' """ t, = list( - self.master_storage.db.content_archive_get(content_id, server[0]) + self.archiver_storage.content_archive_get(content_id, server[0]) ) return { 'content_id': t[0], 'archive_id': t[1], 'status': t[2], 'mtime': t[3] } def __content_archive_update(self, content_id, archive_id, new_status=None): """ Update the status of a archive content and set it's mtime to now() Change the last modification time of an archived content and change its status to the given one. Args: content_id (string): The content id. archive_id (string): The id of the concerned archive. new_status (string): One of missing, ongoing or present, this status will replace the previous one. If not given, the function only changes the mtime of the content. """ - self.master_storage.db.content_archive_update( + self.archiver_storage.content_archive_update( content_id, archive_id, new_status ) def need_archival(self, content, destination): """ Indicates whenever a content need archivage. Filter function that returns True if a given content still require to be archived. Args: content (str): Sha1 of a content. destination: Tuple (archive id, archive url). """ archival_status = self.__get_archival_status( content, destination ) status = archival_status['status'] mtime = archival_status['mtime'] # If the archive is already present, no need to backup. if status == 'present': return False # If the content is ongoing but still have time, there is # another worker working on this content. elif status == 'ongoing': mtime = mtime.replace(tzinfo=None) elapsed = (datetime.now() - mtime).total_seconds() if elapsed <= self.config['archival_max_age']: return False return True def sort_content_by_archive(self): """ Create a map {archive_server -> list of content) Create a mapping that associate to a archive server all the contents that needs to be archived in it by the current worker. The map is in the form of : { (archive_1, archive_1_url): [content1, content2, content_3] (archive_2, archive_2_url): [content1, content3] } Returns: The created mapping. """ slaves_copy = {} for content_id in self.batch: # Choose some servers to upload the content among the missing ones. server_data = self.batch[content_id] nb_present = len(server_data['present']) nb_backup = self.config['retention_policy'] - nb_present backup_servers = self.__choose_backup_servers( server_data['missing'], nb_backup ) # Fill the map destination -> content to upload for server in backup_servers: slaves_copy.setdefault(server, []).append(content_id) return slaves_copy def run(self): """ Do the task expected from the archiver worker. Process the content in the batch, ensure that the elements still need an archival, and spawn copiers to copy files in each destinations. """ # Get a map (archive -> [contents]) slaves_copy = self.sort_content_by_archive() # At this point, re-check the archival status in order to know if the # job have been done by another worker. for destination in slaves_copy: # list() is needed because filter's result will be consumed twice. slaves_copy[destination] = list(filter( lambda content_id: self.need_archival(content_id, destination), slaves_copy[destination] )) for content_id in slaves_copy[destination]: self.__content_archive_update(content_id, destination[0], new_status='ongoing') # Spawn a copier for each destination for destination in slaves_copy: try: self.run_copier(destination, slaves_copy[destination]) except: logger.error('Unable to copy a batch to %s' % destination) def run_copier(self, destination, contents): """ Run a copier in order to archive the given contents Upload the given contents to the given archive. If the process fail, the whole content is considered uncopied and remains 'ongoing', waiting to be rescheduled as there is a delay. Attributes: destination: Tuple (archive_id, archive_url) of the destination. contents: List of contents to archive. """ ac = ArchiverCopier(destination, contents, self.master_storage) if ac.run(): # Once the archival complete, update the database. for content_id in contents: self.__content_archive_update(content_id, destination[0], new_status='present') diff --git a/swh/storage/common.py b/swh/storage/common.py new file mode 100644 index 00000000..69364b3f --- /dev/null +++ b/swh/storage/common.py @@ -0,0 +1,32 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools + + +def db_transaction(meth): + """decorator to execute Storage methods within DB transactions + + The decorated method must accept a `cur` keyword argument + """ + @functools.wraps(meth) + def _meth(self, *args, **kwargs): + with self.db.transaction() as cur: + return meth(self, *args, cur=cur, **kwargs) + return _meth + + +def db_transaction_generator(meth): + """decorator to execute Storage methods within DB transactions, while + returning a generator + + The decorated method must accept a `cur` keyword argument + + """ + @functools.wraps(meth) + def _meth(self, *args, **kwargs): + with self.db.transaction() as cur: + yield from meth(self, *args, cur=cur, **kwargs) + return _meth diff --git a/swh/storage/db.py b/swh/storage/db.py index 430ebf61..2af32e17 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,738 +1,738 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import functools import json import psycopg2 import psycopg2.extras import tempfile from contextlib import contextmanager TMP_CONTENT_TABLE = 'tmp_content' psycopg2.extras.register_uuid() def stored_procedure(stored_proc): """decorator to execute remote stored procedure, specified as argument Generally, the body of the decorated function should be empty. If it is not, the stored procedure will be executed first; the function body then. """ def wrap(meth): @functools.wraps(meth) def _meth(self, *args, **kwargs): cur = kwargs.get('cur', None) self._cursor(cur).execute('SELECT %s()' % stored_proc) meth(self, *args, **kwargs) return _meth return wrap def jsonize(value): """Convert a value to a psycopg2 JSON object if necessary""" if isinstance(value, dict): return psycopg2.extras.Json(value) return value def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() if isinstance(entry, list): return [entry_to_bytes(value) for value in entry] return entry def line_to_bytes(line): """Convert a line coming from the database to bytes""" if isinstance(line, dict): return {k: entry_to_bytes(v) for k, v in line.items()} return line.__class__(entry_to_bytes(entry) for entry in line) def cursor_to_bytes(cursor): """Yield all the data from a cursor as bytes""" yield from (line_to_bytes(line) for line in cursor) class Db: """Proxy to the SWH DB, with wrappers around stored procedures """ @classmethod def connect(cls, *args, **kwargs): """factory method to create a DB proxy Accepts all arguments of psycopg2.connect; only some specific possibilities are reported below. Args: connstring: libpq2 connection string """ conn = psycopg2.connect(*args, **kwargs) return cls(conn) def _cursor(self, cur_arg): """get a cursor: from cur_arg if given, or a fresh one otherwise meant to avoid boilerplate if/then/else in methods that proxy stored procedures """ if cur_arg is not None: return cur_arg # elif self.cur is not None: # return self.cur else: return self.conn.cursor() def __init__(self, conn): """create a DB proxy Args: conn: psycopg2 connection to the SWH DB """ self.conn = conn @contextmanager def transaction(self): """context manager to execute within a DB transaction Yields: a psycopg2 cursor """ with self.conn.cursor() as cur: try: yield cur self.conn.commit() except: if not self.conn.closed: self.conn.rollback() raise def mktemp(self, tblname, cur=None): self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)', (('directory_entry_%s' % entry_type),)) @stored_procedure('swh_mktemp_revision') def mktemp_revision(self, cur=None): pass @stored_procedure('swh_mktemp_release') def mktemp_release(self, cur=None): pass @stored_procedure('swh_mktemp_occurrence_history') def mktemp_occurrence_history(self, cur=None): pass @stored_procedure('swh_mktemp_entity_lister') def mktemp_entity_lister(self, cur=None): pass @stored_procedure('swh_mktemp_entity_history') def mktemp_entity_history(self, cur=None): pass @stored_procedure('swh_mktemp_bytea') def mktemp_bytea(self, cur=None): pass def copy_to(self, items, tblname, columns, cur=None, item_cb=None): def escape(data): if data is None: return '' if isinstance(data, bytes): return '\\x%s' % binascii.hexlify(data).decode('ascii') elif isinstance(data, str): return '"%s"' % data.replace('"', '""') elif isinstance(data, datetime.datetime): # We escape twice to make sure the string generated by # isoformat gets escaped return escape(data.isoformat()) elif isinstance(data, dict): return escape(json.dumps(data)) elif isinstance(data, list): return escape("{%s}" % ','.join(escape(d) for d in data)) elif isinstance(data, psycopg2.extras.Range): # We escape twice here too, so that we make sure # everything gets passed to copy properly return escape( '%s%s,%s%s' % ( '[' if data.lower_inc else '(', '-infinity' if data.lower_inf else escape(data.lower), 'infinity' if data.upper_inf else escape(data.upper), ']' if data.upper_inc else ')', ) ) else: # We don't escape here to make sure we pass literals properly return str(data) with tempfile.TemporaryFile('w+') as f: for d in items: if item_cb is not None: item_cb(d) line = [escape(d.get(k)) for k in columns] f.write(','.join(line)) f.write('\n') f.seek(0) self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) @stored_procedure('swh_content_add') def content_add_from_temp(self, cur=None): pass @stored_procedure('swh_directory_add') def directory_add_from_temp(self, cur=None): pass @stored_procedure('swh_skipped_content_add') def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure('swh_revision_add') def revision_add_from_temp(self, cur=None): pass @stored_procedure('swh_release_add') def release_add_from_temp(self, cur=None): pass @stored_procedure('swh_occurrence_history_add') def occurrence_history_add_from_temp(self, cur=None): pass @stored_procedure('swh_entity_history_add') def entity_history_add_from_temp(self, cur=None): pass def store_tmp_bytea(self, ids, cur=None): """Store the given identifiers in a new tmp_bytea table""" cur = self._cursor(cur) self.mktemp_bytea(cur) self.copy_to(({'id': elem} for elem in ids), 'tmp_bytea', ['id'], cur) def content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_content_missing()""") yield from cursor_to_bytes(cur) def content_missing_per_sha1_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT * FROM swh_content_missing_per_sha1()""") yield from cursor_to_bytes(cur) def skipped_content_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256 FROM swh_skipped_content_missing()""") yield from cursor_to_bytes(cur) def occurrence_get(self, origin_id, cur=None): """Retrieve latest occurrence's information by origin_id. """ cur = self._cursor(cur) cur.execute("""SELECT origin, branch, target, target_type, (select max(date) from origin_visit where origin=%s) as date FROM occurrence WHERE origin=%s """, (origin_id, origin_id)) yield from cursor_to_bytes(cur) def content_find(self, sha1=None, sha1_git=None, sha256=None, cur=None): """Find the content optionally on a combination of the following checksums sha1, sha1_git or sha256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content Returns: The triplet (sha1, sha1_git, sha256) if found or None. """ cur = self._cursor(cur) cur.execute("""SELECT sha1, sha1_git, sha256, length, ctime, status FROM swh_content_find(%s, %s, %s) LIMIT 1""", (sha1, sha1_git, sha256)) content = line_to_bytes(cur.fetchone()) if set(content) == {None}: return None else: return content def content_find_occurrence(self, sha1, cur=None): """Find one content's occurrence. Args: sha1: sha1 content cur: cursor to use Returns: One occurrence for that particular sha1 """ cur = self._cursor(cur) cur.execute("""SELECT origin_type, origin_url, branch, target, target_type, path FROM swh_content_find_occurrence(%s) LIMIT 1""", (sha1, )) return line_to_bytes(cur.fetchone()) def directory_get_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('''SELECT id, file_entries, dir_entries, rev_entries FROM swh_directory_get()''') yield from cursor_to_bytes(cur) def directory_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_missing()') yield from cursor_to_bytes(cur) def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_walk_one(%s)', (directory,)) yield from cursor_to_bytes(cur) def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_directory_walk(%s)', (directory,)) yield from cursor_to_bytes(cur) def revision_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_revision_missing() as r(id)') yield from cursor_to_bytes(cur) revision_add_cols = [ 'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date', 'committer_date_offset', 'committer_date_neg_utc_offset', 'type', 'directory', 'message', 'author_fullname', 'author_name', 'author_email', 'committer_fullname', 'committer_name', 'committer_email', 'metadata', 'synthetic', ] revision_get_cols = revision_add_cols + [ 'author_id', 'committer_id', 'parents'] origin_visit_get_cols = [ 'origin', 'visit', 'date' ] def origin_visit_get(self, origin_id, cur=None): """Retrieve occurrence's history information by origin_id. Args: origin_id: The occurrence's origin Yields: The occurrence's history visits """ cur = self._cursor(cur) cur.execute( 'SELECT origin, visit, date FROM origin_visit where origin=%s', (origin_id, )) yield from cursor_to_bytes(cur) def revision_get_from_temp(self, cur=None): cur = self._cursor(cur) query = 'SELECT %s FROM swh_revision_get()' % ( ', '.join(self.revision_get_cols)) cur.execute(query) yield from cursor_to_bytes(cur) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ', '.join(self.revision_get_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) revision_shortlog_cols = ['id', 'parents'] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ', '.join(self.revision_shortlog_cols) cur.execute(query, (root_revisions, limit)) yield from cursor_to_bytes(cur) def release_missing_from_temp(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT id FROM swh_release_missing() as r(id)') yield from cursor_to_bytes(cur) object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) self.store_tmp_bytea(ids, cur) query = 'select %s from swh_object_find_by_sha1_git()' % ( ', '.join(self.object_find_by_sha1_git_cols) ) cur.execute(query) yield from cursor_to_bytes(cur) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute('SELECT * FROM swh_stat_counters()') yield from cur fetch_history_cols = ['origin', 'date', 'status', 'result', 'stdout', 'stderr', 'duration'] def create_fetch_history(self, fetch_history, cur=None): """Create a fetch_history entry with the data in fetch_history""" cur = self._cursor(cur) query = '''INSERT INTO fetch_history (%s) VALUES (%s) RETURNING id''' % ( ','.join(self.fetch_history_cols), ','.join(['%s'] * len(self.fetch_history_cols)) ) cur.execute(query, [fetch_history.get(col) for col in self.fetch_history_cols]) return cur.fetchone()[0] def get_fetch_history(self, fetch_history_id, cur=None): """Get a fetch_history entry with the given id""" cur = self._cursor(cur) query = '''SELECT %s FROM fetch_history WHERE id=%%s''' % ( ', '.join(self.fetch_history_cols), ) cur.execute(query, (fetch_history_id,)) data = cur.fetchone() if not data: return None ret = {'id': fetch_history_id} for i, col in enumerate(self.fetch_history_cols): ret[col] = data[i] return ret def update_fetch_history(self, fetch_history, cur=None): """Update the fetch_history entry from the data in fetch_history""" cur = self._cursor(cur) query = '''UPDATE fetch_history SET %s WHERE id=%%s''' % ( ','.join('%s=%%s' % col for col in self.fetch_history_cols) ) cur.execute(query, [jsonize(fetch_history.get(col)) for col in self.fetch_history_cols + ['id']]) base_entity_cols = ['uuid', 'parent', 'name', 'type', 'description', 'homepage', 'active', 'generated', 'lister_metadata', 'metadata'] entity_cols = base_entity_cols + ['last_seen', 'last_id'] entity_history_cols = base_entity_cols + ['id', 'validity'] def origin_add(self, type, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (type, url) values (%s, %s) RETURNING id""" cur.execute(insert, (type, url)) return cur.fetchone()[0] def origin_get_with(self, type, url, cur=None): """Retrieve the origin id from its type and url if found.""" cur = self._cursor(cur) query = """SELECT id, type, url, lister, project FROM origin WHERE type=%s AND url=%s""" cur.execute(query, (type, url)) data = cur.fetchone() if data: return line_to_bytes(data) return None def origin_get(self, id, cur=None): """Retrieve the origin per its identifier. """ cur = self._cursor(cur) query = "SELECT id, type, url, lister, project FROM origin WHERE id=%s" cur.execute(query, (id,)) data = cur.fetchone() if data: return line_to_bytes(data) return None person_cols = ['fullname', 'name', 'email'] person_get_cols = person_cols + ['id'] def person_add(self, person, cur=None): """Add a person identified by its name and email. Returns: The new person's id """ cur = self._cursor(cur) query_new_person = '''\ INSERT INTO person(%s) VALUES (%s) RETURNING id''' % ( ', '.join(self.person_cols), ', '.join('%s' for i in range(len(self.person_cols))) ) cur.execute(query_new_person, [person[col] for col in self.person_cols]) return cur.fetchone()[0] def person_get(self, ids, cur=None): """Retrieve the persons identified by the list of ids. """ cur = self._cursor(cur) query = """SELECT %s FROM person WHERE id IN %%s""" % ', '.join(self.person_get_cols) cur.execute(query, (tuple(ids),)) yield from cursor_to_bytes(cur) release_add_cols = [ 'id', 'target', 'target_type', 'date', 'date_offset', 'date_neg_utc_offset', 'name', 'comment', 'synthetic', 'author_fullname', 'author_name', 'author_email', ] release_get_cols = release_add_cols + ['author_id'] def release_get_from_temp(self, cur=None): cur = self._cursor(cur) query = ''' SELECT %s FROM swh_release_get() ''' % ', '.join(self.release_get_cols) cur.execute(query) yield from cursor_to_bytes(cur) def release_get_by(self, origin_id, limit=None, cur=None): """Retrieve a release by occurrence criterion (only origin right now) Args: - origin_id: The origin to look for. """ cur = self._cursor(cur) query = """ SELECT %s FROM swh_release_get_by(%%s) LIMIT %%s """ % ', '.join(self.release_get_cols) cur.execute(query, (origin_id, limit)) yield from cursor_to_bytes(cur) def revision_get_by(self, origin_id, branch_name, datetime, limit=None, cur=None): """Retrieve a revision by occurrence criterion. Args: - origin_id: The origin to look for - branch_name: the branch name to look for - datetime: the lower bound of timerange to look for. - limit: limit number of results to return The upper bound being now. """ cur = self._cursor(cur) if branch_name and isinstance(branch_name, str): branch_name = branch_name.encode('utf-8') query = ''' SELECT %s FROM swh_revision_get_by(%%s, %%s, %%s) LIMIT %%s ''' % ', '.join(self.revision_get_cols) cur.execute(query, (origin_id, branch_name, datetime, limit)) yield from cursor_to_bytes(cur) def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cur.execute("""SELECT dir_id, type, target, name, perms, status, sha1, sha1_git, sha256 FROM swh_find_directory_entry_by_path(%s, %s)""", (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return line_to_bytes(data) def entity_get(self, uuid, cur=None): """Retrieve the entity and its parent hierarchy chain per uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM swh_entity_get(%%s)""" % ( ', '.join(self.entity_cols)), (uuid, )) yield from cursor_to_bytes(cur) def entity_get_one(self, uuid, cur=None): """Retrieve a single entity given its uuid. """ cur = self._cursor(cur) cur.execute("""SELECT %s FROM entity WHERE uuid = %%s""" % ( ', '.join(self.entity_cols)), (uuid, )) data = cur.fetchone() if not data: return None return line_to_bytes(data) def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("""SELECT id, url - FROM archives + FROM archive """) yield from cursor_to_bytes(cur) def content_archive_ls(self, cur=None): """ Get the archival status of the content Get an iterable over all the content that is referenced in a backup server. Yields: the sha1 of each content referenced at least one time in the database of archiveal status. """ cur = self._cursor(cur) cur.execute("""SELECT DISTINCT content_id FROM content_archive""") yield from cursor_to_bytes(cur) def content_archive_get(self, content=None, archive=None, cur=None): """ Get the archival status of a content in a specific server. Retreive from the database the archival status of the given content in the given archive server. Args: content: the sha1 of the content. May be None for any id. archive: the database id of the server we're looking into may be None for any server. Yields: A tuple (content_id, server_id, archival status, mtime, tzinfo). """ query = """SELECT content_id, archive_id, status, mtime FROM content_archive """ conditions = [] if content: conditions.append("content_id='%s'" % content) if archive: conditions.append("archive_id='%s'" % archive) if conditions: query = """%s WHERE %s """ % (query, ' and '.join(conditions)) cur = self._cursor(cur) cur.execute(query) yield from cursor_to_bytes(cur) def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of a archive content and set it's mtime to now() Change the last modification time of an archived content and change its status to the given one. Args: content_id (string): The content id. archive_id (string): The id of the concerned archive. new_status (string): One of missing, ongoing or present, this status will replace the previous one. If not given, the function only changes the mtime of the content. """ query = """UPDATE content_archive SET %(fields)s WHERE content_id='%(content_id)s' and archive_id='%(archive_id)s' """ fields = [] if new_status: fields.append("status='%s'" % new_status) fields.append("mtime=now()") d = {'fields': ', '.join(fields), 'content_id': content_id, 'archive_id': archive_id} cur = self._cursor(cur) cur.execute(query % d) diff --git a/swh/storage/storage.py b/swh/storage/storage.py index 9816250c..c3ab3dc0 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1,1113 +1,1087 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime -import functools import itertools import dateutil.parser import psycopg2 from . import converters +from .common import db_transaction_generator, db_transaction from .db import Db from .exc import StorageDBError from swh.core.hashutil import ALGORITHMS from swh.objstorage import PathSlicingObjStorage from swh.objstorage.exc import ObjNotFoundError # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 -def db_transaction(meth): - """decorator to execute Storage methods within DB transactions - - The decorated method must accept a `cur` keyword argument - """ - @functools.wraps(meth) - def _meth(self, *args, **kwargs): - with self.db.transaction() as cur: - return meth(self, *args, cur=cur, **kwargs) - return _meth - - -def db_transaction_generator(meth): - """decorator to execute Storage methods within DB transactions, while - returning a generator - - The decorated method must accept a `cur` keyword argument - - """ - @functools.wraps(meth) - def _meth(self, *args, **kwargs): - with self.db.transaction() as cur: - yield from meth(self, *args, cur=cur, **kwargs) - return _meth - - class Storage(): """SWH storage proxy, encompassing DB and object storage """ def __init__(self, db_conn, obj_root): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection obj_root: path to the root of the object storage """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = Db(db_conn) else: self.db = Db.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) # TODO this needs to be configured self.objstorage = PathSlicingObjStorage(obj_root, slicing='0:2/2:4/4:6') def content_add(self, content): """Add content blobs to the storage Note: in case of DB errors, objects might have already been added to the object storage and will not be removed. Since addition to the object storage is idempotent, that should not be a problem. Args: content: iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length (default: -1) - one key for each checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to the corresponding checksum - status (str): one of visible, hidden, absent - reason (str): if status = absent, the reason why - origin (int): if status = absent, the origin we saw the content in """ db = self.db content_by_status = defaultdict(list) for d in content: if 'status' not in d: d['status'] = 'visible' if 'length' not in d: d['length'] = -1 content_by_status[d['status']].append(d) content_with_data = content_by_status['visible'] content_without_data = content_by_status['absent'] missing_content = set(self.content_missing(content_with_data)) missing_skipped = set( sha1_git for sha1, sha1_git, sha256 in self.skipped_content_missing(content_without_data)) with db.transaction() as cur: if missing_content: # create temporary table for metadata injection db.mktemp('content', cur) def add_to_objstorage(cont): self.objstorage.add(cont['data'], obj_id=cont['sha1']) content_filtered = (cont for cont in content_with_data if cont['sha1'] in missing_content) db.copy_to(content_filtered, 'tmp_content', ['sha1', 'sha1_git', 'sha256', 'length', 'status'], cur, item_cb=add_to_objstorage) # move metadata in place db.content_add_from_temp(cur) if missing_skipped: missing_filtered = (cont for cont in content_without_data if cont['sha1_git'] in missing_skipped) db.mktemp('skipped_content', cur) db.copy_to(missing_filtered, 'tmp_skipped_content', ['sha1', 'sha1_git', 'sha256', 'length', 'reason', 'status', 'origin'], cur) # move metadata in place db.skipped_content_add_from_temp(cur) def content_get(self, content): """Retrieve in bulk contents and their data. Args: content: iterables of sha1 Returns: Generates streams of contents as dict with their raw data: - sha1: sha1's content - data: bytes data of the content Raises: ValueError in case of too much contents are required. cf. BULK_BLOCK_CONTENT_LEN_MAX """ # FIXME: Improve on server module to slice the result if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise ValueError( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) for obj_id in content: try: data = self.objstorage.get(obj_id) except ObjNotFoundError: yield None continue yield {'sha1': obj_id, 'data': data} @db_transaction_generator def content_missing(self, content, key_hash='sha1', cur=None): """List content missing from storage Args: content: iterable of dictionaries containing one key for each checksum algorithm in swh.core.hashutil.ALGORITHMS, mapped to the corresponding checksum, and a length key mapped to the content length. key_hash: the name of the hash used as key (default: 'sha1') Returns: an iterable of `key_hash`es missing from the storage Raises: TODO: an exception when we get a hash collision. """ db = self.db keys = ['sha1', 'sha1_git', 'sha256'] if key_hash not in keys: raise ValueError("key_hash should be one of %s" % keys) key_hash_idx = keys.index(key_hash) # Create temporary table for metadata injection db.mktemp('content', cur) db.copy_to(content, 'tmp_content', keys + ['length'], cur) for obj in db.content_missing_from_temp(cur): yield obj[key_hash_idx] @db_transaction_generator def content_missing_per_sha1(self, contents, cur=None): """List content missing from storage based only on sha1. Args: contents: Iterable of sha1 to check for absence. Returns: an iterable of `sha1`s missing from the storage. Raises: TODO: an exception when we get a hash collision. """ db = self.db db.store_tmp_bytea(contents, cur) for obj in db.content_missing_per_sha1_from_temp(cur): yield obj[0] @db_transaction_generator def skipped_content_missing(self, content, cur=None): """List skipped_content missing from storage Args: content: iterable of dictionaries containing the data for each checksum algorithm. Returns: an iterable of signatures missing from the storage """ keys = ['sha1', 'sha1_git', 'sha256'] db = self.db db.mktemp('skipped_content', cur) db.copy_to(content, 'tmp_skipped_content', keys + ['length', 'reason'], cur) yield from db.skipped_content_missing_from_temp(cur) @db_transaction def content_find(self, content, cur=None): """Find a content hash in db. Args: content: a dictionary representing one content hash, mapping checksum algorithm names (see swh.core.hashutil.ALGORITHMS) to checksum values Returns: a triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. Raises: ValueError in case the key of the dictionary is not sha1, sha1_git nor sha256. """ db = self.db if not set(content).intersection(ALGORITHMS): raise ValueError('content keys must contain at least one of: ' 'sha1, sha1_git, sha256') c = db.content_find(sha1=content.get('sha1'), sha1_git=content.get('sha1_git'), sha256=content.get('sha256'), cur=cur) if c: keys = ['sha1', 'sha1_git', 'sha256', 'length', 'ctime', 'status'] return dict(zip(keys, c)) return None @db_transaction def content_find_occurrence(self, content, cur=None): """Find the content's occurrence. Args: content: a dictionary entry representing one content hash. The dictionary key is one of swh.core.hashutil.ALGORITHMS. The value mapped to the corresponding checksum. Returns: The occurrence of the content. Raises: ValueError in case the key of the dictionary is not sha1, sha1_git nor sha256. """ db = self.db c = self.content_find(content) if not c: return None sha1 = c['sha1'] found_occ = db.content_find_occurrence(sha1, cur=cur) if found_occ: keys = ['origin_type', 'origin_url', 'branch', 'target', 'target_type', 'path'] return dict(zip(keys, found_occ)) return None def directory_add(self, directories): """Add directories to the storage Args: directories: iterable of dictionaries representing the individual directories to add. Each dict has the following keys: - id (sha1_git): the id of the directory to add - entries (list): list of dicts for each entry in the directory. Each dict has the following keys: - name (bytes) - type (one of 'file', 'dir', 'rev'): type of the directory entry (file, directory, revision) - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions """ dirs = set() dir_entries = { 'file': defaultdict(list), 'dir': defaultdict(list), 'rev': defaultdict(list), } for cur_dir in directories: dir_id = cur_dir['id'] dirs.add(dir_id) for src_entry in cur_dir['entries']: entry = src_entry.copy() entry['dir_id'] = dir_id dir_entries[entry['type']][dir_id].append(entry) dirs_missing = set(self.directory_missing(dirs)) if not dirs_missing: return db = self.db with db.transaction() as cur: # Copy directory ids dirs_missing_dict = ({'id': dir} for dir in dirs_missing) db.mktemp('directory', cur) db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) # Copy entries for entry_type, entry_list in dir_entries.items(): entries = itertools.chain.from_iterable( entries_for_dir for dir_id, entries_for_dir in entry_list.items() if dir_id in dirs_missing) db.mktemp_dir_entry(entry_type) db.copy_to( entries, 'tmp_directory_entry_%s' % entry_type, ['target', 'name', 'perms', 'dir_id'], cur, ) # Do the final copy db.directory_add_from_temp(cur) @db_transaction_generator def directory_missing(self, directories, cur): """List directories missing from storage Args: an iterable of directory ids Returns: a list of missing directory ids """ db = self.db # Create temporary table for metadata injection db.mktemp('directory', cur) directories_dicts = ({'id': dir} for dir in directories) db.copy_to(directories_dicts, 'tmp_directory', ['id'], cur) for obj in db.directory_missing_from_temp(cur): yield obj[0] @db_transaction_generator def directory_get(self, directories, cur=None): """Get information on directories. Args: - directories: an iterable of directory ids Returns: List of directories as dict with keys and associated values. """ db = self.db keys = ('id', 'dir_entries', 'file_entries', 'rev_entries') db.mktemp('directory', cur) db.copy_to(({'id': dir_id} for dir_id in directories), 'tmp_directory', ['id'], cur) dirs = db.directory_get_from_temp(cur) for line in dirs: yield dict(zip(keys, line)) @db_transaction_generator def directory_ls(self, directory, recursive=False, cur=None): """Get entries for one directory. Args: - directory: the directory to list entries from. - recursive: if flag on, this list recursively from this directory. Returns: List of entries for such directory. """ db = self.db keys = ['dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256'] if recursive: res_gen = db.directory_walk(directory) else: res_gen = db.directory_walk_one(directory) for line in res_gen: yield dict(zip(keys, line)) @db_transaction def directory_entry_get_by_path(self, directory, paths, cur=None): """Get the directory entry (either file or dir) from directory with path. Args: - directory: sha1 of the top level directory - paths: path to lookup from the top level directory. From left (top) to right (bottom). Returns: The corresponding directory entry if found, None otherwise. """ db = self.db keys = ('dir_id', 'type', 'target', 'name', 'perms', 'status', 'sha1', 'sha1_git', 'sha256') res = db.directory_entry_get_by_path(directory, paths, cur) if res: return dict(zip(keys, res)) def revision_add(self, revisions): """Add revisions to the storage Args: revisions: iterable of dictionaries representing the individual revisions to add. Each dict has the following keys: - id (sha1_git): id of the revision to add - date (datetime.DateTime): date the revision was written - date_offset (int): offset from UTC in minutes the revision was written - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - committer_date (datetime.DateTime): date the revision got added to the origin - committer_date_offset (int): offset from UTC in minutes the revision was added to the origin - committer_date_neg_utc_offset (boolean): whether a null committer_date_offset represents a negative UTC offset - type (one of 'git', 'tar'): type of the revision added - directory (sha1_git): the directory the revision points at - message (bytes): the message associated with the revision - author_name (bytes): the name of the revision author - author_email (bytes): the email of the revision author - committer_name (bytes): the name of the revision committer - committer_email (bytes): the email of the revision committer - metadata (jsonb): extra information as dictionary - synthetic (bool): revision's nature (tarball, directory creates synthetic revision) - parents (list of sha1_git): the parents of this revision """ db = self.db revisions_missing = set(self.revision_missing( set(revision['id'] for revision in revisions))) if not revisions_missing: return with db.transaction() as cur: db.mktemp_revision(cur) revisions_filtered = ( converters.revision_to_db(revision) for revision in revisions if revision['id'] in revisions_missing) parents_filtered = [] db.copy_to( revisions_filtered, 'tmp_revision', db.revision_add_cols, cur, lambda rev: parents_filtered.extend(rev['parents'])) db.revision_add_from_temp(cur) db.copy_to(parents_filtered, 'revision_history', ['id', 'parent_id', 'parent_rank'], cur) @db_transaction_generator def revision_missing(self, revisions, cur=None): """List revisions missing from storage Args: an iterable of revision ids Returns: a list of missing revision ids """ db = self.db db.store_tmp_bytea(revisions, cur) for obj in db.revision_missing_from_temp(cur): yield obj[0] @db_transaction_generator def revision_get(self, revisions, cur): """Get all revisions from storage Args: an iterable of revision ids Returns: an iterable of revisions as dictionaries (or None if the revision doesn't exist) """ db = self.db db.store_tmp_bytea(revisions, cur) for line in self.db.revision_get_from_temp(cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator def revision_log(self, revisions, limit=None, cur=None): """Fetch revision entry from the given root revisions. Args: - revisions: array of root revision to lookup - limit: limitation on the output result. Default to null. Yields: List of revision log from such revisions root. """ db = self.db for line in db.revision_log(revisions, limit, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator def revision_shortlog(self, revisions, limit=None, cur=None): """Fetch the shortlog for the given revisions Args: revisions: list of root revisions to lookup limit: depth limitation for the output Yields: a list of (id, parents) tuples. """ db = self.db yield from db.revision_shortlog(revisions, limit, cur) @db_transaction_generator def revision_log_by(self, origin_id, branch_name=None, timestamp=None, limit=None, cur=None): """Fetch revision entry from the actual origin_id's latest revision. Args: - origin_id: the origin id from which deriving the revision - branch_name: (optional) occurrence's branch name - timestamp: (optional) occurrence's time - limit: (optional) depth limitation for the output. Default to None. Yields: The revision log starting from the revision derived from the (origin, branch_name, timestamp) combination if any. Returns the [] if no revision matching this combination is found. """ db = self.db # Retrieve the revision by criterion revisions = list(db.revision_get_by( origin_id, branch_name, timestamp, limit=1)) if not revisions: return None revision_id = revisions[0][0] # otherwise, retrieve the revision log from that revision yield from self.revision_log([revision_id], limit) def release_add(self, releases): """Add releases to the storage Args: releases: iterable of dictionaries representing the individual releases to add. Each dict has the following keys: - id (sha1_git): id of the release to add - revision (sha1_git): id of the revision the release points to - date (datetime.DateTime): the date the release was made - date_offset (int): offset from UTC in minutes the release was made - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - name (bytes): the name of the release - comment (bytes): the comment associated with the release - author_name (bytes): the name of the release author - author_email (bytes): the email of the release author """ db = self.db release_ids = set(release['id'] for release in releases) releases_missing = set(self.release_missing(release_ids)) if not releases_missing: return with db.transaction() as cur: db.mktemp_release(cur) releases_filtered = ( converters.release_to_db(release) for release in releases if release['id'] in releases_missing ) db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, cur) db.release_add_from_temp(cur) @db_transaction_generator def release_missing(self, releases, cur=None): """List releases missing from storage Args: an iterable of release ids Returns: a list of missing release ids """ db = self.db # Create temporary table for metadata injection db.store_tmp_bytea(releases, cur) for obj in db.release_missing_from_temp(cur): yield obj[0] @db_transaction_generator def release_get(self, releases, cur=None): """Given a list of sha1, return the releases's information Args: releases: list of sha1s Returns: Generates the list of releases dict with the following keys: - id: origin's id - revision: origin's type - url: origin's url - lister: lister's uuid - project: project's uuid (FIXME, retrieve this information) Raises: ValueError if the keys does not match (url and type) nor id. """ db = self.db # Create temporary table for metadata injection db.store_tmp_bytea(releases, cur) for release in db.release_get_from_temp(cur): yield converters.db_to_release( dict(zip(db.release_get_cols, release)) ) @db_transaction def occurrence_add(self, occurrences, cur=None): """Add occurrences to the storage Args: occurrences: iterable of dictionaries representing the individual occurrences to add. Each dict has the following keys: - origin (int): id of the origin corresponding to the occurrence - branch (str): the reference name of the occurrence - target (sha1_git): the id of the object pointed to by the occurrence - target_type (str): the type of object pointed to by the occurrence - date (datetime.DateTime): the validity date for the given occurrence """ db = self.db processed = [] for occurrence in occurrences: if isinstance(occurrence['date'], str): occurrence['date'] = dateutil.parser.parse(occurrence['date']) processed.append(occurrence) db.mktemp_occurrence_history(cur) db.copy_to(processed, 'tmp_occurrence_history', ['origin', 'branch', 'target', 'target_type', 'date'], cur) db.occurrence_history_add_from_temp(cur) @db_transaction_generator def occurrence_get(self, origin_id, cur=None): """Retrieve occurrence information per origin_id. Args: origin_id: The occurrence's origin. Yields: List of occurrences matching criterion. """ db = self.db for line in db.occurrence_get(origin_id, cur): yield { 'origin': line[0], 'branch': line[1], 'target': line[2], 'target_type': line[3], } @db_transaction_generator def origin_visit_get(self, origin, cur=None): """Retrieve origin's visit dates. Args: origin: The occurrence's origin (identifier). Yields: List of visits. """ db = self.db for line in db.origin_visit_get(origin, cur): data = dict(zip(self.db.origin_visit_get_cols, line)) yield data @db_transaction_generator def revision_get_by(self, origin_id, branch_name=None, timestamp=None, limit=None, cur=None): """Given an origin_id, retrieve occurrences' list per given criterions. Args: origin_id: The origin to filter on. branch_name: (optional) branch name. timestamp: (optional) time. limit: (optional) limit Yields: List of occurrences matching the criterions or None if nothing is found. """ for line in self.db.revision_get_by(origin_id, branch_name, timestamp, limit=limit, cur=cur): data = converters.db_to_revision( dict(zip(self.db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data def release_get_by(self, origin_id, limit=None): """Given an origin id, return all the tag objects pointing to heads of origin_id. Args: origin_id: the origin to filter on. limit: None by default Yields: List of releases matching the criterions or None if nothing is found. """ for line in self.db.release_get_by(origin_id, limit=limit): data = converters.db_to_release( dict(zip(self.db.release_get_cols, line)) ) yield data @db_transaction def object_find_by_sha1_git(self, ids, cur=None): """Return the objects found with the given ids. Args: ids: a generator of sha1_gits Returns: a dict mapping the id to the list of objects found. Each object found is itself a dict with keys: sha1_git: the input id type: the type of object found id: the id of the object found object_id: the numeric id of the object found. """ db = self.db ret = {id: [] for id in ids} for retval in db.object_find_by_sha1_git(ids): if retval[1]: ret[retval[0]].append(dict(zip(db.object_find_by_sha1_git_cols, retval))) return ret @db_transaction def origin_get(self, origin, cur=None): """Return the origin either identified by its id or its tuple (type, url). Args: origin: dictionary representing the individual origin to find. This dict has either the keys type and url: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to either the id: - id: the origin id Returns: the origin dict with the keys: - id: origin's id - type: origin's type - url: origin's url - lister: lister's uuid - project: project's uuid (FIXME, retrieve this information) Raises: ValueError if the keys does not match (url and type) nor id. """ db = self.db keys = ['id', 'type', 'url', 'lister', 'project'] origin_id = origin.get('id') if origin_id: # check lookup per id first ori = db.origin_get(origin_id, cur) elif 'type' in origin and 'url' in origin: # or lookup per type, url ori = db.origin_get_with(origin['type'], origin['url'], cur) else: # unsupported lookup raise ValueError('Origin must have either id or (type and url).') if ori: return dict(zip(keys, ori)) return None @db_transaction def _person_add(self, person, cur=None): """Add a person in storage. BEWARE: Internal function for now. Do not do anything fancy in case a person already exists. Please adapt code if more checks are needed. Args: person dictionary with keys name and email. Returns: Id of the new person. """ db = self.db return db.person_add(person) @db_transaction_generator def person_get(self, person, cur=None): """Return the persons identified by their ids. Args: person: array of ids. Returns: The array of persons corresponding of the ids. """ db = self.db for person in db.person_get(person): yield dict(zip(db.person_get_cols, person)) @db_transaction def origin_add_one(self, origin, cur=None): """Add origin to the storage Args: origin: dictionary representing the individual origin to add. This dict has the following keys: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to Returns: the id of the added origin, or of the identical one that already exists. """ db = self.db data = db.origin_get_with(origin['type'], origin['url'], cur) if data: return data[0] return db.origin_add(origin['type'], origin['url'], cur) @db_transaction def fetch_history_start(self, origin_id, cur=None): """Add an entry for origin origin_id in fetch_history. Returns the id of the added fetch_history entry """ fetch_history = { 'origin': origin_id, 'date': datetime.datetime.now(tz=datetime.timezone.utc), } return self.db.create_fetch_history(fetch_history, cur) @db_transaction def fetch_history_end(self, fetch_history_id, data, cur=None): """Close the fetch_history entry with id `fetch_history_id`, replacing its data with `data`. """ now = datetime.datetime.now(tz=datetime.timezone.utc) fetch_history = self.db.get_fetch_history(fetch_history_id, cur) if not fetch_history: raise ValueError('No fetch_history with id %d' % fetch_history_id) fetch_history['duration'] = now - fetch_history['date'] fetch_history.update(data) self.db.update_fetch_history(fetch_history, cur) @db_transaction def fetch_history_get(self, fetch_history_id, cur=None): """Get the fetch_history entry with id `fetch_history_id`. """ return self.db.get_fetch_history(fetch_history_id, cur) @db_transaction def entity_add(self, entities, cur=None): """Add the given entitites to the database (in entity_history). Args: - entities: iterable of dictionaries containing the following keys: - uuid (uuid): id of the entity - parent (uuid): id of the parent entity - name (str): name of the entity - type (str): type of entity (one of 'organization', 'group_of_entities', 'hosting', 'group_of_persons', 'person', 'project') - description (str, optional): description of the entity - homepage (str): url of the entity's homepage - active (bool): whether the entity is active - generated (bool): whether the entity was generated - lister_metadata (dict): lister-specific entity metadata - metadata (dict): other metadata for the entity - validity (datetime.DateTime array): timestamps at which we listed the entity. """ db = self.db cols = list(db.entity_history_cols) cols.remove('id') db.mktemp_entity_history() db.copy_to(entities, 'tmp_entity_history', cols, cur) db.entity_history_add_from_temp() @db_transaction_generator def entity_get_from_lister_metadata(self, entities, cur=None): """Fetch entities from the database, matching with the lister and associated metadata. Args: entities: iterable of dictionaries containing the lister metadata to look for. Useful keys are 'lister', 'type', 'id', ... Returns: A generator of fetched entities with all their attributes. If no match was found, the returned entity is None. """ db = self.db db.mktemp_entity_lister(cur) mapped_entities = [] for i, entity in enumerate(entities): mapped_entity = { 'id': i, 'lister_metadata': entity, } mapped_entities.append(mapped_entity) db.copy_to(mapped_entities, 'tmp_entity_lister', ['id', 'lister_metadata'], cur) cur.execute('''select id, %s from swh_entity_from_tmp_entity_lister() order by id''' % ','.join(db.entity_cols)) for id, *entity_vals in cur: fetched_entity = dict(zip(db.entity_cols, entity_vals)) if fetched_entity['uuid']: yield fetched_entity else: yield { 'uuid': None, 'lister_metadata': entities[i], } @db_transaction_generator def entity_get(self, uuid, cur=None): """Returns the list of entity per its uuid identifier and also its parent hierarchy. Args: uuid: entity's identifier Returns: List of entities starting with entity with uuid and the parent hierarchy from such entity. """ db = self.db for entity in db.entity_get(uuid, cur): yield dict(zip(db.entity_cols, entity)) @db_transaction def entity_get_one(self, uuid, cur=None): """Returns one entity using its uuid identifier. Args: uuid: entity's identifier Returns: the object corresponding to the given entity """ db = self.db entity = db.entity_get_one(uuid, cur) if entity: return dict(zip(db.entity_cols, entity)) else: return None @db_transaction def stat_counters(self, cur=None): """compute statistics about the number of tuples in various tables Returns: a dictionary mapping textual labels (e.g., content) to integer values (e.g., the number of tuples in table content) """ return {k: v for (k, v) in self.db.stat_counters()} diff --git a/swh/storage/tests/manual_test_archiver.py b/swh/storage/tests/manual_test_archiver.py index 26d4f4cc..8ab77bb2 100644 --- a/swh/storage/tests/manual_test_archiver.py +++ b/swh/storage/tests/manual_test_archiver.py @@ -1,95 +1,96 @@ import string import random from swh.core import hashutil from swh.storage import Storage from swh.storage.db import cursor_to_bytes from swh.storage.archiver import ArchiverDirector def rs(size=6, chars=string.ascii_uppercase + string.ascii_lowercase): return ''.join(random.choice(chars) for _ in range(size)) def mc(data): data = bytes(data, 'utf8') content = hashutil.hashdata(data) content.update({'data': data}) return content def initialize_content_archive(db, sample_size, names=['Local']): """ Initialize the content_archive table with a sample. From the content table, get a sample of id, and fill the content_archive table with those id in order to create a test sample for the archiver. Args: db: The database of the storage. sample_size (int): The size of the sample to create. names: A list of archive names. Those archives must already exists. Archival status of the archives content will be erased on db. Returns: Tha amount of entry created. """ with db.transaction() as cur: cur.execute('DELETE FROM content_archive') with db.transaction() as cur: cur.execute('SELECT sha1 from content limit %d' % sample_size) ids = list(cursor_to_bytes(cur)) for id, in ids: tid = r'\x' + hashutil.hash_to_hex(id) with db.transaction() as cur: for name in names: s = """INSERT INTO content_archive VALUES('%s'::sha1, '%s', 'missing', now()) """ % (tid, name) cur.execute(s) print('Initialized database with', sample_size * len(names), 'items') return sample_size * len(names) def clean(): # Clean all with loc.db.transaction() as cur: cur.execute('delete from content_archive') cur.execute('delete from content') import os os.system("rm -r /tmp/swh/storage-dev/2/*") CONTENT_SIZE = 10 if __name__ == '__main__': random.seed(0) - # Local database - dbname = 'softwareheritage-dev' - user = 'qcampos' - cstring = 'dbname=%s user=%s' % (dbname, user) + + # Local databases + cstring_archiver = 'service=swh-archiver-dev' + cstring_storage = 'service=swh-dev' + # Archiver config config = { 'objstorage_path': '/tmp/swh/storage-dev/2', 'archival_max_age': 3600, 'batch_max_size': 10, 'retention_policy': 1, 'asynchronous': False } # Grand-palais's storage - loc = Storage(cstring, config['objstorage_path']) + loc = Storage(cstring_storage, config['objstorage_path']) # Add the content l = [mc(rs(100)) for _ in range(CONTENT_SIZE)] loc.content_add(l) initialize_content_archive(loc.db, CONTENT_SIZE, ['petit-palais']) # Launch the archiver - archiver = ArchiverDirector(cstring, config) + archiver = ArchiverDirector(cstring_archiver, cstring_storage, config) archiver.run() diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py index c26ef864..9c44515d 100644 --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,245 +1,277 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import unittest import os from nose.tools import istest from nose.plugins.attrib import attr from datetime import datetime, timedelta from swh.core import hashutil -from swh.core.tests.db_testing import DbTestFixture +from swh.core.tests.db_testing import DbsTestFixture from server_testing import ServerTestFixture from swh.storage import Storage from swh.storage.archiver import ArchiverDirector, ArchiverWorker from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.api.server import app TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') -class TestArchiver(DbTestFixture, ServerTestFixture, +class TestArchiver(DbsTestFixture, ServerTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ - TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') + TEST_DB_NAMES = [ + 'softwareheritage-test', + 'softwareheritage-archiver-test', + ] + TEST_DB_DUMPS = [ + os.path.join(TEST_DATA_DIR, 'dumps/swh.dump'), + os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), + ] + TEST_DB_DUMP_TYPES = [ + 'pg_dump', + 'pg_dump', + ] def setUp(self): # Launch the backup server self.backup_objroot = tempfile.mkdtemp(prefix='remote') - self.config = {'storage_base': self.backup_objroot, - 'storage_slicing': '0:2/2:4/4:6'} + self.config = { + 'storage_base': self.backup_objroot, + 'storage_slicing': '0:2/2:4/4:6' + } self.app = app super().setUp() - # Launch a client to check objects presence + # Retrieve connection (depends on the order in TEST_DB_NAMES) + self.conn_storage = self.conns[0] # db connection to storage + self.conn = self.conns[1] # archiver db's connection + self.cursor = self.cursors[1] + # a reader storage to check content has been archived self.remote_objstorage = RemoteObjStorage(self.url()) # Create the local storage. self.objroot = tempfile.mkdtemp(prefix='local') - self.storage = Storage(self.conn, self.objroot) + # a writer storage to store content before archiving + self.storage = Storage(self.conn_storage, self.objroot) # Initializes and fill the tables. self.initialize_tables() # Create the archiver self.archiver = self.__create_director() self.storage_data = ('Local', 'http://localhost:%s/' % self.port) def tearDown(self): self.empty_tables() super().tearDown() def initialize_tables(self): """ Initializes the database with a sample of items. """ # Add an archive - self.cursor.execute("""INSERT INTO archives(id, url) - VALUES('Local', 'http://localhost:{}/') - """.format(self.port)) + self.cursor.execute("""INSERT INTO archive(id, url) + VALUES('Local', '{}') + """.format(self.url())) self.conn.commit() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content_archive') - self.cursor.execute('DELETE FROM archives') + self.cursor.execute('DELETE FROM archive where id=\'Local\'') self.conn.commit() def __add_content(self, content_data, status='missing', date='now()'): # Add the content content = hashutil.hashdata(content_data) content.update({'data': content_data}) self.storage.content_add([content]) # Then update database content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) self.cursor.execute("""INSERT INTO content_archive VALUES('%s'::sha1, 'Local', '%s', %s) """ % (content_id, status, date)) return content['sha1'] def __get_missing(self): self.cursor.execute("""SELECT content_id FROM content_archive WHERE status='missing'""") return self.cursor.fetchall() def __create_director(self, batch_size=5000, archival_max_age=3600, retention_policy=1, asynchronous=False): config = { 'objstorage_path': self.objroot, 'batch_max_size': batch_size, 'archival_max_age': archival_max_age, 'retention_policy': retention_policy, 'asynchronous': asynchronous # Avoid depending on queue for tests. } - director = ArchiverDirector(self.conn, config) + director = ArchiverDirector(db_conn_archiver=self.conn, + db_conn_storage=self.conn_storage, + config=config) return director def __create_worker(self, batch={}, config={}): - mstorage_args = [self.archiver.master_storage.db.conn, - self.objroot] - slaves = [self.storage_data] + mstorage_args = [ + self.archiver.master_storage.db.conn, # master storage db + # connection + self.objroot # object storage path + ] if not config: config = self.archiver.config - return ArchiverWorker(batch, mstorage_args, slaves, config) + return ArchiverWorker(batch, + archiver_args=self.conn, + master_storage_args=mstorage_args, + slave_storages=[self.storage_data], + config=config) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ content_data = b'archive_missing_content' - id = self.__add_content(content_data) - # After the run, the content should be in the archive. + content_id = self.__add_content(content_data) + # before, the content should not be there + try: + self.remote_objstorage.content_get(content_id) + except: + pass self.archiver.run() - remote_data = self.remote_objstorage.content_get(id) + # now the content should be present on remote objstorage + remote_data = self.remote_objstorage.content_get(content_id) + # After the run, the content should be archived after the archiver run. self.assertEquals(content_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ id = self.__add_content(b'archive_present_content', status='present') # After the run, the content should NOT be in the archive.* self.archiver.run() with self.assertRaises(ObjNotFoundError): self.remote_objstorage.content_get(id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ id = self.__add_content(b'archive_alread_enough') director = self.__create_director(retention_policy=0) director.run() with self.assertRaises(ObjNotFoundError): self.remote_objstorage.content_get(id) # Unit test for ArchiverDirector def vstatus(self, status, mtime): return self.archiver.get_virtual_status(status, mtime) @istest def vstatus_present(self): self.assertEquals( self.vstatus('present', None), 'present' ) @istest def vstatus_missing(self): self.assertEquals( self.vstatus('missing', None), 'missing' ) @istest def vstatus_ongoing_remaining(self): current_time = datetime.now() self.assertEquals( self.vstatus('ongoing', current_time), 'present' ) @istest def vstatus_ongoing_elapsed(self): past_time = datetime.now() - timedelta( seconds=self.archiver.config['archival_max_age'] + 1 ) self.assertEquals( self.vstatus('ongoing', past_time), 'missing' ) # Unit tests for archive worker @istest def need_archival_missing(self): """ A content should still need archival when it is missing. """ id = self.__add_content(b'need_archival_missing', status='missing') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest def need_archival_present(self): """ A content should still need archival when it is missing """ id = self.__add_content(b'need_archival_missing', status='present') id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @istest def need_archival_ongoing_remaining(self): """ An ongoing archival with remaining time shouldnt need archival. """ id = self.__add_content(b'need_archival_ongoing_remaining', status='ongoing', date="'%s'" % datetime.now()) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), False) @istest def need_archival_ongoing_elasped(self): """ An ongoing archival with elapsed time should be scheduled again. """ id = self.__add_content( b'archive_ongoing_elapsed', status='ongoing', date="'%s'" % (datetime.now() - timedelta( seconds=self.archiver.config['archival_max_age'] + 1 )) ) id = r'\x' + hashutil.hash_to_hex(id) worker = self.__create_worker() self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest def content_sorting_by_archiver(self): """ Check that the content is correctly sorted. """ batch = { 'id1': { 'present': [('slave1', 'slave1_url')], 'missing': [] }, 'id2': { 'present': [], 'missing': [('slave1', 'slave1_url')] } } worker = self.__create_worker(batch=batch) mapping = worker.sort_content_by_archive() self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) self.assertIn('id2', mapping[('slave1', 'slave1_url')])