diff --git a/PKG-INFO b/PKG-INFO index 0002faf7b..359299e12 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.86 +Version: 0.0.87 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index d58d24fbc..18b99f504 100644 --- a/debian/control +++ b/debian/control @@ -1,57 +1,57 @@ Source: swh-storage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-click, python3-dateutil, python3-flask, python3-nose, python3-psycopg2, python3-requests, python3-setuptools, python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), - python3-swh.scheduler (>= 0.0.11~), + python3-swh.scheduler (>= 0.0.14~), python3-aiohttp, python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ Package: python3-swh.storage Architecture: all Depends: python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage utilities Package: python3-swh.storage.listener Architecture: all Depends: python3-swh.journal (>= 0.0.2~), python3-kafka (>= 1.3.1~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage listener Package: python3-swh.storage.archiver Architecture: all -Depends: python3-swh.scheduler (>= 0.0.11~), +Depends: python3-swh.scheduler (>= 0.0.14~), python3-swh.journal, python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Archiver Package: python3-swh.storage.provenance Architecture: all -Depends: python3-swh.scheduler (>= 0.0.11~), +Depends: python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Provenance diff --git a/requirements-swh.txt b/requirements-swh.txt index 23b31d463..326583ec1 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.28 swh.model >= 0.0.15 swh.objstorage >= 0.0.17 -swh.scheduler >= 0.0.11 +swh.scheduler >= 0.0.14 swh.journal >= 0.0.2 diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 0002faf7b..359299e12 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.storage -Version: 0.0.86 +Version: 0.0.87 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index aea29cc6d..e0e8119c7 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,13 +1,13 @@ aiohttp click flask kafka psycopg2 python-dateutil python-fastimport swh.core>=0.0.28 swh.journal>=0.0.2 swh.model>=0.0.15 swh.objstorage>=0.0.17 -swh.scheduler>=0.0.11 +swh.scheduler>=0.0.14 vcversioner diff --git a/swh/storage/archiver/tasks.py b/swh/storage/archiver/tasks.py index 78549fc53..ccb0a2f63 100644 --- a/swh/storage/archiver/tasks.py +++ b/swh/storage/archiver/tasks.py @@ -1,28 +1,28 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from .worker import ArchiverWithRetentionPolicyWorker from .worker import ArchiverToBackendWorker class SWHArchiverWithRetentionPolicyTask(Task): """Main task that archive a batch of content. """ task_queue = 'swh_storage_archive_worker' - def run(self, *args, **kwargs): + def run_task(self, *args, **kwargs): ArchiverWithRetentionPolicyWorker(*args, **kwargs).run() class SWHArchiverToBackendTask(Task): """Main task that archive a batch of content in the cloud. """ task_queue = 'swh_storage_archive_worker_to_backend' - def run(self, *args, **kwargs): + def run_task(self, *args, **kwargs): ArchiverToBackendWorker(*args, **kwargs).run() diff --git a/swh/storage/provenance/tasks.py b/swh/storage/provenance/tasks.py index a9b50c304..b1ed21fc5 100644 --- a/swh/storage/provenance/tasks.py +++ b/swh/storage/provenance/tasks.py @@ -1,112 +1,112 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import group from swh.model import hashutil from swh.core.config import load_named_config from swh.scheduler.task import Task from swh.storage import get_storage BASE_CONFIG_PATH = 'storage/provenance_cache' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), 'revision_packet_size': ('int', 100), } class PopulateCacheContentRevision(Task): """Populate the content -> revision provenance cache for some revisions""" task_queue = 'swh_populate_cache_content_revision' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config(BASE_CONFIG_PATH, DEFAULT_CONFIG) return self.__config - def run(self, revisions): + def run_task(self, revisions): """Cache the cache_content_revision table for the revisions provided. Args: revisions: List of revisions to cache populate. """ config = self.config storage = get_storage(**config['storage']) storage.cache_content_revision_add( hashutil.hash_to_bytes(revision) for revision in revisions ) class PopulateCacheRevisionOrigin(Task): """Populate the revision -> origin provenance cache for one origin's visit""" task_queue = 'swh_populate_cache_revision_origin' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config(BASE_CONFIG_PATH, DEFAULT_CONFIG) return self.__config - def run(self, origin_id, visit_id): + def run_task(self, origin_id, visit_id): """Cache the cache_revision_origin for the given origin visit Args: origin_id: the origin id to cache visit_id: the visit id to cache This task also creates the revision cache tasks, as well as the task to cache the next origin visit available """ config = self.config storage = get_storage(**config['storage']) packet_size = config['revision_packet_size'] pipelined_tasks = [] visits = sorted( visit['visit'] for visit in storage.origin_visit_get(origin_id) ) if visit_id in visits: revision_task = PopulateCacheContentRevision() new_revisions = [ hashutil.hash_to_hex(revision) for revision in storage.cache_revision_origin_add( origin_id, visit_id) ] if new_revisions: split_new_revisions = [ new_revisions[i:i + packet_size] for i in range(0, packet_size, len(new_revisions)) ] for packet in split_new_revisions: pipelined_tasks.append(revision_task.s(packet)) try: next_visit = min(visit for visit in visits if visit > visit_id) except ValueError: # no next visit, stop pipelining further visits pass else: pipelined_tasks.append(self.s(origin_id, next_visit)) if pipelined_tasks: group(pipelined_tasks).delay() diff --git a/version.txt b/version.txt index 1e526dbce..91ab8c10f 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.86-0-gcb855e0 \ No newline at end of file +v0.0.87-0-g9c8455e \ No newline at end of file