diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py index 78549fc5..ccb0a2f6 100644 --- a/swh/storage/archiver/tasks.py +++ b/swh/storage/archiver/tasks.py @@ -1,28 +1,28 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from .worker import ArchiverWithRetentionPolicyWorker from .worker import ArchiverToBackendWorker class SWHArchiverWithRetentionPolicyTask(Task): """Main task that archive a batch of content. """ task_queue = 'swh_storage_archive_worker' - def run(self, *args, **kwargs): + def run_task(self, *args, **kwargs): ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() class SWHArchiverToBackendTask(Task): """Main task that archive a batch of content in the cloud. """ task_queue = 'swh_storage_archive_worker_to_backend' - def run(self, *args, **kwargs): + def run_task(self, *args, **kwargs): ArchiverToBackendWorker(*args, **kwargs).run() diff --git a/swh/storage/provenance/tasks.py b/swh/storage/provenance/tasks.py index a9b50c30..b1ed21fc 100644 --- a/swh/storage/provenance/tasks.py +++ b/swh/storage/provenance/tasks.py @@ -1,112 +1,112 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group from swh.model import hashutil from swh.core.config import load_named_config from swh.scheduler.task import Task from swh.storage import get_storage BASE_CONFIG_PATH = 'storage/provenance_cache' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), 'revision_packet_size': ('int', 100), } class PopulateCacheContentRevision(Task): """Populate the content -> revision provenance cache for some revisions""" task_queue = 'swh_populate_cache_content_revision' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config(BASE_CONFIG_PATH, DEFAULT_CONFIG) return self.__config - def run(self, revisions): + def run_task(self, revisions): """Cache the cache_content_revision table for the revisions provided. Args: revisions: List of revisions to cache populate. """ config = self.config storage = get_storage(**config['storage']) storage.cache_content_revision_add( hashutil.hash_to_bytes(revision) for revision in revisions ) class PopulateCacheRevisionOrigin(Task): """Populate the revision -> origin provenance cache for one origin's visit""" task_queue = 'swh_populate_cache_revision_origin' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config(BASE_CONFIG_PATH, DEFAULT_CONFIG) return self.__config - def run(self, origin_id, visit_id): + def run_task(self, origin_id, visit_id): """Cache the cache_revision_origin for the given origin visit Args: origin_id: the origin id to cache visit_id: the visit id to cache This task also creates the revision cache tasks, as well as the task to cache the next origin visit available """ config = self.config storage = get_storage(**config['storage']) packet_size = config['revision_packet_size'] pipelined_tasks = [] visits = sorted( visit['visit'] for visit in storage.origin_visit_get(origin_id) ) if visit_id in visits: revision_task = PopulateCacheContentRevision() new_revisions = [ hashutil.hash_to_hex(revision) for revision in storage.cache_revision_origin_add( origin_id, visit_id) ] if new_revisions: split_new_revisions = [ new_revisions[i:i + packet_size] for i in range(0, packet_size, len(new_revisions)) ] for packet in split_new_revisions: pipelined_tasks.append(revision_task.s(packet)) try: next_visit = min(visit for visit in visits if visit > visit_id) except ValueError: # no next visit, stop pipelining further visits pass else: pipelined_tasks.append(self.s(origin_id, next_visit)) if pipelined_tasks: group(pipelined_tasks).delay()