diff --git a/PKG-INFO b/PKG-INFO index 444ecb7f3..fdfc22cea 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.62 +Version: 0.0.63 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql index 551026278..511305637 100644 --- a/sql/archiver/swh-archiver-func.sql +++ b/sql/archiver/swh-archiver-func.sql @@ -1,31 +1,48 @@ create or replace function swh_mktemp_content_archive() returns void language sql as $$ create temporary table tmp_content_archive ( like content_archive including defaults ) on commit drop; alter table tmp_content_archive drop column copies; alter table tmp_content_archive drop column num_present; $$; COMMENT ON FUNCTION swh_mktemp_content_archive() IS 'Create temporary table content_archive'; create or replace function swh_content_archive_missing(backend_name text) returns setof sha1 language plpgsql as $$ begin return query select content_id from tmp_content_archive tmp where exists ( select 1 from content_archive c where tmp.content_id = c.content_id and (not c.copies ? backend_name or c.copies @> jsonb_build_object(backend_name, '{"status": "missing"}'::jsonb)) ); end $$; COMMENT ON FUNCTION swh_content_archive_missing(text) IS 'Filter missing data from a specific backend'; + +create or replace function swh_content_archive_unknown() + returns setof sha1 + language plpgsql +as $$ +begin + return query + select content_id + from tmp_content_archive tmp where not exists ( + select 1 + from content_archive c + where tmp.content_id = c.content_id + ); +end +$$; + +COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1s'; diff --git a/sql/archiver/upgrades/005.sql b/sql/archiver/upgrades/005.sql new file mode 100644 index 000000000..bc50631c1 --- /dev/null +++ b/sql/archiver/upgrades/005.sql @@ -0,0 +1,24 @@ +-- SWH DB schema upgrade +-- from_version: 4 +-- to_version: 5 +-- description: List unknown sha1s from content_archive + +INSERT INTO dbversion(version, release, description) +VALUES(5, now(), 'Work In Progress'); + +create or replace function swh_content_archive_unknown() + returns setof sha1 + language plpgsql +as $$ +begin + return query + select content_id + from tmp_content_archive tmp where not exists ( + select 1 + from content_archive c + where tmp.content_id = c.content_id + ); +end +$$; + +COMMENT ON FUNCTION swh_content_archive_unknown() IS 'Retrieve list of unknown sha1'; diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 444ecb7f3..fdfc22cea 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.62 +Version: 0.0.63 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index 4d69a1e8a..76ba17fa6 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,166 +1,167 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile Makefile.local README.db_testing README.dev requirements.txt setup.py version.txt bin/swh-storage-add-dir debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/archiver-blueprint.md docs/vault-blueprint.md sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/swh-data.sql sql/swh-func.sql sql/swh-init.sql sql/swh-schema.sql sql/archiver/Makefile sql/archiver/swh-archiver-data.sql sql/archiver/swh-archiver-func.sql sql/archiver/swh-archiver-schema.sql sql/archiver/upgrades/002.sql sql/archiver/upgrades/003.sql sql/archiver/upgrades/004.sql +sql/archiver/upgrades/005.sql sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/origin_visit.metadata.json sql/doc/json/revision.metadata.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/origin_visit.metadata.json sql/json/revision.metadata.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql sql/upgrades/069.sql sql/upgrades/070.sql sql/upgrades/071.sql sql/upgrades/072.sql sql/upgrades/073.sql sql/upgrades/074.sql sql/upgrades/075.sql sql/upgrades/076.sql sql/upgrades/077.sql sql/upgrades/078.sql sql/upgrades/079.sql sql/upgrades/080.sql sql/upgrades/081.sql sql/upgrades/082.sql sql/upgrades/083.sql sql/upgrades/084.sql swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py swh/storage/common.py swh/storage/converters.py swh/storage/db.py swh/storage/exc.py swh/storage/listener.py swh/storage/storage.py swh/storage/api/__init__.py swh/storage/api/client.py swh/storage/api/server.py swh/storage/archiver/__init__.py swh/storage/archiver/copier.py swh/storage/archiver/db.py swh/storage/archiver/director.py swh/storage/archiver/storage.py swh/storage/archiver/tasks.py swh/storage/archiver/worker.py swh/storage/provenance/tasks.py swh/storage/tests/server_testing.py swh/storage/tests/test_api_client.py swh/storage/tests/test_archiver.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py swh/storage/tests/test_storage.py swh/storage/vault/cache.py swh/storage/vault/conf.yaml swh/storage/vault/cooker.py swh/storage/vault/api/client.py swh/storage/vault/api/cooking_tasks.py swh/storage/vault/api/server.py utils/dump_revisions.py utils/fix_revisions_from_dump.py \ No newline at end of file diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py index a4611d96b..a18aabee4 100644 --- a/swh/storage/archiver/db.py +++ b/swh/storage/archiver/db.py @@ -1,249 +1,260 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time from swh.core import hashutil from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") yield from cursor_to_bytes(cur) def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id = %s ORDER BY content_id """ cur = self._cursor(cur) cur.execute(query, (content_id,)) row = cur.fetchone() if not row: return None content_id, present, ongoing, mtimes = row return (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """SELECT content_id, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'present' ORDER BY key ) AS present, array( SELECT key FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing, array( SELECT value->'mtime' FROM jsonb_each(copies) WHERE value->>'status' = 'ongoing' ORDER BY key ) AS ongoing_mtime FROM content_archive WHERE content_id > %s AND num_present < %s ORDER BY content_id LIMIT %s """ if last_content is None: last_content = b'' cur = self._cursor(cur) cur.execute(query, (last_content, retention_policy, limit)) for content_id, present, ongoing, mtimes in cursor_to_bytes(cur): yield (content_id, present, dict(zip(ongoing, mtimes))) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. Use from archiver.storage module: self.db.mktemp_content_archive() # copy data over to the temp table self.db.copy_to([{'colname': id0}, {'colname': id1}], 'tmp_cache_content', ['colname'], cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) yield from cursor_to_bytes(cur) + def content_archive_get_unknown(self, cur=None): + """Retrieve unknown sha1 from archiver db. + + """ + cur = self._cursor(cur) + cur.execute('select * from swh_content_archive_unknown()') + yield from cursor_to_bytes(cur) + def content_archive_insert(self, content_id, source, status, cur=None): """Insert a new entry in the db for the content_id. Args: content_id: content concerned source: name of the source status: the status of the content for that source """ if isinstance(content_id, bytes): content_id = '\\x%s' % hashutil.hash_to_hex(content_id) query = """INSERT INTO content_archive(content_id, copies, num_present) VALUES('%s', '{"%s": {"status": "%s", "mtime": %d}}', 1) """ % (content_id, source, status, int(time.time())) cur = self._cursor(cur) cur.execute(query) def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ + if isinstance(content_id, bytes): + content_id = '\\x%s' % hashutil.hash_to_hex(content_id) + if new_status is not None: query = """UPDATE content_archive SET copies=jsonb_set( copies, '{%s}', '{"status":"%s", "mtime":%d}' ) WHERE content_id='%s' """ % (archive_id, new_status, int(time.time()), content_id) else: query = """ UPDATE content_archive SET copies=jsonb_set(copies, '{%s,mtime}', '%d') WHERE content_id='%s' """ % (archive_id, int(time.time())) cur = self._cursor(cur) cur.execute(query) diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py index b6dfebe37..3661f9447 100644 --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,229 +1,301 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import click import sys from swh.core import config, utils, hashutil +from swh.objstorage import get_objstorage from swh.scheduler.celery_backend.config import app from . import tasks # noqa from .storage import ArchiverStorage class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta): """Abstract Director class An archiver director is in charge of dispatching batch of contents to archiver workers (for them to archive). Inherit from this class and provide: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def get_contents_to_archive(self): Implementation method to read contents to archive """ DEFAULT_CONFIG = { 'batch_max_size': ('int', 1500), 'asynchronous': ('bool', True), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') } # Destined to be overridden by subclass ADDITIONAL_CONFIG = {} # We use the same configuration file as the worker CONFIG_BASE_FILENAME = 'archiver/worker' # The worker's task queue name to use TASK_NAME = None def __init__(self): """ Constructor of the archiver director. Args: db_conn_archiver: Either a libpq connection string, or a psycopg2 connection for the archiver db. config: optionnal additional configuration. Keys in the dict will override the one parsed from the configuration file. """ super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.archiver_storage = ArchiverStorage(self.config['dbconn']) def run(self): """ Run the archiver director. The archiver director will check all the contents of the archiver database and do the required backup jobs. """ if self.config['asynchronous']: run_fn = self.run_async_worker else: run_fn = self.run_sync_worker for batch in self.read_batch_contents(): run_fn(batch) def run_async_worker(self, batch): - """ Produce a worker that will be added to the task queue. + """Produce a worker that will be added to the task queue. + """ task = app.tasks[self.TASK_NAME] task.delay(batch=batch) def run_sync_worker(self, batch): - """ Run synchronously a worker on the given batch. + """Run synchronously a worker on the given batch. + """ task = app.tasks[self.TASK_NAME] task(batch=batch) def read_batch_contents(self): """ Create batch of contents that needs to be archived Yields: batch of sha1 that corresponds to contents that needs more archive copies. """ contents = [] for content in self.get_contents_to_archive(): contents.append(content) if len(contents) > self.config['batch_max_size']: yield contents contents = [] if len(contents) > 0: yield contents @abc.abstractmethod def get_contents_to_archive(self): """Retrieve generator of sha1 to archive Yields: sha1 to archive """ pass class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): """Process the files in order to know which one is needed as backup. The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), } TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverWithRetentionPolicyTask' def get_contents_to_archive(self): """Create batch of contents that needs to be archived Yields: Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies is a dict mapping copy to mtime. """ last_content = None while True: archiver_contents = list( self.archiver_storage.content_archive_get_unarchived_copies( last_content=last_content, retention_policy=self.config['retention_policy'])) if not archiver_contents: return for content_id, _, _ in archiver_contents: last_content = content_id yield content_id def read_sha1_from_stdin(): """Read sha1 from stdin. """ for sha1 in sys.stdin: yield {'content_id': hashutil.hex_to_hash(sha1.rstrip())} class ArchiverStdinToBackendDirector(ArchiverDirectorBase): """A cloud archiver director in charge of reading contents and send them in batch in the cloud. - The archiver director processes the files in the local storage in - order to know which one needs archival and it delegates this task - to archiver workers. + The archiver director, in order: + - Reads sha1 to send to a specific backend. + - Checks if those sha1 are known in the archiver. If they are not, + add them + - if the sha1 are missing, they are sent for the worker to archive + + If the flag force_copy is set, this will force the copy to be sent + for archive even though it has already been done. """ ADDITIONAL_CONFIG = { 'destination': ('str', 'azure'), 'force_copy': ('bool', False), + 'source': ('str', 'uffizi'), + 'storages': ('list[dict]', + [ + {'host': 'uffizi', + 'cls': 'pathslicing', + 'args': {'root': '/tmp/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6'}}, + {'host': 'banco', + 'cls': 'remote', + 'args': {'base_url': 'http://banco:5003/'}} + ]) } CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' TASK_NAME = 'swh.storage.archiver.tasks.SWHArchiverToBackendTask' def __init__(self): super().__init__() self.destination = self.config['destination'] self.force_copy = self.config['force_copy'] + self.objstorages = { + storage['host']: get_objstorage(storage['cls'], storage['args']) + for storage in self.config.get('storages', []) + } + # Fallback objstorage + self.source = self.config['source'] + + def _add_unknown_content_ids(self, content_ids, source_objstorage): + """Check whether some content_id are unknown. + If they are, add them to the archiver db. + + Args: + content_ids: List of dict with one key content_id + + source_objstorage (ObjStorage): objstorage to check if + content_id is there + + """ + unknowns = self.archiver_storage.content_archive_get_unknown( + content_ids) + for unknown_id in unknowns: + print('unknown', unknown_id) + if unknown_id not in source_objstorage: + continue + self.archiver_storage.content_archive_insert( + unknown_id, self.source, 'present') def get_contents_to_archive(self): gen_content_ids = ( ids for ids in utils.grouper(read_sha1_from_stdin(), - self.config['batch_max_size']) - ) + self.config['batch_max_size'])) + source_objstorage = self.objstorages[self.source] if self.force_copy: for content_ids in gen_content_ids: content_ids = list(content_ids) if not content_ids: continue + # Add missing entries in archiver table + self._add_unknown_content_ids(content_ids, source_objstorage) + print('Send %s contents to archive' % len(content_ids)) for content in content_ids: - yield content['content_id'] + content_id = content['content_id'] + # force its status to missing + self.archiver_storage.content_archive_update( + content_id, self.destination, 'missing') + yield content_id else: for content_ids in gen_content_ids: + content_ids = list(content_ids) + + # Add missing entries in archiver table + self._add_unknown_content_ids(content_ids, source_objstorage) + + # Filter already copied data content_ids = list( self.archiver_storage.content_archive_get_missing( content_ids=content_ids, backend_name=self.destination)) if not content_ids: continue print('Send %s contents to archive' % len(content_ids)) for content in content_ids: yield content + def run_async_worker(self, batch): + """Produce a worker that will be added to the task queue. + + """ + task = app.tasks[self.TASK_NAME] + task.delay(destination=self.destination, batch=batch) + + def run_sync_worker(self, batch): + """Run synchronously a worker on the given batch. + + """ + task = app.tasks[self.TASK_NAME] + task(destination=self.destination, batch=batch) + @click.command() @click.option('--direct', is_flag=True, help="""The archiver sends content for backup to one storage.""") def launch(direct): if direct: archiver = ArchiverStdinToBackendDirector() else: archiver = ArchiverWithRetentionPolicyDirector() archiver.run() if __name__ == '__main__': launch() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py index 1336c17af..b207a7047 100644 --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -1,148 +1,168 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 from .db import ArchiverDb from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError class ArchiverStorage(): """SWH Archiver storage proxy, encompassing DB """ def __init__(self, db_conn): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ try: if isinstance(db_conn, psycopg2.extensions.connection): self.db = ArchiverDb(db_conn) else: self.db = ArchiverDb.connect(db_conn) except psycopg2.OperationalError as e: raise StorageDBError(e) @db_transaction_generator def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from self.db.archive_ls(cur) @db_transaction def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return self.db.content_archive_get(content_id, cur) @db_transaction_generator def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_copies(last_content, limit, cur) @db_transaction_generator def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from self.db.content_archive_get_unarchived_copies( retention_policy, last_content, limit, cur) @db_transaction_generator def content_archive_get_missing(self, content_ids, backend_name, cur=None): - """Retrieve the list of missing copies from source_name. + """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: - List of ids effectively missing from backend_name + missing sha1s from backend_name """ db = self.db db.mktemp_content_archive() db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_missing(backend_name, cur): yield content_id[0] + @db_transaction_generator + def content_archive_get_unknown(self, content_ids, cur=None): + """Retrieve unknown sha1s from content_archive. + + Args: + content_ids ([sha1s]): list of sha1s to test + + Yields: + Unknown sha1s from content_archive + + """ + db = self.db + + db.mktemp_content_archive() + + db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) + + for content_id in db.content_archive_get_unknown(cur): + yield content_id[0] + @db_transaction def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ self.db.content_archive_update(content_id, archive_id, new_status, cur) @db_transaction def content_archive_insert(self, content_id, source, status, cur=None): """Insert a new entry in db about content_id. Args: content_id: content concerned source: name of the source status: the status of the content for that source """ self.db.content_archive_insert(content_id, source, status, cur) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py index 4f797e530..4cecc2c71 100644 --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -1,413 +1,378 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging import random import time from collections import defaultdict from swh.objstorage import get_objstorage from swh.core import hashutil, config from swh.objstorage.exc import Error, ObjNotFoundError from .storage import ArchiverStorage from .copier import ArchiverCopier logger = logging.getLogger('archiver.worker') class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta): """Base archive worker. Inherit from this class and override: - ADDITIONAL_CONFIG: Some added configuration needed for the director to work - CONFIG_BASE_FILENAME: relative path to lookup for the configuration file - def need_archival(self, content_data): Determine if a content needs archival or not - def choose_backup_servers(self, present, missing): Choose which backup server to send copies to """ DEFAULT_CONFIG = { 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev'), - 'source': ('str', 'uffizi'), 'storages': ('list[dict]', [ {'host': 'uffizi', 'cls': 'pathslicing', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}} ]) } ADDITIONAL_CONFIG = {} CONFIG_BASE_FILENAME = 'archiver/worker' objstorages = {} def __init__(self, batch): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.batch = batch self.archiver_db = ArchiverStorage(self.config['dbconn']) self.objstorages = { storage['host']: get_objstorage(storage['cls'], storage['args']) for storage in self.config.get('storages', []) } self.set_objstorages = set(self.objstorages) - # Fallback objstorage - self.source = self.config['source'] def run(self): """Do the task expected from the archiver worker. Process the contents in self.batch, ensure that the elements still need an archival (using archiver db), and spawn copiers to copy files in each destination according to the archiver-worker's policy. """ transfers = defaultdict(list) for obj_id in self.batch: # Get dict {'missing': [servers], 'present': [servers]} # for contents ignoring those who don't need archival. copies = self.compute_copies(self.set_objstorages, obj_id) - if not copies: - # could happen if archiver db lags behind - copies = self.compute_fallback_copies( - self.source, self.set_objstorages, obj_id) - if not copies: - msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id) - logger.warning(msg) - continue + if not copies: # could not happen if using .director module + msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id) + logger.warning(msg) + continue if not self.need_archival(copies): continue present = copies.get('present', []) missing = copies.get('missing', []) if len(present) == 0: msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id) logger.critical(msg) continue # Choose servers to be used as srcs and dests. for src_dest in self.choose_backup_servers(present, missing): transfers[src_dest].append(obj_id) # Then run copiers for each of the required transfers. for (src, dest), content_ids in transfers.items(): self.run_copier(src, dest, content_ids) - def compute_fallback_copies(self, source, set_objstorages, content_id): - """Compute fallback copies for content_id. - - Args: - source: the objstorage where the content_id is supposedly present - set_objstorages: the complete set of objstorages - content_id: the content concerned - - Returns: - A dictionary with keys 'present' and 'missing' that are - mapped to lists of copies ids depending on whenever the - content is present or missing on the copy. - - There is also the key 'ongoing' which is associated with a - dict that map to a copy name the mtime of the ongoing - status update. - - """ - if content_id not in self.objstorages[source]: - return None - - # insert a new entry about of the content_id's presence for that source - self.archiver_db.content_archive_insert( - content_id=content_id, source=self.source, status='present') - - # Now compute the fallback copies - set_present = {self.source} - set_missing = set_objstorages - set_present - return { - 'present': set_present, - 'missing': set_missing, - 'ongoing': {} - } - def compute_copies(self, set_objstorages, content_id): """From a content_id, return present and missing copies. Args: objstorages (set): objstorage's id name content_id: the content concerned Returns: A dictionary with keys 'present' and 'missing' that are mapped to lists of copies ids depending on whenever the content is present or missing on the copy. There is also the key 'ongoing' which is associated with a dict that map to a copy name the mtime of the ongoing status update. """ result = self.archiver_db.content_archive_get(content_id) if not result: return None _, present, ongoing = result set_present = set(present) set_ongoing = set(ongoing) set_missing = set_objstorages - set_present - set_ongoing return { 'present': set_present, 'missing': set_missing, 'ongoing': ongoing } def run_copier(self, source, destination, content_ids): """Run a copier in order to archive the given contents. Upload the given contents from the source to the destination. If the process fails, the whole content is considered uncopied and remains 'ongoing', waiting to be rescheduled as there is a delay. Args: source (str): source storage's identifier destination (str): destination storage's identifier content_ids ([sha1]): list of content ids to archive. """ # Check if there are any errors among the contents. content_status = self.get_contents_error(content_ids, source) # Iterates over the error detected. for content_id, real_status in content_status.items(): # Remove them from the to-archive list, # as they cannot be retrieved correctly. content_ids.remove(content_id) # Update their status to reflect their real state. - self.content_archive_update( + self.archiver_db.content_archive_update( content_id, archive_id=source, new_status=real_status) # Now perform the copy on the remaining contents ac = ArchiverCopier( source=self.objstorages[source], destination=self.objstorages[destination], content_ids=content_ids) if ac.run(): # Once the archival complete, update the database. for content_id in content_ids: - self.content_archive_update( + self.archiver_db.content_archive_update( content_id, archive_id=destination, new_status='present') def get_contents_error(self, content_ids, source_storage): """Indicates what is the error associated to a content when needed Check the given content on the given storage. If an error is detected, it will be reported through the returned dict. Args: content_ids ([sha1]): list of content ids to check source_storage (str): the source storage holding the contents to check. Returns: a dict that map {content_id -> error_status} for each content_id with an error. The `error_status` result may be 'missing' or 'corrupted'. """ content_status = {} storage = self.objstorages[source_storage] for content_id in content_ids: try: storage.check(content_id) except Error: content_status[content_id] = 'corrupted' logger.error('%s corrupted!' % hashutil.hash_to_hex( content_id)) except ObjNotFoundError: content_status[content_id] = 'missing' logger.error('%s missing!' % hashutil.hash_to_hex(content_id)) return content_status - def content_archive_update(self, content_id, archive_id, new_status=None): - """Update the status of a archive content and set its mtime to now. - - Change the last modification time of an archived content and change - its status to the given one. - - Args: - content_id (str): The content id. - archive_id (str): The id of the concerned archive. - new_status (str): One of missing, ongoing or present, this - status will replace the previous one. If not given, the - function only changes the mtime of the content. - """ - db_obj_id = r'\x' + hashutil.hash_to_hex(content_id) - self.archiver_db.content_archive_update( - db_obj_id, - archive_id, - new_status - ) - @abc.abstractmethod def need_archival(self, content_data): """Indicate if the content needs to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ pass @abc.abstractmethod def choose_backup_servers(self, present, missing): """Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. + Args: + present: set of objstorage source name where the content + is present + missing: set of objstorage destination name where the + content is missing + Yields: tuple (source (str), destination (src)) for each required copy. """ pass class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker): """ Do the required backups on a given batch of contents. Process the content of a content batch in order to do the needed backups on the slaves servers. """ ADDITIONAL_CONFIG = { 'retention_policy': ('int', 2), 'archival_max_age': ('int', 3600), } def __init__(self, batch): """ Constructor of the ArchiverWorker class. Args: batch: list of object's sha1 that potentially need archival. """ super().__init__(batch) config = self.config self.retention_policy = config['retention_policy'] self.archival_max_age = config['archival_max_age'] if len(self.objstorages) < self.retention_policy: raise ValueError('Retention policy is too high for the number of ' 'provided servers') def need_archival(self, content_data): """ Indicate if the content need to be archived. Args: content_data (dict): dict that contains two lists 'present' and 'missing' with copies id corresponding to this status. Returns: True if there is not enough copies, False otherwise. """ nb_presents = len(content_data.get('present', [])) for copy, mtime in content_data.get('ongoing', {}).items(): if not self._is_archival_delay_elasped(mtime): nb_presents += 1 return nb_presents < self.retention_policy def _is_archival_delay_elapsed(self, start_time): """ Indicates if the archival delay is elapsed given the start_time Args: start_time (float): time at which the archival started. Returns: True if the archival delay is elasped, False otherwise """ elapsed = time.time() - start_time return elapsed > self.archival_max_age def choose_backup_servers(self, present, missing): """Choose and yield the required amount of couple source/destination For each required copy, choose a unique destination server among the missing copies and a source server among the presents. Each destination server is unique so after archival, the retention policy requirement will be fulfilled. However, the source server may be used multiple times. + Args: + present: set of objstorage source name where the content + is present + missing: set of objstorage destination name where the + content is missing + Yields: tuple (source, destination) for each required copy. """ # Transform from set to list to allow random selections missing = list(missing) present = list(present) nb_required = self.retention_policy - len(present) destinations = random.sample(missing, nb_required) sources = [random.choice(present) for dest in destinations] yield from zip(sources, destinations) class ArchiverToBackendWorker(BaseArchiveWorker): - """Worker that send copies over from a source to another backend. + """Worker that sends copies over from a source to another backend. Process the content of a content batch from source objstorage to destination objstorage. """ CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' - def __init__(self, batch): + def __init__(self, destination, batch): """Constructor of the ArchiverWorkerToBackend class. Args: - batch: list of object's sha1 that potentially need archival. + destination: where to copy the objects from + batch: sha1s to send to destination """ super().__init__(batch) - self.destination = self.config['destination'] + self.destination = destination def need_archival(self, content_data): """Indicate if the content needs to be archived. Args: content_data (dict): dict that contains 3 lists 'present', 'ongoing' and 'missing' with copies id corresponding to this status. Returns: True if we need to archive, False otherwise """ - if self.destination in content_data.get('missing', {}): - return True - return False + return self.destination in content_data.get('missing', {}) def choose_backup_servers(self, present, missing): - yield (random.choice(present), self.destination) + """The destination is fixed to the destination mentioned. + + The only variable here is the source of information that we + choose randomly in 'present'. + + Args: + present: set of objstorage source name where the content + is present + missing: set of objstorage destination name where the + content is missing + + Yields: + tuple (source, destination) for each required copy. + + """ + yield (random.choice(list(present)), self.destination) diff --git a/swh/storage/provenance/tasks.py b/swh/storage/provenance/tasks.py index 66cc4844f..007b04a53 100644 --- a/swh/storage/provenance/tasks.py +++ b/swh/storage/provenance/tasks.py @@ -1,115 +1,115 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group from swh.core import hashutil from swh.core.config import load_named_config from swh.scheduler.task import Task from swh.storage import get_storage BASE_CONFIG_PATH = 'storage/provenance_cache' DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', [ 'http://localhost:5000/', ]), 'revision_packet_size': ('int', 100), } class PopulateCacheContentRevision(Task): """Populate the content -> revision provenance cache for some revisions""" task_queue = 'swh_populate_cache_content_revision' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config(BASE_CONFIG_PATH, DEFAULT_CONFIG) return self.__config def run(self, revisions): """Cache the cache_content_revision table for the revisions provided. Args: revisions: List of revisions to cache populate. """ config = self.config storage = get_storage( config['storage_class'], config['storage_args'], ) storage.cache_content_revision_add( - hashutil.hash_to_hex(revision) for revision in revisions + hashutil.hex_to_hash(revision) for revision in revisions ) class PopulateCacheRevisionOrigin(Task): """Populate the revision -> origin provenance cache for one origin's visit""" task_queue = 'swh_populate_cache_revision_origin' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config(BASE_CONFIG_PATH, DEFAULT_CONFIG) return self.__config def run(self, origin_id, visit_id): """Cache the cache_revision_origin for the given origin visit Args: origin_id: the origin id to cache visit_id: the visit id to cache This task also creates the revision cache tasks, as well as the task to cache the next origin visit available """ config = self.config storage = get_storage( config['storage_class'], config['storage_args'], ) packet_size = config['revision_packet_size'] pipelined_tasks = [] visits = sorted( visit['visit'] for visit in storage.origin_visit_get(origin_id) ) if visit_id in visits: revision_task = PopulateCacheContentRevision() new_revisions = [ hashutil.hash_to_hex(revision) for revision in storage.cache_revision_origin_add( origin_id, visit_id) ] if new_revisions: split_new_revisions = [ new_revisions[i:i + packet_size] for i in range(0, packet_size, len(new_revisions)) ] for packet in split_new_revisions: pipelined_tasks.append(revision_task.s(packet)) try: next_visit = min(visit for visit in visits if visit > visit_id) except ValueError: # no next visit, stop pipelining further visits pass else: pipelined_tasks.append(self.s(origin_id, next_visit)) if pipelined_tasks: group(pipelined_tasks).delay() diff --git a/version.txt b/version.txt index 08044c7b3..02bdb8ef9 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.62-0-gff87ac5 \ No newline at end of file +v0.0.63-0-g4d6d3bd \ No newline at end of file