diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py index 710dfac..ea45df8 100644 --- a/swh/archiver/tests/test_archiver.py +++ b/swh/archiver/tests/test_archiver.py @@ -1,453 +1,429 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import glob -import tempfile +import os import shutil +import tempfile import unittest -import os -from nose.tools import istest from nose.plugins.attrib import attr -from swh.core.tests.db_testing import SingleDbTestFixture - -from swh.archiver.storage import get_archiver_storage - -from swh.archiver import ArchiverWithRetentionPolicyDirector -from swh.archiver import ArchiverWithRetentionPolicyWorker +from swh.archiver import (ArchiverWithRetentionPolicyDirector, + ArchiverWithRetentionPolicyWorker) from swh.archiver.db import utcnow - +from swh.archiver.storage import get_archiver_storage +from swh.core.tests.db_testing import SingleDbTestFixture from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError - DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') @attr('db') class TestArchiver(SingleDbTestFixture, unittest.TestCase): """ Test the objstorage archiver. """ TEST_DB_NAME = 'softwareheritage-archiver-test' TEST_DB_DUMP = os.path.join(DATA_DIR, 'dumps', 'swh-archiver.sql') TEST_DB_DUMP_TYPE = 'psql' def setUp(self): # Launch the backup server super().setUp() # Create source storage self.src_root = tempfile.mkdtemp() src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) self.dest_root = tempfile.mkdtemp(prefix='remote') dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6', } } self.dest_storage = get_objstorage(**dest_config) # Keep mapped the id to the storages self.storages = { 'uffizi': self.src_storage, 'banco': self.dest_storage } # Override configurations src_archiver_conf = {'host': 'uffizi'} dest_archiver_conf = {'host': 'banco'} src_archiver_conf.update(src_config) dest_archiver_conf.update(dest_config) self.archiver_storages = [src_archiver_conf, dest_archiver_conf] self._override_director_config() self._override_worker_config() # Create the base archiver self.archiver = self._create_director() def tearDown(self): self.empty_tables() shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) super().tearDown() def empty_tables(self): # Remove all content self.cursor.execute('DELETE FROM content') self.cursor.execute('DELETE FROM content_copies') self.conn.commit() def _override_director_config(self, retention_policy=2): """ Override the default config of the Archiver director to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'batch_max_size': 5000, 'archival_max_age': 3600, 'retention_policy': retention_policy, 'asynchronous': False, 'max_queue_length': 100000, 'queue_throttling_delay': 120, } def _override_worker_config(self): """ Override the default config of the Archiver worker to allow the tests to use the *-test db instead of the default one as there is no configuration file for now. """ ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa 'retention_policy': 2, 'archival_max_age': 3600, 'archiver_storage': { 'cls': 'db', 'args': { 'dbconn': self.conn, }, }, 'storages': self.archiver_storages, 'source': 'uffizi', 'sources': ['uffizi'], } def _create_director(self): return ArchiverWithRetentionPolicyDirector(start_id=None) def _create_worker(self, batch={}): return ArchiverWithRetentionPolicyWorker(batch) def _add_content(self, storage_name, content_data): """ Add really a content to the given objstorage This put an empty status for the added content. Args: storage_name: the concerned storage content_data: the data to insert with_row_insert: to insert a row entry in the db or not """ # Add the content to the storage obj_id = self.storages[storage_name].add(content_data) self.cursor.execute(""" INSERT INTO content (sha1) VALUES (%s) """, (obj_id,)) return obj_id def _update_status(self, obj_id, storage_name, status, date=None): """ Update the db status for the given id/storage_name. This does not create the content in the storage. """ self.cursor.execute("""insert into archive (name) values (%s) on conflict do nothing""", (storage_name,)) self.archiver.archiver_storage.content_archive_update( obj_id, storage_name, status ) # Integration test - @istest - def archive_missing_content(self): + def test_archive_missing_content(self): """ Run archiver on a missing content should archive it. """ obj_data = b'archive_missing_content' obj_id = self._add_content('uffizi', obj_data) self._update_status(obj_id, 'uffizi', 'present') # Content is missing on banco (entry not present in the db) try: self.dest_storage.get(obj_id) except ObjNotFoundError: pass else: self.fail('Content should not be present before archival') self.archiver.run() # now the content should be present on remote objstorage remote_data = self.dest_storage.get(obj_id) self.assertEquals(obj_data, remote_data) - @istest - def archive_present_content(self): + def test_archive_present_content(self): """ A content that is not 'missing' shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_present_content') self._update_status(obj_id, 'uffizi', 'present') self._update_status(obj_id, 'banco', 'present') # After the run, the content should NOT be in the archive. # As the archiver believe it was already in. self.archiver.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) - @istest - def archive_already_enough(self): + def test_archive_already_enough(self): """ A content missing with enough copies shouldn't be archived. """ obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self._override_director_config(retention_policy=1) director = self._create_director() # Obj is present in only one archive but only one copy is required. director.run() with self.assertRaises(ObjNotFoundError): self.dest_storage.get(obj_id) - @istest - def content_archive_get_copies(self): + def test_content_archive_get_copies(self): self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [], ) obj_id = self._add_content('uffizi', b'archive_alread_enough') self._update_status(obj_id, 'uffizi', 'present') self.assertCountEqual( self.archiver.archiver_storage.content_archive_get_copies(), [(obj_id, ['uffizi'], {})], ) # Unit tests for archive worker def archival_elapsed(self, mtime): return self._create_worker()._is_archival_delay_elapsed(mtime) - @istest - def vstatus_ongoing_remaining(self): + def test_vstatus_ongoing_remaining(self): self.assertFalse(self.archival_elapsed(utcnow())) - @istest - def vstatus_ongoing_elapsed(self): + def test_vstatus_ongoing_elapsed(self): past_time = (utcnow() - datetime.timedelta( seconds=self._create_worker().archival_max_age)) self.assertTrue(self.archival_elapsed(past_time)) - @istest - def need_archival_missing(self): + def test_need_archival_missing(self): """ A content should need archival when it is missing. """ status_copies = {'present': ['uffizi'], 'missing': ['banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), True) - @istest - def need_archival_present(self): + def test_need_archival_present(self): """ A content present everywhere shouldn't need archival """ status_copies = {'present': ['uffizi', 'banco']} worker = self._create_worker() self.assertEqual(worker.need_archival(status_copies), False) def _compute_copies_status(self, status): """ A content with a given status should be detected correctly """ obj_id = self._add_content( 'banco', b'compute_copies_' + bytes(status, 'utf8')) self._update_status(obj_id, 'banco', status) worker = self._create_worker() self.assertIn('banco', worker.compute_copies( set(worker.objstorages), obj_id)[status]) - @istest - def compute_copies_present(self): + def test_compute_copies_present(self): """ A present content should be detected with correct status """ self._compute_copies_status('present') - @istest - def compute_copies_missing(self): + def test_compute_copies_missing(self): """ A missing content should be detected with correct status """ self._compute_copies_status('missing') - @istest - def compute_copies_extra_archive(self): + def test_compute_copies_extra_archive(self): obj_id = self._add_content('banco', b'foobar') self._update_status(obj_id, 'banco', 'present') self._update_status(obj_id, 'random_archive', 'present') worker = self._create_worker() copies = worker.compute_copies(set(worker.objstorages), obj_id) self.assertEqual(copies['present'], {'banco'}) self.assertEqual(copies['missing'], {'uffizi'}) def _get_backups(self, present, missing): """ Return a list of the pair src/dest from the present and missing """ worker = self._create_worker() return list(worker.choose_backup_servers(present, missing)) - @istest - def choose_backup_servers(self): + def test_choose_backup_servers(self): self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) # Even with more possible destinations, do not take more than the # retention_policy require self.assertEqual( len(self._get_backups(['uffizi'], ['banco', 's3'])), 1 ) class TestArchiverStorageStub(unittest.TestCase): def setUp(self): self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local') self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote') self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log') src_config = { 'cls': 'pathslicing', 'args': { 'root': self.src_root, 'slicing': '0:2/2:4/4:6' } } self.src_storage = get_objstorage(**src_config) # Create destination storage dest_config = { 'cls': 'pathslicing', 'args': { 'root': self.dest_root, 'slicing': '0:2/2:4/4:6' } } self.dest_storage = get_objstorage(**dest_config) self.config = { 'cls': 'stub', 'args': { 'archives': { 'present_archive': 'http://uffizi:5003', 'missing_archive': 'http://banco:5003', }, 'present': ['present_archive'], 'missing': ['missing_archive'], 'logfile_base': os.path.join(self.log_root, 'log_'), } } # Generated with: # # id_length = 20 # random.getrandbits(8 * id_length).to_bytes(id_length, 'big') # self.content_ids = [ b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f", b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d', b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83', b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2', b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc', ] self.archiver_storage = get_archiver_storage(**self.config) super().setUp() def tearDown(self): shutil.rmtree(self.src_root) shutil.rmtree(self.dest_root) shutil.rmtree(self.log_root) super().tearDown() - @istest - def archive_ls(self): + def test_archive_ls(self): self.assertCountEqual( self.archiver_storage.archive_ls(), self.config['args']['archives'].items() ) - @istest - def content_archive_get(self): + def test_content_archive_get(self): for content_id in self.content_ids: self.assertEqual( self.archiver_storage.content_archive_get(content_id), (content_id, set(self.config['args']['present']), {}), ) - @istest - def content_archive_get_copies(self): + def test_content_archive_get_copies(self): self.assertCountEqual( self.archiver_storage.content_archive_get_copies(), [], ) - @istest - def content_archive_get_unarchived_copies(self): + def test_content_archive_get_unarchived_copies(self): retention_policy = 2 self.assertCountEqual( self.archiver_storage.content_archive_get_unarchived_copies( retention_policy), [], ) - @istest - def content_archive_get_missing(self): + def test_content_archive_get_missing(self): self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'missing_archive' ), self.content_ids, ) self.assertCountEqual( self.archiver_storage.content_archive_get_missing( self.content_ids, 'present_archive' ), [], ) with self.assertRaises(ValueError): list(self.archiver_storage.content_archive_get_missing( self.content_ids, 'unknown_archive' )) - @istest - def content_archive_get_unknown(self): + def test_content_archive_get_unknown(self): self.assertCountEqual( self.archiver_storage.content_archive_get_unknown( self.content_ids, ), [], ) - @istest - def content_archive_update(self): + def test_content_archive_update(self): for content_id in self.content_ids: self.archiver_storage.content_archive_update( content_id, 'present_archive', 'present') self.archiver_storage.content_archive_update( content_id, 'missing_archive', 'present') self.archiver_storage.close_logfile() # Make sure we created a logfile files = glob.glob('%s*' % self.config['args']['logfile_base']) self.assertEqual(len(files), 1) # make sure the logfile contains all our lines lines = open(files[0]).readlines() self.assertEqual(len(lines), 2 * len(self.content_ids)) diff --git a/swh/archiver/tests/test_checker.py b/swh/archiver/tests/test_checker.py index 1d92b5b..634067c 100644 --- a/swh/archiver/tests/test_checker.py +++ b/swh/archiver/tests/test_checker.py @@ -1,155 +1,147 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import gzip import tempfile import unittest -from nose.tools import istest from nose.plugins.attrib import attr -from swh.objstorage.exc import ObjNotFoundError from swh.archiver.checker import RepairContentChecker from swh.model import hashutil +from swh.objstorage.exc import ObjNotFoundError class MockBackupObjStorage(): def __init__(self): self.values = {} def add(self, value, obj_id): self.values[obj_id] = value def get(self, obj_id): try: return self.values[obj_id] except KeyError: raise ObjNotFoundError(obj_id) @attr('fs') class TestRepairChecker(unittest.TestCase): """ Test the content integrity checker """ def setUp(self): super().setUp() self._alter_config() self.checker = RepairContentChecker() self.checker.backups = [MockBackupObjStorage(), MockBackupObjStorage()] def _alter_config(self): RepairContentChecker.parse_config_file = ( lambda cls: { 'storage': {'cls': 'pathslicing', 'args': {'root': tempfile.mkdtemp(), 'slicing': '0:2/2:4/4:6'}}, 'batch_size': 1000, 'log_tag': 'objstorage_test', 'backup_storages': {} } ) def _corrupt_content(self, obj_id): """ Make the given content invalid. """ hex_obj_id = hashutil.hash_to_hex(obj_id) file_path = self.checker.objstorage._obj_path(hex_obj_id) with gzip.open(file_path, 'wb') as f: f.write(b'Unexpected content') def _is_corrupted(self, obj_id): """ Ensure the given object is corrupted """ return self.checker._check_content(obj_id) == 'corrupted' def _is_missing(self, obj_id): """ Ensure the given object is missing """ return self.checker._check_content(obj_id) == 'missing' - @istest - def check_valid_content(self): + def test_check_valid_content(self): # Check that a valid content is valid. content = b'check_valid_content' obj_id = self.checker.objstorage.add(content) self.assertFalse(self._is_corrupted(obj_id)) self.assertFalse(self._is_missing(obj_id)) - @istest - def check_corrupted_content(self): + def test_check_corrupted_content(self): # Check that an invalid content is noticed. content = b'check_corrupted_content' obj_id = self.checker.objstorage.add(content) self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) self.assertFalse(self._is_missing(obj_id)) - @istest - def check_missing_content(self): + def test_check_missing_content(self): obj_id = hashutil.hash_data(b'check_missing_content')['sha1'] self.assertFalse(self._is_corrupted(obj_id)) self.assertTrue(self._is_missing(obj_id)) - @istest - def repair_content_present_first(self): + def test_repair_content_present_first(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' obj_id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backups[0].add(content, obj_id) # Corrupt and repair it. self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) self.checker.corrupted_content(obj_id) self.assertFalse(self._is_corrupted(obj_id)) - @istest - def repair_content_present_second(self): + def test_repair_content_present_second(self): # Try to repair a content that is in the backup storage. content = b'repair_content_present_first' obj_id = self.checker.objstorage.add(content) # Add a content to the mock self.checker.backups[-1].add(content, obj_id) # Corrupt and repair it. self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) self.checker.corrupted_content(obj_id) self.assertFalse(self._is_corrupted(obj_id)) - @istest - def repair_content_present_distributed(self): + def test_repair_content_present_distributed(self): # Try to repair two contents that are in separate backup storages. content1 = b'repair_content_present_distributed_2' content2 = b'repair_content_present_distributed_1' obj_id1 = self.checker.objstorage.add(content1) obj_id2 = self.checker.objstorage.add(content2) # Add content to the mock. self.checker.backups[0].add(content1, obj_id1) self.checker.backups[1].add(content2, obj_id2) # Corrupt the contents self._corrupt_content(obj_id1) self._corrupt_content(obj_id2) self.assertTrue(self._is_corrupted(obj_id1)) self.assertTrue(self._is_corrupted(obj_id2)) # Repare them self.checker.corrupted_content(obj_id1) self.checker.corrupted_content(obj_id2) self.assertFalse(self._is_corrupted(obj_id1)) self.assertFalse(self._is_corrupted(obj_id2)) - @istest - def repair_content_missing(self): + def test_repair_content_missing(self): # Try to repair a content that is NOT in the backup storage. content = b'repair_content_missing' obj_id = self.checker.objstorage.add(content) # Corrupt the content self._corrupt_content(obj_id) self.assertTrue(self._is_corrupted(obj_id)) # Try to repair it self.checker.corrupted_content(obj_id) self.assertTrue(self._is_corrupted(obj_id))