Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/core/tasks.py
- This file was added.
# Copyright (C) 2017 the Software Heritage developers | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import abc | |||||
import random | |||||
from celery import group | |||||
from celery.app.task import TaskType | |||||
zack: This module looks like a good place where to briefly describe the different listing… | |||||
Done Inline ActionsThis is still pending — as shown by the TODO marked right below this hunk — right? Do you prefer to merge leaving that pending or what? zack: This is still pending — as shown by the TODO marked right below this hunk — right?
Do you… | |||||
Done Inline ActionsI don't _want_ to leave it pending. I just don't have a good description of tasks to put in there. :) fiendish: I don't _want_ to leave it pending. I just don't have a good description of tasks to put in… | |||||
from swh.scheduler.task import Task | |||||
from .abstractattribute import AbstractAttribute | |||||
class AbstractTaskMeta(abc.ABCMeta, TaskType): | |||||
pass | |||||
class ListerTaskBase(Task, metaclass=AbstractTaskMeta): | |||||
"""Lister Tasks define the process of periodically requesting batches of | |||||
repository information from source code hosting services. They | |||||
instantiate Listers to do batches of work at periodic intervals. | |||||
There are two main kinds of lister tasks: | |||||
1) Discovering new repositories. | |||||
2) Refreshing the list of already discovered repositories. | |||||
If the hosting service is indexable (according to the requirements of | |||||
SWHIndexingLister), then we can optionally partition the set of known | |||||
repositories into sub-sets to distribute the work. | |||||
This means that there is a third possible Task type for Indexing | |||||
Listers: | |||||
3) Discover or refresh a specific range of indices. | |||||
""" | |||||
task_queue = AbstractAttribute('Celery Task queue name') | |||||
@abc.abstractmethod | |||||
def new_lister(self): | |||||
"""Return a new lister of the appropriate type. | |||||
""" | |||||
pass | |||||
@abc.abstractmethod | |||||
def run(self): | |||||
pass | |||||
class IndexingDiscoveryListerTask(ListerTaskBase): | |||||
def run(self): | |||||
lister = self.new_lister() | |||||
lister.run(min_index=lister.db_last_index(), max_index=None) | |||||
class IndexingRangeListerTask(ListerTaskBase): | |||||
def run(self, start, end): | |||||
lister = self.new_lister() | |||||
lister.run(min_index=start, max_index=end) | |||||
class IndexingRefreshListerTask(ListerTaskBase): | |||||
GROUP_SPLIT = 10000 | |||||
def run(self): | |||||
lister = self.new_lister() | |||||
ranges = lister.db_partition_indices(self.GROUP_SPLIT) | |||||
random.shuffle(ranges) | |||||
range_task = IndexingRangeListerTask() | |||||
group(range_task.s(minv, maxv) for minv, maxv in ranges)() |
This module looks like a good place where to briefly describe the different listing "discplines" we support, one per task.