Changeset View
Standalone View
swh/storage/archiver/director.py
# Copyright (C) 2015-2016 The Software Heritage developers | # Copyright (C) 2015-2016 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | |||||
import click | import click | ||||
import time | |||||
from swh.core import hashutil, config | from swh.core import config | ||||
from swh.objstorage import PathSlicingObjStorage | |||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from swh.scheduler.celery_backend.config import app | from swh.scheduler.celery_backend.config import app | ||||
from . import tasks # NOQA | from . import tasks # NOQA | ||||
from .storage import ArchiverStorage | from .storage import ArchiverStorage | ||||
DEFAULT_CONFIG = { | |||||
'objstorage_type': ('str', 'local_storage'), | |||||
'objstorage_path': ('str', '/tmp/swh-storage/objects'), | |||||
'objstorage_slicing': ('str', '0:2/2:4/4:6'), | |||||
'objstorage_url': ('str', 'http://localhost:5003/'), | |||||
'batch_max_size': ('int', 50), | |||||
'archival_max_age': ('int', 3600), | |||||
'retention_policy': ('int', 2), | |||||
'asynchronous': ('bool', True), | |||||
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') | |||||
} | |||||
task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | task_name = 'swh.storage.archiver.tasks.SWHArchiverTask' | ||||
olasd: The type name should probably be `dict` rather than json, as json will never be used for that… | |||||
logger = logging.getLogger() | |||||
class ArchiverDirector(config.SWHConfig): | |||||
class ArchiverDirector(): | |||||
"""Process the files in order to know which one is needed as backup. | """Process the files in order to know which one is needed as backup. | ||||
Done Inline ActionsThe indentation of that dict looks broken :) olasd: The indentation of that dict looks broken :) | |||||
The archiver director processes the files in the local storage in order | The archiver director processes the files in the local storage in order | ||||
to know which one needs archival and it delegates this task to | to know which one needs archival and it delegates this task to | ||||
archiver workers. | archiver workers. | ||||
Attributes: | """ | ||||
master_objstorage: the local storage of the master server. | |||||
master_objstorage_args (dict): arguments of the master objstorage | |||||
initialization. | |||||
archiver_storage: a wrapper for archiver db operations. | DEFAULT_CONFIG = { | ||||
db_conn_archiver: Either a libpq connection string, | 'batch_max_size': ('int', 1500), | ||||
or a psycopg2 connection for the archiver db. | 'retention_policy': ('int', 2), | ||||
'asynchronous': ('bool', True), | |||||
slave_objstorages: Iterable of remote obj storages to the slaves | 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest') | ||||
servers used for backup. | } | ||||
Done Inline ActionsPlease make that archiver/director (therefore storing the configuration in /etc/softwareheritage/archiver/director.{ini,yml}) olasd: Please make that `archiver/director` (therefore storing the configuration in… | |||||
config: Archiver_configuration. A dictionary that must contain | CONFIG_BASE_FILENAME = 'archiver/director' | ||||
the following keys: | |||||
objstorage_type (str): type of objstorage used (local_storage | |||||
or remote_storage). | |||||
If the storage is local, the arguments keys must be present | |||||
objstorage_path (str): master's objstorage path | |||||
objstorage_slicing (str): masters's objstorage slicing | |||||
Otherwise, if it's a remote objstorage, the keys must be: | |||||
objstorage_url (str): url of the remote objstorage | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | |||||
def __init__(self, db_conn_archiver, config): | def __init__(self, add_config): | ||||
""" Constructor of the archiver director. | """ Constructor of the archiver director. | ||||
Args: | Args: | ||||
db_conn_archiver: Either a libpq connection string, | db_conn_archiver: Either a libpq connection string, | ||||
or a psycopg2 connection for the archiver db. | or a psycopg2 connection for the archiver db. | ||||
config: Archiver_configuration. A dictionary that must contain | config: optionnal additional configuration. Keys in the dict will | ||||
the following keys: | override the one parsed from the configuration file. | ||||
objstorage_type (str): type of objstorage used | |||||
(local_objstorage or remote_objstorage). | |||||
If the storage is local, the arguments keys must be present | |||||
objstorage_path (str): master's objstorage path | |||||
objstorage_slicing (str): masters's objstorage slicing | |||||
Otherwise, if it's a remote objstorage, the keys must be: | |||||
objstorage_url (str): url of the remote objstorage | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
# Get the slave storages | self.config = self.parse_config_file(additional_configs=[add_config]) | ||||
self.db_conn_archiver = db_conn_archiver | self.archiver_storage = ArchiverStorage(self.config['dbconn']) | ||||
self.archiver_storage = ArchiverStorage(db_conn_archiver) | |||||
self.slave_objstorages = { | |||||
id: url | |||||
for id, url | |||||
in self.archiver_storage.archive_ls() | |||||
} | |||||
# Check that there is enough backup servers for the retention policy | |||||
if config['retention_policy'] > len(self.slave_objstorages) + 1: | |||||
raise ValueError( | |||||
"Can't have a retention policy of %d with %d backup servers" | |||||
% (config['retention_policy'], len(self.slave_objstorages)) | |||||
) | |||||
# Get the master storage that contains content to be archived | |||||
if config['objstorage_type'] == 'local_objstorage': | |||||
master_objstorage_args = { | |||||
'root': config['objstorage_path'], | |||||
'slicing': config['objstorage_slicing'] | |||||
} | |||||
master_objstorage = PathSlicingObjStorage( | |||||
**master_objstorage_args | |||||
) | |||||
elif config['objstorage_type'] == 'remote_objstorage': | |||||
master_objstorage_args = {'base_url': config['objstorage_url']} | |||||
master_objstorage = RemoteObjStorage(**master_objstorage_args) | |||||
else: | |||||
raise ValueError( | |||||
'Unknow objstorage class `%s`' % config['objstorage_type'] | |||||
) | |||||
self.master_objstorage = master_objstorage | |||||
self.master_objstorage_args = master_objstorage_args | |||||
# Keep the full configuration | |||||
self.config = config | |||||
def run(self): | def run(self): | ||||
""" Run the archiver director. | """ Run the archiver director. | ||||
The archiver director will check all the contents of the archiver | The archiver director will check all the contents of the archiver | ||||
database and do the required backup jobs. | database and do the required backup jobs. | ||||
""" | """ | ||||
if self.config['asynchronous']: | if self.config['asynchronous']: | ||||
run_fn = self.run_async_worker | run_fn = self.run_async_worker | ||||
else: | else: | ||||
run_fn = self.run_sync_worker | run_fn = self.run_sync_worker | ||||
for batch in self.get_unarchived_content(): | for batch in self.get_unarchived_content_batch(): | ||||
run_fn(batch) | run_fn(batch) | ||||
Done Inline ActionsThis function can be removed now. olasd: This function can be removed now. | |||||
def _worker_args(self, batch): | |||||
""" Generates a dict that contains the arguments for a worker. | |||||
""" | |||||
return { | |||||
'batch': batch | |||||
Done Inline ActionsWe do not want to pass configuration to workers via Celery, ever. Considering that, I think the add_config instance attribute should be removed. The only actual use of that is overriding the file configuration when running the director on the command line, and we can just override specific configuration items in the director class's __init__ method. This makes the "worker arguments" just a plain list of objects that need archiving. No need for a wrapping dict, or anything else. olasd: We do not want to pass configuration to workers via Celery, ever. Considering that, I think the… | |||||
} | |||||
def run_async_worker(self, batch): | def run_async_worker(self, batch): | ||||
""" Produce a worker that will be added to the task queue. | """ Produce a worker that will be added to the task queue. | ||||
""" | """ | ||||
Done Inline ActionsThe worker should really read all of this from its configuration rather than having it passed around as a message.
All in all, all the director should enqueue is the list of objects that might need copies. olasd: The worker should really read all of this from its configuration rather than having it passed… | |||||
task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
task.delay(batch, | task.delay(**self._worker_args(batch)) | ||||
Done Inline Actionstask.delay(batch) olasd: `task.delay(batch)` | |||||
archiver_args=self.db_conn_archiver, | |||||
master_objstorage_args=self.master_objstorage_args, | |||||
slave_objstorages=self.slave_objstorages, | |||||
config=self.config) | |||||
def run_sync_worker(self, batch): | def run_sync_worker(self, batch): | ||||
""" Run synchronously a worker on the given batch. | """ Run synchronously a worker on the given batch. | ||||
""" | """ | ||||
task = app.tasks[task_name] | task = app.tasks[task_name] | ||||
task(batch, | task(**self._worker_args(batch)) | ||||
Done Inline Actionstask(batch) olasd: `task(batch)` | |||||
archiver_args=self.db_conn_archiver, | |||||
master_objstorage_args=self.master_objstorage_args, | |||||
slave_objstorages=self.slave_objstorages, | |||||
config=self.config) | |||||
def get_unarchived_content(self): | def get_unarchived_content_batch(self): | ||||
""" Get contents that need to be archived. | """ Create batch of contents that needs to be archived | ||||
Yields: | Yields: | ||||
A batch of contents. Batches are dictionaries which associates | batch of sha1 that corresponds to contents that needs more archive | ||||
a content id to the data about servers that contains it or not. | copies. | ||||
{'id1': | |||||
{'present': [('slave1', 'slave1_url')], | |||||
'missing': [('slave2', 'slave2_url'), | |||||
('slave3', 'slave3_url')] | |||||
}, | |||||
'id2': | |||||
{'present': [], | |||||
'missing': [ | |||||
('slave1', 'slave1_url'), | |||||
('slave2', 'slave2_url'), | |||||
('slave3', 'slave3_url') | |||||
]} | |||||
} | |||||
Where keys (idX) are sha1 of the content and (slaveX, slaveX_url) | |||||
are ids and urls of the storage slaves. | |||||
At least all the content that don't have enough copies on the | |||||
backups servers are distributed into these batches. | |||||
""" | """ | ||||
contents = {} | contents = [] | ||||
# Get the archives | for content in self._get_unarchived_content(): | ||||
archives = dict(self.archiver_storage.archive_ls()) | contents.append(content) | ||||
# Get all the contents referenced into the archiver tables | if len(contents) > self.config['batch_max_size']: | ||||
last_object = b'' | |||||
while True: | |||||
archived_contents = list( | |||||
self.archiver_storage.content_archive_get_copies(last_object)) | |||||
if not archived_contents: | |||||
break | |||||
for content_id, present, ongoing in archived_contents: | |||||
last_object = content_id | |||||
data = { | |||||
'present': set(present), | |||||
'missing': set(archives) - set(present) - set(ongoing), | |||||
} | |||||
for archive_id, mtime in ongoing.items(): | |||||
status = self.get_virtual_status('ongoing', mtime) | |||||
data[status].add(archive_id) | |||||
if not data['missing']: | |||||
continue | |||||
contents[r'\x%s' % hashutil.hash_to_hex(content_id)] = { | |||||
k: [(archive_id, archives[archive_id]) for archive_id in v] | |||||
for k, v in data.items() | |||||
} | |||||
if len(contents) >= self.config['batch_max_size']: | |||||
yield contents | yield contents | ||||
contents = {} | contents = [] | ||||
if len(contents) > 0: | if len(contents) > 0: | ||||
yield contents | yield contents | ||||
def get_virtual_status(self, status, mtime): | def _get_unarchived_content(self): | ||||
""" Compute the virtual presence of a content. | """ Get all the content ids in the db that needs more copies | ||||
If the status is ongoing but the time is not elasped, the archiver | Yields: | ||||
consider it will be present in the futur, and so consider it as | sha1 of contents that needs to be archived. | ||||
present. | |||||
However, if the time is elasped, the copy may have failed, so consider | |||||
the content as missing. | |||||
Arguments: | |||||
status (string): One of ('present', 'missing', 'ongoing'). The | |||||
status of the content. | |||||
mtime (datetime): Time at which the content have been updated for | |||||
the last time. | |||||
Returns: | |||||
The virtual status of the studied content, which is 'present' or | |||||
'missing'. | |||||
Raises: | |||||
ValueError: if the status is not one 'present', 'missing' | |||||
or 'ongoing' | |||||
""" | """ | ||||
if status in ('present', 'missing'): | for content_id, present, _ongoing in self._get_all_contents(): | ||||
return status | if len(present) < self.config['retention_policy']: | ||||
Not Done Inline ActionsThis is okay as a first step but a tiny refactoring of the database should allow us to do that server-side rather than client-side. olasd: This is okay as a first step but a tiny refactoring of the database should allow us to do that… | |||||
Not Done Inline ActionsThats probably a better idea. I can't get the correct request for that however. What should I use in the WHERE section to count the number of present into the jsonb field ? qcampos: Thats probably a better idea. I can't get the correct request for that however. What should I… | |||||
Not Done Inline ActionsLet's look at that after this huge diff lands. olasd: Let's look at that after this huge diff lands. | |||||
yield content_id | |||||
# If the status is 'ongoing' but there is still time, another worker | |||||
# may still be on the task. | |||||
if status == 'ongoing': | |||||
elapsed = int(time.time()) - mtime | |||||
if elapsed <= self.config['archival_max_age']: | |||||
return 'present' | |||||
else: | |||||
return 'missing' | |||||
else: | else: | ||||
raise ValueError("status must be either 'present', 'missing' " | continue | ||||
"or 'ongoing'") | |||||
def _get_all_contents(self): | |||||
""" Get batchs from the archiver db and yield it as continous stream | |||||
Yields: | |||||
Datas about a content as a tuple | |||||
(content_id, present_copies, ongoing_copies) where ongoing_copies | |||||
is a dict mapping copy to mtime. | |||||
""" | |||||
last_object = b'' | |||||
while True: | |||||
archiver_contents = list( | |||||
self.archiver_storage.content_archive_get_copies(last_object) | |||||
) | |||||
if not archiver_contents: | |||||
return | |||||
for content in archiver_contents: | |||||
last_object = content[0] | |||||
yield content | |||||
@click.command() | @click.command() | ||||
@click.argument('config-path', required=1) | @click.option('--batch-size', help="Maximal number of objects in a batch") | ||||
Done Inline ActionsThis argument is not needed any longer: the configuration file can be loaded from the default Software Heritage paths, that is in /etc or the user home directory. olasd: This argument is not needed any longer: the configuration file can be loaded from the default… | |||||
@click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1], | @click.option('--retention-policy', | ||||
help="Minimal number of copies the archiver will create") | |||||
@click.option('--dbconn', | |||||
help="Connection string for the archiver database") | help="Connection string for the archiver database") | ||||
@click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1], | @click.option('--async/--sync', | ||||
help="Indicates if the archiver should run asynchronously") | help="Indicates if the archiver should run asynchronously") | ||||
Done Inline ActionsThe default for the command-line director should be to read the config file. When you're setting defaults for the command-line args, you make the command line _always_ override the contents of the configuration file with the default configuration. What you should do is:
olasd: The default for the command-line director should be to read the config file.
When you're… | |||||
def launch(config_path, dbconn, async): | def launch(batch_size, retention_policy, dbconn, async): | ||||
# The configuration have following priority : | # The configuration have following priority : | ||||
# command line > file config > default config | # command line > file config > default config | ||||
cl_config = { | # Values are None if not provided | ||||
Done Inline ActionsYou should not need to do that any longer olasd: You should not need to do that any longer | |||||
'dbconn': dbconn, | cl_config = create_conf(batch_size, retention_policy, dbconn, async) | ||||
'asynchronous': async | # Rrun the archiver with the overriding conf. | ||||
} | archiver = ArchiverDirector(cl_config) | ||||
Done Inline ActionsThere, you can just pass the command-line config. olasd: There, you can just pass the command-line config. | |||||
conf = config.read(config_path, DEFAULT_CONFIG) | |||||
conf.update(cl_config) | |||||
# Create connection data and run the archiver. | |||||
archiver = ArchiverDirector(conf['dbconn'], conf) | |||||
logger.info("Starting an archival at", time.time()) | |||||
archiver.run() | archiver.run() | ||||
def create_conf(batch_size, retention_policy, dbconn, async): | |||||
""" Create a dict that contains only the given arguments | |||||
""" | |||||
return dict(filter(lambda k, v: v is not None, | |||||
( | |||||
('batch_max_size', batch_size), | |||||
('retention_policy', retention_policy), | |||||
('dbconn', dbconn), | |||||
('async', async) | |||||
))) | |||||
Done Inline ActionsI'm sorry, but I'd rather we removed all the command line arguments than adding this complexity. This function has you duplicate the configuration keys at three places (command line argument names, function argument names, string constants), and realistically it will never be used as we will deploy a configuration file with puppet anyway... Down the line, we should make sure that our "config module" can generate a command-line parser, rather than doing stuff ad-hoc everywhere. Can we please just scrap the command line parsing and finally land this diff? olasd: I'm sorry, but I'd rather we removed all the command line arguments than adding this complexity. | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
launch() | launch() |
The type name should probably be dict rather than json, as json will never be used for that purpose anyway