Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_scheduler.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import random | import random | ||||
import uuid | import uuid | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from typing import Any, Dict | |||||
import psycopg2 | |||||
from arrow import utcnow | from arrow import utcnow | ||||
import psycopg2 | |||||
import pytest | import pytest | ||||
TASK_TYPES = { | TASK_TYPES = { | ||||
'git': { | 'git': { | ||||
'type': 'update-git', | 'type': 'update-git', | ||||
'description': 'Update a git repository', | 'description': 'Update a git repository', | ||||
'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', | 'backend_name': 'swh.loader.git.tasks.UpdateGitRepository', | ||||
▲ Show 20 Lines • Show All 282 Lines • ▼ Show 20 Lines | def test_search_tasks(self, swh_scheduler): | ||||
return [dict(d.items()) for d in l] | return [dict(d.items()) for d in l] | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
t = utcnow() | t = utcnow() | ||||
tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | tasks = self._tasks_from_template(TEMPLATES['git'], t, 100) | ||||
tasks = swh_scheduler.create_tasks(tasks) | tasks = swh_scheduler.create_tasks(tasks) | ||||
assert make_real_dicts(swh_scheduler.search_tasks()) \ | assert make_real_dicts(swh_scheduler.search_tasks()) \ | ||||
== make_real_dicts(tasks) | == make_real_dicts(tasks) | ||||
def assert_filtered_task_ok( | |||||
self, task: Dict[str, Any], | |||||
after: datetime.datetime, | |||||
before: datetime.datetime) -> None: | |||||
"""Ensure filtered tasks have the right expected properties | |||||
(within the range, recurring disabled, etc..) | |||||
""" | |||||
started = task['started'] | |||||
date = started if started is not None else task['scheduled'] | |||||
assert after <= date and date <= before | |||||
if task['task_policy'] == 'oneshot': | |||||
assert task['task_status'] in ['completed', 'disabled'] | |||||
if task['task_policy'] == 'recurring': | |||||
assert task['task_status'] in ['disabled'] | |||||
def test_filter_task_to_archive(self, swh_scheduler): | def test_filter_task_to_archive(self, swh_scheduler): | ||||
"""Filtering only list disabled recurring or completed oneshot tasks | """Filtering only list disabled recurring or completed oneshot tasks | ||||
""" | """ | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) | recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12) | ||||
oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) | oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12) | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def test_filter_task_to_archive(self, swh_scheduler): | ||||
_task_ids['oneshot'], status='next_run_not_scheduled') | _task_ids['oneshot'], status='next_run_not_scheduled') | ||||
# complete the tasks to update | # complete the tasks to update | ||||
swh_scheduler.set_status_tasks( | swh_scheduler.set_status_tasks( | ||||
tasks_to_update['oneshot'], status='completed') | tasks_to_update['oneshot'], status='completed') | ||||
total_tasks_filtered = (status_per_policy['recurring'] + | total_tasks_filtered = (status_per_policy['recurring'] + | ||||
status_per_policy['oneshot']) | status_per_policy['oneshot']) | ||||
# no pagination scenario | |||||
# retrieve tasks to archive | # retrieve tasks to archive | ||||
after = _time.shift(days=-1).format('YYYY-MM-DD') | after = _time.shift(days=-1) | ||||
before = utcnow().shift(days=1).format('YYYY-MM-DD') | after_ts = after.format('YYYY-MM-DD') | ||||
tasks_to_archive = list(swh_scheduler.filter_task_to_archive( | before = utcnow().shift(days=1) | ||||
after_ts=after, before_ts=before, limit=total_tasks)) | before_ts = before.format('YYYY-MM-DD') | ||||
tasks_result = swh_scheduler.filter_task_to_archive( | |||||
after_ts=after_ts, before_ts=before_ts, count=total_tasks) | |||||
tasks_to_archive = tasks_result['tasks'] | |||||
assert len(tasks_to_archive) == total_tasks_filtered | assert len(tasks_to_archive) == total_tasks_filtered | ||||
assert tasks_result.get('next_page_token') is None | |||||
actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} | actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} | ||||
for task in tasks_to_archive: | for task in tasks_to_archive: | ||||
self.assert_filtered_task_ok(task, after, before) | |||||
actual_filtered_per_status[task['task_policy']] += 1 | |||||
assert actual_filtered_per_status == status_per_policy | |||||
# pagination scenario | |||||
nb_tasks = 3 | |||||
tasks_result = swh_scheduler.filter_task_to_archive( | |||||
after_ts=after_ts, before_ts=before_ts, count=nb_tasks) | |||||
tasks_to_archive2 = tasks_result['tasks'] | |||||
assert len(tasks_to_archive2) == nb_tasks | |||||
next_id = tasks_result['next_page_token'] | |||||
assert next_id is not None | |||||
all_tasks = tasks_to_archive2 | |||||
while next_id is not None: # Retrieve all results by pagination | |||||
tasks_result = swh_scheduler.filter_task_to_archive( | |||||
after_ts=after_ts, before_ts=before_ts, count=nb_tasks, | |||||
page_token=next_id) | |||||
tasks_to_archive2 = tasks_result['tasks'] | |||||
assert len(tasks_to_archive2) <= nb_tasks | |||||
all_tasks.extend(tasks_to_archive2) | |||||
next_id = tasks_result.get('next_page_token') | |||||
actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} | |||||
for task in all_tasks: | |||||
self.assert_filtered_task_ok(task, after, before) | |||||
actual_filtered_per_status[task['task_policy']] += 1 | actual_filtered_per_status[task['task_policy']] += 1 | ||||
assert actual_filtered_per_status == status_per_policy | assert actual_filtered_per_status == status_per_policy | ||||
def test_delete_archived_tasks(self, swh_scheduler): | def test_delete_archived_tasks(self, swh_scheduler): | ||||
self._create_task_types(swh_scheduler) | self._create_task_types(swh_scheduler) | ||||
_time = utcnow() | _time = utcnow() | ||||
recurring = self._tasks_from_template( | recurring = self._tasks_from_template( | ||||
▲ Show 20 Lines • Show All 205 Lines • Show Last 20 Lines |