diff --git a/swh/storage/archiver/copier.py b/swh/storage/archiver/copier.py --- a/swh/storage/archiver/copier.py +++ b/swh/storage/archiver/copier.py @@ -16,10 +16,10 @@ has to archive. server (RemoteArchive): The remote object storage that is used to backup content. - master_storage (Storage): The master storage that contains the data - the copier needs to archive. + master_objstorage (ObjStorage): The master storage that contains the + data the copier needs to archive. """ - def __init__(self, destination, content, master_storage): + def __init__(self, destination, content, master_objstorage): """ Create a Copier for the archiver Args: @@ -33,7 +33,7 @@ _name, self.url = destination self.content_ids = content self.server = RemoteObjStorage(self.url) - self.master_storage = master_storage + self.master_objstorage = master_objstorage def run(self): """ Do the copy on the backup storage. @@ -47,14 +47,12 @@ Returns: A boolean that indicates if the whole content have been copied. """ - self.content_ids = list(map(lambda x: hashutil.hex_to_hash(x[2:]), - self.content_ids)) - contents = self.master_storage.content_get(self.content_ids) + self.content_ids = map(lambda x: hashutil.hex_to_hash(x[2:]), + self.content_ids) try: - for content in contents: - content_data = content['data'] - self.server.content_add(content_data) + for content_id in self.content_ids: + content = self.master_objstorage.get(content_id) + self.server.content_add(content, content_id) except: return False - return True diff --git a/swh/storage/archiver/director.py b/swh/storage/archiver/director.py --- a/swh/storage/archiver/director.py +++ b/swh/storage/archiver/director.py @@ -1,15 +1,16 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import swh import logging import click from datetime import datetime from swh.core import hashutil, config +from swh.objstorage import PathSlicingObjStorage +from swh.objstorage.api.client import RemoteObjStorage from swh.scheduler.celery_backend.config import app from . import tasks # NOQA @@ -17,14 +18,17 @@ DEFAULT_CONFIG = { + 'objstorage_type': ('str', 'local_storage'), 'objstorage_path': ('str', '/tmp/swh-storage/objects'), + 'objstorage_slicing': ('str', '0:2/2:4/4:6'), + 'objstorage_url': ('str', 'http://localhost:5003/'), + 'batch_max_size': ('int', 50), 'archival_max_age': ('int', 3600), 'retention_policy': ('int', 2), 'asynchronous': ('bool', True), - 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest'), - 'dbconn_storage': ('str', 'dbname=softwareheritage-dev user=guest') + 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') } task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' @@ -39,34 +43,54 @@ to know which one needs archival and it delegates this task to archiver workers. Attributes: - master_storage: the local storage of the master server. - slave_storages: Iterable of remote obj storages to the slaves servers - used for backup. + master_objstorage: the local storage of the master server. + master_objstorage_args (dict): arguments of the master objstorage + initialization. + + archiver_storage: a wrapper for archiver db operations. + db_conn_archiver: Either a libpq connection string, + or a psycopg2 connection for the archiver db. + + slave_objstorages: Iterable of remote obj storages to the slaves + servers used for backup. config: Archiver_configuration. A dictionary that must contain - the following keys. - objstorage_path (string): master's objstorage path - - batch_max_size (int): The number of content items that can be - given to the same archiver worker. - archival_max_age (int): Delay given to the worker to copy all - the files in a given batch. - retention_policy (int): Required number of copies for the - content to be considered safe. - asynchronous (boolean): Indicate whenever the archival should - run in asynchronous mode or not. + the following keys: + + objstorage_type (str): type of objstorage used (local_storage + or remote_storage). + If the storage is local, the arguments keys must be present + objstorage_path (str): master's objstorage path + objstorage_slicing (str): masters's objstorage slicing + Otherwise, if it's a remote objstorage, the keys must be: + objstorage_url (str): url of the remote objstorage + + batch_max_size (int): The number of content items that can be + given to the same archiver worker. + archival_max_age (int): Delay given to the worker to copy all + the files in a given batch. + retention_policy (int): Required number of copies for the + content to be considered safe. + asynchronous (boolean): Indicate whenever the archival should + run in asynchronous mode or not. """ - def __init__(self, db_conn_archiver, db_conn_storage, config): + def __init__(self, db_conn_archiver, config): """ Constructor of the archiver director. Args: db_conn_archiver: Either a libpq connection string, - or a psycopg2 connection for the archiver db connection. - db_conn_storage: Either a libpq connection string, - or a psycopg2 connection for the db storage connection. + or a psycopg2 connection for the archiver db. config: Archiver_configuration. A dictionary that must contain - the following keys. - objstorage_path (string): master's objstorage path + the following keys: + + objstorage_type (str): type of objstorage used + (local_objstorage or remote_objstorage). + If the storage is local, the arguments keys must be present + objstorage_path (str): master's objstorage path + objstorage_slicing (str): masters's objstorage slicing + Otherwise, if it's a remote objstorage, the keys must be: + objstorage_url (str): url of the remote objstorage + batch_max_size (int): The number of content items that can be given to the same archiver worker. archival_max_age (int): Delay given to the worker to copy all @@ -76,25 +100,41 @@ asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ - # Get the local storage of the master and remote ones for the slaves. + # Get the slave storages self.db_conn_archiver = db_conn_archiver self.archiver_storage = ArchiverStorage(db_conn_archiver) - - self.master_storage_args = [db_conn_storage, config['objstorage_path']] - master_storage = swh.storage.get_storage('local_storage', - self.master_storage_args) - slaves = { + self.slave_objstorages = { id: url for id, url in self.archiver_storage.archive_ls() } + # Check that there is enough backup servers for the retention policy + if config['retention_policy'] > len(self.slave_objstorages) + 1: + raise ValueError( + "Can't have a retention policy of %d with %d backup servers" + % (config['retention_policy'], len(self.slave_objstorages)) + ) - # TODO Database should be initialized somehow before going in - # production. For now, assumes that the database contains - # data for all the current content. + # Get the master storage that contains content to be archived + if config['objstorage_type'] == 'local_objstorage': + master_objstorage_args = { + 'root': config['objstorage_path'], + 'slicing': config['objstorage_slicing'] + } + master_objstorage = PathSlicingObjStorage( + **master_objstorage_args + ) + elif config['objstorage_type'] == 'remote_objstorage': + master_objstorage_args = {'base_url': config['objstorage_url']} + master_objstorage = RemoteObjStorage(**master_objstorage_args) + else: + raise ValueError( + 'Unknow objstorage class `%s`' % config['objstorage_type'] + ) + self.master_objstorage = master_objstorage + self.master_objstorage_args = master_objstorage_args - self.master_storage = master_storage - self.slave_storages = slaves + # Keep the full configuration self.config = config def run(self): @@ -107,6 +147,7 @@ run_fn = self.run_async_worker else: run_fn = self.run_sync_worker + for batch in self.get_unarchived_content(): run_fn(batch) @@ -116,8 +157,8 @@ task = app.tasks[task_name] task.delay(batch, archiver_args=self.db_conn_archiver, - master_storage_args=self.master_storage_args, - slave_storages=self.slave_storages, + master_objstorage_args=self.master_objstorage_args, + slave_objstorages=self.slave_objstorages, config=self.config) def run_sync_worker(self, batch): @@ -126,8 +167,8 @@ task = app.tasks[task_name] task(batch, archiver_args=self.db_conn_archiver, - master_storage_args=self.master_storage_args, - slave_storages=self.slave_storages, + master_objstorage_args=self.master_objstorage_args, + slave_objstorages=self.slave_objstorages, config=self.config) def get_unarchived_content(self): @@ -168,7 +209,7 @@ ) for _content_id, server_id, status, mtime in backups: virtual_status = self.get_virtual_status(status, mtime) - server_data = (server_id, self.slave_storages[server_id]) + server_data = (server_id, self.slave_objstorages[server_id]) missing_copy.setdefault( db_content_id, @@ -177,7 +218,7 @@ # Check the content before archival. try: - self.master_storage.objstorage.check(content_id[0]) + self.master_objstorage.check(content_id[0]) except Exception as e: # Exception can be Error or ObjNotFoundError. logger.error(e) @@ -234,8 +275,6 @@ @click.argument('config-path', required=1) @click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], help="Connection string for the archiver database") -@click.option('--dbconn-storage', default=DEFAULT_CONFIG['dbconn_storage'][1], - help="Connection string for the storage database") @click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], help="Indicates if the archiver should run asynchronously") def launch(config_path, dbconn, dbconn_storage, async): @@ -243,13 +282,12 @@ # command line > file config > default config cl_config = { 'dbconn': dbconn, - 'dbconn_storage': dbconn_storage, 'asynchronous': async } conf = config.read(config_path, DEFAULT_CONFIG) conf.update(cl_config) # Create connection data and run the archiver. - archiver = ArchiverDirector(conf['dbconn'], conf['dbconn_storage'], conf) + archiver = ArchiverDirector(conf['dbconn'], conf) logger.info("Starting an archival at", datetime.now()) archiver.run() diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py --- a/swh/storage/archiver/tasks.py +++ b/swh/storage/archiver/tasks.py @@ -12,9 +12,9 @@ """ task_queue = 'swh_storage_archive_worker' - def run(self, batch, archiver_args, master_storage_args, slave_storages, - config): - aw = ArchiverWorker(batch, archiver_args, master_storage_args, - slave_storages, config) + def run(self, batch, archiver_args, master_objstorage_args, + slave_objstorages, config): + aw = ArchiverWorker(batch, archiver_args, master_objstorage_args, + slave_objstorages, config) if aw.run(): self.log("Successful backup for a batch of size %s" % len(batch)) diff --git a/swh/storage/archiver/worker.py b/swh/storage/archiver/worker.py --- a/swh/storage/archiver/worker.py +++ b/swh/storage/archiver/worker.py @@ -6,11 +6,14 @@ import random import logging +from datetime import datetime + +from swh.objstorage import PathSlicingObjStorage +from swh.objstorage.api.client import RemoteObjStorage + from .storage import ArchiverStorage from .copier import ArchiverCopier -from .. import get_storage -from datetime import datetime logger = logging.getLogger() @@ -26,10 +29,10 @@ that associates a content's sha1 id to the list of servers where the content is present or missing (see ArchiverDirector::get_unarchived_content). - master_storage_args: The connection argument to initialize the + master_objstorage_args: The connection argument to initialize the master storage with the db connection url & the object storage path. - slave_storages: A map that associates server_id to the remote server. + slave_objstorages: A map that associates server_id to the remote server config: Archiver_configuration. A dictionary that must contains the following keys. objstorage_path (string): the path of the objstorage of the @@ -43,8 +46,8 @@ asynchronous (boolean): Indicate whenever the archival should run in asynchronous mode or not. """ - def __init__(self, batch, archiver_args, master_storage_args, - slave_storages, config): + def __init__(self, batch, archiver_args, master_objstorage_args, + slave_objstorages, config): """ Constructor of the ArchiverWorker class. Args: @@ -53,8 +56,8 @@ is present. archiver_args: The archiver's arguments to establish connection to db. - master_storage_args: The master storage arguments. - slave_storages: A map that associates server_id to the remote + master_objstorage_args: The master storage arguments. + slave_objstorages: A map that associates server_id to the remote server. config: Archiver_configuration. A dictionary that must contains the following keys. @@ -71,11 +74,16 @@ """ self.batch = batch self.archiver_storage = ArchiverStorage(archiver_args) - self.master_storage = get_storage('local_storage', master_storage_args) - self.slave_storages = slave_storages + self.slave_objstorages = slave_objstorages self.config = config - def __choose_backup_servers(self, allowed_storage, backup_number): + if config['objstorage_type'] == 'local_objstorage': + master_objstorage = PathSlicingObjStorage(**master_objstorage_args) + else: + master_objstorage = RemoteObjStorage(**master_objstorage_args) + self.master_objstorage = master_objstorage + + def _choose_backup_servers(self, allowed_storage, backup_number): """ Choose the slave servers for archival. Choose the given amount of servers among those which don't already @@ -88,8 +96,7 @@ """ # In case there is not enough backup servers to get all the backups # we need, just do our best. - # TODO such situation can only be caused by an incorrect configuration - # setting. Do a verification previously. + # Such situation should not happen. backup_number = min(backup_number, len(allowed_storage)) # TODO Find a better (or a good) policy to choose the backup servers. @@ -98,7 +105,7 @@ # capacities. return random.sample(allowed_storage, backup_number) - def __get_archival_status(self, content_id, server): + def _get_archival_status(self, content_id, server): """ Get the archival status of the required content. Attributes: @@ -118,8 +125,8 @@ 'mtime': t[3] } - def __content_archive_update(self, content_id, archive_id, - new_status=None): + def _content_archive_update(self, content_id, archive_id, + new_status=None): """ Update the status of a archive content and set it's mtime to now() Change the last modification time of an archived content and change @@ -148,7 +155,7 @@ content (str): Sha1 of a content. destination: Tuple (archive id, archive url). """ - archival_status = self.__get_archival_status( + archival_status = self._get_archival_status( content, destination ) @@ -187,7 +194,7 @@ server_data = self.batch[content_id] nb_present = len(server_data['present']) nb_backup = self.config['retention_policy'] - nb_present - backup_servers = self.__choose_backup_servers( + backup_servers = self._choose_backup_servers( server_data['missing'], nb_backup ) @@ -214,8 +221,8 @@ slaves_copy[destination] )) for content_id in slaves_copy[destination]: - self.__content_archive_update(content_id, destination[0], - new_status='ongoing') + self._content_archive_update(content_id, destination[0], + new_status='ongoing') # Spawn a copier for each destination for destination in slaves_copy: @@ -236,9 +243,9 @@ destination: Tuple (archive_id, archive_url) of the destination. contents: List of contents to archive. """ - ac = ArchiverCopier(destination, contents, self.master_storage) + ac = ArchiverCopier(destination, contents, self.master_objstorage) if ac.run(): # Once the archival complete, update the database. for content_id in contents: - self.__content_archive_update(content_id, destination[0], - new_status='present') + self._content_archive_update(content_id, destination[0], + new_status='present') diff --git a/swh/storage/tests/test_archiver.py b/swh/storage/tests/test_archiver.py --- a/swh/storage/tests/test_archiver.py +++ b/swh/storage/tests/test_archiver.py @@ -112,29 +112,27 @@ def __create_director(self, batch_size=5000, archival_max_age=3600, retention_policy=1, asynchronous=False): config = { + 'objstorage_type': 'local_objstorage', 'objstorage_path': self.objroot, + 'objstorage_slicing': '0:2/2:4/4:6', + 'batch_max_size': batch_size, 'archival_max_age': archival_max_age, 'retention_policy': retention_policy, 'asynchronous': asynchronous # Avoid depending on queue for tests. } director = ArchiverDirector(db_conn_archiver=self.conn, - db_conn_storage=self.conn_storage, config=config) return director def __create_worker(self, batch={}, config={}): - mstorage_args = [ - self.archiver.master_storage.db.conn, # master storage db - # connection - self.objroot # object storage path - ] + mobjstorage_args = self.archiver.master_objstorage_args if not config: config = self.archiver.config return ArchiverWorker(batch, archiver_args=self.conn, - master_storage_args=mstorage_args, - slave_storages=[self.storage_data], + master_objstorage_args=mobjstorage_args, + slave_objstorages=[self.storage_data], config=config) # Integration test