diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1,2 @@ pytest +pytest-postgresql diff --git a/swh/archiver/tasks.py b/swh/archiver/tasks.py --- a/swh/archiver/tasks.py +++ b/swh/archiver/tasks.py @@ -3,26 +3,27 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.task import Task +from swh.scheduler.celery_backend.config import app + from .worker import ArchiverWithRetentionPolicyWorker from .worker import ArchiverToBackendWorker -class SWHArchiverWithRetentionPolicyTask(Task): - """Main task that archive a batch of content. - - """ - task_queue = 'swh_storage_archive_worker' - - def run_task(self, *args, **kwargs): - ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() +@app.task(name='swh.archiver.tasks.SWHArchiverWithRetentionPolicyTask', + bind=True) +def archive_with_retention(self, *args, **kwargs): + self.log.debug('%s, args=%s, kwargs=%s' % ( + self.name, args, kwargs)) + ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() + self.log.debug('%s OK' % (self.name)) -class SWHArchiverToBackendTask(Task): +@app.task(name='swh.archiver.tasks.SWHArchiverToBackendTask', + bind=True) +def archive_to_backend(self, *args, **kwargs): """Main task that archive a batch of content in the cloud. - """ - task_queue = 'swh_storage_archive_worker_to_backend' - - def run_task(self, *args, **kwargs): - ArchiverToBackendWorker(*args, **kwargs).run() + self.log.debug('%s, args=%s, kwargs=%s' % ( + self.name, args, kwargs)) + ArchiverToBackendWorker(*args, **kwargs).run() + self.log.debug('%s OK' % (self.name)) diff --git a/swh/archiver/tests/conftest.py b/swh/archiver/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/archiver/tests/conftest.py @@ -0,0 +1,151 @@ +import os +import glob +import tempfile +import shutil + +import pytest + +from swh.core.utils import numfile_sortkey as sortkey +from swh.objstorage import get_objstorage +from swh.scheduler.tests.conftest import * # noqa +from swh.archiver.storage import get_archiver_storage +from swh.archiver import (ArchiverWithRetentionPolicyDirector, + ArchiverWithRetentionPolicyWorker) +from swh.archiver.tests import SQL_DIR + + +DUMP_FILES = os.path.join(SQL_DIR, '*.sql') + + +@pytest.fixture(scope='session') +def celery_includes(): + return [ + 'swh.archiver.tasks', + ] + + +@pytest.fixture +def swh_archiver_db(postgresql): + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + cursor = postgresql.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + cursor.execute(fobj.read()) + postgresql.commit() + return postgresql + + +@pytest.fixture +def swh_archiver(swh_archiver_db): + + # Create source storage + src_root = tempfile.mkdtemp() + src_config = { + 'cls': 'pathslicing', + 'args': { + 'root': src_root, + 'slicing': '0:2/2:4/4:6' + } + } + src_storage = get_objstorage(**src_config) + + dest_root = tempfile.mkdtemp() + dest_config = { + 'cls': 'pathslicing', + 'args': { + 'root': dest_root, + 'slicing': '0:2/2:4/4:6', + } + } + dest_storage = get_objstorage(**dest_config) + + # Keep mapped the id to the storages + storages = { + 'src': src_storage, # uffizi + 'dest': dest_storage # banco + } + cursor = swh_archiver_db.cursor() + for storage in storages: + cursor.execute("INSERT INTO archive(name) VALUES(%s)", (storage,)) + swh_archiver_db.commit() + + # Override configurations + src_archiver_conf = {'host': 'src'} + dest_archiver_conf = {'host': 'dest'} + src_archiver_conf.update(src_config) + dest_archiver_conf.update(dest_config) + archiver_storages = [src_archiver_conf, dest_archiver_conf] + + def parse_config_file(obj, additional_configs): + return { # noqa + 'archiver_storage': { + 'cls': 'db', + 'args': { + 'dbconn': swh_archiver_db, + }, + }, + 'retention_policy': 2, + 'archival_max_age': 3600, + 'batch_max_size': 5000, + 'asynchronous': False, + 'max_queue_length': 100000, + 'queue_throttling_delay': 120, + } + + orig_director_cfg = ArchiverWithRetentionPolicyDirector.parse_config_file + ArchiverWithRetentionPolicyDirector.parse_config_file = ( + parse_config_file) + + def parse_config_file(obj, additional_configs): + return { # noqa + 'archiver_storage': { + 'cls': 'db', + 'args': { + 'dbconn': swh_archiver_db, + }, + }, + 'retention_policy': 2, + 'archival_max_age': 3600, + 'storages': archiver_storages, + 'source': 'src', + 'sources': ['src'], + } + orig_worker_cfg = ArchiverWithRetentionPolicyWorker.parse_config_file + ArchiverWithRetentionPolicyWorker.parse_config_file = ( + parse_config_file) + + # Create the base archiver + archiver = ArchiverWithRetentionPolicyDirector(start_id=None) + try: + yield archiver, storages + finally: + ArchiverWithRetentionPolicyDirector.parse_config_file = ( + orig_director_cfg) + ArchiverWithRetentionPolicyWorker.parse_config_file = ( + orig_worker_cfg) + shutil.rmtree(src_root) + shutil.rmtree(dest_root) + + +@pytest.fixture +def swh_archiver_storage(swh_archiver): + + log_root = tempfile.mkdtemp() + + config = { + 'cls': 'stub', + 'args': { + 'archives': { + 'present_archive': 'http://src:5003', + 'missing_archive': 'http://dest:5003', + }, + 'present': ['present_archive'], + 'missing': ['missing_archive'], + 'logfile_base': os.path.join(log_root, 'log_'), + } + } + try: + yield get_archiver_storage(**config) + finally: + shutil.rmtree(log_root) diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py --- a/swh/archiver/tests/test_archiver.py +++ b/swh/archiver/tests/test_archiver.py @@ -1,428 +1,212 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import glob -import os -import shutil -import tempfile -import unittest - +from contextlib import contextmanager import pytest -from swh.archiver import (ArchiverWithRetentionPolicyDirector, - ArchiverWithRetentionPolicyWorker) +from swh.archiver import ArchiverWithRetentionPolicyWorker from swh.archiver.db import utcnow -from swh.archiver.storage import get_archiver_storage -from swh.core.tests.db_testing import SingleDbTestFixture -from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError -from swh.archiver.tests import SQL_DIR -from swh.scheduler.tests.celery_testing import CeleryTestFixture -@pytest.mark.db -class TestArchiver(CeleryTestFixture, SingleDbTestFixture, unittest.TestCase): - """ Test the objstorage archiver. +def add_content(cursor, storage, content_data): + """ Add really a content to the given objstorage + + This put an empty status for the added content. + + Args: + storage_name: the concerned storage + content_data: the data to insert + with_row_insert: to insert a row entry in the db or not + + """ + # Add the content to the storage + obj_id = storage.add(content_data) + cursor.execute(""" INSERT INTO content (sha1) + VALUES (%s) + """, (obj_id,)) + return obj_id + + +def update_status(cursor, archiver, obj_id, storage_name, status, date=None): + """ Update the db status for the given id/storage_name. + + This does not create the content in the storage. + """ + cursor.execute("""insert into archive (name) + values (%s) + on conflict do nothing""", (storage_name,)) + + archiver.archiver_storage.content_archive_update( + obj_id, storage_name, status + ) + + +# Integration test +def test_archive_missing_content(swh_archiver_db, swh_archiver): + """ Run archiver on a missing content should archive it. + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_data = b'archive_missing_content' + obj_id = add_content(cursor, storages['src'], obj_data) + + update_status(cursor, archiver, obj_id, 'src', 'present') + # Content is missing on dest (entry not present in the db) + with pytest.raises(ObjNotFoundError): + storages['dest'].get(obj_id) + + archiver.run() + # now the content should be present on remote objstorage + remote_data = storages['dest'].get(obj_id) + assert obj_data == remote_data + + +def test_archive_present_content(swh_archiver_db, swh_archiver): + """ A content that is not 'missing' shouldn't be archived. """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_data = b'archive_present_content' + obj_id = add_content(cursor, storages['src'], obj_data) + + update_status(cursor, archiver, obj_id, 'src', 'present') + update_status(cursor, archiver, obj_id, 'dest', 'present') + + # After the run, the content should NOT be in the archive. + # As the archiver believe it was already in. + archiver.run() + with pytest.raises(ObjNotFoundError): + storages['dest'].get(obj_id) - TEST_DB_NAME = 'softwareheritage-archiver-test' - TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') - - def setUp(self): - # Launch the backup server - super().setUp() - - # Create source storage - self.src_root = tempfile.mkdtemp() - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - self.dest_root = tempfile.mkdtemp(prefix='remote') - dest_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6', - } - } - self.dest_storage = get_objstorage(**dest_config) - - # Keep mapped the id to the storages - self.storages = { - 'uffizi': self.src_storage, - 'banco': self.dest_storage - } - - # Override configurations - src_archiver_conf = {'host': 'uffizi'} - dest_archiver_conf = {'host': 'banco'} - src_archiver_conf.update(src_config) - dest_archiver_conf.update(dest_config) - self.archiver_storages = [src_archiver_conf, dest_archiver_conf] - self._override_director_config() - self._override_worker_config() - # Create the base archiver - self.archiver = self._create_director() - - def tearDown(self): - self.empty_tables() - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - super().tearDown() - - def empty_tables(self): - # Remove all content - self.cursor.execute('DELETE FROM content') - self.cursor.execute('DELETE FROM content_copies') - self.conn.commit() - - def _override_director_config(self, retention_policy=2): - """ Override the default config of the Archiver director - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'batch_max_size': 5000, - 'archival_max_age': 3600, - 'retention_policy': retention_policy, - 'asynchronous': False, - 'max_queue_length': 100000, - 'queue_throttling_delay': 120, - } - - def _override_worker_config(self): - """ Override the default config of the Archiver worker - to allow the tests to use the *-test db instead of the default one as - there is no configuration file for now. - """ - ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa - 'retention_policy': 2, - 'archival_max_age': 3600, - 'archiver_storage': { - 'cls': 'db', - 'args': { - 'dbconn': self.conn, - }, - }, - 'storages': self.archiver_storages, - 'source': 'uffizi', - 'sources': ['uffizi'], - } - - def _create_director(self): - return ArchiverWithRetentionPolicyDirector(start_id=None) - - def _create_worker(self, batch={}): - return ArchiverWithRetentionPolicyWorker(batch) - - def _add_content(self, storage_name, content_data): - """ Add really a content to the given objstorage - - This put an empty status for the added content. - - Args: - storage_name: the concerned storage - content_data: the data to insert - with_row_insert: to insert a row entry in the db or not - - """ - # Add the content to the storage - obj_id = self.storages[storage_name].add(content_data) - self.cursor.execute(""" INSERT INTO content (sha1) - VALUES (%s) - """, (obj_id,)) - return obj_id - - def _update_status(self, obj_id, storage_name, status, date=None): - """ Update the db status for the given id/storage_name. - - This does not create the content in the storage. - """ - self.cursor.execute("""insert into archive (name) - values (%s) - on conflict do nothing""", (storage_name,)) - - self.archiver.archiver_storage.content_archive_update( - obj_id, storage_name, status - ) - - # Integration test - def test_archive_missing_content(self): - """ Run archiver on a missing content should archive it. - """ - obj_data = b'archive_missing_content' - obj_id = self._add_content('uffizi', obj_data) - self._update_status(obj_id, 'uffizi', 'present') - # Content is missing on banco (entry not present in the db) - try: - self.dest_storage.get(obj_id) - except ObjNotFoundError: - pass - else: - self.fail('Content should not be present before archival') - self.archiver.run() - # now the content should be present on remote objstorage - remote_data = self.dest_storage.get(obj_id) - self.assertEqual(obj_data, remote_data) - - def test_archive_present_content(self): - """ A content that is not 'missing' shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_present_content') - self._update_status(obj_id, 'uffizi', 'present') - self._update_status(obj_id, 'banco', 'present') - # After the run, the content should NOT be in the archive. - # As the archiver believe it was already in. - self.archiver.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - def test_archive_already_enough(self): - """ A content missing with enough copies shouldn't be archived. - """ - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self._override_director_config(retention_policy=1) - director = self._create_director() + +@contextmanager +def override_config(obj, **kw): + orig = obj.config.copy() + obj.config.update(kw) + try: + yield + finally: + obj.config = orig + + +def test_archive_already_enough(swh_archiver_db, swh_archiver): + """ A content missing with enough copies shouldn't be archived. + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_data = b'archive_alread_enough' + obj_id = add_content(cursor, storages['src'], obj_data) + + update_status(cursor, archiver, obj_id, 'src', 'present') + + with override_config(archiver, retention_policy=1): # Obj is present in only one archive but only one copy is required. - director.run() - with self.assertRaises(ObjNotFoundError): - self.dest_storage.get(obj_id) - - def test_content_archive_get_copies(self): - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [], - ) - obj_id = self._add_content('uffizi', b'archive_alread_enough') - self._update_status(obj_id, 'uffizi', 'present') - self.assertCountEqual( - self.archiver.archiver_storage.content_archive_get_copies(), - [(obj_id, ['uffizi'], {})], - ) - - # Unit tests for archive worker - - def archival_elapsed(self, mtime): - return self._create_worker()._is_archival_delay_elapsed(mtime) - - def test_vstatus_ongoing_remaining(self): - self.assertFalse(self.archival_elapsed(utcnow())) - - def test_vstatus_ongoing_elapsed(self): - past_time = (utcnow() - - datetime.timedelta( - seconds=self._create_worker().archival_max_age)) - self.assertTrue(self.archival_elapsed(past_time)) - - def test_need_archival_missing(self): - """ A content should need archival when it is missing. - """ - status_copies = {'present': ['uffizi'], 'missing': ['banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - True) - - def test_need_archival_present(self): - """ A content present everywhere shouldn't need archival - """ - status_copies = {'present': ['uffizi', 'banco']} - worker = self._create_worker() - self.assertEqual(worker.need_archival(status_copies), - False) - - def _compute_copies_status(self, status): - """ A content with a given status should be detected correctly - """ - obj_id = self._add_content( - 'banco', b'compute_copies_' + bytes(status, 'utf8')) - self._update_status(obj_id, 'banco', status) - worker = self._create_worker() - self.assertIn('banco', worker.compute_copies( - set(worker.objstorages), obj_id)[status]) - - def test_compute_copies_present(self): - """ A present content should be detected with correct status - """ - self._compute_copies_status('present') - - def test_compute_copies_missing(self): - """ A missing content should be detected with correct status - """ - self._compute_copies_status('missing') - - def test_compute_copies_extra_archive(self): - obj_id = self._add_content('banco', b'foobar') - self._update_status(obj_id, 'banco', 'present') - self._update_status(obj_id, 'random_archive', 'present') - worker = self._create_worker() - copies = worker.compute_copies(set(worker.objstorages), obj_id) - self.assertEqual(copies['present'], {'banco'}) - self.assertEqual(copies['missing'], {'uffizi'}) - - def _get_backups(self, present, missing): - """ Return a list of the pair src/dest from the present and missing - """ - worker = self._create_worker() - return list(worker.choose_backup_servers(present, missing)) - - def test_choose_backup_servers(self): - self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) - self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) - # Even with more possible destinations, do not take more than the - # retention_policy require - self.assertEqual( - len(self._get_backups(['uffizi'], ['banco', 's3'])), - 1 - ) - - -class TestArchiverStorageStub(unittest.TestCase): - def setUp(self): - self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') - self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') - self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') - - src_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.src_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.src_storage = get_objstorage(**src_config) - - # Create destination storage - dest_config = { - 'cls': 'pathslicing', - 'args': { - 'root': self.dest_root, - 'slicing': '0:2/2:4/4:6' - } - } - self.dest_storage = get_objstorage(**dest_config) - - self.config = { - 'cls': 'stub', - 'args': { - 'archives': { - 'present_archive': 'http://uffizi:5003', - 'missing_archive': 'http://banco:5003', - }, - 'present': ['present_archive'], - 'missing': ['missing_archive'], - 'logfile_base': os.path.join(self.log_root, 'log_'), - } - } - - # Generated with: - # - # id_length = 20 - # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') - # - self.content_ids = [ - b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", - b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', - b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', - b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', - b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', - ] - - self.archiver_storage = get_archiver_storage(**self.config) - super().setUp() - - def tearDown(self): - shutil.rmtree(self.src_root) - shutil.rmtree(self.dest_root) - shutil.rmtree(self.log_root) - super().tearDown() - - def test_archive_ls(self): - self.assertCountEqual( - self.archiver_storage.archive_ls(), - self.config['args']['archives'].items() - ) - - def test_content_archive_get(self): - for content_id in self.content_ids: - self.assertEqual( - self.archiver_storage.content_archive_get(content_id), - (content_id, set(self.config['args']['present']), {}), - ) - - def test_content_archive_get_copies(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_copies(), - [], - ) - - def test_content_archive_get_unarchived_copies(self): - retention_policy = 2 - self.assertCountEqual( - self.archiver_storage.content_archive_get_unarchived_copies( - retention_policy), - [], - ) - - def test_content_archive_get_missing(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'missing_archive' - ), - self.content_ids, - ) - - self.assertCountEqual( - self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'present_archive' - ), - [], - ) - - with self.assertRaises(ValueError): - list(self.archiver_storage.content_archive_get_missing( - self.content_ids, - 'unknown_archive' - )) - - def test_content_archive_get_unknown(self): - self.assertCountEqual( - self.archiver_storage.content_archive_get_unknown( - self.content_ids, - ), - [], - ) - - def test_content_archive_update(self): - for content_id in self.content_ids: - self.archiver_storage.content_archive_update( - content_id, 'present_archive', 'present') - self.archiver_storage.content_archive_update( - content_id, 'missing_archive', 'present') - - self.archiver_storage.close_logfile() - - # Make sure we created a logfile - files = glob.glob('%s*' % self.config['args']['logfile_base']) - self.assertEqual(len(files), 1) - - # make sure the logfile contains all our lines - lines = open(files[0]).readlines() - self.assertEqual(len(lines), 2 * len(self.content_ids)) + archiver.run() + with pytest.raises(ObjNotFoundError): + storages['dest'].get(obj_id) + + +def test_content_archive_get_copies(swh_archiver_db, swh_archiver): + archiver, storages = swh_archiver + assert not list(archiver.archiver_storage.content_archive_get_copies()) + + cursor = swh_archiver_db.cursor() + obj_id = add_content(cursor, storages['src'], b'archive_alread_enough') + update_status(cursor, archiver, obj_id, 'src', 'present') + assert list(archiver.archiver_storage.content_archive_get_copies()) == \ + [(obj_id, ['src'], {})] + + +# Unit tests for archive worker + +def create_worker(batch={}): + return ArchiverWithRetentionPolicyWorker(batch) + + +def archival_elapsed(mtime): + return create_worker()._is_archival_delay_elapsed(mtime) + + +def test_vstatus_ongoing_remaining(swh_archiver): + assert not archival_elapsed(utcnow()) + + +def test_vstatus_ongoing_elapsed(swh_archiver): + past_time = (utcnow() + - datetime.timedelta( + seconds=create_worker().archival_max_age)) + assert archival_elapsed(past_time) + + +def test_need_archival_missing(swh_archiver): + """ A content should need archival when it is missing. + """ + status_copies = {'present': ['uffizi'], 'missing': ['banco']} + worker = create_worker() + assert worker.need_archival(status_copies) is True + + +def test_need_archival_present(swh_archiver): + """ A content present everywhere shouldn't need archival + """ + status_copies = {'present': ['uffizi', 'banco']} + worker = create_worker() + assert worker.need_archival(status_copies) is False + + +def compute_copies_status(cursor, archiver, storage, status): + """ A content with a given status should be detected correctly + """ + obj_id = add_content( + cursor, storage, b'compute_copies_' + bytes(status, 'utf8')) + update_status(cursor, archiver, obj_id, 'dest', status) + worker = create_worker() + assert 'dest' in worker.compute_copies( + set(worker.objstorages), obj_id)[status] + + +def test_compute_copies_present(swh_archiver, swh_archiver_db): + """ A present content should be detected with correct status + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + compute_copies_status(cursor, archiver, storages['dest'], 'present') + + +def test_compute_copies_missing(swh_archiver, swh_archiver_db): + """ A missing content should be detected with correct status + """ + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + compute_copies_status(cursor, archiver, storages['dest'], 'missing') + + +def test_compute_copies_extra_archive(swh_archiver, swh_archiver_db): + archiver, storages = swh_archiver + cursor = swh_archiver_db.cursor() + obj_id = add_content(cursor, storages['dest'], b'foobar') + update_status(cursor, archiver, obj_id, 'dest', 'present') + update_status(cursor, archiver, obj_id, 'random_archive', 'present') + worker = create_worker() + copies = worker.compute_copies(set(worker.objstorages), obj_id) + assert copies['present'] == {'dest'} + assert copies['missing'] == {'src'} + + +def get_backups(present=(), missing=()): + """ Return a list of the pair src/dest from the present and missing + """ + worker = create_worker() + return list(worker.choose_backup_servers(present, missing)) + + +def test_choose_backup_servers(swh_archiver, swh_archiver_db): + assert len(get_backups(['src', 'dest'])) == 0 + assert len(get_backups(['src'], ['dest'])) == 1 + # Even with more possible destinations, do not take more than the + # retention_policy require + assert len(get_backups(['src'], ['dest', 's3'])) == 1 diff --git a/swh/archiver/tests/test_archiver_storage.py b/swh/archiver/tests/test_archiver_storage.py new file mode 100644 --- /dev/null +++ b/swh/archiver/tests/test_archiver_storage.py @@ -0,0 +1,79 @@ +# Copyright (C) 2015-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +from glob import glob + + +# Generated with: +# +# id_length = 20 +# random.getrandbits(8 * id_length).to_bytes(id_length, 'big') +# +content_ids = [ + b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", + b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', + b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', + b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', + b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', +] + + +def test_archive_ls(swh_archiver_storage): + assert dict(swh_archiver_storage.archive_ls()) == { + 'present_archive': 'http://src:5003', + 'missing_archive': 'http://dest:5003'} + + +def test_content_archive_get(swh_archiver_storage): + for content_id in content_ids: + assert swh_archiver_storage.content_archive_get(content_id) == \ + (content_id, {'present_archive'}, {}) + + +def test_content_archive_get_copies(swh_archiver_storage): + assert list(swh_archiver_storage.content_archive_get_copies()) == [] + + +def test_content_archive_get_unarchived_copies(swh_archiver_storage): + retention_policy = 2 + assert list(swh_archiver_storage.content_archive_get_unarchived_copies( + retention_policy)) == [] + + +def test_content_archive_get_missing(swh_archiver_storage): + assert list(swh_archiver_storage.content_archive_get_missing( + content_ids, 'missing_archive')) == content_ids + + assert list(swh_archiver_storage.content_archive_get_missing( + content_ids, 'present_archive')) == [] + + with pytest.raises(ValueError): + list(swh_archiver_storage.content_archive_get_missing( + content_ids, 'unknown_archive')) + + +def test_content_archive_get_unknown(swh_archiver_storage): + assert list(swh_archiver_storage.content_archive_get_unknown( + content_ids)) == [] + + +def test_content_archive_update(swh_archiver_storage): + for content_id in content_ids: + swh_archiver_storage.content_archive_update( + content_id, 'present_archive', 'present') + swh_archiver_storage.content_archive_update( + content_id, 'missing_archive', 'present') + + swh_archiver_storage.close_logfile() + + # Make sure we created a logfile + logfile_base = swh_archiver_storage.logfile_base + files = glob('%s*' % logfile_base) + assert len(files) == 1 + + # make sure the logfile contains all our lines + lines = open(files[0]).readlines() + assert len(lines) == (2 * len(content_ids))