Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/director.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015-2016 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import swh | |||||
import logging | import logging | ||||
import click | import click | ||||
from datetime import datetime | from datetime import datetime | ||||
from swh.core import hashutil, config | from swh.core import hashutil, config | ||||
from swh.objstorage import PathSlicingObjStorage | |||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from swh.scheduler.celery_backend.config import app | from swh.scheduler.celery_backend.config import app | ||||
from . import tasks # NOQA | from . import tasks # NOQA | ||||
from .storage import ArchiverStorage | from .storage import ArchiverStorage | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'objstorage_type': ('str', 'local_storage'), | |||||
'objstorage_path': ('str', '/tmp/swh-storage/objects'), | 'objstorage_path': ('str', '/tmp/swh-storage/objects'), | ||||
'objstorage_slicing': ('str', '0:2/2:4/4:6'), | |||||
'objstorage_url': ('str', 'http://localhost:5003/'), | |||||
'batch_max_size': ('int', 50), | 'batch_max_size': ('int', 50), | ||||
'archival_max_age': ('int', 3600), | 'archival_max_age': ('int', 3600), | ||||
'retention_policy': ('int', 2), | 'retention_policy': ('int', 2), | ||||
'asynchronous': ('bool', True), | 'asynchronous': ('bool', True), | ||||
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest'), | 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') | ||||
'dbconn_storage': ('str', 'dbname=softwareheritage-dev user=guest') | |||||
} | } | ||||
task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | ||||
logger = logging.getLogger() | logger = logging.getLogger() | ||||
class ArchiverDirector(): | class ArchiverDirector(): | ||||
"""Process the files in order to know which one is needed as backup. | """Process the files in order to know which one is needed as backup. | ||||
The archiver director processes the files in the local storage in order | The archiver director processes the files in the local storage in order | ||||
to know which one needs archival and it delegates this task to | to know which one needs archival and it delegates this task to | ||||
archiver workers. | archiver workers. | ||||
Attributes: | Attributes: | ||||
master_storage: the local storage of the master server. | master_objstorage: the local storage of the master server. | ||||
slave_storages: Iterable of remote obj storages to the slaves servers | master_objstorage_args (dict): arguments of the master objstorage | ||||
used for backup. | initialization. | ||||
archiver_storage: a wrapper for archiver db operations. | |||||
db_conn_archiver: Either a libpq connection string, | |||||
or a psycopg2 connection for the archiver db. | |||||
slave_objstorages: Iterable of remote obj storages to the slaves | |||||
servers used for backup. | |||||
config: Archiver_configuration. A dictionary that must contain | config: Archiver_configuration. A dictionary that must contain | ||||
the following keys. | the following keys: | ||||
objstorage_path (string): master's objstorage path | |||||
objstorage_type (str): type of objstorage used (local_storage | |||||
or remote_storage). | |||||
If the storage is local, the arguments keys must be present | |||||
objstorage_path (str): master's objstorage path | |||||
objstorage_slicing (str): masters's objstorage slicing | |||||
Otherwise, if it's a remote objstorage, the keys must be: | |||||
objstorage_url (str): url of the remote objstorage | |||||
batch_max_size (int): The number of content items that can be | batch_max_size (int): The number of content items that can be | ||||
given to the same archiver worker. | given to the same archiver worker. | ||||
archival_max_age (int): Delay given to the worker to copy all | archival_max_age (int): Delay given to the worker to copy all | ||||
the files in a given batch. | the files in a given batch. | ||||
retention_policy (int): Required number of copies for the | retention_policy (int): Required number of copies for the | ||||
content to be considered safe. | content to be considered safe. | ||||
asynchronous (boolean): Indicate whenever the archival should | asynchronous (boolean): Indicate whenever the archival should | ||||
run in asynchronous mode or not. | run in asynchronous mode or not. | ||||
""" | """ | ||||
def __init__(self, db_conn_archiver, db_conn_storage, config): | def __init__(self, db_conn_archiver, config): | ||||
""" Constructor of the archiver director. | """ Constructor of the archiver director. | ||||
Args: | Args: | ||||
db_conn_archiver: Either a libpq connection string, | db_conn_archiver: Either a libpq connection string, | ||||
or a psycopg2 connection for the archiver db connection. | or a psycopg2 connection for the archiver db. | ||||
db_conn_storage: Either a libpq connection string, | |||||
or a psycopg2 connection for the db storage connection. | |||||
config: Archiver_configuration. A dictionary that must contain | config: Archiver_configuration. A dictionary that must contain | ||||
the following keys. | the following keys: | ||||
objstorage_path (string): master's objstorage path | |||||
objstorage_type (str): type of objstorage used | |||||
(local_objstorage or remote_objstorage). | |||||
If the storage is local, the arguments keys must be present | |||||
objstorage_path (str): master's objstorage path | |||||
objstorage_slicing (str): masters's objstorage slicing | |||||
Otherwise, if it's a remote objstorage, the keys must be: | |||||
objstorage_url (str): url of the remote objstorage | |||||
batch_max_size (int): The number of content items that can be | batch_max_size (int): The number of content items that can be | ||||
given to the same archiver worker. | given to the same archiver worker. | ||||
archival_max_age (int): Delay given to the worker to copy all | archival_max_age (int): Delay given to the worker to copy all | ||||
the files in a given batch. | the files in a given batch. | ||||
retention_policy (int): Required number of copies for the | retention_policy (int): Required number of copies for the | ||||
content to be considered safe. | content to be considered safe. | ||||
asynchronous (boolean): Indicate whenever the archival should | asynchronous (boolean): Indicate whenever the archival should | ||||
run in asynchronous mode or not. | run in asynchronous mode or not. | ||||
""" | """ | ||||
# Get the local storage of the master and remote ones for the slaves. | # Get the slave storages | ||||
self.db_conn_archiver = db_conn_archiver | self.db_conn_archiver = db_conn_archiver | ||||
self.archiver_storage = ArchiverStorage(db_conn_archiver) | self.archiver_storage = ArchiverStorage(db_conn_archiver) | ||||
self.slave_objstorages = { | |||||
self.master_storage_args = [db_conn_storage, config['objstorage_path']] | |||||
master_storage = swh.storage.get_storage('local_storage', | |||||
self.master_storage_args) | |||||
slaves = { | |||||
id: url | id: url | ||||
for id, url | for id, url | ||||
in self.archiver_storage.archive_ls() | in self.archiver_storage.archive_ls() | ||||
} | } | ||||
# Check that there is enough backup servers for the retention policy | |||||
if config['retention_policy'] > len(self.slave_objstorages) + 1: | |||||
raise ValueError( | |||||
"Can't have a retention policy of %d with %d backup servers" | |||||
% (config['retention_policy'], len(self.slave_objstorages)) | |||||
) | |||||
# TODO Database should be initialized somehow before going in | # Get the master storage that contains content to be archived | ||||
# production. For now, assumes that the database contains | if config['objstorage_type'] == 'local_objstorage': | ||||
# data for all the current content. | master_objstorage_args = { | ||||
'root': config['objstorage_path'], | |||||
'slicing': config['objstorage_slicing'] | |||||
} | |||||
master_objstorage = PathSlicingObjStorage( | |||||
**master_objstorage_args | |||||
) | |||||
elif config['objstorage_type'] == 'remote_objstorage': | |||||
master_objstorage_args = {'base_url': config['objstorage_url']} | |||||
master_objstorage = RemoteObjStorage(**master_objstorage_args) | |||||
else: | |||||
raise ValueError( | |||||
'Unknow objstorage class `%s`' % config['objstorage_type'] | |||||
) | |||||
self.master_objstorage = master_objstorage | |||||
self.master_objstorage_args = master_objstorage_args | |||||
self.master_storage = master_storage | # Keep the full configuration | ||||
self.slave_storages = slaves | |||||
self.config = config | self.config = config | ||||
def run(self): | def run(self): | ||||
""" Run the archiver director. | """ Run the archiver director. | ||||
The archiver director will check all the contents of the archiver | The archiver director will check all the contents of the archiver | ||||
database and do the required backup jobs. | database and do the required backup jobs. | ||||
""" | """ | ||||
if self.config['asynchronous']: | if self.config['asynchronous']: | ||||
run_fn = self.run_async_worker | run_fn = self.run_async_worker | ||||
else: | else: | ||||
run_fn = self.run_sync_worker | run_fn = self.run_sync_worker | ||||
for batch in self.get_unarchived_content(): | for batch in self.get_unarchived_content(): | ||||
run_fn(batch) | run_fn(batch) | ||||
def run_async_worker(self, batch): | def run_async_worker(self, batch): | ||||
""" Produce a worker that will be added to the task queue. | """ Produce a worker that will be added to the task queue. | ||||
""" | """ | ||||
task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
task.delay(batch, | task.delay(batch, | ||||
archiver_args=self.db_conn_archiver, | archiver_args=self.db_conn_archiver, | ||||
master_storage_args=self.master_storage_args, | master_objstorage_args=self.master_objstorage_args, | ||||
slave_storages=self.slave_storages, | slave_objstorages=self.slave_objstorages, | ||||
config=self.config) | config=self.config) | ||||
def run_sync_worker(self, batch): | def run_sync_worker(self, batch): | ||||
""" Run synchronously a worker on the given batch. | """ Run synchronously a worker on the given batch. | ||||
""" | """ | ||||
task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
task(batch, | task(batch, | ||||
archiver_args=self.db_conn_archiver, | archiver_args=self.db_conn_archiver, | ||||
master_storage_args=self.master_storage_args, | master_objstorage_args=self.master_objstorage_args, | ||||
slave_storages=self.slave_storages, | slave_objstorages=self.slave_objstorages, | ||||
config=self.config) | config=self.config) | ||||
def get_unarchived_content(self): | def get_unarchived_content(self): | ||||
""" Get contents that need to be archived. | """ Get contents that need to be archived. | ||||
Yields: | Yields: | ||||
A batch of contents. Batches are dictionaries which associates | A batch of contents. Batches are dictionaries which associates | ||||
a content id to the data about servers that contains it or not. | a content id to the data about servers that contains it or not. | ||||
Show All 24 Lines | def get_unarchived_content(self): | ||||
db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) | db_content_id = '\\x' + hashutil.hash_to_hex(content_id[0]) | ||||
# Fetch the datas about archival status of the content | # Fetch the datas about archival status of the content | ||||
backups = self.archiver_storage.content_archive_get( | backups = self.archiver_storage.content_archive_get( | ||||
content=db_content_id | content=db_content_id | ||||
) | ) | ||||
for _content_id, server_id, status, mtime in backups: | for _content_id, server_id, status, mtime in backups: | ||||
virtual_status = self.get_virtual_status(status, mtime) | virtual_status = self.get_virtual_status(status, mtime) | ||||
server_data = (server_id, self.slave_storages[server_id]) | server_data = (server_id, self.slave_objstorages[server_id]) | ||||
missing_copy.setdefault( | missing_copy.setdefault( | ||||
db_content_id, | db_content_id, | ||||
{'present': [], 'missing': []} | {'present': [], 'missing': []} | ||||
).setdefault(virtual_status, []).append(server_data) | ).setdefault(virtual_status, []).append(server_data) | ||||
# Check the content before archival. | # Check the content before archival. | ||||
try: | try: | ||||
self.master_storage.objstorage.check(content_id[0]) | self.master_objstorage.check(content_id[0]) | ||||
except Exception as e: | except Exception as e: | ||||
# Exception can be Error or ObjNotFoundError. | # Exception can be Error or ObjNotFoundError. | ||||
logger.error(e) | logger.error(e) | ||||
# TODO Do something to restore the content? | # TODO Do something to restore the content? | ||||
if len(missing_copy) >= self.config['batch_max_size']: | if len(missing_copy) >= self.config['batch_max_size']: | ||||
yield missing_copy | yield missing_copy | ||||
missing_copy = {} | missing_copy = {} | ||||
Show All 40 Lines | def get_virtual_status(self, status, mtime): | ||||
raise ValueError("status must be either 'present', 'missing' " | raise ValueError("status must be either 'present', 'missing' " | ||||
"or 'ongoing'") | "or 'ongoing'") | ||||
@click.command() | @click.command() | ||||
@click.argument('config-path', required=1) | @click.argument('config-path', required=1) | ||||
@click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], | @click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], | ||||
help="Connection string for the archiver database") | help="Connection string for the archiver database") | ||||
@click.option('--dbconn-storage', default=DEFAULT_CONFIG['dbconn_storage'][1], | |||||
help="Connection string for the storage database") | |||||
@click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], | @click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], | ||||
help="Indicates if the archiver should run asynchronously") | help="Indicates if the archiver should run asynchronously") | ||||
def launch(config_path, dbconn, dbconn_storage, async): | def launch(config_path, dbconn, dbconn_storage, async): | ||||
ardumont: You forgot to remove the argument dbconn_storage here. | |||||
# The configuration have following priority : | # The configuration have following priority : | ||||
# command line > file config > default config | # command line > file config > default config | ||||
cl_config = { | cl_config = { | ||||
'dbconn': dbconn, | 'dbconn': dbconn, | ||||
'dbconn_storage': dbconn_storage, | |||||
'asynchronous': async | 'asynchronous': async | ||||
} | } | ||||
conf = config.read(config_path, DEFAULT_CONFIG) | conf = config.read(config_path, DEFAULT_CONFIG) | ||||
conf.update(cl_config) | conf.update(cl_config) | ||||
# Create connection data and run the archiver. | # Create connection data and run the archiver. | ||||
archiver = ArchiverDirector(conf['dbconn'], conf['dbconn_storage'], conf) | archiver = ArchiverDirector(conf['dbconn'], conf) | ||||
logger.info("Starting an archival at", datetime.now()) | logger.info("Starting an archival at", datetime.now()) | ||||
archiver.run() | archiver.run() | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
launch() | launch() |
You forgot to remove the argument dbconn_storage here.