Page MenuHomeSoftware Heritage

D889.diff
No OneTemporary

D889.diff

diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py
--- a/swh/lister/bitbucket/tasks.py
+++ b/swh/lister/bitbucket/tasks.py
@@ -2,26 +2,51 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.lister.core.tasks import (IndexingDiscoveryListerTask,
- RangeListerTask,
- IndexingRefreshListerTask, ListerTaskBase)
+import random
+from celery import group
+
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
from .lister import BitBucketLister
+GROUP_SPLIT = 10000
+
+
+def new_lister(api_baseurl='https://api.bitbucket.org/2.0'):
+ return BitBucketLister(api_baseurl=api_baseurl)
+
-class BitBucketListerTask(ListerTaskBase):
- def new_lister(self, *, api_baseurl='https://api.bitbucket.org/2.0'):
- return BitBucketLister(api_baseurl=api_baseurl)
+@app.task(name='swh.lister.bitbucket.tasks.IncrementalBitBucketLister',
+ base=SWHTask, bind=True)
+def incremental_bitbucket_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=lister.db_last_index(), max_bound=None)
+ self.log.debug('%s OK' % (self.name))
-class IncrementalBitBucketLister(BitBucketListerTask,
- IndexingDiscoveryListerTask):
- task_queue = 'swh_lister_bitbucket_discover'
+@app.task(name='swh.lister.bitbucket.tasks.RangeBitBucketLister',
+ base=SWHTask, bind=True)
+def range_bitbucket_lister(self, start, end, **lister_args):
+ self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
+ self.name, start, end, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=start, max_bound=end)
+ self.log.debug('%s OK' % (self.name))
-class RangeBitBucketLister(BitBucketListerTask, RangeListerTask):
- task_queue = 'swh_lister_bitbucket_refresh'
+@app.task(name='swh.lister.bitbucket.tasks.FullBitBucketRelister',
+ base=SWHTask, bind=True)
+def full_bitbucket_relister(self, split=None, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ ranges = lister.db_partition_indices(split or GROUP_SPLIT)
+ random.shuffle(ranges)
+ group(range_bitbucket_lister.s(minv, maxv, **lister_args)
+ for minv, maxv in ranges)()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
-class FullBitBucketRelister(BitBucketListerTask, IndexingRefreshListerTask):
- task_queue = 'swh_lister_bitbucket_refresh'
diff --git a/swh/lister/core/tasks.py b/swh/lister/core/tasks.py
deleted file mode 100644
--- a/swh/lister/core/tasks.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright (C) 2017-2018 the Software Heritage developers
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import abc
-import random
-
-from celery import group
-
-from swh.scheduler.task import Task
-
-from .abstractattribute import AbstractAttribute
-
-
-class AbstractTaskMeta(abc.ABCMeta):
- pass
-
-
-class ListerTaskBase(Task, metaclass=AbstractTaskMeta):
- """Lister Tasks define the process of periodically requesting batches of
- repository information from source code hosting services. They
- instantiate Listers to do batches of work at periodic intervals.
-
- There are two main kinds of lister tasks:
-
- 1. Discovering new repositories.
- 2. Refreshing the list of already discovered repositories.
-
- If the hosting service is indexable (according to the requirements of
- :class:`SWHIndexingLister`), then we can optionally partition the
- set of known repositories into sub-sets to distribute the work.
-
- This means that there is a third possible Task type for Indexing
- Listers:
-
- 3. Discover or refresh a specific range of indices.
-
- """
- task_queue = AbstractAttribute('Celery Task queue name')
-
- @abc.abstractmethod
- def new_lister(self, **lister_args):
- """Return a new lister of the appropriate type.
- """
- pass
-
- @abc.abstractmethod
- def run_task(self, *, lister_args=None):
- pass
-
-
-# Paging/Indexing lister tasks derivatives
-# (cf. {github/bitbucket/gitlab}/tasks)
-
-
-class RangeListerTask(ListerTaskBase):
- """Range lister task.
-
- """
- def run_task(self, start, end, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- return lister.run(min_bound=start, max_bound=end)
-
-
-# Indexing Lister tasks derivatives (cf. {github/bitbucket}/tasks)
-
-
-class IndexingDiscoveryListerTask(ListerTaskBase):
- """Incremental indexing lister task.
-
- """
- def run_task(self, *, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- return lister.run(min_bound=lister.db_last_index(), max_bound=None)
-
-
-class IndexingRefreshListerTask(ListerTaskBase):
- """Full indexing lister task.
-
- """
- GROUP_SPLIT = 10000
-
- def run_task(self, *, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- ranges = lister.db_partition_indices(self.GROUP_SPLIT)
- random.shuffle(ranges)
- range_task = RangeListerTask()
- group(range_task.s(minv, maxv, lister_args)
- for minv, maxv in ranges)()
diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py
--- a/swh/lister/debian/tasks.py
+++ b/swh/lister/debian/tasks.py
@@ -2,17 +2,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.lister.core.tasks import ListerTaskBase
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
from .lister import DebianLister
-class DebianListerTask(ListerTaskBase):
- task_queue = 'swh_lister_debian'
-
- def new_lister(self):
- return DebianLister()
-
- def run_task(self, distribution):
- lister = self.new_lister()
- return lister.run(distribution)
+@app.task(name='swh.lister.debian.tasks.DebianListerTask',
+ base=SWHTask, bind=True)
+def debian_lister(self, distribution, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ DebianLister(**lister_args).run(distribution)
+ self.log.debug('%s OK' % (self.name))
diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py
--- a/swh/lister/github/tasks.py
+++ b/swh/lister/github/tasks.py
@@ -2,25 +2,52 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.lister.core.tasks import (IndexingDiscoveryListerTask,
- RangeListerTask,
- IndexingRefreshListerTask, ListerTaskBase)
+import random
-from .lister import GitHubLister
+from celery import group
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
-class GitHubListerTask(ListerTaskBase):
- def new_lister(self, *, api_baseurl='https://api.github.com'):
- return GitHubLister(api_baseurl=api_baseurl)
+from swh.lister.github.lister import GitHubLister
+GROUP_SPLIT = 10000
-class IncrementalGitHubLister(GitHubListerTask, IndexingDiscoveryListerTask):
- task_queue = 'swh_lister_github_discover'
+def new_lister(api_baseurl='https://api.github.com', **kw):
+ return GitHubLister(api_baseurl=api_baseurl, **kw)
-class RangeGitHubLister(GitHubListerTask, RangeListerTask):
- task_queue = 'swh_lister_github_refresh'
+
+@app.task(name='swh.lister.github.tasks.IncrementalGitHubLister',
+ base=SWHTask, bind=True)
+def incremental_github_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=lister.db_last_index(), max_bound=None)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.github.tasks.RangeGitHubLister',
+ base=SWHTask, bind=True)
+def range_github_lister(self, start, end, **lister_args):
+ self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
+ self.name, start, end, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=start, max_bound=end)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.github.tasks.FullGitHubRelister',
+ base=SWHTask, bind=True)
+def full_github_relister(self, split=None, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ ranges = lister.db_partition_indices(split or GROUP_SPLIT)
+ random.shuffle(ranges)
+ group(range_github_lister.s(minv, maxv, **lister_args)
+ for minv, maxv in ranges)()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
-class FullGitHubRelister(GitHubListerTask, IndexingRefreshListerTask):
- task_queue = 'swh_lister_github_refresh'
diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py
--- a/swh/lister/gitlab/tasks.py
+++ b/swh/lister/gitlab/tasks.py
@@ -6,58 +6,54 @@
from celery import group
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
+
from .. import utils
-from ..core.tasks import ListerTaskBase, RangeListerTask
from .lister import GitLabLister
-class GitLabListerTask(ListerTaskBase):
- def new_lister(self, *, api_baseurl='https://gitlab.com/api/v4',
- instance='gitlab', sort='asc', per_page=20):
- return GitLabLister(
- api_baseurl=api_baseurl, instance=instance, sort=sort)
-
-
-class RangeGitLabLister(GitLabListerTask, RangeListerTask):
- """Range GitLab lister (list available origins on specified range)
-
- """
- task_queue = 'swh_lister_gitlab_refresh'
-
-
-class FullGitLabRelister(GitLabListerTask):
- """Full GitLab lister (list all available origins from the api).
-
- """
- task_queue = 'swh_lister_gitlab_refresh'
-
- # nb pages
- nb_pages = 10
-
- def run_task(self, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- _, total_pages, _ = lister.get_pages_information()
- ranges = list(utils.split_range(total_pages, self.nb_pages))
- random.shuffle(ranges)
- range_task = RangeGitLabLister()
- group(range_task.s(minv, maxv, lister_args=lister_args)
- for minv, maxv in ranges)()
-
-
-class IncrementalGitLabLister(GitLabListerTask):
- """Incremental GitLab lister (list only new available origins).
-
- """
- task_queue = 'swh_lister_gitlab_discover'
-
- def run_task(self, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister_args['sort'] = 'desc'
- lister = self.new_lister(**lister_args)
- _, total_pages, _ = lister.get_pages_information()
- # stopping as soon as existing origins for that instance are detected
- return lister.run(min_bound=1, max_bound=total_pages,
- check_existence=True)
+NBPAGES = 10
+
+
+def new_lister(api_baseurl='https://gitlab.com/api/v4',
+ instance='gitlab', sort='asc', per_page=20):
+ return GitLabLister(
+ api_baseurl=api_baseurl, instance=instance, sort=sort)
+
+
+@app.task(name='swh.lister.gitlab.tasks.IncrementalGitLabLister',
+ base=SWHTask, bind=True)
+def incremental_gitlab_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister_args['sort'] = 'desc'
+ lister = new_lister(**lister_args)
+ total_pages = lister.get_pages_information()[1]
+ # stopping as soon as existing origins for that instance are detected
+ lister.run(min_bound=1, max_bound=total_pages, check_existence=True)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.gitlab.tasks.RangeGitLabLister',
+ base=SWHTask, bind=True)
+def range_gitlab_lister(self, start, end, **lister_args):
+ self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
+ self.name, start, end, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=start, max_bound=end)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.gitlab.tasks.FullGitLabRelister',
+ base=SWHTask, bind=True)
+def full_gitlab_relister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ _, total_pages, _ = lister.get_pages_information()
+ ranges = list(utils.split_range(total_pages, NBPAGES))
+ random.shuffle(ranges)
+ group(range_gitlab_lister.s(minv, maxv, **lister_args)
+ for minv, maxv in ranges)()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py
--- a/swh/lister/npm/tasks.py
+++ b/swh/lister/npm/tasks.py
@@ -3,75 +3,62 @@
# See top-level LICENSE file for more information
from datetime import datetime
+from contextlib import contextmanager
+
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
-from swh.lister.core.tasks import ListerTaskBase
from swh.lister.npm.lister import NpmLister, NpmIncrementalLister
from swh.lister.npm.models import NpmVisitModel
-class _NpmListerTaskBase(ListerTaskBase):
-
- task_queue = 'swh_lister_npm_refresh'
-
- def _save_registry_state(self):
- """Query the root endpoint from the npm registry and
- backup values of interest for future listing
- """
- params = {'headers': self.lister.request_headers()}
- registry_state = \
- self.lister.session.get(self.lister.api_baseurl, **params)
- registry_state = registry_state.json()
- self.registry_state = {
- 'visit_date': datetime.now(),
- }
- for key in ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq',
- 'disk_size', 'data_size', 'committed_update_seq',
- 'compacted_seq'):
- self.registry_state[key] = registry_state[key]
+@contextmanager
+def save_registry_state(lister):
+ params = {'headers': lister.request_headers()}
+ registry_state = lister.session.get(lister.api_baseurl, **params)
+ registry_state = registry_state.json()
+ keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq',
+ 'disk_size', 'data_size', 'committed_update_seq',
+ 'compacted_seq')
- def _store_registry_state(self):
- """Store the backup npm registry state to database.
- """
- npm_visit = NpmVisitModel(**self.registry_state)
- self.lister.db_session.add(npm_visit)
- self.lister.db_session.commit()
+ state = {key: registry_state[key] for key in keys}
+ state['visit_date'] = datetime.now()
+ yield
+ npm_visit = NpmVisitModel(state)
+ lister.db_session.add(npm_visit)
+ lister.db_session.commit()
-class NpmListerTask(_NpmListerTaskBase):
- """Full npm lister (list all available packages from the npm registry)
+def get_last_update_seq(lister):
+ """Get latest ``update_seq`` value for listing only updated packages.
"""
-
- def new_lister(self):
- return NpmLister()
-
- def run_task(self):
- self.lister = self.new_lister()
- self._save_registry_state()
- self.lister.run()
- self._store_registry_state()
-
-
-class NpmIncrementalListerTask(_NpmListerTaskBase):
- """Incremental npm lister (list all updated packages since the last listing)
- """
-
- def new_lister(self):
- return NpmIncrementalLister()
-
- def run_task(self):
- self.lister = self.new_lister()
- update_seq_start = self._get_last_update_seq()
- self._save_registry_state()
- self.lister.run(min_bound=update_seq_start)
- self._store_registry_state()
-
- def _get_last_update_seq(self):
- """Get latest ``update_seq`` value for listing only updated packages.
- """
- query = self.lister.db_session.query(NpmVisitModel.update_seq)
- row = query.order_by(NpmVisitModel.uid.desc()).first()
- if not row:
- raise ValueError('No npm registry listing previously performed ! '
- 'This is required prior to the execution of an '
- 'incremental listing.')
- return row[0]
+ query = lister.db_session.query(NpmVisitModel.update_seq)
+ row = query.order_by(NpmVisitModel.uid.desc()).first()
+ if not row:
+ raise ValueError('No npm registry listing previously performed ! '
+ 'This is required prior to the execution of an '
+ 'incremental listing.')
+ return row[0]
+
+
+@app.task(name='swh.lister.npm.tasks.NpmListerTask',
+ base=SWHTask, bind=True)
+def npm_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = NpmLister(**lister_args)
+ with save_registry_state(lister):
+ lister.run()
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.npm.tasks.NpmIncrementalListerTask',
+ base=SWHTask, bind=True)
+def npm_incremental_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = NpmIncrementalLister(**lister_args)
+ update_seq_start = get_last_update_seq(lister)
+ with save_registry_state(lister):
+ lister.run(min_bound=update_seq_start)
+ self.log.debug('%s OK' % (self.name))
diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py
--- a/swh/lister/pypi/tasks.py
+++ b/swh/lister/pypi/tasks.py
@@ -2,19 +2,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from ..core.tasks import ListerTaskBase
-from .lister import PyPILister
+from swh.scheduler.task import SWHTask
+from swh.scheduler.celery_backend.config import app
+from .lister import PyPILister
-class PyPIListerTask(ListerTaskBase):
- """Full PyPI lister (list all available origins from the api).
- """
- task_queue = 'swh_lister_pypi_refresh'
+@app.task(name='swh.lister.pypi.tasks.PyPIListerTask',
+ base=SWHTask, bind=True)
+def pypi_lister(self, **lister_args):
+ self.log.debug('%s(), lister_args=%s' % (
+ self.name, lister_args))
+ PyPILister(**lister_args).run()
+ self.log.debug('%s OK' % (self.name))
- def new_lister(self):
- return PyPILister()
- def run_task(self):
- lister = self.new_lister()
- lister.run()

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 7:55 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220324

Event Timeline