diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py --- a/swh/storage/archiver/copier.py +++ b/swh/storage/archiver/copier.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -19,20 +19,19 @@ master_objstorage (ObjStorage): The master storage that contains the data the copier needs to archive. """ - def __init__(self, destination, content, master_objstorage): + def __init__(self, destination_url, content, master_objstorage): """ Create a Copier for the archiver Args: - destination: A tuple (archive_name, archive_url) that represents a - remote object storage as in the 'archive' table. + destination_url: url of the archive server in which the content + will be copied. content: A list of sha1 that represents the content this copier have to archive. master_storage (Storage): The master storage of the system that contains the data to archive. """ - _name, self.url = destination self.content_ids = content - self.server = RemoteObjStorage(self.url) + self.server = RemoteObjStorage(destination_url) self.master_objstorage = master_objstorage def run(self): diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -42,7 +42,7 @@ to know which one needs archival and it delegates this task to archiver workers. Attributes: - master_objstorage: the local storage of the master server. + master_objstorage: the local objstorage of the master server. master_objstorage_args (dict): arguments of the master objstorage initialization. @@ -102,11 +102,7 @@ # Get the slave storages self.db_conn_archiver = db_conn_archiver self.archiver_storage = ArchiverStorage(db_conn_archiver) - self.slave_objstorages = { - id: url - for id, url - in self.archiver_storage.archive_ls() - } + self.slave_objstorages = dict(self.archiver_storage.archive_ls()) # Check that there is enough backup servers for the retention policy if config['retention_policy'] > len(self.slave_objstorages) + 1: raise ValueError( @@ -147,7 +143,7 @@ else: run_fn = self.run_sync_worker - for batch in self.get_unarchived_content(): + for batch in self.get_unarchived_content_batch(): run_fn(batch) def run_async_worker(self, batch): @@ -155,9 +151,9 @@ """ task = app.tasks[task_name] task.delay(batch, + slave_objstorages=self.slave_objstorages, archiver_args=self.db_conn_archiver, master_objstorage_args=self.master_objstorage_args, - slave_objstorages=self.slave_objstorages, config=self.config) def run_sync_worker(self, batch): @@ -165,75 +161,86 @@ """ task = app.tasks[task_name] task(batch, + slave_objstorages=self.slave_objstorages, archiver_args=self.db_conn_archiver, master_objstorage_args=self.master_objstorage_args, - slave_objstorages=self.slave_objstorages, config=self.config) - def get_unarchived_content(self): - """ Get contents that need to be archived. + def get_unarchived_content_batch(self): + """ Get batches of contents that need to be archived Yields: A batch of contents. Batches are dictionaries which associates - a content id to the data about servers that contains it or not. - - {'id1': - {'present': [('slave1', 'slave1_url')], - 'missing': [('slave2', 'slave2_url'), - ('slave3', 'slave3_url')] - }, - 'id2': - {'present': [], - 'missing': [ - ('slave1', 'slave1_url'), - ('slave2', 'slave2_url'), - ('slave3', 'slave3_url') - ]} + a content sha1 to a list of server id where it is already present + or archival is ongoing. + + { + 'id1': ['slave1', 'slave3'], + 'id2': ['slave2'] } - Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) - are ids and urls of the storage slaves. + Where keys (idX) are sha1 of the content and slaveX are ids of the + slaves storages. At least all the content that don't have enough copies on the backups servers are distributed into these batches. """ contents = {} - # Get the archives - archives = dict(self.archiver_storage.archive_ls()) - # Get all the contents referenced into the archiver tables - last_object = b'' - while True: - archived_contents = list( - self.archiver_storage.content_archive_get_copies(last_object)) + for content_id, presents in self._get_unarchived_content(): + contents[content_id] = presents + + if len(contents) > self.config['batch_max_size']: + yield contents + contents = {} - if not archived_contents: - break + if len(contents) > 0: + yield contents - for content_id, present, ongoing in archived_contents: - last_object = content_id - data = { - 'present': set(present), - 'missing': set(archives) - set(present) - set(ongoing), - } + def _get_unarchived_content(self): + """ Get contents that need to be archived. - for archive_id, mtime in ongoing.items(): - status = self.get_virtual_status('ongoing', mtime) - data[status].add(archive_id) + Yields: + Tuple that represents the archival status of contents that needs + to be archived, as (content_id, archive_present), where + `archive_present` is a list of archive servers where the content is + already present. + """ + # Iterates over the contents referenced into the archiver. + for content in self._get_all_contents(): + content_id, present_copies, ongoing_copies = content + # Construct the present list + virtualy_present_copies = { + ids + for ids in ongoing_copies + if self.get_virtual_status('ongoing', ongoing_copies[ids]) + } + presents = set(present_copies) | virtualy_present_copies - if not data['missing']: - continue + # If there is already enough copies, don't schedule it. + if len(presents) >= self.config['retention_policy']: + continue - contents[r'\x%s' % hashutil.hash_to_hex(content_id)] = { - k: [(archive_id, archives[archive_id]) for archive_id in v] - for k, v in data.items() - } + # Otherwise, yield it + yield r'\x%s' % hashutil.hash_to_hex(content_id), presents - if len(contents) >= self.config['batch_max_size']: - yield contents - contents = {} + def _get_all_contents(self): + """ Get batchs from the archiver db and yield it as continous stream - if len(contents) > 0: - yield contents + Yields: + Datas about a content as a tuple + (content_id, present_copies, ongoing_copies) where ongoing_copies + is a dict mapping copy to mtime. + """ + last_object = b'' + while True: + archiver_contents = list( + self.archiver_storage.content_archive_get_copies(last_object) + ) + if not archiver_contents: + return + for content in archiver_contents: + last_object = content[0] + yield content def get_virtual_status(self, status, mtime): """ Compute the virtual presence of a content. @@ -258,7 +265,7 @@ ValueError: if the status is not one 'present', 'missing' or 'ongoing' """ - if status in ('present', 'missing'): + if status in {'present', 'missing'}: return status # If the status is 'ongoing' but there is still time, another worker diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py --- a/swh/storage/archiver/tasks.py +++ b/swh/storage/archiver/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -12,9 +12,10 @@ """ task_queue = 'swh_storage_archive_worker' - def run(self, batch, archiver_args, master_objstorage_args, - slave_objstorages, config): - aw = ArchiverWorker(batch, archiver_args, master_objstorage_args, - slave_objstorages, config) + def run(self, batch, slave_objstorages, archiver_args, + master_objstorage_args, config): + aw = ArchiverWorker(batch, slave_objstorages, + archiver_args, master_objstorage_args, + config) if aw.run(): self.log("Successful backup for a batch of size %s" % len(batch)) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -1,11 +1,9 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import random import logging -import time from swh.objstorage import PathSlicingObjStorage from swh.objstorage.api.client import RemoteObjStorage @@ -14,9 +12,6 @@ from .copier import ArchiverCopier -logger = logging.getLogger() - - class ArchiverWorker(): """ Do the required backups on a given batch of contents. @@ -26,42 +21,59 @@ Attributes: batch: The content this worker has to archive, which is a dictionary that associates a content's sha1 id to the list of servers where - the content is present or missing - (see ArchiverDirector::get_unarchived_content). - master_objstorage_args: The connection argument to initialize the - master storage with the db connection url & the object storage - path. + the content is present. + (see ArchiverDirector::get_unarchived_content_batch). slave_objstorages: A map that associates server_id to the remote server - config: Archiver_configuration. A dictionary that must contains - the following keys. - objstorage_path (string): the path of the objstorage of the - master. - batch_max_size (int): The number of content items that can be - given to the same archiver worker. - archival_max_age (int): Delay given to the worker to copy all - the files in a given batch. - retention_policy (int): Required number of copies for the - content to be considered safe. - asynchronous (boolean): Indicate whenever the archival should - run in asynchronous mode or not. + url + archiver_storage: wrapper for archiver's db operations. + config: Archiver_configuration. A dictionary that must contain + the following keys: + + objstorage_type (str): type of objstorage used (local_storage + or remote_storage). + If the storage is local, the arguments keys must be present + objstorage_path (str): master's objstorage path + objstorage_slicing (str): masters's objstorage slicing + Otherwise, if it's a remote objstorage, the keys must be: + objstorage_url (str): url of the remote objstorage + + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. + master_storage (ObjStorage): the master storage that contains the data + to archive. """ - def __init__(self, batch, archiver_args, master_objstorage_args, - slave_objstorages, config): + def __init__(self, batch, slave_objstorages, + archiver_args, master_objstorage_args, + config): """ Constructor of the ArchiverWorker class. Args: batch: A batch of content, which is a dictionary that associates a content's sha1 id to the list of servers where the content is present. + slave_objstorages: A map that associates server_id to the remote + server url. archiver_args: The archiver's arguments to establish connection to db. - master_objstorage_args: The master storage arguments. - slave_objstorages: A map that associates server_id to the remote - server. - config: Archiver_configuration. A dictionary that must contains - the following keys. - objstorage_path (string): the path of the objstorage of the - master. + master_objstorage_args: arguments needed for the master storage + initialization. + config: Archiver_configuration. A dictionary that must contain + the following keys: + + objstorage_type (str): type of objstorage used (local_storage + or remote_storage). + If the storage is local, the arguments keys must be present + objstorage_path (str): master's objstorage path + objstorage_slicing (str): masters's objstorage slicing + Otherwise, if it's a remote objstorage, the keys must be: + objstorage_url (str): url of the remote objstorage + batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all @@ -72,8 +84,8 @@ run in asynchronous mode or not. """ self.batch = batch - self.archiver_storage = ArchiverStorage(archiver_args) self.slave_objstorages = slave_objstorages + self.archiver_storage = ArchiverStorage(archiver_args) self.config = config if config['objstorage_type'] == 'local_objstorage': @@ -82,49 +94,55 @@ master_objstorage = RemoteObjStorage(**master_objstorage_args) self.master_objstorage = master_objstorage - def _choose_backup_servers(self, allowed_storage, backup_number): - """ Choose the slave servers for archival. - - Choose the given amount of servers among those which don't already - contain a copy of the content. + def run(self): + """ Do the task expected from the archiver worker. - Args: - allowed_storage: servers when the content is not already present. - backup_number (int): The number of servers we have to choose in - order to fullfill the objective. + Process the content in the batch, ensure that the elements still need + an archival, and spawn copiers to copy files in each destinations. """ - # In case there is not enough backup servers to get all the backups - # we need, just do our best. - # Such situation should not happen. - backup_number = min(backup_number, len(allowed_storage)) + # Batch the contents to the required servers + contents = {} # Map {archive_id -> [contents]} + for content_id, present_copies in self.batch.items(): + backup_servers = self._select_archive_servers(present_copies) - # TODO Find a better (or a good) policy to choose the backup servers. - # The random choice should be equivalently distributed between - # servers for a great amount of data, but don't take care of servers - # capacities. - return random.sample(allowed_storage, backup_number) + for server in backup_servers: + # Add the content to the map {archive_id -> [content_ids]} + contents.setdefault(server, []).append(content_id) + + # Set the archival status of the items to ongoing. + self._content_archive_update( + content_id, + server, + new_status='ongoing' + ) + + # For each destination, spawn a copier + for destination, content_ids in contents.items(): + server_url = self.slave_objstorages[destination] + try: + self.run_copier(server_url, content_ids) + except: + logging.error('Unable to copy a batch to %s' % destination) - def _get_archival_status(self, content_id, server): - """ Get the archival status of the required content. + def _select_archive_servers(self, already_present_copies): + """ Select some backup servers + + Select backup servers that does not contains the content among the + existing ones. + + Args: + already_present_copies list(str): list of the servers that already + contains the object. - Attributes: - content_id (string): Sha1 of the content. - server: Tuple (archive_id, archive_url) of the archive server. Returns: - A dictionary that contains all the required data : 'content_id', - 'archive_id', 'status', and 'mtime' + a list of enough backup servers id to respect the retention policy. """ - archive = server[0] - t, = list( - self.archiver_storage.content_archive_get(content_id) - ) - status_for_archive = t[1].get(archive, {}) - return { - 'content_id': content_id, - 'archive_id': archive, - 'status': status_for_archive.get('status', 'missing'), - 'mtime': status_for_archive.get('mtime', 0), - } + nb_missing = len(self.slave_objstorages) - len(already_present_copies) + allowed_servers = [server for server in self.slave_objstorages + if server not in already_present_copies] + # Select any server for the backup + backup_servers = allowed_servers[:nb_missing] + return backup_servers def _content_archive_update(self, content_id, archive_id, new_status=None): @@ -136,8 +154,8 @@ Args: content_id (string): The content id. archive_id (string): The id of the concerned archive. - new_status (string): One of missing, ongoing or present, this - status will replace the previous one. If not given, the + new_status (string): One of 'missing', 'ongoing' or 'present', + this status will replace the previous one. If not given, the function only changes the mtime of the content. """ self.archiver_storage.content_archive_update( @@ -146,91 +164,6 @@ new_status ) - def need_archival(self, content, destination): - """ Indicates whenever a content need archivage. - - Filter function that returns True if a given content - still require to be archived. - - Args: - content (str): Sha1 of a content. - destination: Tuple (archive id, archive url). - """ - archival_status = self._get_archival_status( - content, - destination - ) - status = archival_status['status'] - mtime = archival_status['mtime'] - # If the archive is already present, no need to backup. - if status == 'present': - return False - # If the content is ongoing but still have time, there is - # another worker working on this content. - elif status == 'ongoing': - elapsed = int(time.time()) - mtime - if elapsed <= self.config['archival_max_age']: - return False - return True - - def sort_content_by_archive(self): - """ Create a map {archive_server -> list of content) - - Create a mapping that associate to a archive server all the - contents that needs to be archived in it by the current worker. - - The map is in the form of : - { - (archive_1, archive_1_url): [content1, content2, content_3] - (archive_2, archive_2_url): [content1, content3] - } - - Returns: - The created mapping. - """ - slaves_copy = {} - for content_id in self.batch: - # Choose some servers to upload the content among the missing ones. - server_data = self.batch[content_id] - nb_present = len(server_data['present']) - nb_backup = self.config['retention_policy'] - nb_present - backup_servers = self._choose_backup_servers( - server_data['missing'], - nb_backup - ) - # Fill the map destination -> content to upload - for server in backup_servers: - slaves_copy.setdefault(server, []).append(content_id) - return slaves_copy - - def run(self): - """ Do the task expected from the archiver worker. - - Process the content in the batch, ensure that the elements still need - an archival, and spawn copiers to copy files in each destinations. - """ - # Get a map (archive -> [contents]) - slaves_copy = self.sort_content_by_archive() - - # At this point, re-check the archival status in order to know if the - # job have been done by another worker. - for destination in slaves_copy: - # list() is needed because filter's result will be consumed twice. - slaves_copy[destination] = list(filter( - lambda content_id: self.need_archival(content_id, destination), - slaves_copy[destination] - )) - for content_id in slaves_copy[destination]: - self._content_archive_update(content_id, destination[0], - new_status='ongoing') - - # Spawn a copier for each destination - for destination in slaves_copy: - try: - self.run_copier(destination, slaves_copy[destination]) - except: - logger.error('Unable to copy a batch to %s' % destination) - def run_copier(self, destination, contents): """ Run a copier in order to archive the given contents @@ -240,7 +173,8 @@ a delay. Attributes: - destination: Tuple (archive_id, archive_url) of the destination. + destination: url of the destination server where the content will + be copied. contents: List of contents to archive. """ ac = ArchiverCopier(destination, contents, self.master_objstorage) diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -101,7 +101,7 @@ content_id = r'\x' + hashutil.hash_to_hex(content['sha1']) copies = {'banco': { 'status': status, - 'mtime': date or int(time.time()) # if date is None, use now() + 'mtime': date or time.time() # if date is None, use now() }} self.cursor.execute("""INSERT INTO content_archive VALUES('%s'::sha1, '%s') @@ -130,14 +130,16 @@ config=config) return director - def __create_worker(self, batch={}, config={}): + def __create_worker(self, batch={}, slave_objstorages=None, config={}): mobjstorage_args = self.archiver.master_objstorage_args if not config: config = self.archiver.config + if not slave_objstorages: + slave_objstorages = [self.storage_data] return ArchiverWorker(batch, archiver_args=self.conn, master_objstorage_args=mobjstorage_args, - slave_objstorages=[self.storage_data], + slave_objstorages=slave_objstorages, config=config) # Integration test @@ -201,14 +203,14 @@ @istest def vstatus_ongoing_remaining(self): self.assertEquals( - self.vstatus('ongoing', int(time.time())), + self.vstatus('ongoing', time.time()), 'present' ) @istest def vstatus_ongoing_elapsed(self): past_time = ( - int(time.time()) - self.archiver.config['archival_max_age'] - 1 + time.time() - self.archiver.config['archival_max_age'] - 1 ) self.assertEquals( self.vstatus('ongoing', past_time), @@ -216,65 +218,40 @@ ) # Unit tests for archive worker + def _initialize_testdata_worker(self): + storages = ['server1', 'server2'] + batch = { + 'id1': ['server1', 'server2'], + 'id2': ['server2'], + 'id3': [] + } + worker = self.__create_worker( + batch=batch, + slave_objstorages=storages, + config=self.archiver.config.copy().update(retention_policy=2) + ) + return storages, batch, worker @istest - def need_archival_missing(self): - """ A content should still need archival when it is missing. - """ - id = self.__add_content(b'need_archival_missing', status='missing') - id = r'\x' + hashutil.hash_to_hex(id) - worker = self.__create_worker() - self.assertEqual(worker.need_archival(id, self.storage_data), True) - - @istest - def need_archival_present(self): - """ A content should still need archival when it is missing - """ - id = self.__add_content(b'need_archival_missing', status='present') - id = r'\x' + hashutil.hash_to_hex(id) - worker = self.__create_worker() - self.assertEqual(worker.need_archival(id, self.storage_data), False) - - @istest - def need_archival_ongoing_remaining(self): - """ An ongoing archival with remaining time shouldnt need archival. + def select_server_none(self): + """ Content with enough copies should not need any more """ - id = self.__add_content(b'need_archival_ongoing_remaining', - status='ongoing') - id = r'\x' + hashutil.hash_to_hex(id) - worker = self.__create_worker() - self.assertEqual(worker.need_archival(id, self.storage_data), False) + storages, batch, worker = self._initialize_testdata_worker() + mapping = worker._select_archive_servers(batch['id1']) + self.assertEquals(mapping, []) @istest - def need_archival_ongoing_elasped(self): - """ An ongoing archival with elapsed time should be scheduled again. + def select_server_single(self): + """ Content with not enough copies should need at least one """ - id = self.__add_content( - b'archive_ongoing_elapsed', - status='ongoing', - date=( - int(time.time()) - self.archiver.config['archival_max_age'] - 1 - ) - ) - id = r'\x' + hashutil.hash_to_hex(id) - worker = self.__create_worker() - self.assertEqual(worker.need_archival(id, self.storage_data), True) + storages, batch, worker = self._initialize_testdata_worker() + mapping = worker._select_archive_servers(batch['id2']) + self.assertEquals(mapping, ['server1']) @istest - def content_sorting_by_archiver(self): - """ Check that the content is correctly sorted. + def select_server_all(self): + """ Content with no copies will need at least `retention_policy` ones """ - batch = { - 'id1': { - 'present': [('slave1', 'slave1_url')], - 'missing': [] - }, - 'id2': { - 'present': [], - 'missing': [('slave1', 'slave1_url')] - } - } - worker = self.__create_worker(batch=batch) - mapping = worker.sort_content_by_archive() - self.assertNotIn('id1', mapping[('slave1', 'slave1_url')]) - self.assertIn('id2', mapping[('slave1', 'slave1_url')]) + storages, batch, worker = self._initialize_testdata_worker() + mapping = worker._select_archive_servers(batch['id3']) + self.assertEquals(mapping, ['server1', 'server2'])