Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_archiver.py
Show All 11 Lines | |||||
from datetime import datetime, timedelta | from datetime import datetime, timedelta | ||||
from swh.core import hashutil | from swh.core import hashutil | ||||
from swh.core.tests.db_testing import DbTestFixture | from swh.core.tests.db_testing import DbTestFixture | ||||
from server_testing import ServerTestFixture | from server_testing import ServerTestFixture | ||||
from swh.storage import Storage | from swh.storage import Storage | ||||
from swh.storage.exc import ObjNotFoundError | from swh.storage.exc import ObjNotFoundError | ||||
from swh.storage.archiver import ArchiverDirector | from swh.storage.archiver import ArchiverDirector, ArchiverWorker | ||||
from swh.storage.objstorage.api.client import RemoteObjStorage | from swh.storage.objstorage.api.client import RemoteObjStorage | ||||
from swh.storage.objstorage.api.server import app | from swh.storage.objstorage.api.server import app | ||||
TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | TEST_DIR = os.path.dirname(os.path.abspath(__file__)) | ||||
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') | TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') | ||||
@attr('db') | @attr('db') | ||||
class TestArchiver(DbTestFixture, ServerTestFixture, | class TestArchiver(DbTestFixture, ServerTestFixture, | ||||
unittest.TestCase): | unittest.TestCase): | ||||
""" Test the objstorage archiver. | """ Test the objstorage archiver. | ||||
""" | """ | ||||
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') | TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') | ||||
def setUp(self): | def setUp(self): | ||||
# Launch the backup server | # Launch the backup server | ||||
self.backup_objroot = tempfile.mkdtemp() | self.backup_objroot = tempfile.mkdtemp(prefix='remote') | ||||
self.config = {'storage_base': self.backup_objroot, | self.config = {'storage_base': self.backup_objroot, | ||||
'storage_depth': 3} | 'storage_depth': 3} | ||||
self.app = app | self.app = app | ||||
super().setUp() | super().setUp() | ||||
# Launch a client to check objects presence | # Launch a client to check objects presence | ||||
print("url", self.url()) | print("url", self.url()) | ||||
self.remote_objstorage = RemoteObjStorage(self.url()) | self.remote_objstorage = RemoteObjStorage(self.url()) | ||||
# Create the local storage. | # Create the local storage. | ||||
self.objroot = tempfile.mkdtemp() | self.objroot = tempfile.mkdtemp(prefix='local') | ||||
self.storage = Storage(self.conn, self.objroot) | self.storage = Storage(self.conn, self.objroot) | ||||
# Initializes and fill the tables. | # Initializes and fill the tables. | ||||
self.initialize_tables() | self.initialize_tables() | ||||
# Create the archiver | # Create the archiver | ||||
self.archiver = self.__create_director() | self.archiver = self.__create_director() | ||||
self.storage_data = ('Local', 'http://localhost:%s/' % self.port) | |||||
def tearDown(self): | def tearDown(self): | ||||
self.empty_tables() | self.empty_tables() | ||||
super().tearDown() | super().tearDown() | ||||
def initialize_tables(self): | def initialize_tables(self): | ||||
""" Initializes the database with a sample of items. | """ Initializes the database with a sample of items. | ||||
""" | """ | ||||
# Add an archive | # Add an archive | ||||
Show All 33 Lines | def __create_director(self, batch_size=5000, archival_max_age=3600, | ||||
'batch_max_size': batch_size, | 'batch_max_size': batch_size, | ||||
'archival_max_age': archival_max_age, | 'archival_max_age': archival_max_age, | ||||
'retention_policy': retention_policy, | 'retention_policy': retention_policy, | ||||
'asynchronous': asynchronous # Avoid depending on queue for tests. | 'asynchronous': asynchronous # Avoid depending on queue for tests. | ||||
} | } | ||||
director = ArchiverDirector(self.conn, config) | director = ArchiverDirector(self.conn, config) | ||||
return director | return director | ||||
def __create_worker(self, batch={}, config={}): | |||||
mstorage_args = [self.archiver.master_storage.db.conn, | |||||
self.objroot] | |||||
slaves = [self.storage_data] | |||||
if not config: | |||||
config = self.archiver.config | |||||
return ArchiverWorker(batch, mstorage_args, slaves, config) | |||||
# Integration test | |||||
@istest | @istest | ||||
def archive_missing_content(self): | def archive_missing_content(self): | ||||
""" Run archiver on a missing content should archive it. | """ Run archiver on a missing content should archive it. | ||||
""" | """ | ||||
content_data = b'archive_missing_content' | content_data = b'archive_missing_content' | ||||
id = self.__add_content(content_data) | id = self.__add_content(content_data) | ||||
# After the run, the content should be in the archive. | # After the run, the content should be in the archive. | ||||
self.archiver.run() | self.archiver.run() | ||||
remote_data = self.remote_objstorage.content_get(id) | remote_data = self.remote_objstorage.content_get(id) | ||||
self.assertEquals(content_data, remote_data) | self.assertEquals(content_data, remote_data) | ||||
@istest | @istest | ||||
def archive_present_content(self): | def archive_present_content(self): | ||||
""" A content that is not 'missing' shouldn't be archived. | """ A content that is not 'missing' shouldn't be archived. | ||||
""" | """ | ||||
id = self.__add_content(b'archive_present_content', status='present') | id = self.__add_content(b'archive_present_content', status='present') | ||||
# After the run, the content should NOT be in the archive.* | # After the run, the content should NOT be in the archive.* | ||||
self.archiver.run() | self.archiver.run() | ||||
with self.assertRaises(ObjNotFoundError): | with self.assertRaises(ObjNotFoundError): | ||||
self.remote_objstorage.content_get(id) | self.remote_objstorage.content_get(id) | ||||
@istest | @istest | ||||
def archive_ongoing_remaining(self): | def archive_already_enough(self): | ||||
ardumont: 'enough '
Ctrl-t just before the h in emacs will transpose the blank character and the t ^^ | |||||
""" A content that is ongoing and still have some time | """ A content missing with enough copies shouldn't be archived. | ||||
to be archived should not be rescheduled. | """ | ||||
""" | id = self.__add_content(b'archive_alread_enough') | ||||
id = self.__add_content(b'archive_ongoing_remaining', status='ongoing') | director = self.__create_director(retention_policy=0) | ||||
items = [x for batch in self.archiver.get_unarchived_content() | director.run() | ||||
for x in batch] | with self.assertRaises(ObjNotFoundError): | ||||
self.remote_objstorage.content_get(id) | |||||
# Unit test for ArchiverDirector | |||||
def vstatus(self, status, mtime): | |||||
return self.archiver.get_virtual_status(status, mtime) | |||||
@istest | |||||
def vstatus_present(self): | |||||
self.assertEquals( | |||||
self.vstatus('present', None), | |||||
'present' | |||||
) | |||||
@istest | |||||
def vstatus_missing(self): | |||||
self.assertEquals( | |||||
self.vstatus('missing', None), | |||||
'missing' | |||||
) | |||||
@istest | |||||
def vstatus_ongoing_remaining(self): | |||||
current_time = datetime.now() | |||||
self.assertEquals( | |||||
self.vstatus('ongoing', current_time), | |||||
'present' | |||||
) | |||||
@istest | |||||
def vstatus_ongoing_elapsed(self): | |||||
past_time = datetime.now() - timedelta( | |||||
seconds=self.archiver.config['archival_max_age'] + 1 | |||||
) | |||||
self.assertEquals( | |||||
self.vstatus('ongoing', past_time), | |||||
'missing' | |||||
) | |||||
# Unit tests for archive worker | |||||
@istest | |||||
def need_archival_missing(self): | |||||
""" A content should still need archival when it is missing. | |||||
""" | |||||
id = self.__add_content(b'need_archival_missing', status='missing') | |||||
id = r'\x' + hashutil.hash_to_hex(id) | |||||
worker = self.__create_worker() | |||||
self.assertEqual(worker.need_archival(id, self.storage_data), True) | |||||
@istest | |||||
def need_archival_present(self): | |||||
""" A content should still need archival when it is missing | |||||
""" | |||||
id = self.__add_content(b'need_archival_missing', status='present') | |||||
id = r'\x' + hashutil.hash_to_hex(id) | |||||
worker = self.__create_worker() | |||||
self.assertEqual(worker.need_archival(id, self.storage_data), False) | |||||
@istest | |||||
def need_archival_ongoing_remaining(self): | |||||
""" An ongoing archival with remaining time shouldnt need archival. | |||||
""" | |||||
id = self.__add_content(b'need_archival_ongoing_remaining', | |||||
status='ongoing', date="'%s'" % datetime.now()) | |||||
id = r'\x' + hashutil.hash_to_hex(id) | id = r'\x' + hashutil.hash_to_hex(id) | ||||
self.assertNotIn(id, items) | worker = self.__create_worker() | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), False) | |||||
@istest | @istest | ||||
def archive_ongoing_elapsed(self): | def need_archival_ongoing_elasped(self): | ||||
""" A content that is ongoing but with elapsed time should | """ An ongoing archival with elapsed time should be scheduled again. | ||||
be rescheduled. | |||||
""" | """ | ||||
# Create an ongoing archive content with time elapsed by 1s. | |||||
id = self.__add_content( | id = self.__add_content( | ||||
b'archive_ongoing_elapsed', | b'archive_ongoing_elapsed', | ||||
status='ongoing', | status='ongoing', | ||||
date="'%s'" % (datetime.now() - timedelta( | date="'%s'" % (datetime.now() - timedelta( | ||||
seconds=self.archiver.archival_max_age + 1 | seconds=self.archiver.config['archival_max_age'] + 1 | ||||
)) | )) | ||||
) | ) | ||||
items = [x for batch in self.archiver.get_unarchived_content() | |||||
for x in batch] | |||||
id = r'\x' + hashutil.hash_to_hex(id) | id = r'\x' + hashutil.hash_to_hex(id) | ||||
self.assertIn(id, items) | worker = self.__create_worker() | ||||
self.assertEqual(worker.need_archival(id, self.storage_data), True) | |||||
@istest | @istest | ||||
def archive_already_enough(self): | def content_sorting_by_archiver(self): | ||||
""" A content missing should not be archived if there | """ Check that the content is correctly sorted. | ||||
is already enough copies. | |||||
""" | """ | ||||
id = self.__add_content(b'archive_alread_enough') | batch = { | ||||
director = self.__create_director(retention_policy=0) | 'id1': { | ||||
director.run() | 'present': [('slave1', 'slave1_url')], | ||||
with self.assertRaises(ObjNotFoundError): | 'missing': [] | ||||
self.remote_objstorage.content_get(id) | }, | ||||
'id2': { | |||||
'present': [], | |||||
'missing': [('slave1', 'slave1_url')] | |||||
} | |||||
} | |||||
worker = self.__create_worker(batch=batch) | |||||
mapping = worker.sort_content_by_archive() | |||||
self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) | |||||
self.assertIn('id2', mapping[('slave1', 'slave1_url')]) |
'enough '
Ctrl-t just before the h in emacs will transpose the blank character and the t ^^