Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123987
D889.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
D889.diff
View Options
diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py
--- a/swh/lister/bitbucket/tasks.py
+++ b/swh/lister/bitbucket/tasks.py
@@ -2,26 +2,51 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.lister.core.tasks import (IndexingDiscoveryListerTask,
- RangeListerTask,
- IndexingRefreshListerTask, ListerTaskBase)
+import random
+from celery import group
+
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
from .lister import BitBucketLister
+GROUP_SPLIT = 10000
+
+
+def new_lister(api_baseurl='https://api.bitbucket.org/2.0'):
+ return BitBucketLister(api_baseurl=api_baseurl)
+
-class BitBucketListerTask(ListerTaskBase):
- def new_lister(self, *, api_baseurl='https://api.bitbucket.org/2.0'):
- return BitBucketLister(api_baseurl=api_baseurl)
+@app.task(name='swh.lister.bitbucket.tasks.IncrementalBitBucketLister',
+ base=SWHTask, bind=True)
+def incremental_bitbucket_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=lister.db_last_index(), max_bound=None)
+ self.log.debug('%s OK' % (self.name))
-class IncrementalBitBucketLister(BitBucketListerTask,
- IndexingDiscoveryListerTask):
- task_queue = 'swh_lister_bitbucket_discover'
+@app.task(name='swh.lister.bitbucket.tasks.RangeBitBucketLister',
+ base=SWHTask, bind=True)
+def range_bitbucket_lister(self, start, end, **lister_args):
+ self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
+ self.name, start, end, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=start, max_bound=end)
+ self.log.debug('%s OK' % (self.name))
-class RangeBitBucketLister(BitBucketListerTask, RangeListerTask):
- task_queue = 'swh_lister_bitbucket_refresh'
+@app.task(name='swh.lister.bitbucket.tasks.FullBitBucketRelister',
+ base=SWHTask, bind=True)
+def full_bitbucket_relister(self, split=None, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ ranges = lister.db_partition_indices(split or GROUP_SPLIT)
+ random.shuffle(ranges)
+ group(range_bitbucket_lister.s(minv, maxv, **lister_args)
+ for minv, maxv in ranges)()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
-class FullBitBucketRelister(BitBucketListerTask, IndexingRefreshListerTask):
- task_queue = 'swh_lister_bitbucket_refresh'
diff --git a/swh/lister/core/tasks.py b/swh/lister/core/tasks.py
deleted file mode 100644
--- a/swh/lister/core/tasks.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright (C) 2017-2018 the Software Heritage developers
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import abc
-import random
-
-from celery import group
-
-from swh.scheduler.task import Task
-
-from .abstractattribute import AbstractAttribute
-
-
-class AbstractTaskMeta(abc.ABCMeta):
- pass
-
-
-class ListerTaskBase(Task, metaclass=AbstractTaskMeta):
- """Lister Tasks define the process of periodically requesting batches of
- repository information from source code hosting services. They
- instantiate Listers to do batches of work at periodic intervals.
-
- There are two main kinds of lister tasks:
-
- 1. Discovering new repositories.
- 2. Refreshing the list of already discovered repositories.
-
- If the hosting service is indexable (according to the requirements of
- :class:`SWHIndexingLister`), then we can optionally partition the
- set of known repositories into sub-sets to distribute the work.
-
- This means that there is a third possible Task type for Indexing
- Listers:
-
- 3. Discover or refresh a specific range of indices.
-
- """
- task_queue = AbstractAttribute('Celery Task queue name')
-
- @abc.abstractmethod
- def new_lister(self, **lister_args):
- """Return a new lister of the appropriate type.
- """
- pass
-
- @abc.abstractmethod
- def run_task(self, *, lister_args=None):
- pass
-
-
-# Paging/Indexing lister tasks derivatives
-# (cf. {github/bitbucket/gitlab}/tasks)
-
-
-class RangeListerTask(ListerTaskBase):
- """Range lister task.
-
- """
- def run_task(self, start, end, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- return lister.run(min_bound=start, max_bound=end)
-
-
-# Indexing Lister tasks derivatives (cf. {github/bitbucket}/tasks)
-
-
-class IndexingDiscoveryListerTask(ListerTaskBase):
- """Incremental indexing lister task.
-
- """
- def run_task(self, *, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- return lister.run(min_bound=lister.db_last_index(), max_bound=None)
-
-
-class IndexingRefreshListerTask(ListerTaskBase):
- """Full indexing lister task.
-
- """
- GROUP_SPLIT = 10000
-
- def run_task(self, *, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- ranges = lister.db_partition_indices(self.GROUP_SPLIT)
- random.shuffle(ranges)
- range_task = RangeListerTask()
- group(range_task.s(minv, maxv, lister_args)
- for minv, maxv in ranges)()
diff --git a/swh/lister/debian/tasks.py b/swh/lister/debian/tasks.py
--- a/swh/lister/debian/tasks.py
+++ b/swh/lister/debian/tasks.py
@@ -2,17 +2,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.lister.core.tasks import ListerTaskBase
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
from .lister import DebianLister
-class DebianListerTask(ListerTaskBase):
- task_queue = 'swh_lister_debian'
-
- def new_lister(self):
- return DebianLister()
-
- def run_task(self, distribution):
- lister = self.new_lister()
- return lister.run(distribution)
+@app.task(name='swh.lister.debian.tasks.DebianListerTask',
+ base=SWHTask, bind=True)
+def debian_lister(self, distribution, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ DebianLister(**lister_args).run(distribution)
+ self.log.debug('%s OK' % (self.name))
diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py
--- a/swh/lister/github/tasks.py
+++ b/swh/lister/github/tasks.py
@@ -2,25 +2,52 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.lister.core.tasks import (IndexingDiscoveryListerTask,
- RangeListerTask,
- IndexingRefreshListerTask, ListerTaskBase)
+import random
-from .lister import GitHubLister
+from celery import group
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
-class GitHubListerTask(ListerTaskBase):
- def new_lister(self, *, api_baseurl='https://api.github.com'):
- return GitHubLister(api_baseurl=api_baseurl)
+from swh.lister.github.lister import GitHubLister
+GROUP_SPLIT = 10000
-class IncrementalGitHubLister(GitHubListerTask, IndexingDiscoveryListerTask):
- task_queue = 'swh_lister_github_discover'
+def new_lister(api_baseurl='https://api.github.com', **kw):
+ return GitHubLister(api_baseurl=api_baseurl, **kw)
-class RangeGitHubLister(GitHubListerTask, RangeListerTask):
- task_queue = 'swh_lister_github_refresh'
+
+@app.task(name='swh.lister.github.tasks.IncrementalGitHubLister',
+ base=SWHTask, bind=True)
+def incremental_github_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=lister.db_last_index(), max_bound=None)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.github.tasks.RangeGitHubLister',
+ base=SWHTask, bind=True)
+def range_github_lister(self, start, end, **lister_args):
+ self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
+ self.name, start, end, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=start, max_bound=end)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.github.tasks.FullGitHubRelister',
+ base=SWHTask, bind=True)
+def full_github_relister(self, split=None, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ ranges = lister.db_partition_indices(split or GROUP_SPLIT)
+ random.shuffle(ranges)
+ group(range_github_lister.s(minv, maxv, **lister_args)
+ for minv, maxv in ranges)()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
-class FullGitHubRelister(GitHubListerTask, IndexingRefreshListerTask):
- task_queue = 'swh_lister_github_refresh'
diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py
--- a/swh/lister/gitlab/tasks.py
+++ b/swh/lister/gitlab/tasks.py
@@ -6,58 +6,54 @@
from celery import group
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
+
from .. import utils
-from ..core.tasks import ListerTaskBase, RangeListerTask
from .lister import GitLabLister
-class GitLabListerTask(ListerTaskBase):
- def new_lister(self, *, api_baseurl='https://gitlab.com/api/v4',
- instance='gitlab', sort='asc', per_page=20):
- return GitLabLister(
- api_baseurl=api_baseurl, instance=instance, sort=sort)
-
-
-class RangeGitLabLister(GitLabListerTask, RangeListerTask):
- """Range GitLab lister (list available origins on specified range)
-
- """
- task_queue = 'swh_lister_gitlab_refresh'
-
-
-class FullGitLabRelister(GitLabListerTask):
- """Full GitLab lister (list all available origins from the api).
-
- """
- task_queue = 'swh_lister_gitlab_refresh'
-
- # nb pages
- nb_pages = 10
-
- def run_task(self, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister = self.new_lister(**lister_args)
- _, total_pages, _ = lister.get_pages_information()
- ranges = list(utils.split_range(total_pages, self.nb_pages))
- random.shuffle(ranges)
- range_task = RangeGitLabLister()
- group(range_task.s(minv, maxv, lister_args=lister_args)
- for minv, maxv in ranges)()
-
-
-class IncrementalGitLabLister(GitLabListerTask):
- """Incremental GitLab lister (list only new available origins).
-
- """
- task_queue = 'swh_lister_gitlab_discover'
-
- def run_task(self, lister_args=None):
- if lister_args is None:
- lister_args = {}
- lister_args['sort'] = 'desc'
- lister = self.new_lister(**lister_args)
- _, total_pages, _ = lister.get_pages_information()
- # stopping as soon as existing origins for that instance are detected
- return lister.run(min_bound=1, max_bound=total_pages,
- check_existence=True)
+NBPAGES = 10
+
+
+def new_lister(api_baseurl='https://gitlab.com/api/v4',
+ instance='gitlab', sort='asc', per_page=20):
+ return GitLabLister(
+ api_baseurl=api_baseurl, instance=instance, sort=sort)
+
+
+@app.task(name='swh.lister.gitlab.tasks.IncrementalGitLabLister',
+ base=SWHTask, bind=True)
+def incremental_gitlab_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister_args['sort'] = 'desc'
+ lister = new_lister(**lister_args)
+ total_pages = lister.get_pages_information()[1]
+ # stopping as soon as existing origins for that instance are detected
+ lister.run(min_bound=1, max_bound=total_pages, check_existence=True)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.gitlab.tasks.RangeGitLabLister',
+ base=SWHTask, bind=True)
+def range_gitlab_lister(self, start, end, **lister_args):
+ self.log.debug('%s(start=%s, end=%d), lister_args=%s' % (
+ self.name, start, end, lister_args))
+ lister = new_lister(**lister_args)
+ lister.run(min_bound=start, max_bound=end)
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.gitlab.tasks.FullGitLabRelister',
+ base=SWHTask, bind=True)
+def full_gitlab_relister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = new_lister(**lister_args)
+ _, total_pages, _ = lister.get_pages_information()
+ ranges = list(utils.split_range(total_pages, NBPAGES))
+ random.shuffle(ranges)
+ group(range_gitlab_lister.s(minv, maxv, **lister_args)
+ for minv, maxv in ranges)()
+ self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges)))
diff --git a/swh/lister/npm/tasks.py b/swh/lister/npm/tasks.py
--- a/swh/lister/npm/tasks.py
+++ b/swh/lister/npm/tasks.py
@@ -3,75 +3,62 @@
# See top-level LICENSE file for more information
from datetime import datetime
+from contextlib import contextmanager
+
+from swh.scheduler.celery_backend.config import app
+from swh.scheduler.task import SWHTask
-from swh.lister.core.tasks import ListerTaskBase
from swh.lister.npm.lister import NpmLister, NpmIncrementalLister
from swh.lister.npm.models import NpmVisitModel
-class _NpmListerTaskBase(ListerTaskBase):
-
- task_queue = 'swh_lister_npm_refresh'
-
- def _save_registry_state(self):
- """Query the root endpoint from the npm registry and
- backup values of interest for future listing
- """
- params = {'headers': self.lister.request_headers()}
- registry_state = \
- self.lister.session.get(self.lister.api_baseurl, **params)
- registry_state = registry_state.json()
- self.registry_state = {
- 'visit_date': datetime.now(),
- }
- for key in ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq',
- 'disk_size', 'data_size', 'committed_update_seq',
- 'compacted_seq'):
- self.registry_state[key] = registry_state[key]
+@contextmanager
+def save_registry_state(lister):
+ params = {'headers': lister.request_headers()}
+ registry_state = lister.session.get(lister.api_baseurl, **params)
+ registry_state = registry_state.json()
+ keys = ('doc_count', 'doc_del_count', 'update_seq', 'purge_seq',
+ 'disk_size', 'data_size', 'committed_update_seq',
+ 'compacted_seq')
- def _store_registry_state(self):
- """Store the backup npm registry state to database.
- """
- npm_visit = NpmVisitModel(**self.registry_state)
- self.lister.db_session.add(npm_visit)
- self.lister.db_session.commit()
+ state = {key: registry_state[key] for key in keys}
+ state['visit_date'] = datetime.now()
+ yield
+ npm_visit = NpmVisitModel(state)
+ lister.db_session.add(npm_visit)
+ lister.db_session.commit()
-class NpmListerTask(_NpmListerTaskBase):
- """Full npm lister (list all available packages from the npm registry)
+def get_last_update_seq(lister):
+ """Get latest ``update_seq`` value for listing only updated packages.
"""
-
- def new_lister(self):
- return NpmLister()
-
- def run_task(self):
- self.lister = self.new_lister()
- self._save_registry_state()
- self.lister.run()
- self._store_registry_state()
-
-
-class NpmIncrementalListerTask(_NpmListerTaskBase):
- """Incremental npm lister (list all updated packages since the last listing)
- """
-
- def new_lister(self):
- return NpmIncrementalLister()
-
- def run_task(self):
- self.lister = self.new_lister()
- update_seq_start = self._get_last_update_seq()
- self._save_registry_state()
- self.lister.run(min_bound=update_seq_start)
- self._store_registry_state()
-
- def _get_last_update_seq(self):
- """Get latest ``update_seq`` value for listing only updated packages.
- """
- query = self.lister.db_session.query(NpmVisitModel.update_seq)
- row = query.order_by(NpmVisitModel.uid.desc()).first()
- if not row:
- raise ValueError('No npm registry listing previously performed ! '
- 'This is required prior to the execution of an '
- 'incremental listing.')
- return row[0]
+ query = lister.db_session.query(NpmVisitModel.update_seq)
+ row = query.order_by(NpmVisitModel.uid.desc()).first()
+ if not row:
+ raise ValueError('No npm registry listing previously performed ! '
+ 'This is required prior to the execution of an '
+ 'incremental listing.')
+ return row[0]
+
+
+@app.task(name='swh.lister.npm.tasks.NpmListerTask',
+ base=SWHTask, bind=True)
+def npm_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = NpmLister(**lister_args)
+ with save_registry_state(lister):
+ lister.run()
+ self.log.debug('%s OK' % (self.name))
+
+
+@app.task(name='swh.lister.npm.tasks.NpmIncrementalListerTask',
+ base=SWHTask, bind=True)
+def npm_incremental_lister(self, **lister_args):
+ self.log.debug('%s, lister_args=%s' % (
+ self.name, lister_args))
+ lister = NpmIncrementalLister(**lister_args)
+ update_seq_start = get_last_update_seq(lister)
+ with save_registry_state(lister):
+ lister.run(min_bound=update_seq_start)
+ self.log.debug('%s OK' % (self.name))
diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py
--- a/swh/lister/pypi/tasks.py
+++ b/swh/lister/pypi/tasks.py
@@ -2,19 +2,18 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from ..core.tasks import ListerTaskBase
-from .lister import PyPILister
+from swh.scheduler.task import SWHTask
+from swh.scheduler.celery_backend.config import app
+from .lister import PyPILister
-class PyPIListerTask(ListerTaskBase):
- """Full PyPI lister (list all available origins from the api).
- """
- task_queue = 'swh_lister_pypi_refresh'
+@app.task(name='swh.lister.pypi.tasks.PyPIListerTask',
+ base=SWHTask, bind=True)
+def pypi_lister(self, **lister_args):
+ self.log.debug('%s(), lister_args=%s' % (
+ self.name, lister_args))
+ PyPILister(**lister_args).run()
+ self.log.debug('%s OK' % (self.name))
- def new_lister(self):
- return PyPILister()
- def run_task(self):
- lister = self.new_lister()
- lister.run()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 7:55 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220324
Attached To
D889: Heavy refactor of the task system
Event Timeline
Log In to Comment