Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/archiver/worker.py
Show All 17 Lines | class ArchiverWorker(): # This class should probably extend a Celery Task. | ||||
the slaves servers. | the slaves servers. | ||||
Attributes: | Attributes: | ||||
batch: The content this worker has to archive, which is a dictionary | batch: The content this worker has to archive, which is a dictionary | ||||
that associates a content's sha1 id to the list of servers where | that associates a content's sha1 id to the list of servers where | ||||
the content is present or missing | the content is present or missing | ||||
(see ArchiverDirector::get_unarchived_content). | (see ArchiverDirector::get_unarchived_content). | ||||
master_storage_args: The connection argument to initialize the | master_storage_args: The connection argument to initialize the | ||||
master storage where is the content location. | master storage where is the content location. | ||||
ardumont: s/where is the content location/(db connection url, content location)/ | |||||
slave_storages: A map that associates server_id to the remote server. | slave_storages: A map that associates server_id to the remote server. | ||||
retention_policy: The required number of copies for a content to be | retention_policy: The required number of copies for a content to be | ||||
Done Inline ActionsFor the sake of coherence with the rest of your diff, you forgot to adapt the docstring on config argument. ardumont: For the sake of coherence with the rest of your diff, you forgot to adapt the docstring on… | |||||
considered safe. | considered safe. | ||||
""" | """ | ||||
def __init__(self, batch, master_storage_args, | def __init__(self, batch, master_storage_args, | ||||
slave_storages, retention_policy): | slave_storages, retention_policy): | ||||
""" Constructor of the ArchiverWorker class. | """ Constructor of the ArchiverWorker class. | ||||
Args: | Args: | ||||
batch: A batch of content, which is a dictionnary that associates | batch: A batch of content, which is a dictionnary that associates | ||||
a content's sha1 id to the list of servers where the content | a content's sha1 id to the list of servers where the content | ||||
is present. | is present. | ||||
master_storage: The master storage where is the whole content. | master_storage: The master storage where is the whole content. | ||||
Done Inline Actionsarguments. ardumont: arguments. | |||||
slave_storages: A map that associates server_id to the remote | slave_storages: A map that associates server_id to the remote | ||||
server. | server. | ||||
retention_policy: The required number of copies for a content to | retention_policy: The required number of copies for a content to | ||||
be considered safe. | be considered safe. | ||||
""" | """ | ||||
self.batch = batch | self.batch = batch | ||||
self.master_storage = get_storage('local_storage', master_storage_args) | self.master_storage = get_storage('local_storage', master_storage_args) | ||||
self.slave_storages = slave_storages | self.slave_storages = slave_storages | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | def run(self): | ||||
def content_filter(content, destination): | def content_filter(content, destination): | ||||
""" Indicates whenever a content need archivage. | """ Indicates whenever a content need archivage. | ||||
Filter function that returns True if a given content | Filter function that returns True if a given content | ||||
still require to be archived. | still require to be archived. | ||||
Args: | Args: | ||||
content (str): | content (str): Sha1 of a content. | ||||
destination: Tuple of (archive id, archive url). | |||||
""" | """ | ||||
archival_status = self.__get_archival_status( | archival_status = self.__get_archival_status( | ||||
content, | content, | ||||
destination | destination | ||||
) | ) | ||||
if archival_status: | if archival_status: | ||||
status = archival_status['status'] | status = archival_status['status'] | ||||
# If the archive is already present, no need to backup. | # If the archive is already present, no need to backup. | ||||
Show All 25 Lines | def run(self): | ||||
slaves_copy.setdefault(server, []).append(content_id) | slaves_copy.setdefault(server, []).append(content_id) | ||||
# At this point, check the archival status of the content in order to | # At this point, check the archival status of the content in order to | ||||
# know if it is still needed. | # know if it is still needed. | ||||
for destination in slaves_copy: | for destination in slaves_copy: | ||||
contents = [] | contents = [] | ||||
for content in slaves_copy[destination]: | for content in slaves_copy[destination]: | ||||
if content_filter(content, destination): | if content_filter(content, destination): | ||||
self.__content_archive_update(content, destination[0], | |||||
new_status='ongoing') | |||||
contents.append(content) | contents.append(content) | ||||
slaves_copy[destination] = contents | slaves_copy[destination] = contents | ||||
# Spawn a copier for each destination that will copy all the | # Spawn a copier for each destination that will copy all the | ||||
# needed content. | # needed content. | ||||
for destination in slaves_copy: | for destination in slaves_copy: | ||||
ac = ArchiverCopier( | ac = ArchiverCopier( | ||||
destination, slaves_copy[destination], | destination, slaves_copy[destination], | ||||
self.master_storage) | self.master_storage) | ||||
if ac.run(): | if ac.run(): | ||||
# Once the archival complete, update the database. | # Once the archival complete, update the database. | ||||
for content_id in slaves_copy[destination]: | for content_id in slaves_copy[destination]: | ||||
self.__content_archive_update(content_id, destination[0], | self.__content_archive_update(content_id, destination[0], | ||||
new_status='present') | new_status='present') | ||||
Done Inline Actions...archive_url) of the destination. ardumont: ...archive_url) of the destination. | |||||
Done Inline ActionsList of contents ardumont: List of contents | |||||
Done Inline ActionsYou might want to add in the docstring the fact that if the copier fails to run (returns False), we consider the content not being copied (because no update takes place). ardumont: You might want to add in the docstring the fact that if the copier fails to run (returns False)… | |||||
Done Inline ActionsIf one copier fails significantly badly (e.g. by throwing something), the remaining copier won't even be triggered. ardumont: If one copier fails significantly badly (e.g. by throwing something), the remaining copier… |
s/where is the content location/(db connection url, content location)/