Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/backend.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import logging | import logging | ||||
from arrow import Arrow, utcnow | from arrow import Arrow, utcnow | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from typing import Any, Dict | |||||
from psycopg2.extensions import AsIs | from psycopg2.extensions import AsIs | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.common import db_transaction, db_transaction_generator | from swh.core.db.common import db_transaction | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def adapt_arrow(arrow): | def adapt_arrow(arrow): | ||||
return AsIs("'%s'::timestamptz" % arrow.isoformat()) | return AsIs("'%s'::timestamptz" % arrow.isoformat()) | ||||
▲ Show 20 Lines • Show All 385 Lines • ▼ Show 20 Lines | def end_task_run(self, backend_id, status, metadata=None, timestamp=None, | ||||
timestamp = utcnow() | timestamp = utcnow() | ||||
cur.execute( | cur.execute( | ||||
'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', | 'select * from swh_scheduler_end_task_run(%s, %s, %s, %s)', | ||||
(backend_id, status, metadata, timestamp) | (backend_id, status, metadata, timestamp) | ||||
) | ) | ||||
return cur.fetchone() | return cur.fetchone() | ||||
@db_transaction_generator() | @db_transaction() | ||||
def filter_task_to_archive(self, after_ts, before_ts, limit=10, last_id=-1, | def filter_task_to_archive( | ||||
db=None, cur=None): | self, after_ts: str, before_ts: str, limit: int = 10, | ||||
vlorentz: `page_token` should be an optional string (both for consistency, and be cause we want it to be… | |||||
Done Inline Actionsyes, i saw it this morning when reading back your other diffs. I'm ultimately trying to add tests on the other diff and it takes me way too long. ardumont: yes, i saw it this morning when reading back your other diffs.
I'm ultimately trying to add… | |||||
"""Returns the list of task/task_run prior to a given date to archive. | last_id: int = -1, db=None, cur=None) -> Dict[str, Any]: | ||||
"""Compute the tasks to archive between after_ts and before_ts interval. | |||||
The result is paginated | |||||
""" | """ | ||||
last_task_run_id = None | tasks = [] | ||||
while True: | |||||
row = None | |||||
cur.execute( | cur.execute( | ||||
"select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", | "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", | ||||
(after_ts, before_ts, last_id, limit) | (after_ts, before_ts, last_id, limit + 1) | ||||
) | ) | ||||
for row in cur: | for row in cur: | ||||
# nested type index does not accept bare values | # nested type index does not accept bare values | ||||
# transform it as a dict to comply with this | # transform it as a dict to comply with this | ||||
row['arguments']['args'] = { | row['arguments']['args'] = { | ||||
i: v for i, v in enumerate(row['arguments']['args']) | i: v for i, v in enumerate(row['arguments']['args']) | ||||
} | } | ||||
kwargs = row['arguments']['kwargs'] | kwargs = row['arguments']['kwargs'] | ||||
row['arguments']['kwargs'] = json.dumps(kwargs) | row['arguments']['kwargs'] = json.dumps(kwargs) | ||||
yield row | tasks.append(row) | ||||
if len(tasks) >= limit + 1: # remains data, add pagination information | |||||
result = { | |||||
'tasks': tasks[:limit], | |||||
'next_task_id': tasks[-1]['task_id'], | |||||
} | |||||
else: | |||||
result = { | |||||
'tasks': tasks | |||||
} | |||||
if not row: | return result | ||||
break | |||||
_id = row.get('task_id') | |||||
_task_run_id = row.get('task_run_id') | |||||
if last_id == _id and last_task_run_id == _task_run_id: | |||||
break | |||||
last_id = _id | |||||
last_task_run_id = _task_run_id | |||||
@db_transaction() | @db_transaction() | ||||
def delete_archived_tasks(self, task_ids, db=None, cur=None): | def delete_archived_tasks(self, task_ids, db=None, cur=None): | ||||
"""Delete archived tasks as much as possible. Only the task_ids whose | """Delete archived tasks as much as possible. Only the task_ids whose | ||||
complete associated task_run have been cleaned up will be. | complete associated task_run have been cleaned up will be. | ||||
""" | """ | ||||
_task_ids = _task_run_ids = [] | _task_ids = _task_run_ids = [] | ||||
Show All 38 Lines |
page_token should be an optional string (both for consistency, and be cause we want it to be an opaque token we can change at any time)