diff --git a/PKG-INFO b/PKG-INFO index 832dc70..75f6eee 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.archiver -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage archiver Home-page: https://forge.softwareheritage.org/diffusion/DARC/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index a398553..7be675d 100644 --- a/debian/control +++ b/debian/control @@ -1,25 +1,25 @@ Source: swh-archiver Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, python3-click, python3-nose, python3-psycopg2, python3-setuptools, - python3-swh.core (>= 0.0.28), + python3-swh.core (>= 0.0.37), python3-swh.journal (>= 0.0.2), python3-swh.model (>= 0.0.15), python3-swh.objstorage (>= 0.0.17), python3-swh.scheduler (>= 0.0.14), - python3-swh.storage (>= 0.0.88), + python3-swh.storage (>= 0.0.100), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/source/swh-archiver/ Package: python3-swh.archiver Architecture: all Depends: ${misc:Depends}, ${python3:Depends} Description: Software Heritage Archiver diff --git a/docs/index.rst b/docs/index.rst index 8b64117..27f5748 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,15 +1,17 @@ +.. _swh-archiver: + Software Heritage - Development Documentation ============================================= .. toctree:: :maxdepth: 2 :caption: Contents: Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/requirements-swh.txt b/requirements-swh.txt index acb7bfd..8a31695 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,6 @@ -swh.core >= 0.0.28 +swh.core >= 0.0.37 swh.journal >= 0.0.2 swh.model >= 0.0.15 swh.objstorage >= 0.0.17 swh.scheduler >= 0.0.14 -swh.storage >= 0.0.88 +swh.storage >= 0.0.100 diff --git a/swh.archiver.egg-info/PKG-INFO b/swh.archiver.egg-info/PKG-INFO index 832dc70..75f6eee 100644 --- a/swh.archiver.egg-info/PKG-INFO +++ b/swh.archiver.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.archiver -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage archiver Home-page: https://forge.softwareheritage.org/diffusion/DARC/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.archiver.egg-info/requires.txt b/swh.archiver.egg-info/requires.txt index 539798d..46343db 100644 --- a/swh.archiver.egg-info/requires.txt +++ b/swh.archiver.egg-info/requires.txt @@ -1,10 +1,10 @@ click kafka-python psycopg2 -swh.core>=0.0.28 +swh.core>=0.0.37 swh.journal>=0.0.2 swh.model>=0.0.15 swh.objstorage>=0.0.17 swh.scheduler>=0.0.14 -swh.storage>=0.0.88 +swh.storage>=0.0.100 vcversioner diff --git a/swh/archiver/db.py b/swh/archiver/db.py index df26142..e07bf89 100644 --- a/swh/archiver/db.py +++ b/swh/archiver/db.py @@ -1,225 +1,224 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure def utcnow(): return datetime.datetime.now(tz=datetime.timezone.utc) class ArchiverDb(BaseDb): """Proxy to the SWH's archiver DB """ def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ cur = self._cursor(cur) cur.execute("SELECT * FROM archive") yield from cursor_to_bytes(cur) def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content in a specific server. Retrieve from the database the archival status of the given content in the given archive server. Args: content_id: the sha1 of the content. Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ query = """select archive.name, status, mtime from content_copies left join archive on content_copies.archive_id = archive.id where content_copies.content_id = ( select id from content where sha1 = %s) """ cur = self._cursor(cur) cur.execute(query, (content_id,)) rows = cur.fetchall() if not rows: return None present = [] ongoing = {} for archive, status, mtime in rows: if status == 'present': present.append(archive) elif status == 'ongoing': ongoing[archive] = mtime return (content_id, present, ongoing) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ vars = { 'limit': limit, } if last_content is None: last_content_clause = 'true' else: last_content_clause = """content_id > ( select id from content where sha1 = %(last_content)s)""" vars['last_content'] = last_content query = """select (select sha1 from content where id = content_id), array_agg((select name from archive where id = archive_id)) from content_copies where status = 'present' and %s group by content_id order by content_id limit %%(limit)s""" % last_content_clause cur = self._cursor(cur) cur.execute(query, vars) for content_id, present in cursor_to_bytes(cur): yield (content_id, present, {}) def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ vars = { 'limit': limit, 'retention_policy': retention_policy, } if last_content is None: last_content_clause = 'true' else: last_content_clause = """content_id > ( select id from content where sha1 = %(last_content)s)""" vars['last_content'] = last_content query = """select (select sha1 from content where id = content_id), array_agg((select name from archive where id = archive_id)) from content_copies where status = 'present' and %s group by content_id having count(archive_id) < %%(retention_policy)s order by content_id limit %%(limit)s""" % last_content_clause cur = self._cursor(cur) cur.execute(query, vars) for content_id, present in cursor_to_bytes(cur): yield (content_id, present, {}) @stored_procedure('swh_mktemp_content_archive') def mktemp_content_archive(self, cur=None): """Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction. """ pass @stored_procedure('swh_content_archive_add') def content_archive_add_from_temp(self, cur=None): """Add new content archive entries from temporary table. Use from archiver.storage module:: - - self.db.mktemp_content_archive() + db.mktemp_content_archive(cur) # copy data over to the temp table - self.db.copy_to([{'colname': id0}, {'colname': id1}], - 'tmp_cache_content', - ['colname'], cur) + db.copy_to([{'colname': id0}, {'colname': id1}], + 'tmp_cache_content', + ['colname'], cur) # insert into the main table - self.db.add_content_archive_from_temp(cur) + db.add_content_archive_from_temp(cur) """ pass def content_archive_get_missing(self, backend_name, cur=None): """Retrieve the content missing from backend_name. """ cur = self._cursor(cur) cur.execute("select * from swh_content_archive_missing(%s)", (backend_name,)) yield from cursor_to_bytes(cur) def content_archive_get_unknown(self, cur=None): """Retrieve unknown sha1 from archiver db. """ cur = self._cursor(cur) cur.execute('select * from swh_content_archive_unknown()') yield from cursor_to_bytes(cur) def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ assert isinstance(content_id, bytes) assert new_status is not None query = """insert into content_copies (archive_id, content_id, status, mtime) values ((select id from archive where name=%s), (select id from content where sha1=%s), %s, %s) on conflict (archive_id, content_id) do update set status = excluded.status, mtime = excluded.mtime """ cur = self._cursor(cur) cur.execute(query, (archive_id, content_id, new_status, utcnow())) diff --git a/swh/archiver/storage.py b/swh/archiver/storage.py index 9771058..2f662e8 100644 --- a/swh/archiver/storage.py +++ b/swh/archiver/storage.py @@ -1,361 +1,358 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import psycopg2 import time from .db import ArchiverDb from swh.model import hashutil from swh.storage.common import db_transaction_generator, db_transaction from swh.storage.exc import StorageDBError class ArchiverStorage(): """SWH Archiver storage proxy, encompassing DB """ def __init__(self, dbconn): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ try: if isinstance(dbconn, psycopg2.extensions.connection): - self.db = ArchiverDb(dbconn) + self._db = ArchiverDb(dbconn) else: - self.db = ArchiverDb.connect(dbconn) + self._db = ArchiverDb.connect(dbconn) except psycopg2.OperationalError as e: raise StorageDBError(e) + def get_db(self): + return self._db + @db_transaction_generator - def archive_ls(self, cur=None): + def archive_ls(self, db=None, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ - yield from self.db.archive_ls(cur) + yield from db.archive_ls(cur) @db_transaction - def content_archive_get(self, content_id, cur=None): + def content_archive_get(self, content_id, db=None, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ - return self.db.content_archive_get(content_id, cur) + return db.content_archive_get(content_id, cur) @db_transaction_generator def content_archive_get_copies(self, last_content=None, limit=1000, - cur=None): + db=None, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ - yield from self.db.content_archive_get_copies(last_content, limit, - cur) + yield from db.content_archive_get_copies(last_content, limit, cur) @db_transaction_generator def content_archive_get_unarchived_copies( self, retention_policy, last_content=None, - limit=1000, cur=None): + limit=1000, db=None, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ - yield from self.db.content_archive_get_unarchived_copies( + yield from db.content_archive_get_unarchived_copies( retention_policy, last_content, limit, cur) @db_transaction_generator - def content_archive_get_missing(self, content_ids, backend_name, cur=None): + def content_archive_get_missing(self, content_ids, backend_name, db=None, + cur=None): """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: missing sha1s from backend_name """ - db = self.db - - db.mktemp_content_archive() + db.mktemp_content_archive(cur) db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_missing(backend_name, cur): yield content_id[0] @db_transaction_generator - def content_archive_get_unknown(self, content_ids, cur=None): + def content_archive_get_unknown(self, content_ids, db=None, cur=None): """Retrieve unknown sha1s from content_archive. Args: content_ids ([sha1s]): list of sha1s to test Yields: Unknown sha1s from content_archive """ - db = self.db - - db.mktemp_content_archive() + db.mktemp_content_archive(cur) db.copy_to(content_ids, 'tmp_content_archive', ['content_id'], cur) for content_id in db.content_archive_get_unknown(cur): yield content_id[0] @db_transaction def content_archive_update(self, content_id, archive_id, - new_status=None, cur=None): + new_status=None, db=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ - self.db.content_archive_update(content_id, archive_id, new_status, cur) + db.content_archive_update(content_id, archive_id, new_status, cur) @db_transaction def content_archive_add( - self, content_ids, sources_present, cur=None): + self, content_ids, sources_present, db=None, cur=None): """Insert a new entry in db about content_id. Args: content_ids ([bytes|str]): content identifiers sources_present ([str]): List of source names where contents are present """ - db = self.db - # Prepare copies dictionary copies = {} for source in sources_present: copies[source] = { "status": "present", "mtime": int(time.time()), } copies = json.dumps(copies) num_present = len(sources_present) - db.mktemp('content_archive') + db.mktemp('content_archive', cur) db.copy_to( ({'content_id': id, 'copies': copies, 'num_present': num_present} for id in content_ids), 'tmp_content_archive', ['content_id', 'copies', 'num_present'], cur) db.content_archive_add_from_temp(cur) class StubArchiverStorage(): def __init__(self, archives, present, missing, logfile_base): """ A stub storage for the archiver that doesn't write to disk Args: - archives: a dictionary mapping archive names to archive URLs - present: archives where the objects are all considered present - missing: archives where the objects are all considered missing - logfile_base: basename for the logfile """ self.archives = archives self.present = set(present) self.missing = set(missing) if set(archives) != self.present | self.missing: raise ValueError("Present and missing archives don't match") self.logfile_base = logfile_base self.__logfile = None def open_logfile(self): if self.__logfile: return logfile_name = "%s.%d" % (self.logfile_base, os.getpid()) self.__logfile = open(logfile_name, 'a') def close_logfile(self): if not self.__logfile: return self.__logfile.close() self.__logfile = None def archive_ls(self, cur=None): """ Get all the archives registered on the server. Yields: a tuple (server_id, server_url) for each archive server. """ yield from self.archives.items() def content_archive_get(self, content_id, cur=None): """ Get the archival status of a content. Retrieve from the database the archival status of the given content Args: content_id: the sha1 of the content Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ return (content_id, self.present, {}) def content_archive_get_copies(self, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from [] def content_archive_get_unarchived_copies(self, retention_policy, last_content=None, limit=1000, cur=None): """ Get the list of copies for `limit` contents starting after `last_content`. Yields only copies with number of present smaller than `retention policy`. Args: last_content: sha1 of the last content retrieved. May be None to start at the beginning. retention_policy: number of required present copies limit: number of contents to retrieve. Can be None to retrieve all objects (will be slow). Yields: A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime. """ yield from [] def content_archive_get_missing(self, content_ids, backend_name, cur=None): """Retrieve missing sha1s from source_name. Args: content_ids ([sha1s]): list of sha1s to test source_name (str): Name of the backend to check for content Yields: missing sha1s from backend_name """ if backend_name in self.missing: yield from content_ids elif backend_name in self.present: yield from [] else: raise ValueError('Unknown backend `%s`' % backend_name) def content_archive_get_unknown(self, content_ids, cur=None): """Retrieve unknown sha1s from content_archive. Args: content_ids ([sha1s]): list of sha1s to test Yields: Unknown sha1s from content_archive """ yield from [] def content_archive_update(self, content_id, archive_id, new_status=None, cur=None): """ Update the status of an archive content and set its mtime to now Change the mtime of an archived content for the given archive and set it's mtime to the current time. Args: content_id (str): content sha1 archive_id (str): name of the archive new_status (str): one of 'missing', 'present' or 'ongoing'. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive. """ if not self.__logfile: self.open_logfile() print(time.time(), archive_id, new_status, hashutil.hash_to_hex(content_id), file=self.__logfile) def content_archive_add( self, content_ids, sources_present, cur=None): """Insert a new entry in db about content_id. Args: content_ids ([bytes|str]): content identifiers sources_present ([str]): List of source names where contents are present """ pass def get_archiver_storage(cls, args): """Instantiate an archiver database with the proper class and arguments""" if cls == 'db': return ArchiverStorage(**args) elif cls == 'stub': return StubArchiverStorage(**args) else: raise ValueError('Unknown Archiver Storage class `%s`' % cls) diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py index 67cb0f7..fa64a2e 100644 --- a/swh/archiver/tests/test_archiver.py +++ b/swh/archiver/tests/test_archiver.py @@ -1,465 +1,453 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import glob import tempfile import shutil import unittest import os from nose.tools import istest from nose.plugins.attrib import attr from swh.core.tests.db_testing import SingleDbTestFixture from swh.archiver.storage import get_archiver_storage from swh.archiver import ArchiverWithRetentionPolicyDirector from swh.archiver import ArchiverWithRetentionPolicyWorker from swh.archiver.db import utcnow from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError -from swh.objstorage.api.server import make_app as app -from swh.storage.tests.server_testing import ServerTestFixtureAsync - TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') -class TestArchiver(SingleDbTestFixture, ServerTestFixtureAsync, - unittest.TestCase): +class TestArchiver(SingleDbTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAME = 'softwareheritage-archiver-test' TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump') TEST_DB_DUMP_TYPE = 'pg_dump' def setUp(self): # Launch the backup server - self.dest_root = tempfile.mkdtemp(prefix='remote') - self.config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6', - } - } - self.app = app(self.config) super().setUp() # Create source storage self.src_root = tempfile.mkdtemp() src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) - # Create destination storage + self.dest_root = tempfile.mkdtemp(prefix='remote') dest_config = { - 'cls': 'remote', + 'cls': 'pathslicing', 'args': { - 'url': self.url() + 'root': self.dest_root, + 'slicing': '0:2/2:4/4:6', } } self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = { 'uffizi': self.src_storage, 'banco': self.dest_storage } # Override configurations src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self._override_director_config() self._override_worker_config() # Create the base archiver self.archiver = self._create_director() def tearDown(self): self.empty_tables() shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) super().tearDown() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content') self.cursor.execute('DELETE FROM content_copies') self.conn.commit() def _override_director_config(self, retention_policy=2): """ Override the default config of the Archiver director to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'batch_max_size': 5000, 'archival_max_age': 3600, 'retention_policy': retention_policy, 'asynchronous': False, 'max_queue_length': 100000, 'queue_throttling_delay': 120, } def _override_worker_config(self): """ Override the default config of the Archiver worker to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa 'retention_policy': 2, 'archival_max_age': 3600, 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'storages': self.archiver_storages, 'source': 'uffizi', 'sources': ['uffizi'], } def _create_director(self): return ArchiverWithRetentionPolicyDirector(start_id=None) def _create_worker(self, batch={}): return ArchiverWithRetentionPolicyWorker(batch) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. Args: storage_name: the concerned storage content_data: the data to insert with_row_insert: to insert a row entry in the db or not """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) self.cursor.execute(""" INSERT INTO content (sha1) VALUES (%s) """, (obj_id,)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ self.cursor.execute("""insert into archive (name) values (%s) on conflict do nothing""", (storage_name,)) self.archiver.archiver_storage.content_archive_update( obj_id, storage_name, status ) # Integration test @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) @istest def archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self._override_director_config(retention_policy=1) director = self._create_director() # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) @istest def content_archive_get_copies(self): self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [], ) obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [(obj_id, ['uffizi'], {})], ) # Unit tests for archive worker def archival_elapsed(self, mtime): return self._create_worker()._is_archival_delay_elapsed(mtime) @istest def vstatus_ongoing_remaining(self): self.assertFalse(self.archival_elapsed(utcnow())) @istest def vstatus_ongoing_elapsed(self): past_time = (utcnow() - datetime.timedelta( seconds=self._create_worker().archival_max_age)) self.assertTrue(self.archival_elapsed(past_time)) @istest def need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), True) @istest def need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker.compute_copies( set(worker.objstorages), obj_id)[status]) @istest def compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') @istest def compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') @istest def compute_copies_extra_archive(self): obj_id = self._add_content('banco', b'foobar') self._update_status(obj_id, 'banco', 'present') self._update_status(obj_id, 'random_archive', 'present') worker = self._create_worker() copies = worker.compute_copies(set(worker.objstorages), obj_id) self.assertEqual(copies['present'], {'banco'}) self.assertEqual(copies['missing'], {'uffizi'}) def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker.choose_backup_servers(present, missing)) @istest def choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) class TestArchiverStorageStub(unittest.TestCase): def setUp(self): self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6' } } self.dest_storage = get_objstorage(**dest_config) self.config = { 'cls': 'stub', 'args': { 'archives': { 'present_archive': 'http://uffizi:5003', 'missing_archive': 'http://banco:5003', }, 'present': ['present_archive'], 'missing': ['missing_archive'], 'logfile_base': os.path.join(self.log_root, 'log_'), } } # Generated with: # # id_length = 20 # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') # self.content_ids = [ b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', ] self.archiver_storage = get_archiver_storage(**self.config) super().setUp() def tearDown(self): shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) shutil.rmtree(self.log_root) super().tearDown() @istest def archive_ls(self): self.assertCountEqual( self.archiver_storage.archive_ls(), self.config['args']['archives'].items() ) @istest def content_archive_get(self): for content_id in self.content_ids: self.assertEqual( self.archiver_storage.content_archive_get(content_id), (content_id, set(self.config['args']['present']), {}), ) @istest def content_archive_get_copies(self): self.assertCountEqual( self.archiver_storage.content_archive_get_copies(), [], ) @istest def content_archive_get_unarchived_copies(self): retention_policy = 2 self.assertCountEqual( self.archiver_storage.content_archive_get_unarchived_copies( retention_policy), [], ) @istest def content_archive_get_missing(self): self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'missing_archive' ), self.content_ids, ) self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'present_archive' ), [], ) with self.assertRaises(ValueError): list(self.archiver_storage.content_archive_get_missing( self.content_ids, 'unknown_archive' )) @istest def content_archive_get_unknown(self): self.assertCountEqual( self.archiver_storage.content_archive_get_unknown( self.content_ids, ), [], ) @istest def content_archive_update(self): for content_id in self.content_ids: self.archiver_storage.content_archive_update( content_id, 'present_archive', 'present') self.archiver_storage.content_archive_update( content_id, 'missing_archive', 'present') self.archiver_storage.close_logfile() # Make sure we created a logfile files = glob.glob('%s*' % self.config['args']['logfile_base']) self.assertEqual(len(files), 1) # make sure the logfile contains all our lines lines = open(files[0]).readlines() self.assertEqual(len(lines), 2 * len(self.content_ids)) diff --git a/version.txt b/version.txt index 0984193..c5e65c7 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.2-0-g39dd7db \ No newline at end of file +v0.0.3-0-g7d107d7 \ No newline at end of file