Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/worker.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import logging | |||||
from .copier import ArchiverCopier | from .copier import ArchiverCopier | ||||
from .. import get_storage | from .. import get_storage | ||||
from datetime import datetime | from datetime import datetime | ||||
logger = logging.getLogger() | |||||
class ArchiverWorker(): | class ArchiverWorker(): | ||||
""" Do the required backups on a given batch of contents. | """ Do the required backups on a given batch of contents. | ||||
Process the content of a content batch in order to do the needed backups on | Process the content of a content batch in order to do the needed backups on | ||||
the slaves servers. | the slaves servers. | ||||
Attributes: | Attributes: | ||||
batch: The content this worker has to archive, which is a dictionary | batch: The content this worker has to archive, which is a dictionary | ||||
that associates a content's sha1 id to the list of servers where | that associates a content's sha1 id to the list of servers where | ||||
the content is present or missing | the content is present or missing | ||||
(see ArchiverDirector::get_unarchived_content). | (see ArchiverDirector::get_unarchived_content). | ||||
master_storage_args: The connection argument to initialize the | master_storage_args: The connection argument to initialize the | ||||
master storage where is the content location. | master storage with the db connection url & the object storage | ||||
path. | |||||
ardumont: s/where is the content location/(db connection url, content location)/ | |||||
slave_storages: A map that associates server_id to the remote server. | slave_storages: A map that associates server_id to the remote server. | ||||
retention_policy: The required number of copies for a content to be | config: Archiver_configuration. A dictionary that must contains | ||||
Done Inline ActionsFor the sake of coherence with the rest of your diff, you forgot to adapt the docstring on config argument. ardumont: For the sake of coherence with the rest of your diff, you forgot to adapt the docstring on… | |||||
considered safe. | the following keys. | ||||
objstorage_path (string): the path of the objstorage of the | |||||
master. | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
def __init__(self, batch, master_storage_args, | def __init__(self, batch, master_storage_args, slave_storages, config): | ||||
slave_storages, retention_policy): | |||||
""" Constructor of the ArchiverWorker class. | """ Constructor of the ArchiverWorker class. | ||||
Args: | Args: | ||||
batch: A batch of content, which is a dictionnary that associates | batch: A batch of content, which is a dictionary that associates | ||||
a content's sha1 id to the list of servers where the content | a content's sha1 id to the list of servers where the content | ||||
is present. | is present. | ||||
master_storage: The master storage where is the whole content. | master_storage_args: The master storage arguments. | ||||
Done Inline Actionsarguments. ardumont: arguments. | |||||
slave_storages: A map that associates server_id to the remote | slave_storages: A map that associates server_id to the remote | ||||
server. | server. | ||||
retention_policy: The required number of copies for a content to | config: Archiver_configuration. A dictionary that must contains | ||||
be considered safe. | the following keys. | ||||
objstorage_path (string): the path of the objstorage of the | |||||
master. | |||||
batch_max_size (int): The number of content items that can be | |||||
given to the same archiver worker. | |||||
archival_max_age (int): Delay given to the worker to copy all | |||||
the files in a given batch. | |||||
retention_policy (int): Required number of copies for the | |||||
content to be considered safe. | |||||
asynchronous (boolean): Indicate whenever the archival should | |||||
run in asynchronous mode or not. | |||||
""" | """ | ||||
self.batch = batch | self.batch = batch | ||||
self.master_storage = get_storage('local_storage', master_storage_args) | self.master_storage = get_storage('local_storage', master_storage_args) | ||||
self.slave_storages = slave_storages | self.slave_storages = slave_storages | ||||
self.retention_policy = retention_policy | self.config = config | ||||
def __choose_backup_servers(self, allowed_storage, backup_number): | def __choose_backup_servers(self, allowed_storage, backup_number): | ||||
""" Choose the slave servers for archival. | """ Choose the slave servers for archival. | ||||
Choose the given amount of servers among those which don't already | Choose the given amount of servers among those which don't already | ||||
contain a copy of the content. | contain a copy of the content. | ||||
Args: | Args: | ||||
Show All 9 Lines | def __choose_backup_servers(self, allowed_storage, backup_number): | ||||
# TODO Find a better (or a good) policy to choose the backup servers. | # TODO Find a better (or a good) policy to choose the backup servers. | ||||
# The random choice should be equivalently distributed between | # The random choice should be equivalently distributed between | ||||
# servers for a great amount of data, but don't take care of servers | # servers for a great amount of data, but don't take care of servers | ||||
# capacities. | # capacities. | ||||
return random.sample(allowed_storage, backup_number) | return random.sample(allowed_storage, backup_number) | ||||
def __get_archival_status(self, content_id, server): | def __get_archival_status(self, content_id, server): | ||||
""" Get the archival status of the required content. | |||||
Attributes: | |||||
content_id (string): Sha1 of the content. | |||||
server: Tuple (archive_id, archive_url) of the archive server. | |||||
Returns: | |||||
A dictionary that contains all the required data : 'content_id', | |||||
'archive_id', 'status', and 'mtime' | |||||
""" | |||||
t, = list( | t, = list( | ||||
self.master_storage.db.content_archive_get(content_id, server[0]) | self.master_storage.db.content_archive_get(content_id, server[0]) | ||||
) | ) | ||||
return { | return { | ||||
'content_id': t[0], | 'content_id': t[0], | ||||
'archive_id': t[1], | 'archive_id': t[1], | ||||
'status': t[2], | 'status': t[2], | ||||
'mtime': t[3] | 'mtime': t[3] | ||||
} | } | ||||
def __content_archive_update(self, content_id, archive_id, | def __content_archive_update(self, content_id, archive_id, | ||||
new_status=None): | new_status=None): | ||||
""" Update the status of a archive content and set it's mtime to now() | |||||
Change the last modification time of an archived content and change | |||||
its status to the given one. | |||||
Args: | |||||
content_id (string): The content id. | |||||
archive_id (string): The id of the concerned archive. | |||||
new_status (string): One of missing, ongoing or present, this | |||||
status will replace the previous one. If not given, the | |||||
function only changes the mtime of the content. | |||||
""" | |||||
self.master_storage.db.content_archive_update( | self.master_storage.db.content_archive_update( | ||||
content_id, | content_id, | ||||
archive_id, | archive_id, | ||||
new_status | new_status | ||||
) | ) | ||||
def run(self): | def need_archival(self, content, destination): | ||||
""" Do the task expected from the archiver worker. | |||||
Process the content in the batch, ensure that the elements still need | |||||
an archival, and spawn copiers to copy files in each destinations. | |||||
""" | |||||
def content_filter(content, destination): | |||||
""" Indicates whenever a content need archivage. | """ Indicates whenever a content need archivage. | ||||
Filter function that returns True if a given content | Filter function that returns True if a given content | ||||
still require to be archived. | still require to be archived. | ||||
Args: | Args: | ||||
content (str): Sha1 of a content. | content (str): Sha1 of a content. | ||||
destination: Tuple (archive id, archive url). | destination: Tuple (archive id, archive url). | ||||
""" | """ | ||||
archival_status = self.__get_archival_status( | archival_status = self.__get_archival_status( | ||||
content, | content, | ||||
destination | destination | ||||
) | ) | ||||
if archival_status: | |||||
status = archival_status['status'] | status = archival_status['status'] | ||||
mtime = archival_status['mtime'] | |||||
# If the archive is already present, no need to backup. | # If the archive is already present, no need to backup. | ||||
if status == 'present': | if status == 'present': | ||||
return False | return False | ||||
# If the content is ongoing but still have time, there is | # If the content is ongoing but still have time, there is | ||||
# another worker working on this content. | # another worker working on this content. | ||||
elif status == 'ongoing': | elif status == 'ongoing': | ||||
elapsed = datetime.now() - archival_status['mtime']\ | mtime = mtime.replace(tzinfo=None) | ||||
.total_seconds() | elapsed = (datetime.now() - mtime).total_seconds() | ||||
if elapsed < self.master_storage.archival_max_age: | if elapsed <= self.config['archival_max_age']: | ||||
return False | return False | ||||
return True | return True | ||||
else: | |||||
# TODO this is an error case, the content should always exists. | |||||
return None | |||||
def sort_content_by_archive(self): | |||||
""" Create a map {archive_server -> list of content) | |||||
Create a mapping that associate to a archive server all the | |||||
contents that needs to be archived in it by the current worker. | |||||
The map is in the form of : | |||||
{ | |||||
(archive_1, archive_1_url): [content1, content2, content_3] | |||||
(archive_2, archive_2_url): [content1, content3] | |||||
} | |||||
Returns: | |||||
The created mapping. | |||||
""" | |||||
slaves_copy = {} | slaves_copy = {} | ||||
for content_id in self.batch: | for content_id in self.batch: | ||||
# Choose some servers to upload the content | # Choose some servers to upload the content among the missing ones. | ||||
server_data = self.batch[content_id] | server_data = self.batch[content_id] | ||||
nb_present = len(server_data['present']) | |||||
nb_backup = self.config['retention_policy'] - nb_present | |||||
backup_servers = self.__choose_backup_servers( | backup_servers = self.__choose_backup_servers( | ||||
server_data['missing'], | server_data['missing'], | ||||
self.retention_policy - len(server_data['present']) | nb_backup | ||||
) | ) | ||||
# Fill the map destination -> content to upload | # Fill the map destination -> content to upload | ||||
for server in backup_servers: | for server in backup_servers: | ||||
slaves_copy.setdefault(server, []).append(content_id) | slaves_copy.setdefault(server, []).append(content_id) | ||||
return slaves_copy | |||||
# At this point, check the archival status of the content in order to | def run(self): | ||||
# know if it is still needed. | """ Do the task expected from the archiver worker. | ||||
Process the content in the batch, ensure that the elements still need | |||||
an archival, and spawn copiers to copy files in each destinations. | |||||
""" | |||||
# Get a map (archive -> [contents]) | |||||
slaves_copy = self.sort_content_by_archive() | |||||
# At this point, re-check the archival status in order to know if the | |||||
# job have been done by another worker. | |||||
for destination in slaves_copy: | for destination in slaves_copy: | ||||
contents = [] | # list() is needed because filter's result will be consumed twice. | ||||
for content in slaves_copy[destination]: | slaves_copy[destination] = list(filter( | ||||
if content_filter(content, destination): | lambda content_id: self.need_archival(content_id, destination), | ||||
self.__content_archive_update(content, destination[0], | slaves_copy[destination] | ||||
)) | |||||
for content_id in slaves_copy[destination]: | |||||
self.__content_archive_update(content_id, destination[0], | |||||
new_status='ongoing') | new_status='ongoing') | ||||
contents.append(content) | |||||
slaves_copy[destination] = contents | |||||
# Spawn a copier for each destination that will copy all the | # Spawn a copier for each destination | ||||
# needed content. | |||||
for destination in slaves_copy: | for destination in slaves_copy: | ||||
ac = ArchiverCopier( | try: | ||||
destination, slaves_copy[destination], | self.run_copier(destination, slaves_copy[destination]) | ||||
self.master_storage) | except: | ||||
logger.error('Unable to copy a batch to %s' % destination) | |||||
Done Inline ActionsYou might want to add in the docstring the fact that if the copier fails to run (returns False), we consider the content not being copied (because no update takes place). ardumont: You might want to add in the docstring the fact that if the copier fails to run (returns False)… | |||||
def run_copier(self, destination, contents): | |||||
""" Run a copier in order to archive the given contents | |||||
Done Inline Actions...archive_url) of the destination. ardumont: ...archive_url) of the destination. | |||||
Upload the given contents to the given archive. | |||||
Done Inline ActionsList of contents ardumont: List of contents | |||||
If the process fail, the whole content is considered uncopied | |||||
and remains 'ongoing', waiting to be rescheduled as there is | |||||
a delay. | |||||
Attributes: | |||||
destination: Tuple (archive_id, archive_url) of the destination. | |||||
contents: List of contents to archive. | |||||
""" | |||||
ac = ArchiverCopier(destination, contents, self.master_storage) | |||||
if ac.run(): | if ac.run(): | ||||
# Once the archival complete, update the database. | # Once the archival complete, update the database. | ||||
for content_id in slaves_copy[destination]: | for content_id in contents: | ||||
self.__content_archive_update(content_id, destination[0], | self.__content_archive_update(content_id, destination[0], | ||||
new_status='present') | new_status='present') | ||||
Done Inline ActionsIf one copier fails significantly badly (e.g. by throwing something), the remaining copier won't even be triggered. ardumont: If one copier fails significantly badly (e.g. by throwing something), the remaining copier… |
s/where is the content location/(db connection url, content location)/