Page MenuHomeSoftware Heritage

D4642.id16505.diff
No OneTemporary

D4642.id16505.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -14,6 +14,9 @@
[mypy-elasticsearch.*]
ignore_missing_imports = True
+[mypy-humanize.*]
+ignore_missing_imports = True
+
[mypy-kombu.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,13 +2,13 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-arrow
attrs
attrs-strict
celery >= 4.3
Click
elasticsearch > 5.4
flask
+humanize
pika >= 1.1.0
psycopg2
pyyaml
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -8,14 +8,13 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from uuid import UUID
-from arrow import Arrow, utcnow
import attr
-from psycopg2.extensions import AsIs
import psycopg2.extras
import psycopg2.pool
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
+from swh.scheduler.utils import utcnow
from .exc import StaleData
from .model import (
@@ -28,12 +27,7 @@
logger = logging.getLogger(__name__)
-def adapt_arrow(arrow):
- return AsIs("'%s'::timestamptz" % arrow.isoformat())
-
-
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
-psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
psycopg2.extras.register_uuid()
diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -8,13 +8,13 @@
import sys
import time
-from arrow import utcnow
import celery
from celery.events import EventReceiver
import click
from kombu import Queue
from swh.core.statsd import statsd
+from swh.scheduler.utils import utcnow
class ReliableEventReceiver(EventReceiver):
diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py
--- a/swh/scheduler/celery_backend/pika_listener.py
+++ b/swh/scheduler/celery_backend/pika_listener.py
@@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
import json
import logging
import sys
@@ -12,14 +11,11 @@
from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
+from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
-def utcnow():
- return datetime.datetime.now(tz=datetime.timezone.utc)
-
-
def get_listener(broker_url, queue_name, scheduler_backend):
connection = pika.BlockingConnection(pika.URLParameters(broker_url))
channel = connection.channel()
diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py
--- a/swh/scheduler/celery_backend/runner.py
+++ b/swh/scheduler/celery_backend/runner.py
@@ -5,13 +5,14 @@
import logging
-import arrow
from kombu.utils.uuid import uuid
from swh.core.statsd import statsd
from swh.scheduler import compute_nb_tasks_from, get_scheduler
+from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
+
# Max batch size for tasks
MAX_NUM_TASKS = 10000
@@ -29,7 +30,7 @@
{
'task': the scheduler's task id,
'backend_id': Celery's task id,
- 'scheduler': arrow.utcnow()
+ 'scheduler': utcnow()
}
The result can be used to block-wait for the tasks' results::
@@ -98,7 +99,7 @@
data = {
"task": task["id"],
"backend_id": backend_id,
- "scheduled": arrow.utcnow(),
+ "scheduled": utcnow(),
}
backend_tasks.append(data)
diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py
--- a/swh/scheduler/cli/task.py
+++ b/swh/scheduler/cli/task.py
@@ -20,34 +20,18 @@
locale.setlocale(locale.LC_ALL, "")
-ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0]
-
-
-class DateTimeType(click.ParamType):
- name = "time and date"
-
- def convert(self, value, param, ctx):
- import arrow
-
- if not isinstance(value, arrow.Arrow):
- value = arrow.get(value)
-
- return value
-
-
-DATETIME = DateTimeType()
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
+DATETIME = click.DateTime()
def format_dict(d):
+ """Recursively format date objects in the dict passed as argument"""
import datetime
- import arrow
-
ret = {}
for k, v in d.items():
- if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)):
- v = arrow.get(v).format()
+ if isinstance(v, (datetime.date, datetime.datetime)):
+ v = v.isoformat()
elif isinstance(v, dict):
v = format_dict(v)
ret[k] = v
@@ -96,7 +80,7 @@
... }
>>> print(click.unstyle(pretty_print_task(task)))
Task 1234
- Next run: ... (2019-02-21 13:52:35+00:00)
+ Next run: ... (2019-02-21T13:52:35.407818)
Interval: 1:00:00
Type: test_task
Policy: oneshot
@@ -110,7 +94,7 @@
<BLANKLINE>
>>> print(click.unstyle(pretty_print_task(task, full=True)))
Task 1234
- Next run: ... (2019-02-21 13:52:35+00:00)
+ Next run: ... (2019-02-21T13:52:35.407818)
Interval: 1:00:00
Type: test_task
Policy: oneshot
@@ -125,13 +109,13 @@
key2: 42
<BLANKLINE>
"""
- import arrow
+ import humanize
- next_run = arrow.get(task["next_run"])
+ next_run = task["next_run"]
lines = [
"%s %s\n" % (click.style("Task", bold=True), task["id"]),
click.style(" Next run: ", bold=True),
- "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()),
+ "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
"\n",
click.style(" Interval: ", bold=True),
str(task["current_interval"]),
@@ -213,10 +197,10 @@
import csv
import json
- import arrow
+ from swh.scheduler.utils import utcnow
tasks = []
- now = arrow.utcnow()
+ now = utcnow()
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
@@ -230,7 +214,7 @@
"args": args,
"kwargs": kwargs,
}
- task["next_run"] = DATETIME.convert(task.get("next_run", now), None, None)
+ task["next_run"] = task.get("next_run", now)
tasks.append(task)
created = scheduler.create_tasks(tasks)
@@ -273,7 +257,7 @@
Note: if the priority is not given, the task won't have the priority set,
which is considered as the lowest priority level.
"""
- import arrow
+ from swh.scheduler.utils import utcnow
from .utils import parse_options
@@ -281,7 +265,7 @@
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
- now = arrow.utcnow()
+ now = utcnow()
(args, kw) = parse_options(options)
task = {
@@ -289,7 +273,7 @@
"policy": policy,
"priority": priority,
"arguments": {"args": args, "kwargs": kw,},
- "next_run": DATETIME.convert(next_run or now, None, None),
+ "next_run": next_run or now,
}
created = scheduler.create_tasks([task])
@@ -587,13 +571,13 @@
swh-scheduler task respawn 1 3 12
"""
- import arrow
+ from swh.scheduler.utils import utcnow
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
if next_run is None:
- next_run = arrow.utcnow()
+ next_run = utcnow()
output = []
scheduler.set_status_tasks(
@@ -678,10 +662,9 @@
"""
from itertools import groupby
- import arrow
-
from swh.core.utils import grouper
from swh.scheduler.backend_es import ElasticSearchBackend
+ from swh.scheduler.utils import utcnow
config = ctx.obj["config"]
scheduler = ctx.obj["scheduler"]
@@ -699,7 +682,7 @@
logger.info("**NO CLEANUP**")
es_storage = ElasticSearchBackend(**config)
- now = arrow.utcnow()
+ now = utcnow()
# Default to archive tasks from a rolling month starting the week
# prior to the current one
diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py
--- a/swh/scheduler/tests/es/test_cli_task.py
+++ b/swh/scheduler/tests/es/test_cli_task.py
@@ -8,11 +8,11 @@
import random
import uuid
-import arrow
from click.testing import CliRunner
import pytest
from swh.scheduler.cli import cli
+from swh.scheduler.utils import utcnow
from ..common import TASK_TYPES, TEMPLATES, tasks_from_template
@@ -28,7 +28,7 @@
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)
- next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1)
+ next_run_start = utcnow() - datetime.timedelta(days=1)
recurring = tasks_from_template(template_git, next_run_start, 100)
oneshots = tasks_from_template(
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -16,7 +16,7 @@
from swh.core.api.classes import stream_results
from swh.model.model import Origin
from swh.scheduler.cli import cli
-from swh.scheduler.utils import create_task_dict
+from swh.scheduler.utils import create_task_dict, utcnow
from swh.storage import get_storage
CLI_CONFIG = """
@@ -47,10 +47,10 @@
def test_schedule_tasks(swh_scheduler):
csv_data = (
b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
- + datetime.datetime.utcnow().isoformat().encode()
+ + utcnow().isoformat().encode()
+ b"\n"
+ b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
- + datetime.datetime.utcnow().isoformat().encode()
+ + utcnow().isoformat().encode()
+ b"\n"
)
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
@@ -63,7 +63,7 @@
Created 2 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
@@ -73,7 +73,7 @@
key: 'value'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
@@ -114,7 +114,7 @@
Created 1 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -139,7 +139,7 @@
Created 1 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
@@ -177,7 +177,7 @@
Found 1 swh-test-ping tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -229,7 +229,7 @@
Found 1 swh-test-ping tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -261,7 +261,7 @@
Found 2 swh-test-ping tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -270,7 +270,7 @@
key: 'value0'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -279,7 +279,7 @@
key: 'value1'
Task 3
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -315,7 +315,7 @@
Found 1 swh-test-ping tasks
Task 2
- Next run: in a day \(.*\)
+ Next run: tomorrow \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -342,7 +342,7 @@
Found 2 tasks
Task 1
- Next run: in 3 days \(.*\)
+ Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -353,7 +353,7 @@
key: 'value1'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -380,7 +380,7 @@
Found 1 tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -409,7 +409,7 @@
Found 2 tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -420,7 +420,7 @@
key: 'value2'
Task 3
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -449,7 +449,7 @@
Found 2 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -460,7 +460,7 @@
key: 'value1'
Task 3
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -487,7 +487,7 @@
Found 2 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -498,7 +498,7 @@
key: 'value1'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -536,7 +536,7 @@
Found 1 tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -574,7 +574,7 @@
Found 1 tasks
Task 1
- Next run: in 3 days \(.*\)
+ Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -11,16 +11,18 @@
from typing import Any, Dict, List, Optional
import uuid
-from arrow import utcnow
import attr
import pytest
from swh.scheduler.exc import StaleData
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin, ListedOriginPageToken
+from swh.scheduler.utils import utcnow
from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template
+ONEDAY = datetime.timedelta(days=1)
+
def subdict(d, keys=None, excl=()):
if keys is None:
@@ -394,10 +396,10 @@
# no pagination scenario
# retrieve tasks to archive
- after = _time.shift(days=-1)
- after_ts = after.format("YYYY-MM-DD")
- before = utcnow().shift(days=1)
- before_ts = before.format("YYYY-MM-DD")
+ after = _time - ONEDAY
+ after_ts = after.strftime("%Y-%m-%d")
+ before = utcnow() + ONEDAY
+ before_ts = before.strftime("%Y-%m-%d")
tasks_result = swh_scheduler.filter_task_to_archive(
after_ts=after_ts, before_ts=before_ts, limit=total_tasks
)
diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py
--- a/swh/scheduler/utils.py
+++ b/swh/scheduler/utils.py
@@ -7,6 +7,10 @@
from datetime import datetime, timezone
+def utcnow():
+ return datetime.now(tz=timezone.utc)
+
+
def get_task(task_name):
"""Retrieve task object in our application instance by its fully
qualified python name.
@@ -50,7 +54,7 @@
task = {
"policy": policy,
"type": type,
- "next_run": datetime.now(tz=timezone.utc),
+ "next_run": utcnow(),
"arguments": {
"args": args if args else [],
"kwargs": kwargs if kwargs else {},

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 24, 6:04 PM (1 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226186

Event Timeline