diff --git a/PKG-INFO b/PKG-INFO index 5ae2b97a1..0002faf7b 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.85 +Version: 0.0.86 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/sql/archiver/swh-archiver-data.sql b/sql/archiver/swh-archiver-data.sql index 816a40aa7..e4a70a25e 100644 --- a/sql/archiver/swh-archiver-data.sql +++ b/sql/archiver/swh-archiver-data.sql @@ -1,3 +1,3 @@ -INSERT INTO archive(id) VALUES('uffizi'); -INSERT INTO archive(id) VALUES('banco'); -INSERT INTO archive(id) VALUES('azure'); +INSERT INTO archive(name) VALUES('uffizi'); +INSERT INTO archive(name) VALUES('banco'); +INSERT INTO archive(name) VALUES('azure'); diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql index 625ed28bb..750bfae2c 100644 --- a/sql/archiver/swh-archiver-func.sql +++ b/sql/archiver/swh-archiver-func.sql @@ -1,127 +1,40 @@ -create or replace function swh_mktemp_content_archive() - returns void - language sql -as $$ - create temporary table tmp_content_archive ( - like content_archive including defaults - ) on commit drop; - alter table tmp_content_archive drop column copies; - alter table tmp_content_archive drop column num_present; -$$; - -COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive'; - -create or replace function swh_content_archive_missing(backend_name text) - returns setof sha1 - language plpgsql -as $$ -begin - return query - select content_id - from tmp_content_archive tmp where exists ( - select 1 - from content_archive c - where tmp.content_id = c.content_id - and (not c.copies ? backend_name - or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb)) - ); -end -$$; - -COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend'; - -create or replace function swh_content_archive_unknown() - returns setof sha1 - language plpgsql -as $$ -begin - return query - select content_id - from tmp_content_archive tmp where not exists ( - select 1 - from content_archive c - where tmp.content_id = c.content_id - ); -end -$$; - -COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1s'; - -CREATE OR REPLACE FUNCTION count_copies(from_id bytea, to_id bytea) returns void language sql as $$ - with sample as ( - select content_id, copies from content_archive - where content_id > from_id and content_id <= to_id - ), data as ( - select substring(content_id from 19) as bucket, jbe.key as archive - from sample - join lateral jsonb_each(copies) jbe on true - where jbe.value->>'status' = 'present' - ), bucketed as ( - select bucket, archive, count(*) as count - from data - group by bucket, archive - ) update content_archive_counts cac set - count = cac.count + bucketed.count - from bucketed - where cac.archive = bucketed.archive and cac.bucket = bucketed.bucket; -$$; - -comment on function count_copies(bytea, bytea) is 'Count the objects between from_id and to_id, add the results to content_archive_counts'; - -CREATE OR REPLACE FUNCTION init_content_archive_counts() returns void language sql as $$ - insert into content_archive_counts ( - select id, decode(lpad(to_hex(bucket), 4, '0'), 'hex')::bucket as bucket, 0 as count - from archive join lateral generate_series(0, 65535) bucket on true - ) on conflict (archive, bucket) do nothing; -$$; - -comment on function init_content_archive_counts() is 'Initialize the content archive counts for the registered archives'; - -create type content_archive_count as ( - archive text, - count bigint -); - -create or replace function get_content_archive_counts() returns setof content_archive_count language sql as $$ - select archive, sum(count)::bigint - from content_archive_counts - group by archive - order by archive; -$$; - -comment on function get_content_archive_counts() is 'Get count for each archive'; - --- create a temporary table called tmp_TBLNAME, mimicking existing table --- TBLNAME -create or replace function swh_mktemp(tblname regclass) +create or replace function swh_mktemp_content() returns void language plpgsql as $$ -begin - execute format(' - create temporary table tmp_%1$I - (like %1$I including defaults) - on commit drop; - ', tblname); + begin + create temporary table tmp_content ( + sha1 sha1 not null + ) on commit drop; return; -end + end $$; -comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; - --- Helper function to insert new entries in content_archive from a --- temporary table skipping duplicates. -create or replace function swh_content_archive_add() +create or replace function swh_content_copies_from_temp(archive_names text[]) returns void language plpgsql as $$ -begin - insert into content_archive (content_id, copies, num_present) - select distinct content_id, copies, num_present - from tmp_content_archive - on conflict(content_id) do nothing; - return; -end + begin + with existing_content_ids as ( + select id + from content + inner join tmp_content on content.sha1 = tmp.sha1 + ), created_content_ids as ( + insert into content (sha1) + select sha1 from tmp_content + on conflict do nothing + returning id + ), content_ids as ( + select * from existing_content_ids + union all + select * from created_content_ids + ), archive_ids as ( + select id from archive + where name = any(archive_names) + ) insert into content_copies (content_id, archive_id, mtime, status) + select content_ids.id, archive_ids.id, now(), 'present' + from content_ids cross join archive_ids + on conflict (content_id, archive_id) do update + set mtime = excluded.mtime, status = excluded.status; + end $$; - -comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql index 1514d3e63..e5849585f 100644 --- a/sql/archiver/swh-archiver-schema.sql +++ b/sql/archiver/swh-archiver-schema.sql @@ -1,121 +1,63 @@ -- In order to archive the content of the object storage, add -- some tables to keep trace of what have already been archived. create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Schema update tracking'; INSERT INTO dbversion(version, release, description) -VALUES(9, now(), 'Work In Progress'); +VALUES(10, now(), 'Work In Progress'); CREATE TABLE archive ( - id text PRIMARY KEY + id bigserial PRIMARY KEY, + name text not null ); -comment on table archive is 'Possible archives'; -comment on column archive.id is 'Short identifier for the archive'; +create unique index on archive(name); + +comment on table archive is 'The archives in which contents are stored'; +comment on column archive.id is 'Short identifier for archives'; +comment on column archive.name is 'Name of the archive'; CREATE TYPE archive_status AS ENUM ( 'missing', 'ongoing', 'present', 'corrupted' ); -comment on type archive_status is 'Status of a given archive'; +comment on type archive_status is 'Status of a given copy of a content'; -- a SHA1 checksum (not necessarily originating from Git) CREATE DOMAIN sha1 AS bytea CHECK (LENGTH(VALUE) = 20); -- a bucket for which we count items CREATE DOMAIN bucket AS bytea CHECK (LENGTH(VALUE) = 2); -CREATE TABLE content_archive ( - content_id sha1 primary key, - copies jsonb, - num_present int default null +create table content ( + id bigserial primary key, + sha1 sha1 not null ); -create index on content_archive(num_present); +comment on table content is 'All the contents being archived by Software Heritage'; +comment on column content.id is 'Short id for the content being archived'; +comment on column content.sha1 is 'SHA1 hash of the content being archived'; -comment on table content_archive is 'Referencing the status and whereabouts of a content'; -comment on column content_archive.content_id is 'content identifier'; -comment on column content_archive.copies is 'map archive_id -> { "status": archive_status, "mtime": epoch timestamp }'; -comment on column content_archive.num_present is 'Number of copies marked as present (cache updated via trigger)'; +create unique index on content(sha1); -CREATE TABLE content_archive_counts ( - archive text not null references archive(id), - bucket bucket not null, - count bigint, - primary key (archive, bucket) +create table content_copies ( + content_id bigint not null, -- references content(id) + archive_id bigint not null, -- references archive(id) + mtime timestamptz, + status archive_status not null, + primary key (content_id, archive_id) ); -comment on table content_archive_counts is 'Bucketed count of archive contents'; -comment on column content_archive_counts.archive is 'the archive for which we''re counting'; -comment on column content_archive_counts.bucket is 'the bucket of items we''re counting'; -comment on column content_archive_counts.count is 'the number of items counted in the given bucket'; - --- Keep the num_copies cache updated -CREATE FUNCTION update_num_present() RETURNS TRIGGER AS $$ - BEGIN - NEW.num_present := (select count(*) from jsonb_each(NEW.copies) where value->>'status' = 'present'); - RETURN new; - END; -$$ LANGUAGE PLPGSQL; - -CREATE TRIGGER update_num_present - BEFORE INSERT OR UPDATE OF copies ON content_archive - FOR EACH ROW - EXECUTE PROCEDURE update_num_present(); - --- keep the content_archive_counts updated -CREATE OR REPLACE FUNCTION update_content_archive_counts() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ - DECLARE - content_id sha1; - content_bucket bucket; - copies record; - old_row content_archive; - new_row content_archive; - BEGIN - -- default values for old or new row depending on trigger type - if tg_op = 'INSERT' then - old_row := (null::sha1, '{}'::jsonb, 0); - else - old_row := old; - end if; - if tg_op = 'DELETE' then - new_row := (null::sha1, '{}'::jsonb, 0); - else - new_row := new; - end if; - - -- get the content bucket - content_id := coalesce(old_row.content_id, new_row.content_id); - content_bucket := substring(content_id from 19)::bucket; - - -- compare copies present in old and new row for each archive type - FOR copies IN - select coalesce(o.key, n.key) as archive, o.value->>'status' as old_status, n.value->>'status' as new_status - from jsonb_each(old_row.copies) o full outer join lateral jsonb_each(new_row.copies) n on o.key = n.key - LOOP - -- the count didn't change - CONTINUE WHEN copies.old_status is not distinct from copies.new_status OR - (copies.old_status != 'present' AND copies.new_status != 'present'); - - update content_archive_counts cac - set count = count + (case when copies.old_status = 'present' then -1 else 1 end) - where archive = copies.archive and bucket = content_bucket; - END LOOP; - return null; - END; -$$; - -create trigger update_content_archive_counts - AFTER INSERT OR UPDATE OR DELETE ON content_archive - FOR EACH ROW - EXECUTE PROCEDURE update_content_archive_counts(); +comment on table content_copies is 'Tracking of all content copies in the archives'; +comment on column content_copies.mtime is 'Last update time of the copy'; +comment on column content_copies.status is 'Status of the copy'; diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 5ae2b97a1..0002faf7b 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.85 +Version: 0.0.86 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py index 4b726a7eb..d9e5e4399 100644 --- a/swh/storage/archiver/db.py +++ b/swh/storage/archiver/db.py @@ -1,251 +1,224 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime -import time - -from swh.model import hashutil from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure +def utcnow(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") yield from cursor_to_bytes(cur) def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ - query = """SELECT content_id, - array( - SELECT key - FROM jsonb_each(copies) - WHERE value->>'status' = 'present' - ORDER BY key - ) AS present, - array( - SELECT key - FROM jsonb_each(copies) - WHERE value->>'status' = 'ongoing' - ORDER BY key - ) AS ongoing, - array( - SELECT value->'mtime' - FROM jsonb_each(copies) - WHERE value->>'status' = 'ongoing' - ORDER BY key - ) AS ongoing_mtime - FROM content_archive - WHERE content_id = %s - ORDER BY content_id + query = """select archive.name, status, mtime + from content_copies + left join archive on content_copies.archive_id = archive.id + where content_copies.content_id = ( + select id from content where sha1 = %s) """ cur = self._cursor(cur) cur.execute(query, (content_id,)) - row = cur.fetchone() - if not row: + rows = cur.fetchall() + if not rows: return None - content_id, present, ongoing, mtimes = row - return (content_id, present, dict(zip(ongoing, mtimes))) + present = [] + ongoing = {} + for archive, status, mtime in rows: + if status == 'present': + present.append(archive) + elif status == 'ongoing': + ongoing[archive] = mtime + return (content_id, present, ongoing) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ - query = """SELECT content_id, - array( - SELECT key - FROM jsonb_each(copies) - WHERE value->>'status' = 'present' - ORDER BY key - ) AS present, - array( - SELECT key - FROM jsonb_each(copies) - WHERE value->>'status' = 'ongoing' - ORDER BY key - ) AS ongoing, - array( - SELECT value->'mtime' - FROM jsonb_each(copies) - WHERE value->>'status' = 'ongoing' - ORDER BY key - ) AS ongoing_mtime - FROM content_archive - WHERE content_id > %s - ORDER BY content_id - LIMIT %s - """ + vars = { + 'limit': limit, + } if last_content is None: - last_content = b'' + last_content_clause = 'true' + else: + last_content_clause = """content_id > ( + select id from content + where sha1 = %(last_content)s)""" + vars['last_content'] = last_content + + query = """select + (select sha1 from content where id = content_id), + array_agg((select name from archive + where id = archive_id)) + from content_copies + where status = 'present' and %s + group by content_id + order by content_id + limit %%(limit)s""" % last_content_clause cur = self._cursor(cur) - cur.execute(query, (last_content, limit)) - for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): - yield (content_id, present, dict(zip(ongoing, mtimes))) + cur.execute(query, vars) + for content_id, present in cursor_to_bytes(cur): + yield (content_id, present, {}) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ - query = """SELECT content_id, - array( - SELECT key - FROM jsonb_each(copies) - WHERE value->>'status' = 'present' - ORDER BY key - ) AS present, - array( - SELECT key - FROM jsonb_each(copies) - WHERE value->>'status' = 'ongoing' - ORDER BY key - ) AS ongoing, - array( - SELECT value->'mtime' - FROM jsonb_each(copies) - WHERE value->>'status' = 'ongoing' - ORDER BY key - ) AS ongoing_mtime - FROM content_archive - WHERE content_id > %s AND num_present < %s - ORDER BY content_id - LIMIT %s - """ + vars = { + 'limit': limit, + 'retention_policy': retention_policy, + } if last_content is None: - last_content = b'' + last_content_clause = 'true' + else: + last_content_clause = """content_id > ( + select id from content + where sha1 = %(last_content)s)""" + vars['last_content'] = last_content + + query = """select + (select sha1 from content where id = content_id), + array_agg((select name from archive + where id = archive_id)) + from content_copies + where status = 'present' and %s + group by content_id + having count(archive_id) < %%(retention_policy)s + order by content_id + limit %%(limit)s""" % last_content_clause cur = self._cursor(cur) - cur.execute(query, (last_content, retention_policy, limit)) - for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): - yield (content_id, present, dict(zip(ongoing, mtimes))) + cur.execute(query, vars) + for content_id, present in cursor_to_bytes(cur): + yield (content_id, present, {}) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. """ pass @stored_procedure('swh_content_archive_add') def content_archive_add_from_temp(self, cur=None): """Add new content archive entries from temporary table. Use from archiver.storage module: self.db.mktemp_content_archive() # copy data over to the temp table self.db.copy_to([{'colname': id0}, {'colname': id1}], 'tmp_cache_content', ['colname'], cur) # insert into the main table self.db.add_content_archive_from_temp(cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) yield from cursor_to_bytes(cur) def content_archive_get_unknown(self, cur=None): """Retrieve unknown sha1 from archiver db. """ cur = self._cursor(cur) cur.execute('select * from swh_content_archive_unknown()') yield from cursor_to_bytes(cur) def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ - if isinstance(content_id, bytes): - content_id = '\\x%s' % hashutil.hash_to_hex(content_id) - - if new_status is not None: - query = """UPDATE content_archive - SET copies=jsonb_set( - copies, '{%s}', - '{"status":"%s", "mtime":%d}' - ) - WHERE content_id='%s' - """ % (archive_id, - new_status, int(time.time()), - content_id) - else: - query = """ UPDATE content_archive - SET copies=jsonb_set(copies, '{%s,mtime}', '%d') - WHERE content_id='%s' - """ % (archive_id, int(time.time())) + assert isinstance(content_id, bytes) + assert new_status is not None + + query = """insert into content_copies (archive_id, content_id, status, mtime) + values ((select id from archive where name=%s), + (select id from content where sha1=%s), + %s, %s) + on conflict (archive_id, content_id) do + update set status = excluded.status, mtime = excluded.mtime + """ cur = self._cursor(cur) - cur.execute(query) + cur.execute(query, (archive_id, content_id, new_status, utcnow())) diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py index 9b0d2e1b2..f707d24c5 100644 --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,306 +1,307 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import sys from swh.core import config, utils from swh.model import hashutil from swh.objstorage import get_objstorage from swh.scheduler.utils import get_task from . import tasks # noqa from .storage import get_archiver_storage class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract Director class An archiver director is in charge of dispatching batch of contents to archiver workers (for them to archive). Inherit from this class and provide: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def get_contents_to_archive(self): Implementation method to read contents to archive """ DEFAULT_CONFIG = { 'batch_max_size': ('int', 1500), 'asynchronous': ('bool', True), 'archiver_storage': ('dict', { 'cls': 'db', 'args': { 'dbconn': 'dbname=softwareheritage-archiver-dev user=guest', }, }), } # Destined to be overridden by subclass ADDITIONAL_CONFIG = {} # We use the same configuration file as the worker CONFIG_BASE_FILENAME = 'archiver/worker' # The worker's task queue name to use TASK_NAME = None def __init__(self): """ Constructor of the archiver director. Args: db_conn_archiver: Either a libpq connection string, or a psycopg2 connection for the archiver db. config: optionnal additional configuration. Keys in the dict will override the one parsed from the configuration file. """ super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.archiver_storage = get_archiver_storage( **self.config['archiver_storage']) self.task = get_task(self.TASK_NAME) def run(self): """ Run the archiver director. The archiver director will check all the contents of the archiver database and do the required backup jobs. """ if self.config['asynchronous']: run_fn = self.run_async_worker else: run_fn = self.run_sync_worker for batch in self.read_batch_contents(): run_fn(batch) def run_async_worker(self, batch): """Produce a worker that will be added to the task queue. """ self.task.delay(batch=batch) def run_sync_worker(self, batch): """Run synchronously a worker on the given batch. """ self.task(batch=batch) def read_batch_contents(self): """ Create batch of contents that needs to be archived Yields: batch of sha1 that corresponds to contents that needs more archive copies. """ contents = [] for content in self.get_contents_to_archive(): contents.append(content) if len(contents) > self.config['batch_max_size']: yield contents contents = [] if len(contents) > 0: yield contents @abc.abstractmethod def get_contents_to_archive(self): """Retrieve generator of sha1 to archive Yields: sha1 to archive """ pass class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): """Process the files in order to know which one is needed as backup. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), } TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask' def get_contents_to_archive(self): """Create batch of contents that needs to be archived Yields: Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies is a dict mapping copy to mtime. """ last_content = None while True: archiver_contents = list( self.archiver_storage.content_archive_get_unarchived_copies( last_content=last_content, - retention_policy=self.config['retention_policy'])) + retention_policy=self.config['retention_policy'], + limit=self.config['batch_max_size'])) if not archiver_contents: return for content_id, _, _ in archiver_contents: last_content = content_id yield content_id def read_sha1_from_stdin(): """Read sha1 from stdin. """ for line in sys.stdin: sha1 = line.strip() try: yield hashutil.hash_to_bytes(sha1) except Exception: print("%s is not a valid sha1 hash, continuing" % repr(sha1), file=sys.stderr) continue class ArchiverStdinToBackendDirector(ArchiverDirectorBase): """A cloud archiver director in charge of reading contents and send them in batch in the cloud. The archiver director, in order: - Reads sha1 to send to a specific backend. - Checks if those sha1 are known in the archiver. If they are not, add them - if the sha1 are missing, they are sent for the worker to archive If the flag force_copy is set, this will force the copy to be sent for archive even though it has already been done. """ ADDITIONAL_CONFIG = { 'destination': ('str', 'azure'), 'force_copy': ('bool', False), 'source': ('str', 'uffizi'), 'storages': ('list[dict]', [ {'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}} ]) } CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask' def __init__(self): super().__init__() self.destination = self.config['destination'] self.force_copy = self.config['force_copy'] self.objstorages = { storage['host']: get_objstorage(storage['cls'], storage['args']) for storage in self.config.get('storages', []) } # Fallback objstorage self.source = self.config['source'] def _add_unknown_content_ids(self, content_ids): """Check whether some content_id are unknown. If they are, add them to the archiver db. Args: content_ids: List of dict with one key content_id """ source_objstorage = self.objstorages[self.source] self.archiver_storage.content_archive_add( (h for h in content_ids if h in source_objstorage), sources_present=[self.source]) def get_contents_to_archive(self): gen_content_ids = ( ids for ids in utils.grouper(read_sha1_from_stdin(), self.config['batch_max_size'])) if self.force_copy: for content_ids in gen_content_ids: content_ids = list(content_ids) if not content_ids: continue # Add missing entries in archiver table self._add_unknown_content_ids(content_ids) print('Send %s contents to archive' % len(content_ids)) for content_id in content_ids: # force its status to missing self.archiver_storage.content_archive_update( content_id, self.destination, 'missing') yield content_id else: for content_ids in gen_content_ids: content_ids = list(content_ids) # Add missing entries in archiver table self._add_unknown_content_ids(content_ids) # Filter already copied data content_ids = list( self.archiver_storage.content_archive_get_missing( content_ids=content_ids, backend_name=self.destination)) if not content_ids: continue print('Send %s contents to archive' % len(content_ids)) for content in content_ids: yield content def run_async_worker(self, batch): """Produce a worker that will be added to the task queue. """ self.task.delay(destination=self.destination, batch=batch) def run_sync_worker(self, batch): """Run synchronously a worker on the given batch. """ self.task(destination=self.destination, batch=batch) @click.command() @click.option('--direct', is_flag=True, help="""The archiver sends content for backup to one storage.""") def launch(direct): if direct: archiver = ArchiverStdinToBackendDirector() else: archiver = ArchiverWithRetentionPolicyDirector() archiver.run() if __name__ == '__main__': launch() diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py index da1a03fa9..d48d83d33 100644 --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,483 +1,486 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import tempfile import shutil import unittest import os import time -import json from nose.tools import istest from nose.plugins.attrib import attr -from swh.model import hashutil from swh.core.tests.db_testing import DbsTestFixture from swh.storage.archiver.storage import get_archiver_storage from swh.storage.archiver import ArchiverWithRetentionPolicyDirector from swh.storage.archiver import ArchiverWithRetentionPolicyWorker from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError try: # objstorage > 0.17 from swh.objstorage.api.server import make_app as app from server_testing import ServerTestFixtureAsync as ServerTestFixture MIGRATED = True except ImportError: # objstorage <= 0.17 from swh.objstorage.api.server import app from server_testing import ServerTestFixture MIGRATED = False TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class TestArchiver(DbsTestFixture, ServerTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAMES = [ 'softwareheritage-archiver-test', ] TEST_DB_DUMPS = [ os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), ] TEST_DB_DUMP_TYPES = [ 'pg_dump', ] def setUp(self): # Launch the backup server self.dest_root = tempfile.mkdtemp(prefix='remote') self.config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6', } } if MIGRATED: self.app = app(self.config) else: self.app = app super().setUp() # Retrieve connection (depends on the order in TEST_DB_NAMES) self.conn = self.conns[0] # archiver db's connection self.cursor = self.cursors[0] # Create source storage self.src_root = tempfile.mkdtemp() src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'remote', 'args': { 'url': self.url() } } self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = { 'uffizi': self.src_storage, 'banco': self.dest_storage } # Override configurations src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self._override_director_config() self._override_worker_config() # Create the base archiver self.archiver = self._create_director() def tearDown(self): self.empty_tables() shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) super().tearDown() def empty_tables(self): # Remove all content - self.cursor.execute('DELETE FROM content_archive') + self.cursor.execute('DELETE FROM content') + self.cursor.execute('DELETE FROM content_copies') self.conn.commit() def _override_director_config(self, retention_policy=2): """ Override the default config of the Archiver director to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'batch_max_size': 5000, 'archival_max_age': 3600, 'retention_policy': retention_policy, 'asynchronous': False, } def _override_worker_config(self): """ Override the default config of the Archiver worker to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa 'retention_policy': 2, 'archival_max_age': 3600, 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'storages': self.archiver_storages, 'source': 'uffizi', } def _create_director(self): return ArchiverWithRetentionPolicyDirector() def _create_worker(self, batch={}): return ArchiverWithRetentionPolicyWorker(batch) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. Args: storage_name: the concerned storage content_data: the data to insert with_row_insert: to insert a row entry in the db or not """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) - db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) - self.cursor.execute(""" INSERT INTO content_archive - VALUES('%s', '{}') - """ % (db_obj_id)) + self.cursor.execute(""" INSERT INTO content (sha1) + VALUES (%s) + """, (obj_id,)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ - db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) + self.cursor.execute("""insert into archive (name) + values (%s) + on conflict do nothing""", (storage_name,)) + self.archiver.archiver_storage.content_archive_update( - db_obj_id, storage_name, status + obj_id, storage_name, status ) - def _add_dated_content(self, obj_id, copies={}): - """ Fully erase the previous copies field for the given content id - - This does not alter the contents into the objstorages. - """ - db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) - self.cursor.execute(""" UPDATE TABLE content_archive - SET copies='%s' - WHERE content_id='%s' - """ % (json.dumps(copies), db_obj_id)) - # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self._override_director_config(retention_policy=1) director = self._create_director() # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) + @istest + def content_archive_get_copies(self): + self.assertCountEqual( + self.archiver.archiver_storage.content_archive_get_copies(), + [], + ) + obj_id = self._add_content('uffizi', b'archive_alread_enough') + self._update_status(obj_id, 'uffizi', 'present') + self.assertCountEqual( + self.archiver.archiver_storage.content_archive_get_copies(), + [(obj_id, ['uffizi'], {})], + ) + # Unit tests for archive worker def archival_elapsed(self, mtime): return self._create_worker()._is_archival_delay_elapsed(mtime) @istest def vstatus_ongoing_remaining(self): self.assertFalse(self.archival_elapsed(time.time())) @istest def vstatus_ongoing_elapsed(self): past_time = ( time.time() - self._create_worker().archival_max_age ) self.assertTrue(self.archival_elapsed(past_time)) def _status(self, status, mtime=None): """ Get a dict that match the copies structure """ return {'status': status, 'mtime': mtime or time.time()} @istest def need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), True) @istest def need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker.compute_copies( set(worker.objstorages), obj_id)[status]) @istest def compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') @istest def compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') @istest def compute_copies_extra_archive(self): obj_id = self._add_content('banco', b'foobar') self._update_status(obj_id, 'banco', 'present') self._update_status(obj_id, 'random_archive', 'present') worker = self._create_worker() copies = worker.compute_copies(set(worker.objstorages), obj_id) self.assertEqual(copies['present'], {'banco'}) self.assertEqual(copies['missing'], {'uffizi'}) def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker.choose_backup_servers(present, missing)) @istest def choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) class TestArchiverStorageStub(unittest.TestCase): def setUp(self): self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6' } } self.dest_storage = get_objstorage(**dest_config) self.config = { 'cls': 'stub', 'args': { 'archives': { 'present_archive': 'http://uffizi:5003', 'missing_archive': 'http://banco:5003', }, 'present': ['present_archive'], 'missing': ['missing_archive'], 'logfile_base': os.path.join(self.log_root, 'log_'), } } # Generated with: # # id_length = 20 # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') # self.content_ids = [ b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', ] self.archiver_storage = get_archiver_storage(**self.config) super().setUp() def tearDown(self): shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) shutil.rmtree(self.log_root) super().tearDown() @istest def archive_ls(self): self.assertCountEqual( self.archiver_storage.archive_ls(), self.config['args']['archives'].items() ) @istest def content_archive_get(self): for content_id in self.content_ids: self.assertEqual( self.archiver_storage.content_archive_get(content_id), (content_id, set(self.config['args']['present']), {}), ) @istest def content_archive_get_copies(self): self.assertCountEqual( self.archiver_storage.content_archive_get_copies(), [], ) @istest def content_archive_get_unarchived_copies(self): retention_policy = 2 self.assertCountEqual( self.archiver_storage.content_archive_get_unarchived_copies( retention_policy), [], ) @istest def content_archive_get_missing(self): self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'missing_archive' ), self.content_ids, ) self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'present_archive' ), [], ) with self.assertRaises(ValueError): list(self.archiver_storage.content_archive_get_missing( self.content_ids, 'unknown_archive' )) @istest def content_archive_get_unknown(self): self.assertCountEqual( self.archiver_storage.content_archive_get_unknown( self.content_ids, ), [], ) @istest def content_archive_update(self): for content_id in self.content_ids: self.archiver_storage.content_archive_update( content_id, 'present_archive', 'present') self.archiver_storage.content_archive_update( content_id, 'missing_archive', 'present') self.archiver_storage.close_logfile() # Make sure we created a logfile files = glob.glob('%s*' % self.config['args']['logfile_base']) self.assertEqual(len(files), 1) # make sure the logfile contains all our lines lines = open(files[0]).readlines() self.assertEqual(len(lines), 2 * len(self.content_ids)) diff --git a/version.txt b/version.txt index b5484ae0e..1e526dbce 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.85-0-g0965a85 \ No newline at end of file +v0.0.86-0-gcb855e0 \ No newline at end of file