diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -15,7 +15,7 @@ python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.13~), python3-swh.objstorage (>= 0.0.17~), - python3-swh.scheduler, + python3-swh.scheduler (>= 0.0.11~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ @@ -39,7 +39,8 @@ Package: python3-swh.storage.archiver Architecture: all -Depends: python3-swh.scheduler, +Depends: python3-swh.scheduler (>= 0.0.11~), + python3-swh.journal, python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} @@ -47,7 +48,7 @@ Package: python3-swh.storage.provenance Architecture: all -Depends: python3-swh.scheduler, +Depends: python3-swh.scheduler (>= 0.0.11~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ swh.core >= 0.0.28 swh.model >= 0.0.13 swh.objstorage >= 0.0.17 -swh.scheduler +swh.scheduler >= 0.0.11 diff --git a/sql/archiver/swh-archiver-func.sql b/sql/archiver/swh-archiver-func.sql --- a/sql/archiver/swh-archiver-func.sql +++ b/sql/archiver/swh-archiver-func.sql @@ -90,3 +90,38 @@ $$; comment on function get_content_archive_counts() is 'Get count for each archive'; + +-- create a temporary table called tmp_TBLNAME, mimicking existing table +-- TBLNAME +create or replace function swh_mktemp(tblname regclass) + returns void + language plpgsql +as $$ +begin + execute format(' + create temporary table tmp_%1$I + (like %1$I including defaults) + on commit drop; + ', tblname); + return; +end +$$; + +comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; + +-- Helper function to insert new entries in content_archive from a +-- temporary table skipping duplicates. +create or replace function swh_content_archive_add() + returns void + language plpgsql +as $$ +begin + insert into content_archive (content_id, copies, num_present) + select distinct content_id, copies, num_present + from tmp_content_archive + on conflict(content_id) do nothing; + return; +end +$$; + +comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/sql/archiver/swh-archiver-schema.sql b/sql/archiver/swh-archiver-schema.sql --- a/sql/archiver/swh-archiver-schema.sql +++ b/sql/archiver/swh-archiver-schema.sql @@ -11,7 +11,7 @@ comment on table dbversion is 'Schema update tracking'; INSERT INTO dbversion(version, release, description) -VALUES(8, now(), 'Work In Progress'); +VALUES(9, now(), 'Work In Progress'); CREATE TABLE archive ( id text PRIMARY KEY diff --git a/sql/archiver/upgrades/009.sql b/sql/archiver/upgrades/009.sql new file mode 100644 --- /dev/null +++ b/sql/archiver/upgrades/009.sql @@ -0,0 +1,42 @@ +-- SWH Archiver DB schema upgrade +-- from_version: 8 +-- to_version: 9 +-- description: Add helper functions to create temporary table and insert new entries in content_archive table + +insert into dbversion(version, release, description) +values(9, now(), 'Work In Progress'); + +-- create a temporary table called tmp_TBLNAME, mimicking existing +-- table TBLNAME +create or replace function swh_mktemp(tblname regclass) + returns void + language plpgsql +as $$ +begin + execute format(' + create temporary table tmp_%1$I + (like %1$I including defaults) + on commit drop; + ', tblname); + return; +end +$$; + +comment on function swh_mktemp(regclass) is 'Helper function to create a temporary table mimicking the existing one'; + +-- Helper function to insert new entries in content_archive from a +-- temporary table skipping duplicates. +create or replace function swh_content_archive_add() + returns void + language plpgsql +as $$ +begin + insert into content_archive (content_id, copies, num_present) + select distinct content_id, copies, num_present + from tmp_content_archive + on conflict(content_id) do nothing; + return; +end +$$; + +comment on function swh_content_archive_add() is 'Helper function to insert new entry in content_archive'; diff --git a/swh/storage/archiver/db.py b/swh/storage/archiver/db.py --- a/swh/storage/archiver/db.py +++ b/swh/storage/archiver/db.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -177,12 +177,21 @@ """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. + """ + pass + + @stored_procedure('swh_content_archive_add') + def content_archive_add_from_temp(self, cur=None): + """Add new content archive entries from temporary table. + Use from archiver.storage module: self.db.mktemp_content_archive() # copy data over to the temp table self.db.copy_to([{'colname': id0}, {'colname': id1}], 'tmp_cache_content', ['colname'], cur) + # insert into the main table + self.db.add_content_archive_from_temp(cur) """ pass @@ -204,24 +213,6 @@ cur.execute('select * from swh_content_archive_unknown()') yield from cursor_to_bytes(cur) - def content_archive_insert(self, content_id, source, status, cur=None): - """Insert a new entry in the db for the content_id. - - Args: - content_id: content concerned - source: name of the source - status: the status of the content for that source - - """ - if isinstance(content_id, bytes): - content_id = '\\x%s' % hashutil.hash_to_hex(content_id) - - query = """INSERT INTO content_archive(content_id, copies, num_present) - VALUES('%s', '{"%s": {"status": "%s", "mtime": %d}}', 1) - """ % (content_id, source, status, int(time.time())) - cur = self._cursor(cur) - cur.execute(query) - def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,7 +9,7 @@ from swh.core import config, utils, hashutil from swh.objstorage import get_objstorage -from swh.scheduler.celery_backend.config import app +from swh.scheduler.utils import get_task from . import tasks # noqa from .storage import ArchiverStorage @@ -59,6 +59,7 @@ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.archiver_storage = ArchiverStorage(self.config['dbconn']) + self.task = get_task(self.TASK_NAME) def run(self): """ Run the archiver director. @@ -78,15 +79,13 @@ """Produce a worker that will be added to the task queue. """ - task = app.tasks[self.TASK_NAME] - task.delay(batch=batch) + self.task.delay(batch=batch) def run_sync_worker(self, batch): """Run synchronously a worker on the given batch. """ - task = app.tasks[self.TASK_NAME] - task(batch=batch) + self.task(batch=batch) def read_batch_contents(self): """ Create batch of contents that needs to be archived @@ -203,6 +202,8 @@ } # Fallback objstorage self.source = self.config['source'] + # Where the content is missing + self.sources_missing = set(self.objstorages.keys()) - set([self.source]) # noqa def _add_unknown_content_ids(self, content_ids, source_objstorage): """Check whether some content_id are unknown. @@ -215,13 +216,12 @@ content_id is there """ - unknowns = self.archiver_storage.content_archive_get_unknown( - content_ids) - for unknown_id in unknowns: - if unknown_id not in source_objstorage: - continue - self.archiver_storage.content_archive_insert( - unknown_id, self.source, 'present') + self.archiver_storage.content_archive_add( + (h['content_id'] + for h in content_ids + if h['content_id'] in source_objstorage), + sources_present=[self.source], + sources_missing=self.sources_missing) def get_contents_to_archive(self): gen_content_ids = ( @@ -273,15 +273,13 @@ """Produce a worker that will be added to the task queue. """ - task = app.tasks[self.TASK_NAME] - task.delay(destination=self.destination, batch=batch) + self.task.delay(destination=self.destination, batch=batch) def run_sync_worker(self, batch): """Run synchronously a worker on the given batch. """ - task = app.tasks[self.TASK_NAME] - task(destination=self.destination, batch=batch) + self.task(destination=self.destination, batch=batch) @click.command() diff --git a/swh/storage/archiver/storage.py b/swh/storage/archiver/storage.py --- a/swh/storage/archiver/storage.py +++ b/swh/storage/archiver/storage.py @@ -1,9 +1,11 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import json import psycopg2 +import time from .db import ArchiverDb @@ -156,13 +158,43 @@ self.db.content_archive_update(content_id, archive_id, new_status, cur) @db_transaction - def content_archive_insert(self, content_id, source, status, cur=None): + def content_archive_add( + self, content_ids, sources_present, sources_missing, cur=None): """Insert a new entry in db about content_id. Args: - content_id: content concerned - source: name of the source - status: the status of the content for that source + content_ids ([bytes|str]): content identifiers + sources_present ([str]): List of source names where + contents are present + sources_missing ([str]): List of sources names where + contents are missing """ - self.db.content_archive_insert(content_id, source, status, cur) + db = self.db + + # Prepare copies dictionary + copies = {} + for source in sources_present: + copies[source] = { + "status": "present", + "mtime": int(time.time()), + } + + for source in sources_missing: + copies[source] = { + "status": "missing", + } + + copies = json.dumps(copies) + num_present = len(sources_present) + + db.mktemp('content_archive') + db.copy_to( + ({'content_id': id, + 'copies': copies, + 'num_present': num_present} + for id in content_ids), + 'tmp_content_archive', + ['content_id', 'copies', 'num_present'], + cur) + db.content_archive_add_from_temp(cur) diff --git a/swh/storage/archiver/updater.py b/swh/storage/archiver/updater.py new file mode 100644 --- /dev/null +++ b/swh/storage/archiver/updater.py @@ -0,0 +1,49 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from swh.journal.client import SWHJournalClient + +from .storage import ArchiverStorage + + +class SWHArchiverContentUpdater(SWHJournalClient): + """Client in charge to update new content in the content_archiver db. + + """ + CONFIG_BASE_FILENAME = 'archiver/content_updater' + + ADDITIONAL_CONFIG = { + 'archiver_storage_conn': ( + 'str', 'dbname=softwareheritage-archiver-dev user=guest'), + 'sources_missing': ('list[str]', ['banco', 'azure']), + 'sources_present': ('list[str]', ['uffizi']) + } + + def __init__(self): + # Only interested in content here so override the configuration + super().__init__(extra_configuration={'object_types': ['content']}) + + self.sources_present = self.config['sources_present'] + self.sources_missing = self.config['sources_missing'] + + self.archiver_storage = ArchiverStorage( + self.config['archiver_storage_conn']) + + def process_objects(self, messages): + self.archiver_storage.content_archive_add( + (c[b'sha1'] for c in messages['content']), + self.sources_present, self.sources_missing) + + +if __name__ == '__main__': + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(process)d %(levelname)s %(message)s' + ) + + content_updater = SWHArchiverContentUpdater() + content_updater.process() diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -14,7 +14,7 @@ from swh.core import hashutil, config, utils from swh.objstorage import get_objstorage from swh.objstorage.exc import Error, ObjNotFoundError -from swh.scheduler.celery_backend.config import app +from swh.scheduler.utils import get_task from .storage import ArchiverStorage from .copier import ArchiverCopier @@ -368,7 +368,7 @@ self.destination = destination next_task = self.config['next_task'] destination_queue = next_task['queue'] - self.task_destination = app.tasks[destination_queue] + self.task_destination = get_task(destination_queue) self.batch_size = int(next_task['batch_size']) def need_archival(self, content_data): diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -182,14 +182,14 @@ self._cursor(cur).copy_expert('COPY %s (%s) FROM STDIN CSV' % ( tblname, ', '.join(columns)), f) + def mktemp(self, tblname, cur=None): + self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) + class Db(BaseDb): """Proxy to the SWH DB, with wrappers around stored procedures """ - def mktemp(self, tblname, cur=None): - self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,)) - def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)', (('directory_entry_%s' % entry_type),)) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -300,24 +300,3 @@ len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) - - # This cannot be tested with ArchiverWithRetentionPolicyDirector - # (it reads from archiver db) - # @istest - # def archive_missing_content__without_row_entry_in_archive_db(self): - # """ Run archiver on a missing content should archive it. - # """ - # obj_data = b'archive_missing_content_without_row_entry_in_archive_db' - # obj_id = self._add_content('uffizi', obj_data) - # # One entry in archiver db but no status about its whereabouts - # # Content is actually missing on banco but present on uffizi - # try: - # self.dest_storage.get(obj_id) - # except ObjNotFoundError: - # pass - # else: - # self.fail('Content should not be present before archival') - # self.archiver.run() - # # now the content should be present on remote objstorage - # remote_data = self.dest_storage.get(obj_id) - # self.assertEquals(obj_data, remote_data)