Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_archiver.py
Show All 10 Lines | |||||
from nose.tools import istest | from nose.tools import istest | ||||
from nose.plugins.attrib import attr | from nose.plugins.attrib import attr | ||||
from swh.core import hashutil | from swh.core import hashutil | ||||
from swh.core.tests.db_testing import DbsTestFixture | from swh.core.tests.db_testing import DbsTestFixture | ||||
from server_testing import ServerTestFixture | from server_testing import ServerTestFixture | ||||
from swh.storage import Storage | |||||
from swh.storage.archiver import ArchiverDirector, ArchiverWorker | from swh.storage.archiver import ArchiverDirector, ArchiverWorker | ||||
from swh.objstorage import get_objstorage | |||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from swh.objstorage.api.server import app | from swh.objstorage.api.server import app | ||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | ||||
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') | TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') | ||||
@attr('db') | @attr('db') | ||||
class TestArchiver(DbsTestFixture, ServerTestFixture, | class TestArchiver(DbsTestFixture, ServerTestFixture, | ||||
unittest.TestCase): | unittest.TestCase): | ||||
""" Test the objstorage archiver. | """ Test the objstorage archiver. | ||||
""" | """ | ||||
TEST_DB_NAMES = [ | TEST_DB_NAMES = [ | ||||
'softwareheritage-test', | |||||
'softwareheritage-archiver-test', | 'softwareheritage-archiver-test', | ||||
] | ] | ||||
TEST_DB_DUMPS = [ | TEST_DB_DUMPS = [ | ||||
os.path.join(TEST_DATA_DIR, 'dumps/swh.dump'), | |||||
os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), | os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), | ||||
] | ] | ||||
TEST_DB_DUMP_TYPES = [ | TEST_DB_DUMP_TYPES = [ | ||||
'pg_dump', | 'pg_dump', | ||||
'pg_dump', | |||||
] | ] | ||||
def setUp(self): | def setUp(self): | ||||
# Launch the backup server | # Launch the backup server | ||||
self.backup_objroot = tempfile.mkdtemp(prefix='remote') | dest_root = tempfile.mkdtemp(prefix='remote') | ||||
self.config = { | self.config = { | ||||
'storage_base': self.backup_objroot, | 'storage_base': dest_root, | ||||
'storage_slicing': '0:2/2:4/4:6' | 'storage_slicing': '0:2/2:4/4:6' | ||||
} | } | ||||
self.app = app | self.app = app | ||||
super().setUp() | super().setUp() | ||||
# Retrieve connection (depends on the order in TEST_DB_NAMES) | # Retrieve connection (depends on the order in TEST_DB_NAMES) | ||||
self.conn_storage = self.conns[0] # db connection to storage | self.conn = self.conns[0] # archiver db's connection | ||||
self.conn = self.conns[1] # archiver db's connection | self.cursor = self.cursors[0] | ||||
self.cursor = self.cursors[1] | |||||
# a reader storage to check content has been archived | # Create source storage | ||||
self.remote_objstorage = RemoteObjStorage(self.url()) | src_root = tempfile.mkdtemp() | ||||
# Create the local storage. | src_config = {'cls': 'pathslicing', | ||||
self.objroot = tempfile.mkdtemp(prefix='local') | 'args': {'root': src_root, | ||||
# a writer storage to store content before archiving | 'slicing': '0:2/2:4/4:6'}} | ||||
self.storage = Storage(self.conn_storage, self.objroot) | self.src_storage = get_objstorage(**src_config) | ||||
# Create destination storage | |||||
dest_config = {'cls': 'remote', | |||||
'args': {'base_url': self.url()}} | |||||
self.dest_storage = get_objstorage(**dest_config) | |||||
# Keep mapped the id to the storages | |||||
self.storages = {'uffizi': self.src_storage, | |||||
'banco': self.dest_storage} | |||||
# Create the archiver itself | |||||
src_archiver_conf = {'host': 'uffizi'} | |||||
dest_archiver_conf = {'host': 'banco'} | |||||
src_archiver_conf.update(src_config) | |||||
dest_archiver_conf.update(dest_config) | |||||
self.archiver_storages = [src_archiver_conf, dest_archiver_conf] | |||||
self._override_config() # Override the default config for db conn | |||||
self.archiver = self._create_director( | |||||
retention_policy=2, | |||||
storages=self.archiver_storages | |||||
) | |||||
# Initializes and fill the tables. | # Initializes and fill the tables. | ||||
self.initialize_tables() | self.initialize_tables() | ||||
# Create the archiver | |||||
self.archiver = self.__create_director() | |||||
self.storage_data = ('banco', 'http://localhost:%s/' % self.port) | |||||
def tearDown(self): | def tearDown(self): | ||||
self.empty_tables() | self.empty_tables() | ||||
super().tearDown() | super().tearDown() | ||||
def initialize_tables(self): | def initialize_tables(self): | ||||
""" Initializes the database with a sample of items. | """ Initializes the database with a sample of items. | ||||
""" | """ | ||||
# Add an archive (update existing one for technical reason, | # Add an archive (update existing one for technical reason, | ||||
# altering enum cannot run in a transaction...) | # altering enum cannot run in a transaction...) | ||||
self.cursor.execute("""UPDATE archive | self.cursor.execute("""UPDATE archive | ||||
SET url='{}' | SET url='{}' | ||||
WHERE id='banco' | WHERE id='banco' | ||||
""".format(self.url())) | """.format(self.url())) | ||||
self.conn.commit() | self.conn.commit() | ||||
def empty_tables(self): | def empty_tables(self): | ||||
# Remove all content | # Remove all content | ||||
self.cursor.execute('DELETE FROM content_archive') | self.cursor.execute('DELETE FROM content_archive') | ||||
self.conn.commit() | self.conn.commit() | ||||
def __add_content(self, content_data, status='missing', date=None): | def _override_config(self): | ||||
# Add the content to the storage | """ Override the default config of the Archiver director and worker | ||||
content = hashutil.hashdata(content_data) | to allow the tests to use the *-test db instead of the default one as | ||||
content.update({'data': content_data}) | there is no configuration file for now. | ||||
self.storage.content_add([content]) | |||||
# Then update database | Note that the default config file name is also overriden. If there is | ||||
content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) | a file with this name in the configuration directories, the tests | ||||
copies = {'banco': { | behavior may be altered. | ||||
'status': status, | """ | ||||
'mtime': date or int(time.time()) # if date is None, use now() | ArchiverWorker.parse_config_file = lambda obj: { | ||||
}} | 'retention_policy': 2, | ||||
self.cursor.execute("""INSERT INTO content_archive | 'archival_max_age': 3600, | ||||
VALUES('%s'::sha1, '%s') | 'dbconn': self.conn, | ||||
""" % (content_id, json.dumps(copies))) | 'storages': self.archiver_storages | ||||
return content['sha1'] | } | ||||
ArchiverDirector.pars_config_file = lambda obj: { | |||||
olasd: There's a typo here | |||||
def __get_missing(self): | 'dbconn': self.conn, | ||||
self.cursor.execute("""SELECT content_id | 'batch_max_size': 5000, | ||||
FROM content_archive | 'archival_max_age': 3600, | ||||
WHERE status='missing'""") | 'retention_policy': 2, | ||||
return self.cursor.fetchall() | 'asynchronous': False | ||||
} | |||||
def __create_director(self, batch_size=5000, archival_max_age=3600, | def _create_director(self, storages, batch_size=5000, | ||||
retention_policy=1, asynchronous=False): | archival_max_age=3600, retention_policy=2, | ||||
asynchronous=False): | |||||
config = { | config = { | ||||
'objstorage_type': 'local_objstorage', | 'dbconn': ('str', self.conn), | ||||
'objstorage_path': self.objroot, | 'batch_max_size': ('int', batch_size), | ||||
'objstorage_slicing': '0:2/2:4/4:6', | 'archival_max_age': ('int', archival_max_age), | ||||
'retention_policy': ('int', retention_policy), | |||||
'batch_max_size': batch_size, | 'asynchronous': ('bool', asynchronous) | ||||
'archival_max_age': archival_max_age, | } | ||||
'retention_policy': retention_policy, | return ArchiverDirector(config) | ||||
'asynchronous': asynchronous # Avoid depending on queue for tests. | |||||
def _create_worker(self, batch={}, retention_policy=2, | |||||
archival_max_age=3600): | |||||
ArchiverWorker.DEFAULT_CONFIG = { | |||||
olasdUnsubmitted Done Inline ActionsIs there still a point in doing this override? olasd: Is there still a point in doing this override? | |||||
'retention_policy': ('int', retention_policy), | |||||
'archival_max_age': ('int', archival_max_age), | |||||
'dbconn': ('str', self.conn), | |||||
'storages': ('dict', self.archiver_storages) | |||||
} | } | ||||
director = ArchiverDirector(db_conn_archiver=self.conn, | aw = ArchiverWorker(batch) | ||||
config=config) | return aw | ||||
return director | |||||
def _add_content(self, storage_name, content_data): | |||||
def __create_worker(self, batch={}, config={}): | """ Add really a content to the given objstorage | ||||
mobjstorage_args = self.archiver.master_objstorage_args | |||||
if not config: | This put an empty status for the added content. | ||||
config = self.archiver.config | """ | ||||
return ArchiverWorker(batch, | # Add the content to the storage | ||||
archiver_args=self.conn, | obj_id = self.storages[storage_name].add(content_data) | ||||
master_objstorage_args=mobjstorage_args, | db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) | ||||
slave_objstorages=[self.storage_data], | self.cursor.execute(""" INSERT INTO content_archive | ||||
config=config) | VALUES('%s', '{}') | ||||
""" % (db_obj_id)) | |||||
return obj_id | |||||
def _update_status(self, obj_id, storage_name, status, date=None): | |||||
""" Update the db status for the given id/storage_name. | |||||
This does not create the content in the storage. | |||||
""" | |||||
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) | |||||
self.archiver.archiver_storage.content_archive_update( | |||||
db_obj_id, storage_name, status | |||||
) | |||||
def _add_dated_content(self, obj_id, copies={}): | |||||
""" Fully erase the previous copies field for the given content id | |||||
This does not alter the contents into the objstorages. | |||||
""" | |||||
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) | |||||
self.cursor.execute(""" UPDATE TABLE content_archive | |||||
SET copies='%s' | |||||
WHERE content_id='%s' | |||||
""" % (json.dumps(copies), db_obj_id)) | |||||
# Integration test | # Integration test | ||||
@istest | @istest | ||||
def archive_missing_content(self): | def archive_missing_content(self): | ||||
""" Run archiver on a missing content should archive it. | """ Run archiver on a missing content should archive it. | ||||
""" | """ | ||||
content_data = b'archive_missing_content' | obj_data = b'archive_missing_content' | ||||
content_id = self.__add_content(content_data) | obj_id = self._add_content('uffizi', obj_data) | ||||
# before, the content should not be there | self._update_status(obj_id, 'uffizi', 'present') | ||||
# Content is missing on banco (entry not present in the db) | |||||
try: | try: | ||||
self.remote_objstorage.get(content_id) | self.dest_storage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
pass | pass | ||||
else: | else: | ||||
self.fail('Content should not be present before archival') | self.fail('Content should not be present before archival') | ||||
self.archiver.run() | self.archiver.run() | ||||
# now the content should be present on remote objstorage | # now the content should be present on remote objstorage | ||||
remote_data = self.remote_objstorage.content_get(content_id) | remote_data = self.dest_storage.get(obj_id) | ||||
self.assertEquals(content_data, remote_data) | self.assertEquals(obj_data, remote_data) | ||||
@istest | @istest | ||||
def archive_present_content(self): | def archive_present_content(self): | ||||
""" A content that is not 'missing' shouldn't be archived. | """ A content that is not 'missing' shouldn't be archived. | ||||
""" | """ | ||||
id = self.__add_content(b'archive_present_content', status='present') | obj_id = self._add_content('uffizi', b'archive_present_content') | ||||
# After the run, the content should NOT be in the archive.* | self._update_status(obj_id, 'uffizi', 'present') | ||||
self._update_status(obj_id, 'banco', 'present') | |||||
# After the run, the content should NOT be in the archive. | |||||
# As the archiver believe it was already in. | |||||
self.archiver.run() | self.archiver.run() | ||||
with self.assertRaises(ObjNotFoundError): | with self.assertRaises(ObjNotFoundError): | ||||
self.remote_objstorage.get(id) | self.dest_storage.get(obj_id) | ||||
@istest | @istest | ||||
def archive_already_enough(self): | def archive_already_enough(self): | ||||
""" A content missing with enough copies shouldn't be archived. | """ A content missing with enough copies shouldn't be archived. | ||||
""" | """ | ||||
id = self.__add_content(b'archive_alread_enough') | obj_id = self._add_content('uffizi', b'archive_alread_enough') | ||||
director = self.__create_director(retention_policy=0) | self._update_status(obj_id, 'uffizi', 'present') | ||||
director = self._create_director(self.archiver_storages, | |||||
retention_policy=1) | |||||
# Obj is present in only one archive but only one copy is required. | |||||
director.run() | director.run() | ||||
with self.assertRaises(ObjNotFoundError): | with self.assertRaises(ObjNotFoundError): | ||||
self.remote_objstorage.get(id) | self.dest_storage.get(obj_id) | ||||
# Unit test for ArchiverDirector | |||||
def vstatus(self, status, mtime): | # Unit tests for archive worker | ||||
return self.archiver.get_virtual_status(status, mtime) | |||||
@istest | def archival_elapsed(self, mtime): | ||||
def vstatus_present(self): | return self._create_worker()._is_archival_delay_elapsed(mtime) | ||||
self.assertEquals( | |||||
self.vstatus('present', None), | |||||
'present' | |||||
) | |||||
@istest | |||||
def vstatus_missing(self): | |||||
self.assertEquals( | |||||
self.vstatus('missing', None), | |||||
'missing' | |||||
) | |||||
@istest | @istest | ||||
def vstatus_ongoing_remaining(self): | def vstatus_ongoing_remaining(self): | ||||
self.assertEquals( | self.assertFalse(self.archival_elapsed(time.time())) | ||||
self.vstatus('ongoing', int(time.time())), | |||||
'present' | |||||
) | |||||
@istest | @istest | ||||
def vstatus_ongoing_elapsed(self): | def vstatus_ongoing_elapsed(self): | ||||
past_time = ( | past_time = ( | ||||
int(time.time()) - self.archiver.config['archival_max_age'] - 1 | time.time() - self._create_worker().archival_max_age | ||||
) | |||||
self.assertEquals( | |||||
self.vstatus('ongoing', past_time), | |||||
'missing' | |||||
) | ) | ||||
self.assertTrue(self.archival_elapsed(past_time)) | |||||
# Unit tests for archive worker | def _status(self, status, mtime=None): | ||||
""" Get a dict that match the copies structure | |||||
""" | |||||
return {'status': status, 'mtime': mtime or time.time()} | |||||
@istest | @istest | ||||
def need_archival_missing(self): | def need_archival_missing(self): | ||||
""" A content should still need archival when it is missing. | """ A content should need archival when it is missing. | ||||
""" | """ | ||||
id = self.__add_content(b'need_archival_missing', status='missing') | status_copies = {'present': ['uffizi'], 'missing': ['banco']} | ||||
id = r'\x' + hashutil.hash_to_hex(id) | worker = self._create_worker({}, retention_policy=2) | ||||
worker = self.__create_worker() | self.assertEqual(worker._need_archival(status_copies), | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), True) | True) | ||||
@istest | @istest | ||||
def need_archival_present(self): | def need_archival_present(self): | ||||
""" A content should still need archival when it is missing | """ A content present everywhere shouldn't need archival | ||||
""" | """ | ||||
id = self.__add_content(b'need_archival_missing', status='present') | status_copies = {'present': ['uffizi', 'banco']} | ||||
id = r'\x' + hashutil.hash_to_hex(id) | worker = self._create_worker({}, retention_policy=2) | ||||
worker = self.__create_worker() | self.assertEqual(worker._need_archival(status_copies), | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), False) | False) | ||||
@istest | def _compute_copies_status(self, status): | ||||
def need_archival_ongoing_remaining(self): | """ A content with a given status should be detected correctly | ||||
""" An ongoing archival with remaining time shouldnt need archival. | |||||
""" | """ | ||||
id = self.__add_content(b'need_archival_ongoing_remaining', | obj_id = self._add_content( | ||||
status='ongoing') | 'banco', b'compute_copies_' + bytes(status, 'utf8')) | ||||
id = r'\x' + hashutil.hash_to_hex(id) | self._update_status(obj_id, 'banco', status) | ||||
worker = self.__create_worker() | worker = self._create_worker() | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), False) | self.assertIn('banco', worker._compute_copies(obj_id)[status]) | ||||
@istest | @istest | ||||
def need_archival_ongoing_elasped(self): | def compute_copies_present(self): | ||||
""" An ongoing archival with elapsed time should be scheduled again. | """ A present content should be detected with correct status | ||||
""" | """ | ||||
id = self.__add_content( | self._compute_copies_status('present') | ||||
b'archive_ongoing_elapsed', | |||||
status='ongoing', | |||||
date=( | |||||
int(time.time()) - self.archiver.config['archival_max_age'] - 1 | |||||
) | |||||
) | |||||
id = r'\x' + hashutil.hash_to_hex(id) | |||||
worker = self.__create_worker() | |||||
self.assertEqual(worker.need_archival(id, self.storage_data), True) | |||||
@istest | @istest | ||||
def content_sorting_by_archiver(self): | def compute_copies_missing(self): | ||||
""" Check that the content is correctly sorted. | """ A missing content should be detected with correct status | ||||
""" | """ | ||||
batch = { | self._compute_copies_status('missing') | ||||
'id1': { | |||||
'present': [('slave1', 'slave1_url')], | def _get_backups(self, present, missing): | ||||
'missing': [] | """ Return a list of the pair src/dest from the present and missing | ||||
}, | """ | ||||
'id2': { | worker = self._create_worker() | ||||
'present': [], | return list(worker._choose_backup_servers(present, missing)) | ||||
'missing': [('slave1', 'slave1_url')] | |||||
} | @istest | ||||
} | def choose_backup_servers(self): | ||||
worker = self.__create_worker(batch=batch) | self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) | ||||
mapping = worker.sort_content_by_archive() | self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) | ||||
self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) | # Even with more possible destinations, do not take more than the | ||||
self.assertIn('id2', mapping[('slave1', 'slave1_url')]) | # retention_policy require | ||||
self.assertEqual( | |||||
len(self._get_backups(['uffizi'], ['banco', 's3'])), | |||||
1 | |||||
) |
There's a typo here