Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_archiver.py
Show All 10 Lines | |||||
from nose.tools import istest | from nose.tools import istest | ||||
from nose.plugins.attrib import attr | from nose.plugins.attrib import attr | ||||
from swh.core import hashutil | from swh.core import hashutil | ||||
from swh.core.tests.db_testing import DbsTestFixture | from swh.core.tests.db_testing import DbsTestFixture | ||||
from server_testing import ServerTestFixture | from server_testing import ServerTestFixture | ||||
from swh.storage import Storage | |||||
from swh.storage.archiver import ArchiverDirector, ArchiverWorker | from swh.storage.archiver import ArchiverDirector, ArchiverWorker | ||||
from swh.objstorage import get_objstorage | |||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from swh.objstorage.api.server import app | from swh.objstorage.api.server import app | ||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | ||||
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') | TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') | ||||
@attr('db') | @attr('db') | ||||
class TestArchiver(DbsTestFixture, ServerTestFixture, | class TestArchiver(DbsTestFixture, ServerTestFixture, | ||||
unittest.TestCase): | unittest.TestCase): | ||||
""" Test the objstorage archiver. | """ Test the objstorage archiver. | ||||
""" | """ | ||||
TEST_DB_NAMES = [ | TEST_DB_NAMES = [ | ||||
'softwareheritage-test', | |||||
'softwareheritage-archiver-test', | 'softwareheritage-archiver-test', | ||||
] | ] | ||||
TEST_DB_DUMPS = [ | TEST_DB_DUMPS = [ | ||||
os.path.join(TEST_DATA_DIR, 'dumps/swh.dump'), | |||||
os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), | os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'), | ||||
] | ] | ||||
TEST_DB_DUMP_TYPES = [ | TEST_DB_DUMP_TYPES = [ | ||||
'pg_dump', | 'pg_dump', | ||||
'pg_dump', | |||||
] | ] | ||||
def setUp(self): | def setUp(self): | ||||
# Launch the backup server | # Launch the backup server | ||||
self.backup_objroot = tempfile.mkdtemp(prefix='remote') | dest_root = tempfile.mkdtemp(prefix='remote') | ||||
self.config = { | self.config = { | ||||
'storage_base': self.backup_objroot, | 'storage_base': dest_root, | ||||
'storage_slicing': '0:2/2:4/4:6' | 'storage_slicing': '0:2/2:4/4:6' | ||||
} | } | ||||
self.app = app | self.app = app | ||||
super().setUp() | super().setUp() | ||||
# Retrieve connection (depends on the order in TEST_DB_NAMES) | # Retrieve connection (depends on the order in TEST_DB_NAMES) | ||||
self.conn_storage = self.conns[0] # db connection to storage | self.conn = self.conns[0] # archiver db's connection | ||||
self.conn = self.conns[1] # archiver db's connection | self.cursor = self.cursors[0] | ||||
self.cursor = self.cursors[1] | |||||
# a reader storage to check content has been archived | # Create source storage | ||||
self.remote_objstorage = RemoteObjStorage(self.url()) | src_root = tempfile.mkdtemp() | ||||
# Create the local storage. | src_config = {'cls': 'pathslicing', | ||||
self.objroot = tempfile.mkdtemp(prefix='local') | 'args': {'root': src_root, | ||||
# a writer storage to store content before archiving | 'slicing': '0:2/2:4/4:6'}} | ||||
self.storage = Storage(self.conn_storage, self.objroot) | self.src_storage = get_objstorage(**src_config) | ||||
# Create destination storage | |||||
dest_config = {'cls': 'remote', | |||||
'args': {'base_url': self.url()}} | |||||
self.dest_storage = get_objstorage(**dest_config) | |||||
# Keep mapped the id to the storages | |||||
self.storages = {'uffizi': self.src_storage, | |||||
'banco': self.dest_storage} | |||||
# Create the archiver itself | |||||
src_archiver_conf = {'host': 'uffizi'} | |||||
dest_archiver_conf = {'host': 'banco'} | |||||
src_archiver_conf.update(src_config) | |||||
dest_archiver_conf.update(dest_config) | |||||
self.archiver_storages = [src_archiver_conf, dest_archiver_conf] | |||||
self.archiver = self._create_director( | |||||
retention_policy=2, | |||||
storages=self.archiver_storages | |||||
) | |||||
# Create a base worker | |||||
self.archiver_worker = self._create_worker() | |||||
# Initializes and fill the tables. | # Initializes and fill the tables. | ||||
self.initialize_tables() | self.initialize_tables() | ||||
# Create the archiver | |||||
self.archiver = self.__create_director() | |||||
self.storage_data = ('banco', 'http://localhost:%s/' % self.port) | |||||
def tearDown(self): | def tearDown(self): | ||||
self.empty_tables() | self.empty_tables() | ||||
super().tearDown() | super().tearDown() | ||||
def initialize_tables(self): | def initialize_tables(self): | ||||
""" Initializes the database with a sample of items. | """ Initializes the database with a sample of items. | ||||
""" | """ | ||||
# Add an archive (update existing one for technical reason, | # Add an archive (update existing one for technical reason, | ||||
# altering enum cannot run in a transaction...) | # altering enum cannot run in a transaction...) | ||||
self.cursor.execute("""UPDATE archive | self.cursor.execute("""UPDATE archive | ||||
SET url='{}' | SET url='{}' | ||||
WHERE id='banco' | WHERE id='banco' | ||||
""".format(self.url())) | """.format(self.url())) | ||||
self.conn.commit() | self.conn.commit() | ||||
def empty_tables(self): | def empty_tables(self): | ||||
# Remove all content | # Remove all content | ||||
self.cursor.execute('DELETE FROM content_archive') | self.cursor.execute('DELETE FROM content_archive') | ||||
self.conn.commit() | self.conn.commit() | ||||
def __add_content(self, content_data, status='missing', date=None): | def _create_director(self, storages, batch_size=5000, | ||||
archival_max_age=3600, retention_policy=2, | |||||
asynchronous=False): | |||||
config = { | |||||
'storages': storages, | |||||
'batch_max_size': batch_size, | |||||
'archival_max_age': archival_max_age, | |||||
'retention_policy': retention_policy, | |||||
'asynchronous': asynchronous | |||||
} | |||||
return ArchiverDirector(self.conn, config) | |||||
def _add_content(self, storage_name, content_data): | |||||
""" Add really a content to the given objstorage | |||||
olasd: There's a typo here | |||||
This put an empty status for the added content. | |||||
""" | |||||
# Add the content to the storage | # Add the content to the storage | ||||
Done Inline ActionsIs there still a point in doing this override? olasd: Is there still a point in doing this override? | |||||
content = hashutil.hashdata(content_data) | obj_id = self.storages[storage_name].add(content_data) | ||||
content.update({'data': content_data}) | db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) | ||||
self.storage.content_add([content]) | |||||
# Then update database | |||||
content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) | |||||
copies = {'banco': { | |||||
'status': status, | |||||
'mtime': date or int(time.time()) # if date is None, use now() | |||||
}} | |||||
self.cursor.execute("""INSERT INTO content_archive | self.cursor.execute(""" INSERT INTO content_archive | ||||
VALUES('%s'::sha1, '%s') | VALUES('%s', '{}') | ||||
""" % (content_id, json.dumps(copies))) | """ % (db_obj_id)) | ||||
return content['sha1'] | return obj_id | ||||
def __get_missing(self): | |||||
self.cursor.execute("""SELECT content_id | |||||
FROM content_archive | |||||
WHERE status='missing'""") | |||||
return self.cursor.fetchall() | |||||
def __create_director(self, batch_size=5000, archival_max_age=3600, | def _update_status(self, obj_id, storage_name, status, date=None): | ||||
retention_policy=1, asynchronous=False): | """ Update the db status for the given id/storage_name. | ||||
config = { | |||||
'objstorage_type': 'local_objstorage', | |||||
'objstorage_path': self.objroot, | |||||
'objstorage_slicing': '0:2/2:4/4:6', | |||||
'batch_max_size': batch_size, | This does not create the content in the storage. | ||||
'archival_max_age': archival_max_age, | """ | ||||
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) | |||||
self.archiver.archiver_storage.content_archive_update( | |||||
db_obj_id, storage_name, status | |||||
) | |||||
def _add_dated_content(self, obj_id, copies={}): | |||||
""" Fully erase the previous copies field for the given content id | |||||
This does not alter the contents into the objstorages. | |||||
""" | |||||
db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id) | |||||
self.cursor.execute(""" UPDATE TABLE content_archive | |||||
SET copies='%s' | |||||
WHERE content_id='%s' | |||||
""" % (json.dumps(copies), db_obj_id)) | |||||
def _create_worker(self, batch={}, retention_policy=2, | |||||
archival_max_age=3600): | |||||
archival_policy = { | |||||
'retention_policy': retention_policy, | 'retention_policy': retention_policy, | ||||
'asynchronous': asynchronous # Avoid depending on queue for tests. | 'archival_max_age': archival_max_age | ||||
} | } | ||||
director = ArchiverDirector(db_conn_archiver=self.conn, | return ArchiverWorker(batch, self.archiver_storages, | ||||
config=config) | self.conn, archival_policy) | ||||
return director | |||||
def __create_worker(self, batch={}, config={}): | |||||
mobjstorage_args = self.archiver.master_objstorage_args | |||||
if not config: | |||||
config = self.archiver.config | |||||
return ArchiverWorker(batch, | |||||
archiver_args=self.conn, | |||||
master_objstorage_args=mobjstorage_args, | |||||
slave_objstorages=[self.storage_data], | |||||
config=config) | |||||
# Integration test | # Integration test | ||||
@istest | @istest | ||||
def archive_missing_content(self): | def archive_missing_content(self): | ||||
""" Run archiver on a missing content should archive it. | """ Run archiver on a missing content should archive it. | ||||
""" | """ | ||||
content_data = b'archive_missing_content' | obj_data = b'archive_missing_content' | ||||
content_id = self.__add_content(content_data) | obj_id = self._add_content('uffizi', obj_data) | ||||
# before, the content should not be there | self._update_status(obj_id, 'uffizi', 'present') | ||||
# Content is missing on banco (entry not present in the db) | |||||
try: | try: | ||||
self.remote_objstorage.get(content_id) | self.dest_storage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
pass | pass | ||||
else: | else: | ||||
self.fail('Content should not be present before archival') | self.fail('Content should not be present before archival') | ||||
self.archiver.run() | self.archiver.run() | ||||
# now the content should be present on remote objstorage | # now the content should be present on remote objstorage | ||||
remote_data = self.remote_objstorage.content_get(content_id) | remote_data = self.dest_storage.get(obj_id) | ||||
self.assertEquals(content_data, remote_data) | self.assertEquals(obj_data, remote_data) | ||||
@istest | @istest | ||||
def archive_present_content(self): | def archive_present_content(self): | ||||
""" A content that is not 'missing' shouldn't be archived. | """ A content that is not 'missing' shouldn't be archived. | ||||
""" | """ | ||||
id = self.__add_content(b'archive_present_content', status='present') | obj_id = self._add_content('uffizi', b'archive_present_content') | ||||
# After the run, the content should NOT be in the archive.* | self._update_status(obj_id, 'uffizi', 'present') | ||||
self._update_status(obj_id, 'banco', 'present') | |||||
# After the run, the content should NOT be in the archive. | |||||
# As the archiver believe it was already in. | |||||
self.archiver.run() | self.archiver.run() | ||||
with self.assertRaises(ObjNotFoundError): | with self.assertRaises(ObjNotFoundError): | ||||
self.remote_objstorage.get(id) | self.dest_storage.get(obj_id) | ||||
@istest | @istest | ||||
def archive_already_enough(self): | def archive_already_enough(self): | ||||
""" A content missing with enough copies shouldn't be archived. | """ A content missing with enough copies shouldn't be archived. | ||||
""" | """ | ||||
id = self.__add_content(b'archive_alread_enough') | obj_id = self._add_content('uffizi', b'archive_alread_enough') | ||||
director = self.__create_director(retention_policy=0) | self._update_status(obj_id, 'uffizi', 'present') | ||||
director = self._create_director(self.archiver_storages, | |||||
retention_policy=1) | |||||
# Obj is present in only one archive but only one copy is required. | |||||
director.run() | director.run() | ||||
with self.assertRaises(ObjNotFoundError): | with self.assertRaises(ObjNotFoundError): | ||||
self.remote_objstorage.get(id) | self.dest_storage.get(obj_id) | ||||
# Unit test for ArchiverDirector | # Unit tests for archive worker | ||||
def vstatus(self, status, mtime): | def vstatus(self, status, mtime): | ||||
return self.archiver.get_virtual_status(status, mtime) | return self.archiver_worker._get_virtual_status(status, mtime) | ||||
@istest | @istest | ||||
def vstatus_present(self): | def vstatus_present(self): | ||||
self.assertEquals( | self.assertEquals( | ||||
self.vstatus('present', None), | self.vstatus('present', None), | ||||
'present' | 'present' | ||||
) | ) | ||||
@istest | @istest | ||||
def vstatus_missing(self): | def vstatus_missing(self): | ||||
self.assertEquals( | self.assertEquals( | ||||
self.vstatus('missing', None), | self.vstatus('missing', None), | ||||
'missing' | 'missing' | ||||
) | ) | ||||
@istest | @istest | ||||
def vstatus_ongoing_remaining(self): | def vstatus_ongoing_remaining(self): | ||||
self.assertEquals( | self.assertEquals( | ||||
self.vstatus('ongoing', int(time.time())), | self.vstatus('ongoing', time.time()), | ||||
'present' | 'present' | ||||
) | ) | ||||
@istest | @istest | ||||
def vstatus_ongoing_elapsed(self): | def vstatus_ongoing_elapsed(self): | ||||
past_time = ( | past_time = ( | ||||
int(time.time()) - self.archiver.config['archival_max_age'] - 1 | time.time() - self.archiver_worker.archival_policy[ | ||||
'archival_max_age' | |||||
] - 1 | |||||
) | ) | ||||
self.assertEquals( | self.assertEquals( | ||||
self.vstatus('ongoing', past_time), | self.vstatus('ongoing', past_time), | ||||
'missing' | 'missing' | ||||
) | ) | ||||
# Unit tests for archive worker | def _status(self, status, mtime=None): | ||||
""" Get a dict that match the copies structure | |||||
""" | |||||
return {'status': status, 'mtime': mtime or time.time()} | |||||
@istest | @istest | ||||
def need_archival_missing(self): | def need_archival_missing(self): | ||||
""" A content should still need archival when it is missing. | """ A content should need archival when it is missing. | ||||
""" | """ | ||||
id = self.__add_content(b'need_archival_missing', status='missing') | status_copies = {'present': ['uffizi'], 'missing': ['banco']} | ||||
id = r'\x' + hashutil.hash_to_hex(id) | worker = self._create_worker({}, retention_policy=2) | ||||
worker = self.__create_worker() | self.assertEqual(worker._need_archival(status_copies), | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), True) | True) | ||||
@istest | @istest | ||||
def need_archival_present(self): | def need_archival_present(self): | ||||
""" A content should still need archival when it is missing | """ A content present everywhere shouldn't need archival | ||||
""" | |||||
status_copies = {'present': ['uffizi', 'banco']} | |||||
worker = self._create_worker({}, retention_policy=2) | |||||
self.assertEqual(worker._need_archival(status_copies), | |||||
False) | |||||
def _compute_copies_status(self, status): | |||||
""" A content with a given status should be detected correctly | |||||
""" | """ | ||||
id = self.__add_content(b'need_archival_missing', status='present') | obj_id = self._add_content( | ||||
id = r'\x' + hashutil.hash_to_hex(id) | 'banco', b'compute_copies_' + bytes(status, 'utf8')) | ||||
worker = self.__create_worker() | self._update_status(obj_id, 'banco', status) | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), False) | worker = self._create_worker() | ||||
self.assertIn('banco', worker._compute_copies(obj_id)[status]) | |||||
@istest | @istest | ||||
def need_archival_ongoing_remaining(self): | def compute_copies_present(self): | ||||
""" An ongoing archival with remaining time shouldnt need archival. | """ A present content should be detected with correct status | ||||
""" | """ | ||||
id = self.__add_content(b'need_archival_ongoing_remaining', | self._compute_copies_status('present') | ||||
status='ongoing') | |||||
id = r'\x' + hashutil.hash_to_hex(id) | |||||
worker = self.__create_worker() | |||||
self.assertEqual(worker.need_archival(id, self.storage_data), False) | |||||
@istest | @istest | ||||
def need_archival_ongoing_elasped(self): | def compute_copies_missing(self): | ||||
""" An ongoing archival with elapsed time should be scheduled again. | """ A missing content should be detected with correct status | ||||
""" | """ | ||||
id = self.__add_content( | self._compute_copies_status('missing') | ||||
b'archive_ongoing_elapsed', | |||||
status='ongoing', | def _get_backups(self, present, missing): | ||||
date=( | """ Return a list of the pair src/dest from the present and missing | ||||
int(time.time()) - self.archiver.config['archival_max_age'] - 1 | """ | ||||
) | worker = self._create_worker() | ||||
return list(worker._choose_backup_servers(present, missing)) | |||||
@istest | |||||
def choose_backup_servers(self): | |||||
self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0) | |||||
self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1) | |||||
# Even with more possible destinations, do not take more than the | |||||
# retention_policy require | |||||
self.assertEqual( | |||||
len(self._get_backups(['uffizi'], ['banco', 's3'])), | |||||
1 | |||||
) | ) | ||||
id = r'\x' + hashutil.hash_to_hex(id) | |||||
worker = self.__create_worker() | |||||
self.assertEqual(worker.need_archival(id, self.storage_data), True) | |||||
@istest | |||||
def content_sorting_by_archiver(self): | |||||
""" Check that the content is correctly sorted. | |||||
""" | |||||
batch = { | |||||
'id1': { | |||||
'present': [('slave1', 'slave1_url')], | |||||
'missing': [] | |||||
}, | |||||
'id2': { | |||||
'present': [], | |||||
'missing': [('slave1', 'slave1_url')] | |||||
} | |||||
} | |||||
worker = self.__create_worker(batch=batch) | |||||
mapping = worker.sort_content_by_archive() | |||||
self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) | |||||
self.assertIn('id2', mapping[('slave1', 'slave1_url')]) |
There's a typo here