diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,9 @@ [mypy-elasticsearch.*] ignore_missing_imports = True +[mypy-humanize.*] +ignore_missing_imports = True + [mypy-kombu.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,13 +2,13 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html -arrow attrs attrs-strict celery >= 4.3 Click elasticsearch > 5.4 flask +humanize pika >= 1.1.0 psycopg2 pyyaml diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -8,14 +8,13 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from uuid import UUID -from arrow import Arrow, utcnow import attr -from psycopg2.extensions import AsIs import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction +from swh.scheduler.utils import utcnow from .exc import StaleData from .model import ( @@ -28,12 +27,7 @@ logger = logging.getLogger(__name__) -def adapt_arrow(arrow): - return AsIs("'%s'::timestamptz" % arrow.isoformat()) - - psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) -psycopg2.extensions.register_adapter(Arrow, adapt_arrow) psycopg2.extras.register_uuid() diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -8,13 +8,13 @@ import sys import time -from arrow import utcnow import celery from celery.events import EventReceiver import click from kombu import Queue from swh.core.statsd import statsd +from swh.scheduler.utils import utcnow class ReliableEventReceiver(EventReceiver): diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py --- a/swh/scheduler/celery_backend/pika_listener.py +++ b/swh/scheduler/celery_backend/pika_listener.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime import json import logging import sys @@ -12,14 +11,11 @@ from swh.core.statsd import statsd from swh.scheduler import get_scheduler +from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - def get_listener(broker_url, queue_name, scheduler_backend): connection = pika.BlockingConnection(pika.URLParameters(broker_url)) channel = connection.channel() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -5,13 +5,14 @@ import logging -import arrow from kombu.utils.uuid import uuid from swh.core.statsd import statsd from swh.scheduler import compute_nb_tasks_from, get_scheduler +from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) + # Max batch size for tasks MAX_NUM_TASKS = 10000 @@ -29,7 +30,7 @@ { 'task': the scheduler's task id, 'backend_id': Celery's task id, - 'scheduler': arrow.utcnow() + 'scheduler': utcnow() } The result can be used to block-wait for the tasks' results:: @@ -98,7 +99,7 @@ data = { "task": task["id"], "backend_id": backend_id, - "scheduled": arrow.utcnow(), + "scheduled": utcnow(), } backend_tasks.append(data) diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py --- a/swh/scheduler/cli/task.py +++ b/swh/scheduler/cli/task.py @@ -20,34 +20,18 @@ locale.setlocale(locale.LC_ALL, "") -ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] - - -class DateTimeType(click.ParamType): - name = "time and date" - - def convert(self, value, param, ctx): - import arrow - - if not isinstance(value, arrow.Arrow): - value = arrow.get(value) - - return value - - -DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) +DATETIME = click.DateTime() def format_dict(d): + """Recursively format date objects in the dict passed as argument""" import datetime - import arrow - ret = {} for k, v in d.items(): - if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)): - v = arrow.get(v).format() + if isinstance(v, (datetime.date, datetime.datetime)): + v = v.isoformat() elif isinstance(v, dict): v = format_dict(v) ret[k] = v @@ -96,7 +80,7 @@ ... } >>> print(click.unstyle(pretty_print_task(task))) Task 1234 - Next run: ... (2019-02-21 13:52:35+00:00) + Next run: ... (2019-02-21T13:52:35.407818) Interval: 1:00:00 Type: test_task Policy: oneshot @@ -110,7 +94,7 @@ >>> print(click.unstyle(pretty_print_task(task, full=True))) Task 1234 - Next run: ... (2019-02-21 13:52:35+00:00) + Next run: ... (2019-02-21T13:52:35.407818) Interval: 1:00:00 Type: test_task Policy: oneshot @@ -125,13 +109,13 @@ key2: 42 """ - import arrow + import humanize - next_run = arrow.get(task["next_run"]) + next_run = task["next_run"] lines = [ "%s %s\n" % (click.style("Task", bold=True), task["id"]), click.style(" Next run: ", bold=True), - "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), + "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()), "\n", click.style(" Interval: ", bold=True), str(task["current_interval"]), @@ -213,10 +197,10 @@ import csv import json - import arrow + from swh.scheduler.utils import utcnow tasks = [] - now = arrow.utcnow() + now = utcnow() scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") @@ -230,7 +214,7 @@ "args": args, "kwargs": kwargs, } - task["next_run"] = DATETIME.convert(task.get("next_run", now), None, None) + task["next_run"] = task.get("next_run", now) tasks.append(task) created = scheduler.create_tasks(tasks) @@ -273,7 +257,7 @@ Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. """ - import arrow + from swh.scheduler.utils import utcnow from .utils import parse_options @@ -281,7 +265,7 @@ if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") - now = arrow.utcnow() + now = utcnow() (args, kw) = parse_options(options) task = { @@ -289,7 +273,7 @@ "policy": policy, "priority": priority, "arguments": {"args": args, "kwargs": kw,}, - "next_run": DATETIME.convert(next_run or now, None, None), + "next_run": next_run or now, } created = scheduler.create_tasks([task]) @@ -587,13 +571,13 @@ swh-scheduler task respawn 1 3 12 """ - import arrow + from swh.scheduler.utils import utcnow scheduler = ctx.obj["scheduler"] if not scheduler: raise ValueError("Scheduler class (local/remote) must be instantiated") if next_run is None: - next_run = arrow.utcnow() + next_run = utcnow() output = [] scheduler.set_status_tasks( @@ -678,10 +662,9 @@ """ from itertools import groupby - import arrow - from swh.core.utils import grouper from swh.scheduler.backend_es import ElasticSearchBackend + from swh.scheduler.utils import utcnow config = ctx.obj["config"] scheduler = ctx.obj["scheduler"] @@ -699,7 +682,7 @@ logger.info("**NO CLEANUP**") es_storage = ElasticSearchBackend(**config) - now = arrow.utcnow() + now = utcnow() # Default to archive tasks from a rolling month starting the week # prior to the current one diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py --- a/swh/scheduler/tests/es/test_cli_task.py +++ b/swh/scheduler/tests/es/test_cli_task.py @@ -8,11 +8,11 @@ import random import uuid -import arrow from click.testing import CliRunner import pytest from swh.scheduler.cli import cli +from swh.scheduler.utils import utcnow from ..common import TASK_TYPES, TEMPLATES, tasks_from_template @@ -28,7 +28,7 @@ for tt in TASK_TYPES.values(): scheduler.create_task_type(tt) - next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1) + next_run_start = utcnow() - datetime.timedelta(days=1) recurring = tasks_from_template(template_git, next_run_start, 100) oneshots = tasks_from_template( diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -16,7 +16,7 @@ from swh.core.api.classes import stream_results from swh.model.model import Origin from swh.scheduler.cli import cli -from swh.scheduler.utils import create_task_dict +from swh.scheduler.utils import create_task_dict, utcnow from swh.storage import get_storage CLI_CONFIG = """ @@ -47,10 +47,10 @@ def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' - + datetime.datetime.utcnow().isoformat().encode() + + utcnow().isoformat().encode() + b"\n" + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' - + datetime.datetime.utcnow().isoformat().encode() + + utcnow().isoformat().encode() + b"\n" ) with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd: @@ -63,7 +63,7 @@ Created 2 tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring @@ -73,7 +73,7 @@ key: 'value' Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring @@ -114,7 +114,7 @@ Created 1 tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -139,7 +139,7 @@ Created 1 tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring @@ -177,7 +177,7 @@ Found 1 swh-test-ping tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -229,7 +229,7 @@ Found 1 swh-test-ping tasks Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -261,7 +261,7 @@ Found 2 swh-test-ping tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -270,7 +270,7 @@ key: 'value0' Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -279,7 +279,7 @@ key: 'value1' Task 3 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -315,7 +315,7 @@ Found 1 swh-test-ping tasks Task 2 - Next run: in a day \(.*\) + Next run: tomorrow \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -342,7 +342,7 @@ Found 2 tasks Task 1 - Next run: in 3 days \(.*\) + Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -353,7 +353,7 @@ key: 'value1' Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -380,7 +380,7 @@ Found 1 tasks Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -409,7 +409,7 @@ Found 2 tasks Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -420,7 +420,7 @@ key: 'value2' Task 3 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -449,7 +449,7 @@ Found 2 tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -460,7 +460,7 @@ key: 'value1' Task 3 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -487,7 +487,7 @@ Found 2 tasks Task 1 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -498,7 +498,7 @@ key: 'value1' Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -536,7 +536,7 @@ Found 1 tasks Task 2 - Next run: just now \(.*\) + Next run: today \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot @@ -574,7 +574,7 @@ Found 1 tasks Task 1 - Next run: in 3 days \(.*\) + Next run: .+ \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -11,16 +11,18 @@ from typing import Any, Dict, List, Optional import uuid -from arrow import utcnow import attr import pytest from swh.scheduler.exc import StaleData from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin, ListedOriginPageToken +from swh.scheduler.utils import utcnow from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template +ONEDAY = datetime.timedelta(days=1) + def subdict(d, keys=None, excl=()): if keys is None: @@ -394,10 +396,10 @@ # no pagination scenario # retrieve tasks to archive - after = _time.shift(days=-1) - after_ts = after.format("YYYY-MM-DD") - before = utcnow().shift(days=1) - before_ts = before.format("YYYY-MM-DD") + after = _time - ONEDAY + after_ts = after.strftime("%Y-%m-%d") + before = utcnow() + ONEDAY + before_ts = before.strftime("%Y-%m-%d") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py --- a/swh/scheduler/utils.py +++ b/swh/scheduler/utils.py @@ -7,6 +7,10 @@ from datetime import datetime, timezone +def utcnow(): + return datetime.now(tz=timezone.utc) + + def get_task(task_name): """Retrieve task object in our application instance by its fully qualified python name. @@ -50,7 +54,7 @@ task = { "policy": policy, "type": type, - "next_run": datetime.now(tz=timezone.utc), + "next_run": utcnow(), "arguments": { "args": args if args else [], "kwargs": kwargs if kwargs else {},