Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/worker.py
# Copyright (C) 2015 The Software Heritage developers | # Copyright (C) 2015 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import random | import random | ||||
import logging | import logging | ||||
from datetime import datetime | |||||
from swh.objstorage import PathSlicingObjStorage | |||||
from swh.objstorage.api.client import RemoteObjStorage | |||||
from .storage import ArchiverStorage | from .storage import ArchiverStorage | ||||
from .copier import ArchiverCopier | from .copier import ArchiverCopier | ||||
from .. import get_storage | |||||
from datetime import datetime | |||||
logger = logging.getLogger() | logger = logging.getLogger() | ||||
class ArchiverWorker(): | class ArchiverWorker(): | ||||
""" Do the required backups on a given batch of contents. | """ Do the required backups on a given batch of contents. | ||||
Process the content of a content batch in order to do the needed backups on | Process the content of a content batch in order to do the needed backups on | ||||
the slaves servers. | the slaves servers. | ||||
Attributes: | Attributes: | ||||
batch: The content this worker has to archive, which is a dictionary | batch: The content this worker has to archive, which is a dictionary | ||||
that associates a content's sha1 id to the list of servers where | that associates a content's sha1 id to the list of servers where | ||||
the content is present or missing | the content is present or missing | ||||
(see ArchiverDirector::get_unarchived_content). | (see ArchiverDirector::get_unarchived_content). | ||||
master_storage_args: The connection argument to initialize the | master_objstorage_args: The connection argument to initialize the | ||||
master storage with the db connection url & the object storage | master storage with the db connection url & the object storage | ||||
path. | path. | ||||
slave_storages: A map that associates server_id to the remote server. | slave_objstorages: A map that associates server_id to the remote server | ||||
config: Archiver_configuration. A dictionary that must contains | config: Archiver_configuration. A dictionary that must contains | ||||
the following keys. | the following keys. | ||||
objstorage_path (string): the path of the objstorage of the | objstorage_path (string): the path of the objstorage of the | ||||
master. | master. | ||||
batch_max_size (int): The number of content items that can be | batch_max_size (int): The number of content items that can be | ||||
given to the same archiver worker. | given to the same archiver worker. | ||||
archival_max_age (int): Delay given to the worker to copy all | archival_max_age (int): Delay given to the worker to copy all | ||||
the files in a given batch. | the files in a given batch. | ||||
retention_policy (int): Required number of copies for the | retention_policy (int): Required number of copies for the | ||||
content to be considered safe. | content to be considered safe. | ||||
asynchronous (boolean): Indicate whenever the archival should | asynchronous (boolean): Indicate whenever the archival should | ||||
run in asynchronous mode or not. | run in asynchronous mode or not. | ||||
""" | """ | ||||
def __init__(self, batch, archiver_args, master_storage_args, | def __init__(self, batch, archiver_args, master_objstorage_args, | ||||
slave_storages, config): | slave_objstorages, config): | ||||
""" Constructor of the ArchiverWorker class. | """ Constructor of the ArchiverWorker class. | ||||
Args: | Args: | ||||
batch: A batch of content, which is a dictionary that associates | batch: A batch of content, which is a dictionary that associates | ||||
a content's sha1 id to the list of servers where the content | a content's sha1 id to the list of servers where the content | ||||
is present. | is present. | ||||
archiver_args: The archiver's arguments to establish connection to | archiver_args: The archiver's arguments to establish connection to | ||||
db. | db. | ||||
master_storage_args: The master storage arguments. | master_objstorage_args: The master storage arguments. | ||||
slave_storages: A map that associates server_id to the remote | slave_objstorages: A map that associates server_id to the remote | ||||
server. | server. | ||||
config: Archiver_configuration. A dictionary that must contains | config: Archiver_configuration. A dictionary that must contains | ||||
the following keys. | the following keys. | ||||
objstorage_path (string): the path of the objstorage of the | objstorage_path (string): the path of the objstorage of the | ||||
master. | master. | ||||
batch_max_size (int): The number of content items that can be | batch_max_size (int): The number of content items that can be | ||||
given to the same archiver worker. | given to the same archiver worker. | ||||
archival_max_age (int): Delay given to the worker to copy all | archival_max_age (int): Delay given to the worker to copy all | ||||
the files in a given batch. | the files in a given batch. | ||||
retention_policy (int): Required number of copies for the | retention_policy (int): Required number of copies for the | ||||
content to be considered safe. | content to be considered safe. | ||||
asynchronous (boolean): Indicate whenever the archival should | asynchronous (boolean): Indicate whenever the archival should | ||||
run in asynchronous mode or not. | run in asynchronous mode or not. | ||||
""" | """ | ||||
self.batch = batch | self.batch = batch | ||||
self.archiver_storage = ArchiverStorage(archiver_args) | self.archiver_storage = ArchiverStorage(archiver_args) | ||||
self.master_storage = get_storage('local_storage', master_storage_args) | self.slave_objstorages = slave_objstorages | ||||
self.slave_storages = slave_storages | |||||
self.config = config | self.config = config | ||||
def __choose_backup_servers(self, allowed_storage, backup_number): | if config['objstorage_type'] == 'local_objstorage': | ||||
master_objstorage = PathSlicingObjStorage(**master_objstorage_args) | |||||
else: | |||||
master_objstorage = RemoteObjStorage(**master_objstorage_args) | |||||
self.master_objstorage = master_objstorage | |||||
def _choose_backup_servers(self, allowed_storage, backup_number): | |||||
""" Choose the slave servers for archival. | """ Choose the slave servers for archival. | ||||
Choose the given amount of servers among those which don't already | Choose the given amount of servers among those which don't already | ||||
contain a copy of the content. | contain a copy of the content. | ||||
Args: | Args: | ||||
allowed_storage: servers when the content is not already present. | allowed_storage: servers when the content is not already present. | ||||
backup_number (int): The number of servers we have to choose in | backup_number (int): The number of servers we have to choose in | ||||
order to fullfill the objective. | order to fullfill the objective. | ||||
""" | """ | ||||
# In case there is not enough backup servers to get all the backups | # In case there is not enough backup servers to get all the backups | ||||
# we need, just do our best. | # we need, just do our best. | ||||
# TODO such situation can only be caused by an incorrect configuration | # Such situation should not happen. | ||||
# setting. Do a verification previously. | |||||
backup_number = min(backup_number, len(allowed_storage)) | backup_number = min(backup_number, len(allowed_storage)) | ||||
# TODO Find a better (or a good) policy to choose the backup servers. | # TODO Find a better (or a good) policy to choose the backup servers. | ||||
# The random choice should be equivalently distributed between | # The random choice should be equivalently distributed between | ||||
# servers for a great amount of data, but don't take care of servers | # servers for a great amount of data, but don't take care of servers | ||||
# capacities. | # capacities. | ||||
return random.sample(allowed_storage, backup_number) | return random.sample(allowed_storage, backup_number) | ||||
def __get_archival_status(self, content_id, server): | def _get_archival_status(self, content_id, server): | ||||
""" Get the archival status of the required content. | """ Get the archival status of the required content. | ||||
Attributes: | Attributes: | ||||
content_id (string): Sha1 of the content. | content_id (string): Sha1 of the content. | ||||
server: Tuple (archive_id, archive_url) of the archive server. | server: Tuple (archive_id, archive_url) of the archive server. | ||||
Returns: | Returns: | ||||
A dictionary that contains all the required data : 'content_id', | A dictionary that contains all the required data : 'content_id', | ||||
'archive_id', 'status', and 'mtime' | 'archive_id', 'status', and 'mtime' | ||||
""" | """ | ||||
t, = list( | t, = list( | ||||
self.archiver_storage.content_archive_get(content_id, server[0]) | self.archiver_storage.content_archive_get(content_id, server[0]) | ||||
) | ) | ||||
return { | return { | ||||
'content_id': t[0], | 'content_id': t[0], | ||||
'archive_id': t[1], | 'archive_id': t[1], | ||||
'status': t[2], | 'status': t[2], | ||||
'mtime': t[3] | 'mtime': t[3] | ||||
} | } | ||||
def __content_archive_update(self, content_id, archive_id, | def _content_archive_update(self, content_id, archive_id, | ||||
new_status=None): | new_status=None): | ||||
""" Update the status of a archive content and set it's mtime to now() | """ Update the status of a archive content and set it's mtime to now() | ||||
Change the last modification time of an archived content and change | Change the last modification time of an archived content and change | ||||
its status to the given one. | its status to the given one. | ||||
Args: | Args: | ||||
content_id (string): The content id. | content_id (string): The content id. | ||||
archive_id (string): The id of the concerned archive. | archive_id (string): The id of the concerned archive. | ||||
Show All 12 Lines | def need_archival(self, content, destination): | ||||
Filter function that returns True if a given content | Filter function that returns True if a given content | ||||
still require to be archived. | still require to be archived. | ||||
Args: | Args: | ||||
content (str): Sha1 of a content. | content (str): Sha1 of a content. | ||||
destination: Tuple (archive id, archive url). | destination: Tuple (archive id, archive url). | ||||
""" | """ | ||||
archival_status = self.__get_archival_status( | archival_status = self._get_archival_status( | ||||
content, | content, | ||||
destination | destination | ||||
) | ) | ||||
status = archival_status['status'] | status = archival_status['status'] | ||||
mtime = archival_status['mtime'] | mtime = archival_status['mtime'] | ||||
# If the archive is already present, no need to backup. | # If the archive is already present, no need to backup. | ||||
if status == 'present': | if status == 'present': | ||||
return False | return False | ||||
Show All 22 Lines | def sort_content_by_archive(self): | ||||
The created mapping. | The created mapping. | ||||
""" | """ | ||||
slaves_copy = {} | slaves_copy = {} | ||||
for content_id in self.batch: | for content_id in self.batch: | ||||
# Choose some servers to upload the content among the missing ones. | # Choose some servers to upload the content among the missing ones. | ||||
server_data = self.batch[content_id] | server_data = self.batch[content_id] | ||||
nb_present = len(server_data['present']) | nb_present = len(server_data['present']) | ||||
nb_backup = self.config['retention_policy'] - nb_present | nb_backup = self.config['retention_policy'] - nb_present | ||||
backup_servers = self.__choose_backup_servers( | backup_servers = self._choose_backup_servers( | ||||
server_data['missing'], | server_data['missing'], | ||||
nb_backup | nb_backup | ||||
) | ) | ||||
# Fill the map destination -> content to upload | # Fill the map destination -> content to upload | ||||
for server in backup_servers: | for server in backup_servers: | ||||
slaves_copy.setdefault(server, []).append(content_id) | slaves_copy.setdefault(server, []).append(content_id) | ||||
return slaves_copy | return slaves_copy | ||||
Show All 10 Lines | def run(self): | ||||
# job have been done by another worker. | # job have been done by another worker. | ||||
for destination in slaves_copy: | for destination in slaves_copy: | ||||
# list() is needed because filter's result will be consumed twice. | # list() is needed because filter's result will be consumed twice. | ||||
slaves_copy[destination] = list(filter( | slaves_copy[destination] = list(filter( | ||||
lambda content_id: self.need_archival(content_id, destination), | lambda content_id: self.need_archival(content_id, destination), | ||||
slaves_copy[destination] | slaves_copy[destination] | ||||
)) | )) | ||||
for content_id in slaves_copy[destination]: | for content_id in slaves_copy[destination]: | ||||
self.__content_archive_update(content_id, destination[0], | self._content_archive_update(content_id, destination[0], | ||||
new_status='ongoing') | new_status='ongoing') | ||||
# Spawn a copier for each destination | # Spawn a copier for each destination | ||||
for destination in slaves_copy: | for destination in slaves_copy: | ||||
try: | try: | ||||
self.run_copier(destination, slaves_copy[destination]) | self.run_copier(destination, slaves_copy[destination]) | ||||
except: | except: | ||||
logger.error('Unable to copy a batch to %s' % destination) | logger.error('Unable to copy a batch to %s' % destination) | ||||
def run_copier(self, destination, contents): | def run_copier(self, destination, contents): | ||||
""" Run a copier in order to archive the given contents | """ Run a copier in order to archive the given contents | ||||
Upload the given contents to the given archive. | Upload the given contents to the given archive. | ||||
If the process fail, the whole content is considered uncopied | If the process fail, the whole content is considered uncopied | ||||
and remains 'ongoing', waiting to be rescheduled as there is | and remains 'ongoing', waiting to be rescheduled as there is | ||||
a delay. | a delay. | ||||
Attributes: | Attributes: | ||||
destination: Tuple (archive_id, archive_url) of the destination. | destination: Tuple (archive_id, archive_url) of the destination. | ||||
contents: List of contents to archive. | contents: List of contents to archive. | ||||
""" | """ | ||||
ac = ArchiverCopier(destination, contents, self.master_storage) | ac = ArchiverCopier(destination, contents, self.master_objstorage) | ||||
if ac.run(): | if ac.run(): | ||||
# Once the archival complete, update the database. | # Once the archival complete, update the database. | ||||
for content_id in contents: | for content_id in contents: | ||||
self.__content_archive_update(content_id, destination[0], | self._content_archive_update(content_id, destination[0], | ||||
new_status='present') | new_status='present') |