Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/worker.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import logging | import logging | ||||
import time | import time | ||||
from swh.objstorage import PathSlicingObjStorage | from collections import defaultdict | ||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from swh.core import hashutil | |||||
from swh.core import config | |||||
from swh.objstorage import get_objstorage | |||||
from swh.objstorage.exc import Error, ObjNotFoundError | |||||
from .storage import ArchiverStorage | from .storage import ArchiverStorage | ||||
from .copier import ArchiverCopier | from .copier import ArchiverCopier | ||||
logger = logging.getLogger() | logger = logging.getLogger('archiver.worker') | ||||
class ArchiverWorker(): | class ArchiverWorker(config.SWHConfig): | ||||
""" Do the required backups on a given batch of contents. | """ Do the required backups on a given batch of contents. | ||||
Process the content of a content batch in order to do the needed backups on | Process the content of a content batch in order to do the needed backups on | ||||
the slaves servers. | the slaves servers. | ||||
Attributes: | |||||
batch: The content this worker has to archive, which is a dictionary | |||||
that associates a content's sha1 id to the list of servers where | |||||
the content is present or missing | |||||
(see ArchiverDirector::get_unarchived_content). | |||||
master_objstorage_args: The connection argument to initialize the | |||||
master storage with the db connection url & the object storage | |||||
path. | |||||
slave_objstorages: A map that associates server_id to the remote server | |||||
config: Archiver_configuration. A dictionary that must contains | |||||
the following keys. | |||||
objstorage_path (string): the path of the objstorage of the | |||||
master. | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
def __init__(self, batch, archiver_args, master_objstorage_args, | |||||
slave_objstorages, config): | DEFAULT_CONFIG = { | ||||
'retention_policy': ('int', 2), | |||||
'archival_max_age': ('int', 3600), | |||||
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev'), | |||||
'storages': ('dict', | |||||
[ | |||||
{'host': 'uffizi', | |||||
'cls': 'pathslicing', | |||||
'args': {'root': '/tmp/softwareheritage/objects', | |||||
'slicing': '0:2/2:4/4:6'}}, | |||||
{'host': 'banco', | |||||
'cls': 'remote', | |||||
'args': {'base_url': 'http://banco:5003/'}} | |||||
]) | |||||
} | |||||
CONFIG_BASE_FILENAME = 'archiver/worker' | |||||
def __init__(self, batch): | |||||
olasd: Three arguments to be read from config. | |||||
""" Constructor of the ArchiverWorker class. | """ Constructor of the ArchiverWorker class. | ||||
Args: | Args: | ||||
batch: A batch of content, which is a dictionary that associates | batch: list of object's sha1 that potentially need archival. | ||||
a content's sha1 id to the list of servers where the content | |||||
is present. | |||||
archiver_args: The archiver's arguments to establish connection to | |||||
db. | |||||
master_objstorage_args: The master storage arguments. | |||||
slave_objstorages: A map that associates server_id to the remote | |||||
server. | |||||
config: Archiver_configuration. A dictionary that must contains | |||||
the following keys. | |||||
objstorage_path (string): the path of the objstorage of the | |||||
master. | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
self.batch = batch | self.batch = batch | ||||
self.archiver_storage = ArchiverStorage(archiver_args) | config = self.parse_config_file() | ||||
self.slave_objstorages = slave_objstorages | self.retention_policy = config['retention_policy'] | ||||
self.config = config | self.archival_max_age = config['archival_max_age'] | ||||
if config['objstorage_type'] == 'local_objstorage': | self.archiver_db = ArchiverStorage(config['dbconn']) | ||||
master_objstorage = PathSlicingObjStorage(**master_objstorage_args) | self.objstorages = { | ||||
else: | storage['host']: get_objstorage(storage['cls'], storage['args']) | ||||
master_objstorage = RemoteObjStorage(**master_objstorage_args) | for storage in config.get('storages', []) | ||||
self.master_objstorage = master_objstorage | |||||
def _choose_backup_servers(self, allowed_storage, backup_number): | |||||
""" Choose the slave servers for archival. | |||||
Choose the given amount of servers among those which don't already | |||||
contain a copy of the content. | |||||
Args: | |||||
allowed_storage: servers when the content is not already present. | |||||
backup_number (int): The number of servers we have to choose in | |||||
order to fullfill the objective. | |||||
""" | |||||
# In case there is not enough backup servers to get all the backups | |||||
# we need, just do our best. | |||||
# Such situation should not happen. | |||||
backup_number = min(backup_number, len(allowed_storage)) | |||||
# TODO Find a better (or a good) policy to choose the backup servers. | |||||
# The random choice should be equivalently distributed between | |||||
# servers for a great amount of data, but don't take care of servers | |||||
# capacities. | |||||
return random.sample(allowed_storage, backup_number) | |||||
def _get_archival_status(self, content_id, server): | |||||
""" Get the archival status of the required content. | |||||
Attributes: | |||||
content_id (string): Sha1 of the content. | |||||
server: Tuple (archive_id, archive_url) of the archive server. | |||||
Returns: | |||||
A dictionary that contains all the required data : 'content_id', | |||||
'archive_id', 'status', and 'mtime' | |||||
""" | |||||
archive = server[0] | |||||
t, = list( | |||||
self.archiver_storage.content_archive_get(content_id) | |||||
) | |||||
status_for_archive = t[1].get(archive, {}) | |||||
return { | |||||
'content_id': content_id, | |||||
'archive_id': archive, | |||||
'status': status_for_archive.get('status', 'missing'), | |||||
'mtime': status_for_archive.get('mtime', 0), | |||||
} | } | ||||
def _content_archive_update(self, content_id, archive_id, | if len(self.objstorages) < self.retention_policy: | ||||
new_status=None): | raise ValueError('Retention policy is too high for the number of ' | ||||
""" Update the status of a archive content and set its mtime to now. | 'provided servers') | ||||
Done Inline ActionsPlease make that archiver/worker. olasd: Please make that `archiver/worker`. | |||||
Change the last modification time of an archived content and change | def run(self): | ||||
Done Inline ActionsI don't think add_config is needed. olasd: I don't think add_config is needed. | |||||
its status to the given one. | """ Do the task expected from the archiver worker. | ||||
Args: | Process the content in the batch, ensure that the elements still need | ||||
content_id (string): The content id. | an archival, and spawn copiers to copy files in each destinations. | ||||
archive_id (string): The id of the concerned archive. | |||||
new_status (string): One of missing, ongoing or present, this | |||||
status will replace the previous one. If not given, the | |||||
function only changes the mtime of the content. | |||||
""" | """ | ||||
self.archiver_storage.content_archive_update( | transfers = defaultdict(list) | ||||
content_id, | for obj_id in self.batch: | ||||
Done Inline Actionstransfers olasd: `transfers` | |||||
archive_id, | # Get dict {'missing': [servers], 'present': [servers]} | ||||
new_status | # for contents ignoring those who don't need archival. | ||||
) | copies = self._compute_copies(obj_id) | ||||
if not self._need_archival(copies): | |||||
continue | |||||
present = copies.get('present', []) | |||||
missing = copies.get('missing', []) | |||||
if len(present) == 0: | |||||
logger.critical('Content have been lost %s' % obj_id) | |||||
continue | |||||
# Choose randomly some servers to be used as srcs and dests. | |||||
for src_dest in self._choose_backup_servers(present, missing): | |||||
transfers[src_dest].append(obj_id) | |||||
# Then run copiers for each of the required transferts. | |||||
for (src, dest), content_ids in transfers.items(): | |||||
self.run_copier(self.objstorages[src], | |||||
self.objstorages[dest], content_ids) | |||||
def _compute_copies(self, content_id): | |||||
""" From a content_id, return present and missing copies. | |||||
def need_archival(self, content, destination): | Returns: | ||||
Done Inline ActionsI don't think this comment is necessary. olasd: I don't think this comment is necessary. | |||||
""" Indicates whenever a content need archivage. | A dictionary with keys 'present' and 'missing' that are mapped to | ||||
lists of copies ids depending on whenever the content is present | |||||
or missing on the copy. | |||||
The key 'ongoing' is associated with a dict that map to a copy | |||||
name the mtime of the ongoing status update. | |||||
""" | |||||
copies = self.archiver_db.content_archive_get(content_id) | |||||
_, present, ongoing = copies | |||||
set_present, set_ongoing = set(present), set(ongoing) | |||||
set_missing = set(self.objstorages) - set_present - set_ongoing | |||||
return {'present': set_present, 'missing': set_missing, | |||||
'ongoing': ongoing} | |||||
Done Inline ActionsWe don't want an ongoing copy to end up as present here: that would mean that we might try to copy from a source that is not there. If the "virtual status" is not used elsewhere, you should probably just fold the timeout logic in this function. olasd: We don't want an `ongoing` copy to end up as `present` here: that would mean that we might try… | |||||
Filter function that returns True if a given content | def _need_archival(self, content_data): | ||||
still require to be archived. | """ Indicate if the content need to be archived. | ||||
Args: | Args: | ||||
content (str): Sha1 of a content. | content_data (dict): dict that contains two lists 'present' and | ||||
destination: Tuple (archive id, archive url). | 'missing' with copies id corresponding to this status. | ||||
""" | Returns: True if there is not enough copies, False otherwise. | ||||
archival_status = self._get_archival_status( | """ | ||||
content, | nb_presents = len(content_data.get('present', [])) | ||||
destination | for copy, mtime in content_data.get('ongoing', {}).items(): | ||||
) | if not self._is_archival_delay_elasped(mtime): | ||||
status = archival_status['status'] | nb_presents += 1 | ||||
mtime = archival_status['mtime'] | return nb_presents < self.retention_policy | ||||
# If the archive is already present, no need to backup. | |||||
if status == 'present': | def _is_archival_delay_elapsed(self, start_time): | ||||
return False | """ Indicates if the archival delay is elapsed given the start_time | ||||
# If the content is ongoing but still have time, there is | |||||
# another worker working on this content. | Args: | ||||
elif status == 'ongoing': | start_time (float): time at which the archival started. | ||||
elapsed = int(time.time()) - mtime | |||||
if elapsed <= self.config['archival_max_age']: | |||||
return False | |||||
return True | |||||
def sort_content_by_archive(self): | |||||
""" Create a map {archive_server -> list of content) | |||||
Create a mapping that associate to a archive server all the | |||||
contents that needs to be archived in it by the current worker. | |||||
The map is in the form of : | |||||
{ | |||||
(archive_1, archive_1_url): [content1, content2, content_3] | |||||
(archive_2, archive_2_url): [content1, content3] | |||||
} | |||||
Done Inline Actionsspurious debug print olasd: spurious debug print | |||||
Returns: | Returns: | ||||
The created mapping. | True if the archival delay is elasped, False otherwise | ||||
""" | """ | ||||
slaves_copy = {} | elapsed = time.time() - start_time | ||||
for content_id in self.batch: | return elapsed > self.archival_max_age | ||||
# Choose some servers to upload the content among the missing ones. | |||||
server_data = self.batch[content_id] | |||||
nb_present = len(server_data['present']) | |||||
nb_backup = self.config['retention_policy'] - nb_present | |||||
backup_servers = self._choose_backup_servers( | |||||
server_data['missing'], | |||||
nb_backup | |||||
) | |||||
# Fill the map destination -> content to upload | |||||
for server in backup_servers: | |||||
slaves_copy.setdefault(server, []).append(content_id) | |||||
return slaves_copy | |||||
def run(self): | def _choose_backup_servers(self, present, missing): | ||||
""" Do the task expected from the archiver worker. | """ Choose and yield the required amount of couple source/destination | ||||
Process the content in the batch, ensure that the elements still need | For each required copy, choose a unique destination server among the | ||||
an archival, and spawn copiers to copy files in each destinations. | missing copies and a source server among the presents. | ||||
""" | |||||
# Get a map (archive -> [contents]) | |||||
slaves_copy = self.sort_content_by_archive() | |||||
# At this point, re-check the archival status in order to know if the | Each destination server is unique so after archival, the retention | ||||
# job have been done by another worker. | policy requiremen will be fulfilled. However, the source server may be | ||||
for destination in slaves_copy: | used multiple times. | ||||
# list() is needed because filter's result will be consumed twice. | |||||
slaves_copy[destination] = list(filter( | |||||
lambda content_id: self.need_archival(content_id, destination), | |||||
slaves_copy[destination] | |||||
)) | |||||
for content_id in slaves_copy[destination]: | |||||
self._content_archive_update(content_id, destination[0], | |||||
new_status='ongoing') | |||||
# Spawn a copier for each destination | Yields: | ||||
for destination in slaves_copy: | tuple (source, destination) for each required copy. | ||||
try: | """ | ||||
self.run_copier(destination, slaves_copy[destination]) | # Transform from set to list to allow random selections | ||||
except: | missing = list(missing) | ||||
logger.error('Unable to copy a batch to %s' % destination) | present = list(present) | ||||
nb_required = self.retention_policy - len(present) | |||||
destinations = random.sample(missing, nb_required) | |||||
sources = [random.choice(present) for dest in destinations] | |||||
yield from zip(sources, destinations) | |||||
def run_copier(self, destination, contents): | def run_copier(self, source, destination, content_ids): | ||||
""" Run a copier in order to archive the given contents | """ Run a copier in order to archive the given contents | ||||
Upload the given contents to the given archive. | Upload the given contents from the source to the destination. | ||||
If the process fail, the whole content is considered uncopied | If the process fail, the whole content is considered uncopied | ||||
and remains 'ongoing', waiting to be rescheduled as there is | and remains 'ongoing', waiting to be rescheduled as there is | ||||
a delay. | a delay. | ||||
Attributes: | Args: | ||||
destination: Tuple (archive_id, archive_url) of the destination. | source (ObjStorage): source storage to get the contents. | ||||
contents: List of contents to archive. | destination (ObjStorage): Storage where the contents will be copied | ||||
""" | content_ids: list of content's id to archive. | ||||
ac = ArchiverCopier(destination, contents, self.master_objstorage) | """ | ||||
# Check if there is any error among the contents. | |||||
content_status = self._get_contents_error(content_ids, source) | |||||
# Iterates over the error detected. | |||||
for content_id, real_status in content_status.items(): | |||||
# Remove them from the to-archive list, | |||||
# as they cannot be retrieved correclty. | |||||
content_ids.remove(content_id) | |||||
# Update their status to reflect their real state. | |||||
self._content_archive_update(content_id, source, | |||||
new_status=real_status) | |||||
# Now perform the copy on the remaining contents | |||||
ac = ArchiverCopier(source, destination, content_ids) | |||||
if ac.run(): | if ac.run(): | ||||
# Once the archival complete, update the database. | # Once the archival complete, update the database. | ||||
for content_id in contents: | for content_id in content_ids: | ||||
self._content_archive_update(content_id, destination[0], | self._content_archive_update(content_id, destination, | ||||
new_status='present') | new_status='present') | ||||
def _get_contents_error(self, content_ids, storage): | |||||
""" Indicates what is the error associated to a content when needed | |||||
Check the given content on the given storage. If an error is detected, | |||||
it will be reported through the returned dict. | |||||
Args: | |||||
content_ids: a list of content id to check | |||||
storage: the storage where are the content to check. | |||||
Returns: | |||||
a dict that map {content_id -> error_status} for each content_id | |||||
with an error. The `error_status` result may be 'missing' or | |||||
'corrupted'. | |||||
""" | |||||
content_status = {} | |||||
for content_id in content_ids: | |||||
try: | |||||
storage.check(content_id) | |||||
except Error: | |||||
content_status[content_id] = 'corrupted' | |||||
logger.error('Content is corrupted: %s' % content_id) | |||||
except ObjNotFoundError: | |||||
content_status[content_id] = 'missing' | |||||
logger.error('A content referenced present is missing: %s' | |||||
% content_id) | |||||
return content_status | |||||
def _content_archive_update(self, content_id, archive_id, | |||||
new_status=None): | |||||
""" Update the status of a archive content and set its mtime to now. | |||||
Change the last modification time of an archived content and change | |||||
its status to the given one. | |||||
Args: | |||||
content_id (string): The content id. | |||||
archive_id (string): The id of the concerned archive. | |||||
new_status (string): One of missing, ongoing or present, this | |||||
status will replace the previous one. If not given, the | |||||
function only changes the mtime of the content. | |||||
""" | |||||
db_obj_id = r'\x' + hashutil.hash_to_hex(content_id) | |||||
self.archiver_db.content_archive_update( | |||||
db_obj_id, | |||||
archive_id, | |||||
new_status | |||||
) |
Three arguments to be read from config.