diff --git a/sql/updates/15.sql b/sql/updates/15.sql new file mode 100644 --- /dev/null +++ b/sql/updates/15.sql @@ -0,0 +1,24 @@ +insert into dbversion (version, release, description) + values (15, now(), 'Work In Progress'); + +create or replace function swh_scheduler_task_to_archive( + ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, + lim bigint default 10) + returns setof task_record + language sql stable +as $$ + select t.id as task_id, t.policy as task_policy, + t.status as task_status, tr.id as task_run_id, + t.arguments, t.type, tr.backend_id, tr.metadata, + tr.scheduled, tr.started, tr.ended, tr.status as task_run_status + from task_run tr inner join task t on tr.task=t.id + where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or + (t.policy = 'recurring' and t.status = 'disabled')) and + ((ts_after <= tr.started and tr.started < ts_before) or + (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and + t.id >= last_id + order by tr.task, tr.started + limit lim; +$$; + +comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function'; diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -93,13 +93,13 @@ 'timestamp': timestamp, }) - def filter_task_to_archive(self, after_ts, before_ts, limit=10, - last_id=-1): + def filter_task_to_archive(self, after_ts, before_ts, + count=10, page_token=None): return self.post('filter_task_to_archive', { 'after_ts': after_ts, 'before_ts': before_ts, - 'limit': limit, - 'last_id': last_id, + 'count': count, + 'page_token': page_token, }) def delete_archived_tasks(self, task_ids): diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,10 +9,12 @@ from arrow import Arrow, utcnow import psycopg2.pool import psycopg2.extras + +from typing import Any, Dict, Optional from psycopg2.extensions import AsIs from swh.core.db import BaseDb -from swh.core.db.common import db_transaction, db_transaction_generator +from swh.core.db.common import db_transaction logger = logging.getLogger(__name__) @@ -414,37 +416,55 @@ ) return cur.fetchone() - @db_transaction_generator() - def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, - db=None, cur=None): - """Returns the list of task/task_run prior to a given date to archive. + @db_transaction() + def filter_task_to_archive( + self, after_ts: str, before_ts: str, + count: int = 10, page_token: Optional[int] = None, + db=None, cur=None) -> Dict[str, Any]: + """Compute the tasks to archive within the datetime interval + [after_ts, before_ts[. The method returns a paginated result. + + Returns: + dict with the following keys: + - **next_page_token** (int, optional): opaque token to be used as + `page_token` to retrieve the next page of result. If absent, + there is no more pages to gather. + - **tasks**: list of task dictionaries with the following keys: + + **id** (str): origin task id + **started** (Optional[datetime]): started date + **scheduled** (datetime): scheduled date + **arguments** (json dict): task's arguments + ... """ - last_task_run_id = None - while True: - row = None - cur.execute( - "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", - (after_ts, before_ts, last_id, limit) - ) - for row in cur: - # nested type index does not accept bare values - # transform it as a dict to comply with this - row['arguments']['args'] = { - i: v for i, v in enumerate(row['arguments']['args']) - } - kwargs = row['arguments']['kwargs'] - row['arguments']['kwargs'] = json.dumps(kwargs) - yield row - - if not row: - break - _id = row.get('task_id') - _task_run_id = row.get('task_run_id') - if last_id == _id and last_task_run_id == _task_run_id: - break - last_id = _id - last_task_run_id = _task_run_id + last_id = -1 if page_token is None else page_token + tasks = [] + cur.execute( + "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", + (after_ts, before_ts, last_id, count + 1) + ) + for row in cur: + # nested type index does not accept bare values + # transform it as a dict to comply with this + row['arguments']['args'] = { + i: v for i, v in enumerate(row['arguments']['args']) + } + kwargs = row['arguments']['kwargs'] + row['arguments']['kwargs'] = json.dumps(kwargs) + tasks.append(row) + + if len(tasks) >= count + 1: # remains data, add pagination information + result = { + 'tasks': tasks[:count], + 'next_page_token': tasks[-1]['task_id'], + } + else: + result = { + 'tasks': tasks + } + + return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -537,19 +537,23 @@ return es_client.compute_index_name(date.year, date.month) def index_data(before, last_id, batch_index): - tasks_in = scheduler.filter_task_to_archive( - after, before, last_id=last_id, limit=batch_index) - for index_name, tasks_group in itertools.groupby( - tasks_in, key=group_by_index_name): - log.debug('Index tasks to %s' % index_name) - if dry_run: - for task in tasks_group: - yield task - continue - - yield from es_client.streaming_bulk( - index_name, tasks_group, source=['task_id', 'task_run_id'], - chunk_size=bulk_index, log=log) + while last_id is not None: + result = scheduler.filter_task_to_archive( + after, before, page_token=last_id, count=batch_index) + tasks_in = result['tasks'] + for index_name, tasks_group in itertools.groupby( + tasks_in, key=group_by_index_name): + log.debug('Index tasks to %s' % index_name) + if dry_run: + for task in tasks_group: + yield task + continue + + yield from es_client.streaming_bulk( + index_name, tasks_group, source=['task_id', 'task_run_id'], + chunk_size=bulk_index, log=log) + + last_id = result.get('next_page_token') gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql --- a/swh/scheduler/sql/30-swh-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (14, now(), 'Work In Progress'); + values (15, now(), 'Work In Progress'); create table task_type ( type text primary key, @@ -110,4 +110,3 @@ comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; - diff --git a/swh/scheduler/sql/40-swh-func.sql b/swh/scheduler/sql/40-swh-func.sql --- a/swh/scheduler/sql/40-swh-func.sql +++ b/swh/scheduler/sql/40-swh-func.sql @@ -305,8 +305,9 @@ from task_run tr inner join task t on tr.task=t.id where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or (t.policy = 'recurring' and t.status = 'disabled')) and - ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and - t.id > last_id + ((ts_after <= tr.started and tr.started < ts_before) or + (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and + t.id >= last_id order by tr.task, tr.started limit lim; $$; @@ -405,4 +406,3 @@ for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); - diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,10 +7,13 @@ import datetime import random import uuid + from collections import defaultdict +from typing import Any, Dict -import psycopg2 from arrow import utcnow + +import psycopg2 import pytest @@ -309,6 +312,21 @@ assert make_real_dicts(swh_scheduler.search_tasks()) \ == make_real_dicts(tasks) + def assert_filtered_task_ok( + self, task: Dict[str, Any], + after: datetime.datetime, + before: datetime.datetime) -> None: + """Ensure filtered tasks match requisite + + """ + started = task['started'] + date = started if started is not None else task['scheduled'] + assert after <= date and date <= before + if task['task_policy'] == 'oneshot': + assert task['task_status'] in ['completed', 'disabled'] + if task['task_policy'] == 'recurring': + assert task['task_status'] in ['disabled'] + def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks @@ -371,16 +389,53 @@ total_tasks_filtered = (status_per_policy['recurring'] + status_per_policy['oneshot']) + # no pagination scenario + # retrieve tasks to archive - after = _time.shift(days=-1).format('YYYY-MM-DD') - before = utcnow().shift(days=1).format('YYYY-MM-DD') - tasks_to_archive = list(swh_scheduler.filter_task_to_archive( - after_ts=after, before_ts=before, limit=total_tasks)) + after = _time.shift(days=-1) + after_ts = after.format('YYYY-MM-DD') + before = utcnow().shift(days=1) + before_ts = before.format('YYYY-MM-DD') + tasks_result = swh_scheduler.filter_task_to_archive( + after_ts=after_ts, before_ts=before_ts, count=total_tasks) + + tasks_to_archive = tasks_result['tasks'] assert len(tasks_to_archive) == total_tasks_filtered + assert tasks_result.get('next_page_token') is None actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} for task in tasks_to_archive: + self.assert_filtered_task_ok(task, after, before) + actual_filtered_per_status[task['task_policy']] += 1 + + assert actual_filtered_per_status == status_per_policy + + # pagination scenario + + nb_tasks = 3 + tasks_result = swh_scheduler.filter_task_to_archive( + after_ts=after_ts, before_ts=before_ts, count=nb_tasks) + + tasks_to_archive2 = tasks_result['tasks'] + + assert len(tasks_to_archive2) == nb_tasks + next_id = tasks_result['next_page_token'] + assert next_id is not None + + all_tasks = tasks_to_archive2 + while next_id is not None: # Retrieve all results by pagination + tasks_result = swh_scheduler.filter_task_to_archive( + after_ts=after_ts, before_ts=before_ts, count=nb_tasks, + page_token=next_id) + tasks_to_archive2 = tasks_result['tasks'] + assert len(tasks_to_archive2) <= nb_tasks + all_tasks.extend(tasks_to_archive2) + next_id = tasks_result.get('next_page_token') + + actual_filtered_per_status = {'recurring': 0, 'oneshot': 0} + for task in all_tasks: + self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task['task_policy']] += 1 assert actual_filtered_per_status == status_per_policy diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ deps = pytest-cov pifpaf + dev: ipdb setenv = LC_ALL=C.UTF-8 LC_CTYPE=C.UTF-8