Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/worker.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import logging | import logging | ||||
import time | import time | ||||
from swh.objstorage import PathSlicingObjStorage | from collections import defaultdict | ||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from swh.core import hashutil | |||||
from swh.objstorage import get_objstorage | |||||
from .storage import ArchiverStorage | from .storage import ArchiverStorage | ||||
from .copier import ArchiverCopier | from .copier import ArchiverCopier | ||||
logger = logging.getLogger() | logger = logging.getLogger('archiver.worker') | ||||
class ArchiverWorker(): | class ArchiverWorker(): | ||||
""" Do the required backups on a given batch of contents. | """ Do the required backups on a given batch of contents. | ||||
Process the content of a content batch in order to do the needed backups on | Process the content of a content batch in order to do the needed backups on | ||||
the slaves servers. | the slaves servers. | ||||
Attributes: | |||||
batch: The content this worker has to archive, which is a dictionary | |||||
that associates a content's sha1 id to the list of servers where | |||||
the content is present or missing | |||||
(see ArchiverDirector::get_unarchived_content). | |||||
master_objstorage_args: The connection argument to initialize the | |||||
master storage with the db connection url & the object storage | |||||
path. | |||||
slave_objstorages: A map that associates server_id to the remote server | |||||
config: Archiver_configuration. A dictionary that must contains | |||||
the following keys. | |||||
objstorage_path (string): the path of the objstorage of the | |||||
master. | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
def __init__(self, batch, archiver_args, master_objstorage_args, | def __init__(self, batch, storages, dbconn, archival_policy): | ||||
olasd: Three arguments to be read from config. | |||||
slave_objstorages, config): | |||||
""" Constructor of the ArchiverWorker class. | """ Constructor of the ArchiverWorker class. | ||||
Args: | |||||
batch: A batch of content, which is a dictionary that associates | |||||
a content's sha1 id to the list of servers where the content | |||||
is present. | |||||
archiver_args: The archiver's arguments to establish connection to | |||||
db. | |||||
master_objstorage_args: The master storage arguments. | |||||
slave_objstorages: A map that associates server_id to the remote | |||||
server. | |||||
config: Archiver_configuration. A dictionary that must contains | |||||
the following keys. | |||||
objstorage_path (string): the path of the objstorage of the | |||||
master. | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
self.batch = batch | self.batch = batch | ||||
self.archiver_storage = ArchiverStorage(archiver_args) | self.archival_policy = archival_policy | ||||
self.slave_objstorages = slave_objstorages | |||||
self.config = config | |||||
if config['objstorage_type'] == 'local_objstorage': | self.archiver_db = ArchiverStorage(dbconn) | ||||
master_objstorage = PathSlicingObjStorage(**master_objstorage_args) | self.objstorages = { | ||||
else: | storage['host']: get_objstorage(storage['cls'], storage['args']) | ||||
master_objstorage = RemoteObjStorage(**master_objstorage_args) | for storage in storages | ||||
self.master_objstorage = master_objstorage | } | ||||
def _choose_backup_servers(self, allowed_storage, backup_number): | |||||
""" Choose the slave servers for archival. | |||||
Choose the given amount of servers among those which don't already | def run(self): | ||||
contain a copy of the content. | """ Do the task expected from the archiver worker. | ||||
Args: | Process the content in the batch, ensure that the elements still need | ||||
allowed_storage: servers when the content is not already present. | an archival, and spawn copiers to copy files in each destinations. | ||||
Done Inline ActionsPlease make that archiver/worker. olasd: Please make that `archiver/worker`. | |||||
backup_number (int): The number of servers we have to choose in | |||||
order to fullfill the objective. | |||||
""" | |||||
# In case there is not enough backup servers to get all the backups | |||||
# we need, just do our best. | |||||
# Such situation should not happen. | |||||
backup_number = min(backup_number, len(allowed_storage)) | |||||
# TODO Find a better (or a good) policy to choose the backup servers. | |||||
# The random choice should be equivalently distributed between | |||||
# servers for a great amount of data, but don't take care of servers | |||||
# capacities. | |||||
return random.sample(allowed_storage, backup_number) | |||||
def _get_archival_status(self, content_id, server): | |||||
""" Get the archival status of the required content. | |||||
Attributes: | |||||
content_id (string): Sha1 of the content. | |||||
server: Tuple (archive_id, archive_url) of the archive server. | |||||
Returns: | |||||
A dictionary that contains all the required data : 'content_id', | |||||
'archive_id', 'status', and 'mtime' | |||||
""" | """ | ||||
archive = server[0] | # Defaultdict so the d[key] with non-existant key automatically | ||||
Done Inline ActionsI don't think add_config is needed. olasd: I don't think add_config is needed. | |||||
t, = list( | # create the given type (here list). | ||||
Done Inline Actionstransfers olasd: `transfers` | |||||
self.archiver_storage.content_archive_get(content_id) | transferts = defaultdict(list) | ||||
) | for obj_id in self.batch: | ||||
status_for_archive = t[1].get(archive, {}) | # Get dict {'missing': [servers], 'present': [servers]} | ||||
return { | # for contents ignoring those who don't need archival. | ||||
'content_id': content_id, | copies = self._compute_copies(obj_id) | ||||
'archive_id': archive, | if not self._need_archival(copies): | ||||
'status': status_for_archive.get('status', 'missing'), | continue | ||||
'mtime': status_for_archive.get('mtime', 0), | present = copies.get('present', []) | ||||
} | missing = copies.get('missing', []) | ||||
if len(present) == 0: | |||||
logger.critical('Content have been lost %s' % obj_id) | |||||
continue | |||||
# Choose randomly some servers to be used as srcs and dests. | |||||
for src_dest in self._choose_backup_servers(present, missing): | |||||
transferts[src_dest].append(obj_id) | |||||
# Then run copiers for each of the required transferts. | |||||
for (src, dest), content_ids in transferts.items(): | |||||
self.run_copier(self.objstorages[src], | |||||
self.objstorages[dest], content_ids) | |||||
def _content_archive_update(self, content_id, archive_id, | def _compute_copies(self, content_id): | ||||
new_status=None): | """ From a content_id, return present and missing copies. | ||||
""" Update the status of a archive content and set its mtime to now. | |||||
Change the last modification time of an archived content and change | |||||
its status to the given one. | |||||
Args: | Returns: | ||||
Done Inline ActionsI don't think this comment is necessary. olasd: I don't think this comment is necessary. | |||||
content_id (string): The content id. | A dictionary with keys 'present' and 'missing' that are mapped to | ||||
archive_id (string): The id of the concerned archive. | lists of copies ids depending on whenever the content is present | ||||
new_status (string): One of missing, ongoing or present, this | or missing on the copy. | ||||
status will replace the previous one. If not given, the | """ | ||||
function only changes the mtime of the content. | copies = self.archiver_db.content_archive_get(content_id) | ||||
""" | _, present, ongoing = copies | ||||
self.archiver_storage.content_archive_update( | # Initialize the archival status with all known present | ||||
content_id, | content_data = {'present': set(present), 'missing': set()} | ||||
archive_id, | # Add data about the ongoing items | ||||
new_status | for copy, mtime in ongoing.items(): | ||||
content_data[ | |||||
self._get_virtual_status('ongoing', mtime) | |||||
Done Inline ActionsWe don't want an ongoing copy to end up as present here: that would mean that we might try to copy from a source that is not there. If the "virtual status" is not used elsewhere, you should probably just fold the timeout logic in this function. olasd: We don't want an `ongoing` copy to end up as `present` here: that would mean that we might try… | |||||
].add(copy) | |||||
# Add to the archival status datas about servers that were not | |||||
# in the db; they are missing. | |||||
content_data['missing'].update( | |||||
set(self.objstorages.keys()) - set(content_data['present']) | |||||
) | ) | ||||
return content_data | |||||
def need_archival(self, content, destination): | def _get_virtual_status(self, status, mtime): | ||||
""" Indicates whenever a content need archivage. | """ Compute the virtual presence of a content. | ||||
Filter function that returns True if a given content | If the status is ongoing but the time is not elasped, the archiver | ||||
still require to be archived. | consider it will be present in the futur, and so consider it as | ||||
present. | |||||
Args: | However, if the time is elasped, the copy may have failed, so consider | ||||
content (str): Sha1 of a content. | the content as missing. | ||||
destination: Tuple (archive id, archive url). | |||||
""" | Arguments: | ||||
archival_status = self._get_archival_status( | status (string): One of ('present', 'missing', 'ongoing'). The | ||||
content, | status of the content. | ||||
destination | mtime (datetime): Time at which the content have been updated for | ||||
) | the last time. | ||||
status = archival_status['status'] | |||||
mtime = archival_status['mtime'] | |||||
# If the archive is already present, no need to backup. | |||||
if status == 'present': | |||||
return False | |||||
# If the content is ongoing but still have time, there is | |||||
# another worker working on this content. | |||||
elif status == 'ongoing': | |||||
elapsed = int(time.time()) - mtime | |||||
if elapsed <= self.config['archival_max_age']: | |||||
return False | |||||
return True | |||||
def sort_content_by_archive(self): | |||||
""" Create a map {archive_server -> list of content) | |||||
Create a mapping that associate to a archive server all the | |||||
contents that needs to be archived in it by the current worker. | |||||
The map is in the form of : | |||||
{ | |||||
(archive_1, archive_1_url): [content1, content2, content_3] | |||||
(archive_2, archive_2_url): [content1, content3] | |||||
} | |||||
Returns: | Returns: | ||||
The created mapping. | The virtual status of the studied content, which is 'present' or | ||||
""" | 'missing'. | ||||
slaves_copy = {} | |||||
for content_id in self.batch: | |||||
# Choose some servers to upload the content among the missing ones. | |||||
server_data = self.batch[content_id] | |||||
nb_present = len(server_data['present']) | |||||
nb_backup = self.config['retention_policy'] - nb_present | |||||
backup_servers = self._choose_backup_servers( | |||||
server_data['missing'], | |||||
nb_backup | |||||
) | |||||
# Fill the map destination -> content to upload | |||||
for server in backup_servers: | |||||
slaves_copy.setdefault(server, []).append(content_id) | |||||
return slaves_copy | |||||
def run(self): | Raises: | ||||
""" Do the task expected from the archiver worker. | ValueError: if the status is not one 'present', 'missing' | ||||
or 'ongoing' | |||||
""" | |||||
if status in ('present', 'missing'): | |||||
return status | |||||
# If the status is 'ongoing' but there is still time, another worker | |||||
# may still be on the task. | |||||
if status == 'ongoing': | |||||
elapsed = time.time() - mtime | |||||
if elapsed <= self.archival_policy['archival_max_age']: | |||||
return 'present' | |||||
else: | |||||
return 'missing' | |||||
else: | |||||
raise ValueError("status must be either 'present', 'missing' " | |||||
"or 'ongoing'") | |||||
Process the content in the batch, ensure that the elements still need | def _need_archival(self, content_data): | ||||
an archival, and spawn copiers to copy files in each destinations. | """ Indicate if the content need to be archived. | ||||
""" | |||||
# Get a map (archive -> [contents]) | |||||
slaves_copy = self.sort_content_by_archive() | |||||
# At this point, re-check the archival status in order to know if the | Args: | ||||
# job have been done by another worker. | content_data (dict): dict that contains two lists 'present' and | ||||
for destination in slaves_copy: | 'missing' with copies id corresponding to this status. | ||||
# list() is needed because filter's result will be consumed twice. | Returns: True if there is not enough copies, False otherwise. | ||||
slaves_copy[destination] = list(filter( | """ | ||||
lambda content_id: self.need_archival(content_id, destination), | nb_present = len(content_data.get('present', [])) | ||||
slaves_copy[destination] | retention_policy = self.archival_policy['retention_policy'] | ||||
Done Inline Actionsspurious debug print olasd: spurious debug print | |||||
)) | print(content_data['present'], nb_present, retention_policy) | ||||
for content_id in slaves_copy[destination]: | return nb_present < retention_policy | ||||
self._content_archive_update(content_id, destination[0], | |||||
new_status='ongoing') | def _choose_backup_servers(self, present, missing): | ||||
""" Choose and yield the required amount of couple source/destination | |||||
# Spawn a copier for each destination | |||||
for destination in slaves_copy: | For each required copy, choose a unique destination server among the | ||||
try: | missing copies and a source server among the presents. | ||||
self.run_copier(destination, slaves_copy[destination]) | |||||
except: | Each destination server is unique so after archival, the retention | ||||
logger.error('Unable to copy a batch to %s' % destination) | policy requiremen will be fulfilled. However, the source server may be | ||||
used multiple times. | |||||
Yields: | |||||
tuple (source, destination) for each required copy. | |||||
""" | |||||
# Transform from set to list to allow random selections | |||||
missing = list(missing) | |||||
present = list(present) | |||||
nb_required = self.archival_policy['retention_policy'] - len(present) | |||||
destinations = random.sample(missing, nb_required) | |||||
sources = [random.choice(present) for dest in destinations] | |||||
yield from zip(sources, destinations) | |||||
def run_copier(self, destination, contents): | def run_copier(self, source, destination, content_ids): | ||||
""" Run a copier in order to archive the given contents | """ Run a copier in order to archive the given contents | ||||
Upload the given contents to the given archive. | Upload the given contents from the source to the destination. | ||||
If the process fail, the whole content is considered uncopied | If the process fail, the whole content is considered uncopied | ||||
and remains 'ongoing', waiting to be rescheduled as there is | and remains 'ongoing', waiting to be rescheduled as there is | ||||
a delay. | a delay. | ||||
Attributes: | Args: | ||||
destination: Tuple (archive_id, archive_url) of the destination. | source (ObjStorage): source storage to get the contents. | ||||
contents: List of contents to archive. | destination (ObjStorage): Storage where the contents will be copied | ||||
content_ids: list of content's id to archive. | |||||
""" | """ | ||||
ac = ArchiverCopier(destination, contents, self.master_objstorage) | ac = ArchiverCopier(source, destination, content_ids) | ||||
if ac.run(): | if ac.run(): | ||||
# Once the archival complete, update the database. | # Once the archival complete, update the database. | ||||
for content_id in contents: | for content_id in content_ids: | ||||
self._content_archive_update(content_id, destination[0], | self._content_archive_update(content_id, destination, | ||||
new_status='present') | new_status='present') | ||||
def _content_archive_update(self, content_id, archive_id, | |||||
new_status=None): | |||||
""" Update the status of a archive content and set its mtime to now. | |||||
Change the last modification time of an archived content and change | |||||
its status to the given one. | |||||
Args: | |||||
content_id (string): The content id. | |||||
archive_id (string): The id of the concerned archive. | |||||
new_status (string): One of missing, ongoing or present, this | |||||
status will replace the previous one. If not given, the | |||||
function only changes the mtime of the content. | |||||
""" | |||||
db_obj_id = r'\x' + hashutil.hash_to_hex(content_id) | |||||
self.archiver_db.content_archive_update( | |||||
db_obj_id, | |||||
archive_id, | |||||
new_status | |||||
) |
Three arguments to be read from config.