diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index fa9bb26..e518984 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,183 +1,191 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timezone from unittest.mock import patch import uuid from swh.scheduler import model, utils from .common import LISTERS @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_simple(mock_datetime): mock_datetime.now.return_value = "some-date" actual_task = utils.create_oneshot_task_dict("some-task-type") expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-date", "arguments": { "args": [], "kwargs": {}, }, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_other_call(mock_datetime): mock_datetime.now.return_value = "some-other-date" actual_task = utils.create_oneshot_task_dict( "some-task-type", "arg0", "arg1", priority="high", other_stuff="normal" ) expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-other-date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "high", } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_task_dict(mock_datetime): mock_datetime.now.return_value = "date" actual_task = utils.create_task_dict( "task-type", "recurring", "arg0", "arg1", priority="low", other_stuff="normal", retries_left=3, ) expected_task = { "policy": "recurring", "type": "task-type", "next_run": "date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "low", "retries_left": 3, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) def test_create_origin_task_dict(): lister = model.Lister(**LISTERS[1], id=uuid.uuid4()) origin = model.ListedOrigin( lister_id=lister.id, url="http://example.com/", visit_type="git", ) task = utils.create_origin_task_dict(origin, lister) assert task == { "type": "load-git", "arguments": { "args": [], - "kwargs": {"url": "http://example.com/", "lister_name": LISTERS[1]["name"]}, + "kwargs": { + "url": "http://example.com/", + "lister_name": LISTERS[1]["name"], + "lister_instance_name": LISTERS[1]["instance_name"], + }, }, } loader_args = {"foo": "bar", "baz": {"foo": "bar"}} origin_w_args = model.ListedOrigin( lister_id=lister.id, url="http://example.com/svn/", visit_type="svn", extra_loader_arguments=loader_args, ) task_w_args = utils.create_origin_task_dict(origin_w_args, lister) assert task_w_args == { "type": "load-svn", "arguments": { "args": [], "kwargs": { "url": "http://example.com/svn/", "lister_name": LISTERS[1]["name"], + "lister_instance_name": LISTERS[1]["instance_name"], **loader_args, }, }, } def test_create_origin_task_dicts(swh_scheduler): listers = [] for lister_args in LISTERS: listers.append(swh_scheduler.get_or_create_lister(**lister_args)) origin1 = model.ListedOrigin( lister_id=listers[0].id, url="http://example.com/1", visit_type="git", ) origin2 = model.ListedOrigin( lister_id=listers[0].id, url="http://example.com/2", visit_type="git", ) origin3 = model.ListedOrigin( lister_id=listers[1].id, url="http://example.com/3", visit_type="git", ) origins = [origin1, origin2, origin3] tasks = utils.create_origin_task_dicts(origins, swh_scheduler) assert tasks == [ { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/1", "lister_name": LISTERS[0]["name"], + "lister_instance_name": None, }, }, }, { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/2", "lister_name": LISTERS[0]["name"], + "lister_instance_name": None, }, }, }, { "type": "load-git", "arguments": { "args": [], "kwargs": { "url": "http://example.com/3", "lister_name": LISTERS[1]["name"], + "lister_instance_name": LISTERS[1]["instance_name"], }, }, }, ] diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py index 2a0b698..3f726b8 100644 --- a/swh/scheduler/utils.py +++ b/swh/scheduler/utils.py @@ -1,119 +1,120 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone from typing import Any, Dict, List from .interface import SchedulerInterface from .model import ListedOrigin, Lister def utcnow(): return datetime.now(tz=timezone.utc) def get_task(task_name): """Retrieve task object in our application instance by its fully qualified python name. Args: task_name (str): task's name (e.g swh.loader.git.tasks.LoadDiskGitRepository) Returns: Instance of task """ from swh.scheduler.celery_backend.config import app for module in app.conf.CELERY_IMPORTS: __import__(module) return app.tasks[task_name] def create_task_dict(type, policy, *args, **kwargs): """Create a task with type and policy, scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: load-git, check-deposit) policy (str): oneshot or recurring policy Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ task_extra = {} for extra_key in ["priority", "retries_left"]: if extra_key in kwargs: extra_val = kwargs.pop(extra_key) task_extra[extra_key] = extra_val task = { "policy": policy, "type": type, "next_run": utcnow(), "arguments": { "args": args if args else [], "kwargs": kwargs if kwargs else {}, }, } task.update(task_extra) return task def create_origin_task_dict(origin: ListedOrigin, lister: Lister) -> Dict[str, Any]: if origin.lister_id != lister.id: raise ValueError( "origin.lister_id and lister.id differ", origin.lister_id, lister.id ) return { "type": f"load-{origin.visit_type}", "arguments": { "args": [], "kwargs": { "url": origin.url, "lister_name": lister.name, + "lister_instance_name": lister.instance_name or None, **origin.extra_loader_arguments, }, }, } def create_origin_task_dicts( origins: List[ListedOrigin], scheduler: SchedulerInterface ) -> List[Dict[str, Any]]: """Returns a task dict for each origin, in the same order.""" lister_ids = {o.lister_id for o in origins} listers = { lister.id: lister for lister in scheduler.get_listers_by_id(list(map(str, lister_ids))) } missing_lister_ids = lister_ids - set(listers) assert not missing_lister_ids, f"Missing listers: {missing_lister_ids}" return [create_origin_task_dict(o, listers[o.lister_id]) for o in origins] def create_oneshot_task_dict(type, *args, **kwargs): """Create a oneshot task scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: load-git, check-deposit) Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ return create_task_dict(type, "oneshot", *args, **kwargs)