diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -4,13 +4,13 @@ # See top-level LICENSE file for more information import swh +import logging from datetime import datetime -from swh.core import hashutil, config +from swh.core import hashutil from swh.scheduler.celery_backend.config import app from . import tasks # NOQA -from ..db import cursor_to_bytes DEFAULT_CONFIG = { @@ -26,6 +26,8 @@ task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' +logger = logging.getLogger() + class ArchiverDirector(): """Process the files in order to know which one is needed as backup. @@ -38,12 +40,18 @@ master_storage: the local storage of the master server. slave_storages: Iterable of remote obj storages to the slaves servers used for backup. - batch_max_size: The number of content items that can be given - to the same archiver worker. - archival_max_age: Delay given to the worker to copy all the files - in a given batch. - retention_policy: Required number of copies for the content to - be considered safe. + config: Archiver_configuration. A dictionary that must contain + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. """ def __init__(self, db_conn, config): @@ -52,13 +60,13 @@ Args: db_conn: db_conn: Either a libpq connection string, or a psycopg2 connection. - config: Archiver_configuration. A dictionnary that must contains + config: Archiver_configuration. A dictionary that must contains the following keys. objstorage_path (string): the path of the objstorage of the master. batch_max_size (int): The number of content items that can be given to the same archiver worker. - archival_max_age (int): Delay given to the worker to cpy all + archival_max_age (int): Delay given to the worker to copy all the files in a given batch. retention_policy (int): Required number of copies for the content to be considered safe. @@ -81,10 +89,7 @@ self.master_storage = master_storage self.slave_storages = slaves - self.batch_max_size = config['batch_max_size'] - self.archival_max_age = config['archival_max_age'] - self.retention_policy = config['retention_policy'] - self.is_asynchronous = config['asynchronous'] + self.config = config def run(self): """ Run the archiver director. @@ -92,9 +97,10 @@ The archiver director will check all the contents of the archiver database and do the required backup jobs. """ - run_fn = (self.run_async_worker - if self.is_asynchronous - else self.run_sync_worker) + if self.config['asynchronous']: + run_fn = self.run_async_worker + else: + run_fn = self.run_sync_worker for batch in self.get_unarchived_content(): run_fn(batch) @@ -103,20 +109,20 @@ """ task = app.tasks[task_name] task.delay(batch, self.master_storage_args, - self.slave_storages, self.retention_policy) + self.slave_storages, self.config['retention_policy']) def run_sync_worker(self, batch): """ Run synchronously a worker on the given batch. """ task = app.tasks[task_name] task(batch, self.master_storage_args, - self.slave_storages, self.retention_policy) + self.slave_storages, self.config) def get_unarchived_content(self): """ get all the contents that needs to be archived. Yields: - A batch of contents. Batches are dictionnaries which associates + A batch of contents. Batches are dictionaries which associates a content id to the data about servers that contains it or not. {'id1': @@ -130,7 +136,7 @@ ('slave1', 'slave1_url'), ('slave2', 'slave2_url'), ('slave3', 'slave3_url') - ] + ]} } Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) @@ -142,103 +148,71 @@ # Get the data about each content referenced into the archiver. missing_copy = {} for content_id in self.master_storage.db.content_archive_ls(): - # Do some initializations db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) - # Query in order to know in which servers the content is saved. + # Fetch the datas about archival status of the content backups = self.master_storage.db.content_archive_get( content=db_content_id ) for _content_id, server_id, status, mtime in backups: - - # If the content is ongoing but still have time, there is - # another worker working on this content. - if status == 'ongoing': - mtime = mtime.replace(tzinfo=None) - elapsed = (datetime.now() - mtime).total_seconds() - if elapsed < self.archival_max_age: - continue + virtual_status = self.get_virtual_status(status, mtime) server_data = (server_id, self.slave_storages[server_id]) + missing_copy.setdefault( db_content_id, {'present': [], 'missing': []} - ).setdefault(status, []).append(server_data) + ).setdefault(virtual_status, []).append(server_data) # Check the content before archival. - # TODO catch exception and try to restore the file from an - # archive? - self.master_storage.objstorage.check(content_id[0]) - - if len(missing_copy) >= self.batch_max_size: + try: + self.master_storage.objstorage.check(content_id[0]) + except Exception as e: + # Exception can be Error or ObjNotFoundError. + logger.error(e) + # TODO Do something to restore the content? + + if len(missing_copy) >= self.config['batch_max_size']: yield missing_copy missing_copy = {} if len(missing_copy) > 0: yield missing_copy + def get_virtual_status(self, status, mtime): + """ Compute the virtual presence of a content. -def initialize_content_archive(db, sample_size, names=['Local']): - """ Initialize the content_archive table with a sample. - - From the content table, get a sample of id, and fill the - content_archive table with those id in order to create a test sample - for the archiver. - - Args: - db: The database of the storage. - sample_size (int): The size of the sample to create. - names: A list of archive names. Those archives must already exists. - Archival status of the archives content will be erased on db. - - Returns: - Tha amount of entry created. - """ - with db.transaction() as cur: - cur.execute('DELETE FROM content_archive') - - with db.transaction() as cur: - cur.execute('SELECT sha1 from content limit %d' % sample_size) - ids = list(cursor_to_bytes(cur)) - - for id, in ids: - tid = r'\x' + hashutil.hash_to_hex(id) + If the status is ongoing but the time is not elasped, the archiver + consider it will be present in the futur, and so consider it as + present. + However, if the time is elasped, the copy may have failed, so consider + the content as missing. - with db.transaction() as cur: - for name in names: - s = """INSERT INTO content_archive - VALUES('%s'::sha1, '%s', 'missing', now()) - """ % (tid, name) - cur.execute(s) + Arguments: + status (string): One of ('present', 'missing', 'ongoing'). The + status of the content. + mtime (datetime): Time at which the content have been updated for + the last time. - print('Initialized database with', sample_size * len(names), 'items') - return sample_size * len(names) + Returns: + The virtual status of the studied content, which is 'present' or + 'missing'. - -def add_content_to_objstore(director, source, content_ids): - """ Fill the objstore according to the database - - Get the current status of the database and fill the objstorage of the - master storage according to these data. - Content are fetched from the source, which is a storage. - - Args: - director (ArchiverDirector): The archiver director containing - the master storage to fill. - source (Storage): A storage that contains the content for all the - ids in content_ids. - content_ids: A list of ids that should be added to the master object - storage. - """ - for res in source.content_get(content_ids): - content_data = res['data'] - director.master_storage.objstorage.add_bytes(content_data) - - -if __name__ == '__main__': - import sys - - conf = config.read(sys.argv[1], DEFAULT_CONFIG) - cstring = 'dbname={} user={}'.format(conf['dbname'], conf['user']) - - director = ArchiverDirector(cstring, conf) - director.run() + Raises: + ValueError: if the status is not one 'present', 'missing' + or 'ongoing' + """ + if status in ('present', 'missing'): + return status + + # If the status is 'ongoing' but there is still time, another worker + # may still be on the task. + if status == 'ongoing': + mtime = mtime.replace(tzinfo=None) + elapsed = (datetime.now() - mtime).total_seconds() + if elapsed <= self.config['archival_max_age']: + return 'present' + else: + return 'missing' + else: + raise ValueError("status must be either 'present', 'missing' " + "or 'ongoing'") diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py --- a/swh/storage/archiver/tasks.py +++ b/swh/storage/archiver/tasks.py @@ -13,8 +13,8 @@ task_queue = 'swh_storage_archive_worker' def run(self, batch, master_storage_args, - slave_storages, retention_policy): + slave_storages, config): aw = ArchiverWorker(batch, master_storage_args, - slave_storages, retention_policy) + slave_storages, config) if aw.run(): self.log("Successful backup for a batch of size %s" % len(batch)) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -4,14 +4,17 @@ # See top-level LICENSE file for more information import random +import logging from .copier import ArchiverCopier from .. import get_storage from datetime import datetime +logger = logging.getLogger() -class ArchiverWorker(): # This class should probably extend a Celery Task. + +class ArchiverWorker(): """ Do the required backups on a given batch of contents. Process the content of a content batch in order to do the needed backups on @@ -23,29 +26,49 @@ the content is present or missing (see ArchiverDirector::get_unarchived_content). master_storage_args: The connection argument to initialize the - master storage where is the content location. + master storage with the db connection url & the object storage + path. slave_storages: A map that associates server_id to the remote server. - retention_policy: The required number of copies for a content to be - considered safe. + config: Archiver_configuration. A dictionary that must contains + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. """ - def __init__(self, batch, master_storage_args, - slave_storages, retention_policy): + def __init__(self, batch, master_storage_args, slave_storages, config): """ Constructor of the ArchiverWorker class. Args: - batch: A batch of content, which is a dictionnary that associates + batch: A batch of content, which is a dictionary that associates a content's sha1 id to the list of servers where the content is present. - master_storage: The master storage where is the whole content. + master_storage_args: The master storage arguments. slave_storages: A map that associates server_id to the remote server. - retention_policy: The required number of copies for a content to - be considered safe. + config: Archiver_configuration. A dictionary that must contains + the following keys. + objstorage_path (string): the path of the objstorage of the + master. + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. """ self.batch = batch self.master_storage = get_storage('local_storage', master_storage_args) self.slave_storages = slave_storages - self.retention_policy = retention_policy + self.config = config def __choose_backup_servers(self, allowed_storage, backup_number): """ Choose the slave servers for archival. @@ -71,6 +94,15 @@ return random.sample(allowed_storage, backup_number) def __get_archival_status(self, content_id, server): + """ Get the archival status of the required content. + + Attributes: + content_id (string): Sha1 of the content. + server: Tuple (archive_id, archive_url) of the archive server. + Returns: + A dictionary that contains all the required data : 'content_id', + 'archive_id', 'status', and 'mtime' + """ t, = list( self.master_storage.db.content_archive_get(content_id, server[0]) ) @@ -95,90 +127,113 @@ status will replace the previous one. If not given, the function only changes the mtime of the content. """ - query = """UPDATE content_archive - SET %(fields)s - WHERE content_id='%(content_id)s' - and archive_id='%(archive_id)s' - """ - fields = [] - if new_status: - fields.append("status='%s'" % new_status) - fields.append("mtime=now()") - - d = {'fields': ', '.join(fields), - 'content_id': content_id, - 'archive_id': archive_id} - - with self.master_storage.db.transaction() as cur: - cur.execute(query % d) - - def run(self): - """ Do the task expected from the archiver worker. - - Process the content in the batch, ensure that the elements still need - an archival, and spawn copiers to copy files in each destinations. - """ + self.master_storage.db.content_archive_update( + content_id, + archive_id, + new_status + ) - def content_filter(content, destination): - """ Indicates whenever a content need archivage. + def need_archival(self, content, destination): + """ Indicates whenever a content need archivage. - Filter function that returns True if a given content - still require to be archived. + Filter function that returns True if a given content + still require to be archived. - Args: - content (str): - """ - archival_status = self.__get_archival_status( - content, - destination - ) - if archival_status: - status = archival_status['status'] - # If the archive is already present, no need to backup. - if status == 'present': - return False - # If the content is ongoing but still have time, there is - # another worker working on this content. - elif status == 'ongoing': - elapsed = datetime.now() - archival_status['mtime']\ - .total_seconds() - if elapsed < self.master_storage.archival_max_age: - return False - return True - else: - # TODO this is an error case, the content should always exists. - return None + Args: + content (str): Sha1 of a content. + destination: Tuple (archive id, archive url). + """ + archival_status = self.__get_archival_status( + content, + destination + ) + status = archival_status['status'] + mtime = archival_status['mtime'] + # If the archive is already present, no need to backup. + if status == 'present': + return False + # If the content is ongoing but still have time, there is + # another worker working on this content. + elif status == 'ongoing': + mtime = mtime.replace(tzinfo=None) + elapsed = (datetime.now() - mtime).total_seconds() + if elapsed <= self.config['archival_max_age']: + return False + return True + + def sort_content_by_archive(self): + """ Create a map {archive_server -> list of content) + + Create a mapping that associate to a archive server all the + contents that needs to be archived in it by the current worker. + + The map is in the form of : + { + (archive_1, archive_1_url): [content1, content2, content_3] + (archive_2, archive_2_url): [content1, content3] + } + Returns: + The created mapping. + """ slaves_copy = {} for content_id in self.batch: - # Choose some servers to upload the content + # Choose some servers to upload the content among the missing ones. server_data = self.batch[content_id] - + nb_present = len(server_data['present']) + nb_backup = self.config['retention_policy'] - nb_present backup_servers = self.__choose_backup_servers( server_data['missing'], - self.retention_policy - len(server_data['present']) + nb_backup ) # Fill the map destination -> content to upload for server in backup_servers: slaves_copy.setdefault(server, []).append(content_id) + return slaves_copy + + def run(self): + """ Do the task expected from the archiver worker. + + Process the content in the batch, ensure that the elements still need + an archival, and spawn copiers to copy files in each destinations. + """ + # Get a map (archive -> [contents]) + slaves_copy = self.sort_content_by_archive() - # At this point, check the archival status of the content in order to - # know if it is still needed. + # At this point, re-check the archival status in order to know if the + # job have been done by another worker. for destination in slaves_copy: - contents = [] - for content in slaves_copy[destination]: - if content_filter(content, destination): - contents.append(content) - slaves_copy[destination] = contents - - # Spawn a copier for each destination that will copy all the - # needed content. + # list() is needed because filter's result will be consumed twice. + slaves_copy[destination] = list(filter( + lambda content_id: self.need_archival(content_id, destination), + slaves_copy[destination] + )) + for content_id in slaves_copy[destination]: + self.__content_archive_update(content_id, destination[0], + new_status='ongoing') + + # Spawn a copier for each destination for destination in slaves_copy: - ac = ArchiverCopier( - destination, slaves_copy[destination], - self.master_storage) - if ac.run(): - # Once the archival complete, update the database. - for content_id in slaves_copy[destination]: - self.__content_archive_update(content_id, destination[0], - new_status='present') + try: + self.run_copier(destination, slaves_copy[destination]) + except: + logger.error('Unable to copy a batch to %s' % destination) + + def run_copier(self, destination, contents): + """ Run a copier in order to archive the given contents + + Upload the given contents to the given archive. + If the process fail, the whole content is considered uncopied + and remains 'ongoing', wainting to be rescheduled as there is + a delay. + + Attributes: + destination: Tuple (archive_id, archive_url) of the destination. + contents: List of contents to archive. + """ + ac = ArchiverCopier(destination, contents, self.master_storage) + if ac.run(): + # Once the archival complete, update the database. + for content_id in contents: + self.__content_archive_update(content_id, destination[0], + new_status='present') diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -683,3 +683,34 @@ cur = self._cursor(cur) cur.execute(query) yield from cursor_to_bytes(cur) + + def content_archive_update(self, content_id, archive_id, + new_status=None, cur=None): + """ Update the status of a archive content and set it's mtime to now() + + Change the last modification time of an archived content and change + its status to the given one. + + Args: + content_id (string): The content id. + archive_id (string): The id of the concerned archive. + new_status (string): One of missing, ongoing or present, this + status will replace the previous one. If not given, the + function only changes the mtime of the content. + """ + query = """UPDATE content_archive + SET %(fields)s + WHERE content_id='%(content_id)s' + and archive_id='%(archive_id)s' + """ + fields = [] + if new_status: + fields.append("status='%s'" % new_status) + fields.append("mtime=now()") + + d = {'fields': ', '.join(fields), + 'content_id': content_id, + 'archive_id': archive_id} + + cur = self._cursor(cur) + cur.execute(query % d) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -17,7 +17,7 @@ from swh.storage import Storage from swh.storage.exc import ObjNotFoundError -from swh.storage.archiver import ArchiverDirector +from swh.storage.archiver import ArchiverDirector, ArchiverWorker from swh.storage.objstorage.api.client import RemoteObjStorage from swh.storage.objstorage.api.server import app @@ -35,7 +35,7 @@ def setUp(self): # Launch the backup server - self.backup_objroot = tempfile.mkdtemp() + self.backup_objroot = tempfile.mkdtemp(prefix='remote') self.config = {'storage_base': self.backup_objroot, 'storage_depth': 3} self.app = app @@ -45,13 +45,15 @@ print("url", self.url()) self.remote_objstorage = RemoteObjStorage(self.url()) # Create the local storage. - self.objroot = tempfile.mkdtemp() + self.objroot = tempfile.mkdtemp(prefix='local') self.storage = Storage(self.conn, self.objroot) # Initializes and fill the tables. self.initialize_tables() # Create the archiver self.archiver = self.__create_director() + self.storage_data = ('Local', 'http://localhost:%s/' % self.port) + def tearDown(self): self.empty_tables() super().tearDown() @@ -101,6 +103,16 @@ director = ArchiverDirector(self.conn, config) return director + def __create_worker(self, batch={}, config={}): + mstorage_args = [self.archiver.master_storage.db.conn, + self.objroot] + slaves = [self.storage_data] + if not config: + config = self.archiver.config + return ArchiverWorker(batch, mstorage_args, slaves, config) + + # Integration test + @istest def archive_missing_content(self): """ Run archiver on a missing content should archive it. @@ -123,41 +135,112 @@ self.remote_objstorage.content_get(id) @istest - def archive_ongoing_remaining(self): - """ A content that is ongoing and still have some time - to be archived should not be rescheduled. + def archive_already_enough(self): + """ A content missing with enough copies shouldn't be archived. """ - id = self.__add_content(b'archive_ongoing_remaining', status='ongoing') - items = [x for batch in self.archiver.get_unarchived_content() - for x in batch] + id = self.__add_content(b'archive_alread_enough') + director = self.__create_director(retention_policy=0) + director.run() + with self.assertRaises(ObjNotFoundError): + self.remote_objstorage.content_get(id) + + # Unit test for ArchiverDirector + + def vstatus(self, status, mtime): + return self.archiver.get_virtual_status(status, mtime) + + @istest + def vstatus_present(self): + self.assertEquals( + self.vstatus('present', None), + 'present' + ) + + @istest + def vstatus_missing(self): + self.assertEquals( + self.vstatus('missing', None), + 'missing' + ) + + @istest + def vstatus_ongoing_remaining(self): + current_time = datetime.now() + self.assertEquals( + self.vstatus('ongoing', current_time), + 'present' + ) + + @istest + def vstatus_ongoing_elapsed(self): + past_time = datetime.now() - timedelta( + seconds=self.archiver.config['archival_max_age'] + 1 + ) + self.assertEquals( + self.vstatus('ongoing', past_time), + 'missing' + ) + + # Unit tests for archive worker + + @istest + def need_archival_missing(self): + """ A content should still need archival when it is missing. + """ + id = self.__add_content(b'need_archival_missing', status='missing') id = r'\x' + hashutil.hash_to_hex(id) - self.assertNotIn(id, items) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest - def archive_ongoing_elapsed(self): - """ A content that is ongoing but with elapsed time should - be rescheduled. + def need_archival_present(self): + """ A content should still need archival when it is missing + """ + id = self.__add_content(b'need_archival_missing', status='present') + id = r'\x' + hashutil.hash_to_hex(id) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), False) + + @istest + def need_archival_ongoing_remaining(self): + """ An ongoing archival with remaining time shouldnt need archival. + """ + id = self.__add_content(b'need_archival_ongoing_remaining', + status='ongoing', date="'%s'" % datetime.now()) + id = r'\x' + hashutil.hash_to_hex(id) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), False) + + @istest + def need_archival_ongoing_elasped(self): + """ An ongoing archival with elapsed time should be scheduled again. """ - # Create an ongoing archive content with time elapsed by 1s. id = self.__add_content( b'archive_ongoing_elapsed', status='ongoing', date="'%s'" % (datetime.now() - timedelta( - seconds=self.archiver.archival_max_age + 1 + seconds=self.archiver.config['archival_max_age'] + 1 )) ) - items = [x for batch in self.archiver.get_unarchived_content() - for x in batch] id = r'\x' + hashutil.hash_to_hex(id) - self.assertIn(id, items) + worker = self.__create_worker() + self.assertEqual(worker.need_archival(id, self.storage_data), True) @istest - def archive_already_enough(self): - """ A content missing should not be archived if there - is already enough copies. + def content_sorting_by_archiver(self): + """ Check that the content is correctly sorted. """ - id = self.__add_content(b'archive_alread_enough') - director = self.__create_director(retention_policy=0) - director.run() - with self.assertRaises(ObjNotFoundError): - self.remote_objstorage.content_get(id) + batch = { + 'id1': { + 'present': [('slave1', 'slave1_url')], + 'missing': [] + }, + 'id2': { + 'present': [], + 'missing': [('slave1', 'slave1_url')] + } + } + worker = self.__create_worker(batch=batch) + mapping = worker.sort_content_by_archive() + self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) + self.assertIn('id2', mapping[('slave1', 'slave1_url')])