diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py index a0ec081..546bfc3 100644 --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -1,227 +1,227 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This module implements a model of the frequency of updates of an origin and how long it takes to load it. For each origin, a commit frequency is chosen deterministically based on the hash of its URL and assume all origins were created on an arbitrary epoch. From this we compute a number of commits, that is the product of these two. And the run time of a load task is approximated as proportional to the number of commits since the previous visit of the origin (possibly 0).""" from datetime import datetime, timedelta, timezone import hashlib import logging from typing import Dict, Generator, Iterator, List, Optional, Tuple import uuid import attr from simpy import Event from swh.model.model import OriginVisitStatus from swh.scheduler.model import ListedOrigin from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) _nb_generated_origins = 0 _visit_times: Dict[Tuple[str, str], datetime] = {} """Cache of the time of the last visit of (visit_type, origin_url), to spare an SQL query (high latency).""" def generate_listed_origin( lister_id: uuid.UUID, now: Optional[datetime] = None ) -> ListedOrigin: """Returns a globally unique new origin. Seed the `last_update` value according to the OriginModel and the passed timestamp. Arguments: lister: instance of the lister that generated this origin now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) """ global _nb_generated_origins _nb_generated_origins += 1 assert _nb_generated_origins < 10 ** 6, "Too many origins!" if now is None: now = datetime.now(tz=timezone.utc) url = f"https://example.com/{_nb_generated_origins:06d}.git" - visit_type = "git" + visit_type = "test-git" origin = OriginModel(visit_type, url) return ListedOrigin( lister_id=lister_id, url=url, visit_type=visit_type, last_update=origin.get_last_update(now), ) class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" EPOCH = datetime(2015, 9, 1, 0, 0, 0, tzinfo=timezone.utc) """The origin of all origins (at least according to Software Heritage)""" def __init__(self, type: str, origin: str): self.type = type self.origin = origin def seconds_between_commits(self): """Returns a random 'average time between two commits' of this origin, used to estimate the run time of a load task, and how much the loading architecture is lagging behind origin updates.""" n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) # Deterministic seed to generate "random" characteristics of this origin bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) def get_last_update(self, now: datetime) -> datetime: """Get the last_update value for this origin. We assume that the origin had its first commit at `EPOCH`, and that one commit happened every `self.seconds_between_commits()`. This returns the last commit date before or equal to `now`. """ _, time_since_last_commit = divmod( (now - self.EPOCH).total_seconds(), self.seconds_between_commits() ) return now - timedelta(seconds=time_since_last_commit) def get_current_snapshot_id(self, now: datetime) -> bytes: """Get the current snapshot for this origin. To generate a snapshot id, we calculate the number of commits since the EPOCH, and hash it alongside the origin type and url. """ commits_since_epoch, _ = divmod( (now - self.EPOCH).total_seconds(), self.seconds_between_commits() ) return hashlib.sha1( f"{self.type} {self.origin} {commits_since_epoch}".encode() ).digest() def load_task_characteristics( self, now: datetime ) -> Tuple[float, str, Optional[bytes]]: """Returns the (run_time, end_status, snapshot id) of the next origin visit.""" current_snapshot = self.get_current_snapshot_id(now) key = (self.type, self.origin) last_visit = _visit_times.get(key, now - timedelta(days=365)) time_since_last_successful_run = now - last_visit _visit_times[key] = now seconds_between_commits = self.seconds_between_commits() seconds_since_last_successful = time_since_last_successful_run.total_seconds() n_commits = int(seconds_since_last_successful / seconds_between_commits) logger.debug( "%s characteristics %s origin=%s: Interval: %s, n_commits: %s", now, self.type, self.origin, timedelta(seconds=seconds_between_commits), n_commits, ) run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits if run_time > self.MAX_RUN_TIME: # Long visits usually fail return (self.MAX_RUN_TIME, "partial", None) else: return (run_time, "full", current_snapshot) def lister_process( env: Environment, lister_id: uuid.UUID ) -> Generator[Event, Event, None]: """Every hour, generate new origins and update the `last_update` field for the ones this process generated in the past""" NUM_NEW_ORIGINS = 100 origins: List[ListedOrigin] = [] while True: updated_origins = [] for origin in origins: model = OriginModel(origin.visit_type, origin.url) updated_origins.append( attr.evolve(origin, last_update=model.get_last_update(env.time)) ) origins = updated_origins origins.extend( generate_listed_origin(lister_id, now=env.time) for _ in range(NUM_NEW_ORIGINS) ) env.scheduler.record_listed_origins(origins) yield env.timeout(3600) def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) origin_model = OriginModel(task.visit_type, task.origin) (run_time, end_status, snapshot) = origin_model.load_task_characteristics(env.time) yield env.timeout(run_time) logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) yield status_queue.put( TaskEvent( task=task, status=attr.evolve( status, status=end_status, date=env.time, snapshot=snapshot ), ) ) env.report.record_visit( (task.visit_type, task.origin), run_time, end_status, snapshot ) diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py index 6d5da0f..1dc8324 100644 --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -1,125 +1,125 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime from typing import Dict, List, Optional TEMPLATES = { - "git": { - "type": "load-git", + "test-git": { + "type": "load-test-git", "arguments": {"args": [], "kwargs": {},}, "next_run": None, }, - "hg": { - "type": "load-hg", + "test-hg": { + "type": "load-test-hg", "arguments": {"args": [], "kwargs": {},}, "next_run": None, "policy": "oneshot", }, } TASK_TYPES = { - "git": { - "type": "load-git", + "test-git": { + "type": "load-test-git", "description": "Update a git repository", "backend_name": "swh.loader.git.tasks.UpdateGitRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, - "hg": { - "type": "load-hg", + "test-hg": { + "type": "load-test-hg", "description": "Update a mercurial repository", "backend_name": "swh.loader.mercurial.tasks.UpdateHgRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, } def _task_from_template( template: Dict, next_run: datetime.datetime, priority: Optional[str], *args, **kwargs, ) -> Dict: ret = copy.deepcopy(template) ret["next_run"] = next_run if priority: ret["priority"] = priority if args: ret["arguments"]["args"] = list(args) if kwargs: ret["arguments"]["kwargs"] = kwargs return ret def tasks_from_template( template: Dict, max_timestamp: datetime.datetime, num: Optional[int] = None, priority: Optional[str] = None, num_priorities: Dict[Optional[str], int] = {}, ) -> List[Dict]: """Build ``num`` tasks from template """ assert bool(num) != bool(num_priorities), "mutually exclusive" if not num_priorities: assert num is not None # to please mypy num_priorities = {None: num} tasks: List[Dict] = [] for (priority, num) in num_priorities.items(): for _ in range(num): i = len(tasks) tasks.append( _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) ) return tasks def tasks_with_priority_from_template( template: Dict, max_timestamp: datetime.datetime, num: int, priority: str ) -> List[Dict]: """Build tasks with priority from template """ return [ _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, **{"kwarg%03d" % i: "bogus-kwarg"}, ) for i in range(num) ] LISTERS = ( {"name": "github"}, {"name": "gitlab", "instance_name": "gitlab"}, {"name": "gitlab", "instance_name": "freedesktop"}, {"name": "npm"}, {"name": "pypi"}, ) diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index 4872516..3645ecd 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,72 +1,72 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import os from typing import Dict, List from unittest.mock import patch import pytest from swh.scheduler.model import ListedOrigin, Lister from swh.scheduler.tests.common import LISTERS # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ["LC_ALL"] = "C.UTF-8" @pytest.fixture def stored_lister(swh_scheduler) -> Lister: """Store a lister in the scheduler and return its information""" return swh_scheduler.get_or_create_lister(**LISTERS[0]) @pytest.fixture def visit_types() -> List[str]: """Possible visit types in `ListedOrigin`s""" - return ["git", "svn"] + return ["test-git", "test-svn"] @pytest.fixture def listed_origins_by_type( stored_lister: Lister, visit_types: List[str] ) -> Dict[str, List[ListedOrigin]]: """A fixed list of `ListedOrigin`s, for each `visit_type`.""" count_per_type = 1000 assert stored_lister.id return { visit_type: [ ListedOrigin( lister_id=stored_lister.id, url=f"https://{visit_type}.example.com/{i:04d}", visit_type=visit_type, last_update=datetime( 2020, 6, 15, 16, 0, 0, j * count_per_type + i, tzinfo=timezone.utc ), ) for i in range(count_per_type) ] for j, visit_type in enumerate(visit_types) } @pytest.fixture def listed_origins(listed_origins_by_type) -> List[ListedOrigin]: """Return a (fixed) set of listed origins""" return sum(listed_origins_by_type.values(), []) @pytest.fixture def storage(swh_storage): """An instance of in-memory storage that gets injected into the CLI functions.""" with patch("swh.storage.get_storage") as get_storage_mock: get_storage_mock.return_value = swh_storage yield swh_storage diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py index 27d42f2..edf1197 100644 --- a/swh/scheduler/tests/test_cli_origin.py +++ b/swh/scheduler/tests/test_cli_origin.py @@ -1,157 +1,159 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Tuple import pytest from swh.scheduler.cli.origin import format_origins from swh.scheduler.tests.common import TASK_TYPES from swh.scheduler.tests.test_cli import invoke as basic_invoke def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False): return basic_invoke( scheduler, args=["origin", *args], catch_exceptions=catch_exceptions ) def test_cli_origin(swh_scheduler): """Check that swh scheduler origin returns its help text""" result = invoke(swh_scheduler) assert "Commands:" in result.stdout def test_format_origins_basic(listed_origins): listed_origins = listed_origins[:100] basic_output = list(format_origins(listed_origins)) # 1 header line + all origins assert len(basic_output) == len(listed_origins) + 1 no_header_output = list(format_origins(listed_origins, with_header=False)) assert basic_output[1:] == no_header_output def test_format_origins_fields_unknown(listed_origins): listed_origins = listed_origins[:10] it = format_origins(listed_origins, fields=["unknown_field"]) with pytest.raises(ValueError, match="unknown_field"): next(it) def test_format_origins_fields(listed_origins): listed_origins = listed_origins[:10] fields = ["lister_id", "url", "visit_type"] output = list(format_origins(listed_origins, fields=fields)) assert output[0] == ",".join(fields) for i, origin in enumerate(listed_origins): assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}" def test_grab_next(swh_scheduler, listed_origins_by_type): NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested + + # XXX: should test all of 'listed_origins_by_type' here... visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("grab-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 out_lines = result.stdout.splitlines() assert len(out_lines) == NUM_RESULTS + 1 fields = out_lines[0].split(",") returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]] # Check that we've received origins we had listed in the first place assert set(origin["url"] for origin in returned_origins) <= set( origin.url for origin in listed_origins_by_type[visit_type] ) def test_schedule_next(swh_scheduler, listed_origins_by_type): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) NUM_RESULTS = 10 # Strict inequality to check that grab_next_visits doesn't return more # results than requested visit_type = next(iter(listed_origins_by_type)) assert len(listed_origins_by_type[visit_type]) > NUM_RESULTS for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) result = invoke(swh_scheduler, args=("schedule-next", visit_type, str(NUM_RESULTS))) assert result.exit_code == 0 # pull all tasks out of the scheduler tasks = swh_scheduler.search_tasks() assert len(tasks) == NUM_RESULTS scheduled_tasks = { (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks } all_possible_tasks = { (f"load-{origin.visit_type}", origin.url) for origin in listed_origins_by_type[visit_type] } assert scheduled_tasks <= all_possible_tasks def test_send_to_celery( mocker, swh_scheduler, swh_scheduler_celery_app, listed_origins_by_type, ): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) visit_type = next(iter(listed_origins_by_type)) for origins in listed_origins_by_type.values(): swh_scheduler.record_listed_origins(origins) get_queue_length = mocker.patch( "swh.scheduler.celery_backend.config.get_queue_length" ) get_queue_length.return_value = None send_task = mocker.patch.object(swh_scheduler_celery_app, "send_task") send_task.return_value = None result = invoke(swh_scheduler, args=("send-to-celery", visit_type)) assert result.exit_code == 0 scheduled_tasks = { (call[0][0], call[1]["kwargs"]["url"]) for call in send_task.call_args_list } expected_tasks = { (TASK_TYPES[origin.visit_type]["backend_name"], origin.url) for origin in listed_origins_by_type[visit_type] } assert expected_tasks == scheduled_tasks def test_update_metrics(swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) assert swh_scheduler.get_metrics() == [] result = invoke(swh_scheduler, args=("update-metrics",)) assert result.exit_code == 0 assert swh_scheduler.get_metrics() != [] diff --git a/swh/scheduler/tests/test_common.py b/swh/scheduler/tests/test_common.py index b5331dc..2439052 100644 --- a/swh/scheduler/tests/test_common.py +++ b/swh/scheduler/tests/test_common.py @@ -1,55 +1,55 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from .common import TEMPLATES, tasks_from_template def test_tasks_from_template_no_priority(): nb_tasks = 3 - template = TEMPLATES["git"] + template = TEMPLATES["test-git"] next_run = datetime.datetime.utcnow() tasks = tasks_from_template(template, next_run, nb_tasks) assert len(tasks) == nb_tasks for i, t in enumerate(tasks): assert t["type"] == template["type"] assert t["arguments"] is not None assert t.get("policy") is None # not defined in template assert len(t["arguments"]["args"]) == 1 assert len(t["arguments"]["kwargs"].keys()) == 1 assert t["next_run"] == next_run - datetime.timedelta(microseconds=i) assert t.get("priority") is None def test_tasks_from_template_priority(): - template = TEMPLATES["hg"] + template = TEMPLATES["test-hg"] num_priorities = { None: 3, "high": 5, "normal": 3, "low": 2, } next_run = datetime.datetime.utcnow() tasks = tasks_from_template(template, next_run, num_priorities=num_priorities,) assert len(tasks) == sum(num_priorities.values()) repartition_priority = {k: 0 for k in num_priorities} for i, t in enumerate(tasks): assert t["type"] == template["type"] assert t["arguments"] is not None assert t["policy"] == template["policy"] assert len(t["arguments"]["args"]) == 1 assert len(t["arguments"]["kwargs"].keys()) == 1 assert t["next_run"] == next_run - datetime.timedelta(microseconds=i) priority = t.get("priority") assert priority in num_priorities repartition_priority[priority] += 1 assert repartition_priority == num_priorities diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py index efa8d67..4d83547 100644 --- a/swh/scheduler/tests/test_recurrent_visits.py +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -1,202 +1,211 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import logging from queue import Queue -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from swh.scheduler.celery_backend.recurrent_visits import ( POLICY_ADDITIONAL_PARAMETERS, VisitSchedulerThreads, grab_next_visits_policy_weights, send_visits_for_visit_type, spawn_visit_scheduler_thread, terminate_visit_scheduler_threads, visit_scheduler_thread, ) from .test_cli import invoke TEST_MAX_QUEUE = 10000 MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" def _compute_backend_name(visit_type: str) -> str: "Build a dummy reproducible backend name" return f"swh.loader.{visit_type}.tasks" @pytest.fixture def swh_scheduler(swh_scheduler): """Override default fixture of the scheduler to install some more task types.""" - for visit_type in ["git", "hg", "svn"]: + for visit_type in ["test-git", "test-hg", "test-svn"]: task_type = f"load-{visit_type}" swh_scheduler.create_task_type( { "type": task_type, "max_queue_length": TEST_MAX_QUEUE, "description": "The {} testing task".format(task_type), "backend_name": _compute_backend_name(visit_type), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return swh_scheduler def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): """When passed an unknown visit type, the recurrent visit scheduler should refuse to start.""" with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, - ["schedule-recurrent", "--visit-type", "unknown", "--visit-type", "git"], + [ + "schedule-recurrent", + "--visit-type", + "unknown", + "--visit-type", + "test-git", + ], ) def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): """When passing no visit types, the recurrent visit scheduler should start.""" spawn_visit_scheduler_thread = mocker.patch( f"{MODULE_NAME}.spawn_visit_scheduler_thread" ) spawn_visit_scheduler_thread.side_effect = SystemExit # The actual scheduling threads won't spawn, they'll immediately terminate. This # only exercises the logic to pull task types out of the database result = invoke(swh_scheduler, False, ["schedule-recurrent"]) assert result.exit_code == 0, result.output def test_recurrent_visit_scheduling( swh_scheduler, caplog, listed_origins_by_type, mocker, ): """Scheduling known tasks is ok.""" caplog.set_level(logging.DEBUG, MODULE_NAME) nb_origins = 1000 mock_celery_app = MagicMock() mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") mock_available_slots.return_value = nb_origins # Slots available in queue # Make sure the scheduler is properly configured in terms of visit/task types all_task_types = { task_type_d["type"]: task_type_d for task_type_d in swh_scheduler.get_task_types() } visit_types = list(listed_origins_by_type.keys()) assert len(visit_types) > 0 task_types = [] origins = [] for visit_type, _origins in listed_origins_by_type.items(): origins.extend(swh_scheduler.record_listed_origins(_origins)) task_type_name = f"load-{visit_type}" assert task_type_name in all_task_types.keys() task_type = all_task_types[task_type_name] task_type["visit_type"] = visit_type # we'll limit the orchestrator to the origins' type we know task_types.append(task_type) - for visit_type in ["git", "svn"]: + for visit_type in ["test-git", "test-svn"]: task_type = f"load-{visit_type}" send_visits_for_visit_type( swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type] ) assert mock_available_slots.called, "The available slots functions should be called" records = [record.message for record in caplog.records] # Mapping over the dict ratio/policies entries can change overall order so let's # check the set of records expected_records = set() for task_type in task_types: visit_type = task_type["visit_type"] queue_name = task_type["backend_name"] msg = ( f"{nb_origins} available slots for visit type {visit_type} " f"in queue {queue_name}" ) expected_records.add(msg) for expected_record in expected_records: assert expected_record in set(records) +@patch.dict( + POLICY_ADDITIONAL_PARAMETERS, {"test-git": POLICY_ADDITIONAL_PARAMETERS["git"]} +) @pytest.mark.parametrize( "visit_type, tablesamples", - [("hg", {}), ("git", POLICY_ADDITIONAL_PARAMETERS["git"])], + [("test-hg", {}), ("test-git", POLICY_ADDITIONAL_PARAMETERS["git"])], ) def test_recurrent_visit_additional_parameters( swh_scheduler, mocker, visit_type, tablesamples ): """Testing additional policy parameters""" mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits") mock_grab_next_visits.return_value = [] grab_next_visits_policy_weights(swh_scheduler, visit_type, 10) for call in mock_grab_next_visits.call_args_list: assert call[1].get("tablesample") == tablesamples.get( call[1]["policy"], {} ).get("tablesample") @pytest.fixture def scheduler_config(swh_scheduler_config): return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} def test_visit_scheduler_thread_unknown_task( swh_scheduler, scheduler_config, ): """Starting a thread with unknown task type reports the error""" unknown_visit_type = "unknown" command_queue = Queue() exc_queue = Queue() visit_scheduler_thread( scheduler_config, unknown_visit_type, command_queue, exc_queue ) assert command_queue.empty() is True assert exc_queue.empty() is False assert len(exc_queue.queue) == 1 result = exc_queue.queue.pop() assert result[0] == unknown_visit_type assert isinstance(result[1], ValueError) def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): """Spawning and terminating threads runs smoothly""" threads: VisitSchedulerThreads = {} exc_queue = Queue() mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") mock_build_app.return_value = MagicMock() assert len(threads) == 0 for visit_type in visit_types: spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) # This actually only checks the spawning and terminating logic is sound assert len(threads) == len(visit_types) actual_threads = terminate_visit_scheduler_threads(threads) assert not len(actual_threads) assert mock_build_app.called diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index 3407084..748ccc6 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,1505 +1,1505 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import copy import datetime from datetime import timedelta import inspect import random from typing import Any, Dict, List, Optional, Tuple import uuid import attr from psycopg2.extras import execute_values import pytest from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import ListedOriginPageToken, SchedulerInterface from swh.scheduler.model import ( LastVisitStatus, ListedOrigin, OriginVisitStats, SchedulerMetrics, ) from swh.scheduler.utils import utcnow from .common import ( LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template, tasks_with_priority_from_template, ) ONEDAY = timedelta(days=1) NUM_PRIORITY_TASKS = {None: 100, "high": 60, "normal": 30, "low": 20} def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: return (m.lister_id, m.visit_type) def assert_metrics_equal(left, right): assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) class TestScheduler: def test_interface(self, swh_scheduler): """Checks all methods of SchedulerInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SchedulerInterface,), {})() assert "create_task_type" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(swh_scheduler, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] def test_add_task_type(self, swh_scheduler): - tt = TASK_TYPES["git"] + tt = TASK_TYPES["test-git"] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) - tt2 = TASK_TYPES["hg"] + tt2 = TASK_TYPES["test-hg"] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt["type"]) assert tt2 == swh_scheduler.get_task_type(tt2["type"]) def test_create_task_type_idempotence(self, swh_scheduler): - tt = TASK_TYPES["git"] + tt = TASK_TYPES["test-git"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) def test_get_task_types(self, swh_scheduler): - tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] + tt, tt2 = TASK_TYPES["test-git"], TASK_TYPES["test-hg"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) num_git = 100 - tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), num_git) + tasks_1 = tasks_from_template(TEMPLATES["test-git"], utcnow(), num_git) tasks_2 = tasks_from_template( - TEMPLATES["hg"], utcnow(), num_priorities=NUM_PRIORITY_TASKS + TEMPLATES["test-hg"], utcnow(), num_priorities=NUM_PRIORITY_TASKS ) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks) set_ret1 = set([t["id"] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t["id"] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) - task_type = TASK_TYPES[orig_task["type"].split("-")[-1]] + task_type = TASK_TYPES[orig_task["type"].split("-", 1)[-1]] assert task["id"] not in ids assert task["status"] == "next_run_not_scheduled" assert task["current_interval"] == task_type["default_interval"] assert task["policy"] == orig_task.get("policy", "recurring") priority = task.get("priority") actual_priorities[priority] += 1 assert task["retries_left"] == (task_type["num_retries"] or 0) ids.add(task["id"]) del task["id"] del task["status"] del task["current_interval"] del task["retries_left"] if "policy" not in orig_task: del task["policy"] if "priority" not in orig_task: del task["priority"] assert task == orig_task expected_priorities = NUM_PRIORITY_TASKS.copy() expected_priorities[None] += num_git assert dict(actual_priorities) == expected_priorities def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() - task_type = TEMPLATES["git"]["type"] - tasks = tasks_from_template(TEMPLATES["git"], t, 100) + task_type = TEMPLATES["test-git"]["type"] + tasks = tasks_from_template(TEMPLATES["test-git"], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks) // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit - 1]["next_run"] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts ) for ready_task in ready_tasks_timestamped: assert ready_task["next_run"] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit // 3 ) assert len(ready_tasks_both) <= limit // 3 for ready_task in ready_tasks_both: assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] def test_peek_ready_tasks_returns_only_no_priority_tasks(self, swh_scheduler): """Peek ready tasks only return standard tasks (no priority)""" self._create_task_types(swh_scheduler) t = utcnow() - task_type = TEMPLATES["git"]["type"] + task_type = TEMPLATES["test-git"]["type"] # Create tasks with and without priorities tasks = tasks_from_template( - TEMPLATES["git"], t, num_priorities=NUM_PRIORITY_TASKS, + TEMPLATES["test-git"], t, num_priorities=NUM_PRIORITY_TASKS, ) count_priority = 0 for task in tasks: count_priority += 0 if task.get("priority") is None else 1 assert count_priority > 0, "Some created tasks should have some priority" random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available no priority tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) - count_priority # No read task should have any priority for task in ready_tasks: assert task.get("priority") is None def test_grab_ready_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() - task_type = TEMPLATES["git"]["type"] + task_type = TEMPLATES["test-git"]["type"] # Create tasks with and without priorities tasks = tasks_from_template( - TEMPLATES["git"], t, num_priorities=NUM_PRIORITY_TASKS + TEMPLATES["test-git"], t, num_priorities=NUM_PRIORITY_TASKS ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks(task_type, num_tasks=50) grabbed_tasks = swh_scheduler.grab_ready_tasks(task_type, num_tasks=50) first_ready_tasks.sort(key=lambda task: task["arguments"]["args"][0]) grabbed_tasks.sort(key=lambda task: task["arguments"]["args"][0]) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed priority = grabbed["priority"] assert priority == peeked["priority"] assert priority is None def test_grab_ready_priority_tasks(self, swh_scheduler): """check the grab and peek priority tasks endpoint behave as expected""" self._create_task_types(swh_scheduler) t = utcnow() - task_type = TEMPLATES["git"]["type"] + task_type = TEMPLATES["test-git"]["type"] num_tasks = 100 # Create tasks with and without priorities tasks0 = tasks_with_priority_from_template( - TEMPLATES["git"], t, num_tasks, "high", + TEMPLATES["test-git"], t, num_tasks, "high", ) tasks1 = tasks_with_priority_from_template( - TEMPLATES["hg"], t, num_tasks, "low", + TEMPLATES["test-hg"], t, num_tasks, "low", ) tasks2 = tasks_with_priority_from_template( - TEMPLATES["hg"], t, num_tasks, "normal", + TEMPLATES["test-hg"], t, num_tasks, "normal", ) tasks = tasks0 + tasks1 + tasks2 random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_priority_tasks(task_type, num_tasks=50) grabbed_tasks = swh_scheduler.grab_ready_priority_tasks(task_type, num_tasks=50) ready_tasks.sort(key=lambda task: task["arguments"]["args"][0]) grabbed_tasks.sort(key=lambda task: task["arguments"]["args"][0]) for peeked, grabbed in zip(ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] assert peeked["priority"] is not None def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() - tasks = tasks_from_template(TEMPLATES["git"], t, 100) + tasks = tasks_from_template(TEMPLATES["test-git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x["id"]) tasks[:length] = [] ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x["id"]) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(lst): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in lst] self._create_task_types(swh_scheduler) t = utcnow() - tasks = tasks_from_template(TEMPLATES["git"], t, 100) + tasks = tasks_from_template(TEMPLATES["test-git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks) def assert_filtered_task_ok( self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime ) -> None: """Ensure filtered tasks have the right expected properties (within the range, recurring disabled, etc..) """ started = task["started"] date = started if started is not None else task["scheduled"] assert after <= date and date <= before if task["task_policy"] == "oneshot": assert task["task_status"] in ["completed", "disabled"] if task["task_policy"] == "recurring": assert task["task_status"] in ["disabled"] def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types(swh_scheduler) _time = utcnow() - recurring = tasks_from_template(TEMPLATES["git"], _time, 12) - oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) + recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) + oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") _tasks.append(t) # Randomly update task's status per policy status_per_policy = {"recurring": 0, "oneshot": 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] "recurring": [ (1, "disabled"), (0, "completed"), (0, "next_run_not_scheduled"), ], "oneshot": [ (0, "next_run_not_scheduled"), (1, "disabled"), (1, "completed"), ], } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task["policy"] _task_ids[policy].append(task["id"]) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task["id"]) swh_scheduler.disable_tasks(tasks_to_update["recurring"]) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids["oneshot"], status="next_run_not_scheduled" ) # complete the tasks to update swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed") total_tasks_filtered = ( status_per_policy["recurring"] + status_per_policy["oneshot"] ) # no pagination scenario # retrieve tasks to archive after = _time - ONEDAY after_ts = after.strftime("%Y-%m-%d") before = utcnow() + ONEDAY before_ts = before.strftime("%Y-%m-%d") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) tasks_to_archive = tasks_result["tasks"] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get("next_page_token") is None actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in tasks_to_archive: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario nb_tasks = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) == nb_tasks next_page_token = tasks_result["next_page_token"] assert next_page_token is not None all_tasks = tasks_to_archive2 while next_page_token is not None: # Retrieve paginated results tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks, page_token=next_page_token, ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) <= nb_tasks all_tasks.extend(tasks_to_archive2) next_page_token = tasks_result.get("next_page_token") actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in all_tasks: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() - recurring = tasks_from_template(TEMPLATES["git"], _time, 12) - oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) + recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) + oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") c = random.randint(0, 100) if c <= percent: _tasks.append({"task_id": t["task"], "task_run_id": t["id"]}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task["id"] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): """No task exist in the scheduler's db, get_task_runs() should always return an empty list. """ assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): """No task has been executed yet, get_task_runs() should always return an empty list. """ self._create_task_types(swh_scheduler) _time = utcnow() - recurring = tasks_from_template(TEMPLATES["git"], _time, 12) - oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) + recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) + oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): """Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() - recurring = tasks_from_template(TEMPLATES["git"], _time, 12) - oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) + recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) + oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=("id",)) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": None, "ended": None, "metadata": None, "status": "scheduled", } runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks], limit=2 ) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks] ) assert len(runs) == total_tasks keys = ("task", "backend_id", "scheduled") assert ( sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"]) == backend_tasks ) def test_get_task_runs_with_executed(self, swh_scheduler): """Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() - recurring = tasks_from_template(TEMPLATES["git"], _time, 12) - oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) + recurring = tasks_from_template(TEMPLATES["test-git"], _time, 12) + oneshots = tasks_from_template(TEMPLATES["test-hg"], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run( btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": None, "metadata": {"something": "stupid"}, "status": "started", } ts2 = utcnow() swh_scheduler.end_task_run( btask["backend_id"], metadata={"other": "stuff"}, timestamp=ts2, status="eventful", ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": ts2, "metadata": {"something": "stupid", "other": "stuff"}, "status": "eventful", } def test_get_or_create_lister(self, swh_scheduler): db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): assert lister.name == lister_args["name"] assert lister.instance_name == lister_args.get("instance_name", "") lister_get_again = swh_scheduler.get_or_create_lister( lister.name, lister.instance_name ) assert lister == lister_get_again def test_get_lister(self, swh_scheduler): for lister_args in LISTERS: assert swh_scheduler.get_lister(**lister_args) is None db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) for lister, lister_args in zip(db_listers, LISTERS): lister_get_again = swh_scheduler.get_lister( lister.name, lister.instance_name ) assert lister == lister_get_again def test_get_listers(self, swh_scheduler): assert swh_scheduler.get_listers() == [] db_listers = [] for lister_args in LISTERS: db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) assert swh_scheduler.get_listers() == db_listers def test_update_lister(self, swh_scheduler, stored_lister): lister = attr.evolve(stored_lister, current_state={"updated": "now"}) updated_lister = swh_scheduler.update_lister(lister) assert updated_lister.updated > lister.updated assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) def test_update_lister_stale(self, swh_scheduler, stored_lister): swh_scheduler.update_lister(stored_lister) with pytest.raises(StaleData) as exc: swh_scheduler.update_lister(stored_lister) assert "state not updated" in exc.value.args[0] def test_record_listed_origins(self, swh_scheduler, listed_origins): ret = swh_scheduler.record_listed_origins(listed_origins) assert set(returned.url for returned in ret) == set( origin.url for origin in listed_origins ) assert all(origin.first_seen == origin.last_seen for origin in ret) def test_record_listed_origins_with_duplicate(self, swh_scheduler, listed_origins): # the duplicates must be in the same page to raise the "on conflict error" listed_origins.insert(0, listed_origins[0]) ret = swh_scheduler.record_listed_origins(listed_origins) # without the duplicate assert len(ret) == len(listed_origins) - 1 def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins): # First, insert `cutoff` origins cutoff = 100 assert cutoff < len(listed_origins) ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff]) assert len(ret) == cutoff # Then, insert all origins, including the `cutoff` first. ret = swh_scheduler.record_listed_origins(listed_origins) assert len(ret) == len(listed_origins) # Two different "first seen" values assert len(set(origin.first_seen for origin in ret)) == 2 # But a single "last seen" value assert len(set(origin.last_seen for origin in ret)) == 1 def test_get_listed_origins_exact(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) for i, origin in enumerate(listed_origins): ret = swh_scheduler.get_listed_origins( lister_id=origin.lister_id, url=origin.url ) assert ret.next_page_token is None assert len(ret.results) == 1 assert ret.results[0].lister_id == origin.lister_id assert ret.results[0].url == origin.url @pytest.mark.parametrize("num_origins,limit", [(20, 6), (5, 42), (20, 20)]) def test_get_listed_origins_limit( self, swh_scheduler, listed_origins, num_origins, limit ) -> None: added_origins = sorted( listed_origins[:num_origins], key=lambda o: (o.lister_id, o.url) ) swh_scheduler.record_listed_origins(added_origins) returned_origins: List[ListedOrigin] = [] call_count = 0 next_page_token: Optional[ListedOriginPageToken] = None while True: call_count += 1 ret = swh_scheduler.get_listed_origins( lister_id=listed_origins[0].lister_id, limit=limit, page_token=next_page_token, ) returned_origins.extend(ret.results) next_page_token = ret.next_page_token if next_page_token is None: break assert call_count == (num_origins // limit) + 1 assert len(returned_origins) == num_origins assert [(origin.lister_id, origin.url) for origin in returned_origins] == [ (origin.lister_id, origin.url) for origin in added_origins ] def test_get_listed_origins_all(self, swh_scheduler, listed_origins) -> None: swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.get_listed_origins(limit=len(listed_origins) + 1) assert ret.next_page_token is None assert len(ret.results) == len(listed_origins) def _grab_next_visits_setup(self, swh_scheduler, listed_origins_by_type): """Basic origins setup for scheduling policy tests""" visit_type = next(iter(listed_origins_by_type)) origins = listed_origins_by_type[visit_type][:100] assert len(origins) > 0 recorded_origins = swh_scheduler.record_listed_origins(origins) return visit_type, recorded_origins def _check_grab_next_visit_basic( self, swh_scheduler, visit_type, policy, expected, **kwargs ): """Calls grab_next_visits with the passed policy, and check that: - all the origins returned are the expected ones (in the same order) - no extra origins are returned - the last_scheduled field has been set properly. Pass the extra keyword arguments to the calls to grab_next_visits. Returns a timestamp greater than all `last_scheduled` values for the grabbed visits. """ assert len(expected) != 0 before = utcnow() ret = swh_scheduler.grab_next_visits( visit_type=visit_type, # Request one more than expected to check that no extra origin is returned count=len(expected) + 1, policy=policy, **kwargs, ) after = utcnow() assert ret == expected visit_stats_list = swh_scheduler.origin_visit_stats_get( [(origin.url, origin.visit_type) for origin in expected] ) assert len(visit_stats_list) == len(expected) for visit_stats in visit_stats_list: # Check that last_scheduled got updated assert before <= visit_stats.last_scheduled <= after # They should not be scheduled again ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy=policy, **kwargs ) assert ret == [], "grab_next_visits returned already-scheduled origins" return after def _check_grab_next_visit( self, swh_scheduler, visit_type, policy, expected, **kwargs ): """Run the same check as _check_grab_next_visit_basic, but also checks the origin visits have been marked as scheduled, and are only re-scheduled a week later """ after = self._check_grab_next_visit_basic( swh_scheduler, visit_type, policy, expected, **kwargs ) # But a week, later, they should ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy=policy, timestamp=after + timedelta(days=7), ) # We need to sort them because their 'last_scheduled' field is updated to # exactly the same value, so the order is not deterministic assert sorted(ret) == sorted( expected ), "grab_next_visits didn't reschedule visits after a week" def _prepare_oldest_scheduled_first_origins( self, swh_scheduler, listed_origins_by_type ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Give all origins but one a last_scheduled date base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=None, last_successful=None, last_visit=None, last_scheduled=base_date - timedelta(seconds=i), ) for i, origin in enumerate(origins[1:]) ] swh_scheduler.origin_visit_stats_upsert(visit_stats) # We expect to retrieve the origin with a NULL last_scheduled # as well as those with the oldest values (i.e. the last ones), in order. expected = [origins[0]] + origins[1:][::-1] return visit_type, origins, expected def test_grab_next_visits_oldest_scheduled_first( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="oldest_scheduled_first", expected=expected, ) @pytest.mark.parametrize("which_cooldown", ("scheduled", "failed", "not_found")) @pytest.mark.parametrize("cooldown", (7, 15)) def test_grab_next_visits_cooldowns( self, swh_scheduler, listed_origins_by_type, which_cooldown, cooldown, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) after = self._check_grab_next_visit_basic( swh_scheduler, visit_type=visit_type, policy="oldest_scheduled_first", expected=expected, ) # Mark all the visits as scheduled, failed or notfound on the `after` timestamp ovs_args = { "last_visit": None, "last_visit_status": None, "last_scheduled": None, } if which_cooldown == "scheduled": ovs_args["last_scheduled"] = after else: ovs_args["last_visit"] = after ovs_args["last_visit_status"] = LastVisitStatus(which_cooldown) visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=None, last_successful=None, **ovs_args, ) for i, origin in enumerate(origins) ] swh_scheduler.origin_visit_stats_upsert(visit_stats) cooldown_td = timedelta(days=cooldown) cooldown_args = { "scheduled_cooldown": None, "failed_cooldown": None, "not_found_cooldown": None, } cooldown_args[f"{which_cooldown}_cooldown"] = cooldown_td ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", timestamp=after + cooldown_td - timedelta(seconds=1), **cooldown_args, ) assert ret == [], f"{which_cooldown}_cooldown ignored" ret = swh_scheduler.grab_next_visits( visit_type=visit_type, count=len(expected) + 1, policy="oldest_scheduled_first", timestamp=after + cooldown_td + timedelta(seconds=1), **cooldown_args, ) assert sorted(ret) == sorted( expected ), "grab_next_visits didn't reschedule visits after the configured cooldown" def test_grab_next_visits_tablesample( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins, expected = self._prepare_oldest_scheduled_first_origins( swh_scheduler, listed_origins_by_type ) ret = swh_scheduler.grab_next_visits( visit_type=visit_type, policy="oldest_scheduled_first", tablesample=50, count=len(expected), ) # Just a smoke test, not obvious how to test this more reliably assert ret is not None def test_grab_next_visits_never_visited_oldest_update_first( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) # We expect to retrieve origins with the oldest update date, that is # origins at the end of our updated_origins list. expected_origins = sorted(updated_origins, key=lambda o: o.last_update) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="never_visited_oldest_update_first", expected=expected_origins, ) def test_grab_next_visits_already_visited_order_by_lag( self, swh_scheduler, listed_origins_by_type, ): visit_type, origins = self._grab_next_visits_setup( swh_scheduler, listed_origins_by_type ) # Update known origins with a `last_update` field that we control base_date = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) updated_origins = [ attr.evolve(origin, last_update=base_date - timedelta(seconds=i)) for i, origin in enumerate(origins) ] updated_origins = swh_scheduler.record_listed_origins(updated_origins) # Update the visit stats with a known visit at a controlled date for # half the origins. Pick the date in the middle of the # updated_origins' `last_update` range visit_date = updated_origins[len(updated_origins) // 2].last_update visited_origins = updated_origins[::2] visit_stats = [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), last_successful=visit_date, last_visit=visit_date, ) for origin in visited_origins ] swh_scheduler.origin_visit_stats_upsert(visit_stats) # We expect to retrieve visited origins with the largest lag, but only # those which haven't been visited since their last update expected_origins = sorted( [origin for origin in visited_origins if origin.last_update > visit_date], key=lambda o: visit_date - o.last_update, ) self._check_grab_next_visit( swh_scheduler, visit_type=visit_type, policy="already_visited_order_by_lag", expected=expected_origins, ) def test_grab_next_visits_underflow(self, swh_scheduler, listed_origins_by_type): """Check that grab_next_visits works when there not enough origins in the database""" visit_type = next(iter(listed_origins_by_type)) # Only add 5 origins to the database origins = listed_origins_by_type[visit_type][:5] assert origins swh_scheduler.record_listed_origins(origins) ret = swh_scheduler.grab_next_visits( visit_type, len(origins) + 2, policy="oldest_scheduled_first" ) assert len(ret) == 5 def test_grab_next_visits_no_last_update_nor_visit_stats( self, swh_scheduler, listed_origins_by_type ): """grab_next_visits should retrieve tasks without last update (nor visit stats) """ visit_type = next(iter(listed_origins_by_type)) origins = [] for origin in listed_origins_by_type[visit_type]: origins.append( attr.evolve(origin, last_update=None) ) # void the last update so we are in the relevant context assert len(origins) > 0 swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state == {} # nor any visit statuses actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in origins ) assert len(actual_visit_stats) == 0 # Grab some new visits next_visits = swh_scheduler.grab_next_visits( visit_type, count=len(origins), policy="origins_without_last_update", ) # we do have the one without any last update assert len(next_visits) == len(origins) # Now the global state got updated current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state[visit_type] is not None actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in next_visits ) # Visit stats got algo created assert len(actual_visit_stats) == len(origins) def test_grab_next_visits_no_last_update_with_visit_stats( self, swh_scheduler, listed_origins_by_type ): """grab_next_visits should retrieve tasks without last update""" visit_type = next(iter(listed_origins_by_type)) origins = [] for origin in listed_origins_by_type[visit_type]: origins.append( attr.evolve(origin, last_update=None) ) # void the last update so we are in the relevant context assert len(origins) > 0 swh_scheduler.record_listed_origins(origins) # Initially, we have no global queue position current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state == {} # Simulate some of those origins have associated visit stats (some with an # existing queue position and some without any) visit_stats = ( [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), next_visit_queue_position=int(24 * 3600 * random.uniform(-10, 1)), ) for origin in origins[:100] ] + [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), next_visit_queue_position=int( 24 * 3600 * random.uniform(1, 10) ), # definitely > 0 ) for origin in origins[100:150] ] + [ OriginVisitStats( url=origin.url, visit_type=origin.visit_type, last_successful=utcnow(), last_visit=utcnow(), ) for origin in origins[150:] ] ) swh_scheduler.origin_visit_stats_upsert(visit_stats) # Grab next visits actual_visits = swh_scheduler.grab_next_visits( visit_type, count=len(origins), policy="origins_without_last_update", ) assert len(actual_visits) == len(origins) actual_visit_stats = swh_scheduler.origin_visit_stats_get( (o.url, o.visit_type) for o in actual_visits ) assert len(actual_visit_stats) == len(origins) current_state = swh_scheduler.visit_scheduler_queue_position_get() assert current_state == { visit_type: max( s.next_visit_queue_position for s in actual_visit_stats if s.next_visit_queue_position is not None ) } def test_grab_next_visits_unknown_policy(self, swh_scheduler): unknown_policy = "non_existing_policy" NUM_RESULTS = 5 with pytest.raises(UnknownPolicy, match=unknown_policy): swh_scheduler.grab_next_visits("type", NUM_RESULTS, policy=unknown_policy) def test_grab_next_visit_duplicates(self, swh_scheduler, listed_origins): """Checks grab_next_visits does not crash when there are rows with duplicated (origin_url, visit_type) in the database """ lister2 = swh_scheduler.get_or_create_lister(**LISTERS[1]) assert lister2.id != listed_origins[0].lister_id # Create two origins with the same url and visit_type, but different listers # (and also differing value for last_update so they are returned in # deterministic order) origin1 = attr.evolve( listed_origins[0], first_seen=utcnow(), last_seen=utcnow() ) origin2 = attr.evolve( origin1, lister_id=lister2.id, last_update=origin1.last_update + datetime.timedelta(seconds=10), ) origins = [origin1, origin2] recorded_origins = swh_scheduler.record_listed_origins(origins) expected_origins = sorted(recorded_origins, key=lambda o: o.last_update) self._check_grab_next_visit( swh_scheduler, visit_type=origin1.visit_type, policy="never_visited_oldest_update_first", expected=expected_origins, ) def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt) def test_origin_visit_stats_get_empty(self, swh_scheduler) -> None: assert swh_scheduler.origin_visit_stats_get([]) == [] def test_origin_visit_stats_get_pagination(self, swh_scheduler) -> None: page_size = inspect.signature(execute_values).parameters["page_size"].default visit_stats = [ OriginVisitStats( url=f"https://example.com/origin-{i:03d}", visit_type="git", last_successful=utcnow(), last_visit=utcnow(), ) for i in range( page_size + 1 ) # Ensure overflow of the psycopg2.extras.execute_values page_size ] swh_scheduler.origin_visit_stats_upsert(visit_stats) assert set( swh_scheduler.origin_visit_stats_get( [(ovs.url, ovs.visit_type) for ovs in visit_stats] ) ) == set(visit_stats) def test_origin_visit_stats_upsert(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/test" visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=eventful_date, last_visit=eventful_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] new_visit_date = utcnow() visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=None, last_visit=new_visit_date, ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) uneventful_visits = swh_scheduler.origin_visit_stats_get([(url, "git")]) expected_visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=eventful_date, last_visit=new_visit_date, ) assert uneventful_visits == [expected_visit_stats] def test_origin_visit_stats_upsert_with_snapshot(self, swh_scheduler) -> None: eventful_date = utcnow() url = "https://github.com/666/test" visit_stats = OriginVisitStats( url=url, visit_type="git", last_successful=eventful_date, last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ) swh_scheduler.origin_visit_stats_upsert([visit_stats]) assert swh_scheduler.origin_visit_stats_get([(url, "git")]) == [visit_stats] assert swh_scheduler.origin_visit_stats_get([(url, "svn")]) == [] def test_origin_visit_stats_upsert_batch(self, swh_scheduler) -> None: """Batch upsert is ok""" visit_stats = [ OriginVisitStats( url="foo", visit_type="git", last_successful=utcnow(), last_snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), ), OriginVisitStats( url="bar", visit_type="git", last_visit=utcnow(), last_snapshot=hash_to_bytes("fffcc0710eb6cf9efd5b920a8453e1e07157bfff"), ), ] swh_scheduler.origin_visit_stats_upsert(visit_stats) for visit_stat in swh_scheduler.origin_visit_stats_get( [(vs.url, vs.visit_type) for vs in visit_stats] ): assert visit_stat is not None def test_origin_visit_stats_upsert_cardinality_failing(self, swh_scheduler) -> None: """Batch upsert does not support altering multiple times the same origin-visit-status """ with pytest.raises(SchedulerException, match="CardinalityViolation"): swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url="foo", visit_type="git", last_successful=None, last_visit=utcnow(), ), OriginVisitStats( url="foo", visit_type="git", last_successful=utcnow(), last_visit=None, ), ] ) def test_visit_scheduler_queue_position( self, swh_scheduler, listed_origins ) -> None: result = swh_scheduler.visit_scheduler_queue_position_get() assert result == {} expected_result = {} visit_types = set() for origin in listed_origins: visit_type = origin.visit_type if visit_type in visit_types: continue visit_types.add(visit_type) position = 42 swh_scheduler.visit_scheduler_queue_position_set(visit_type, position) expected_result[visit_type] = position result = swh_scheduler.visit_scheduler_queue_position_get() assert result == expected_result def test_metrics_origins_known(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ret = swh_scheduler.update_metrics() assert sum(metric.origins_known for metric in ret) == len(listed_origins) def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) disabled_origin = attr.evolve(listed_origins[0], enabled=False) swh_scheduler.record_listed_origins([disabled_origin]) ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) for metric in ret: if metric.visit_type == disabled_origin.visit_type: # We disabled one of these origins assert metric.origins_known - metric.origins_enabled == 1 else: # But these are still all enabled assert metric.origins_known == metric.origins_enabled def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) # Pretend that we've recorded a visit on one origin visited_origin = listed_origins[0] swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, last_successful=utcnow(), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), ), ] ) ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) for metric in ret: if metric.visit_type == visited_origin.visit_type: # We visited one of these origins assert metric.origins_known - metric.origins_never_visited == 1 else: # But none of these have been visited assert metric.origins_known == metric.origins_never_visited def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) # Pretend that we've recorded a visit on one origin, in the past with # respect to the "last update" time for the origin visited_origin = listed_origins[0] assert visited_origin.last_update is not None swh_scheduler.origin_visit_stats_upsert( [ OriginVisitStats( url=visited_origin.url, visit_type=visited_origin.visit_type, last_successful=visited_origin.last_update - timedelta(days=1), last_snapshot=hash_to_bytes( "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" ), ), ] ) ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) for metric in ret: if metric.visit_type == visited_origin.visit_type: # We visited one of these origins, in the past assert metric.origins_with_pending_changes == 1 else: # But none of these have been visited assert metric.origins_with_pending_changes == 0 def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) fake_uuid = uuid.uuid4() assert all(fake_uuid != origin.lister_id for origin in listed_origins) ret = swh_scheduler.update_metrics(lister_id=fake_uuid) assert len(ret) == 0 def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) def test_update_metrics_twice(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) ts = utcnow() ret = swh_scheduler.update_metrics(timestamp=ts) assert all(metric.last_update == ts for metric in ret) second_ts = ts + timedelta(seconds=1) ret = swh_scheduler.update_metrics(timestamp=second_ts) assert all(metric.last_update == second_ts for metric in ret) def test_get_metrics(self, swh_scheduler, listed_origins): swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics() assert_metrics_equal(updated, retrieved) def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): lister_id = listed_origins[0].lister_id assert lister_id is not None swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics(lister_id=lister_id) assert len(retrieved) > 0 assert_metrics_equal( [metric for metric in updated if metric.lister_id == lister_id], retrieved ) def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): visit_type = listed_origins[0].visit_type assert visit_type is not None swh_scheduler.record_listed_origins(listed_origins) updated = swh_scheduler.update_metrics() retrieved = swh_scheduler.get_metrics(visit_type=visit_type) assert len(retrieved) > 0 assert_metrics_equal( [metric for metric in updated if metric.visit_type == visit_type], retrieved )