Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9749649
D4642.id16505.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D4642.id16505.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -14,6 +14,9 @@
[mypy-elasticsearch.*]
ignore_missing_imports = True
+[mypy-humanize.*]
+ignore_missing_imports = True
+
[mypy-kombu.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,13 +2,13 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-arrow
attrs
attrs-strict
celery >= 4.3
Click
elasticsearch > 5.4
flask
+humanize
pika >= 1.1.0
psycopg2
pyyaml
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -8,14 +8,13 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from uuid import UUID
-from arrow import Arrow, utcnow
import attr
-from psycopg2.extensions import AsIs
import psycopg2.extras
import psycopg2.pool
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
+from swh.scheduler.utils import utcnow
from .exc import StaleData
from .model import (
@@ -28,12 +27,7 @@
logger = logging.getLogger(__name__)
-def adapt_arrow(arrow):
- return AsIs("'%s'::timestamptz" % arrow.isoformat())
-
-
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
-psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
psycopg2.extras.register_uuid()
diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -8,13 +8,13 @@
import sys
import time
-from arrow import utcnow
import celery
from celery.events import EventReceiver
import click
from kombu import Queue
from swh.core.statsd import statsd
+from swh.scheduler.utils import utcnow
class ReliableEventReceiver(EventReceiver):
diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py
--- a/swh/scheduler/celery_backend/pika_listener.py
+++ b/swh/scheduler/celery_backend/pika_listener.py
@@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import datetime
import json
import logging
import sys
@@ -12,14 +11,11 @@
from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
+from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
-def utcnow():
- return datetime.datetime.now(tz=datetime.timezone.utc)
-
-
def get_listener(broker_url, queue_name, scheduler_backend):
connection = pika.BlockingConnection(pika.URLParameters(broker_url))
channel = connection.channel()
diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py
--- a/swh/scheduler/celery_backend/runner.py
+++ b/swh/scheduler/celery_backend/runner.py
@@ -5,13 +5,14 @@
import logging
-import arrow
from kombu.utils.uuid import uuid
from swh.core.statsd import statsd
from swh.scheduler import compute_nb_tasks_from, get_scheduler
+from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
+
# Max batch size for tasks
MAX_NUM_TASKS = 10000
@@ -29,7 +30,7 @@
{
'task': the scheduler's task id,
'backend_id': Celery's task id,
- 'scheduler': arrow.utcnow()
+ 'scheduler': utcnow()
}
The result can be used to block-wait for the tasks' results::
@@ -98,7 +99,7 @@
data = {
"task": task["id"],
"backend_id": backend_id,
- "scheduled": arrow.utcnow(),
+ "scheduled": utcnow(),
}
backend_tasks.append(data)
diff --git a/swh/scheduler/cli/task.py b/swh/scheduler/cli/task.py
--- a/swh/scheduler/cli/task.py
+++ b/swh/scheduler/cli/task.py
@@ -20,34 +20,18 @@
locale.setlocale(locale.LC_ALL, "")
-ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0]
-
-
-class DateTimeType(click.ParamType):
- name = "time and date"
-
- def convert(self, value, param, ctx):
- import arrow
-
- if not isinstance(value, arrow.Arrow):
- value = arrow.get(value)
-
- return value
-
-
-DATETIME = DateTimeType()
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
+DATETIME = click.DateTime()
def format_dict(d):
+ """Recursively format date objects in the dict passed as argument"""
import datetime
- import arrow
-
ret = {}
for k, v in d.items():
- if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)):
- v = arrow.get(v).format()
+ if isinstance(v, (datetime.date, datetime.datetime)):
+ v = v.isoformat()
elif isinstance(v, dict):
v = format_dict(v)
ret[k] = v
@@ -96,7 +80,7 @@
... }
>>> print(click.unstyle(pretty_print_task(task)))
Task 1234
- Next run: ... (2019-02-21 13:52:35+00:00)
+ Next run: ... (2019-02-21T13:52:35.407818)
Interval: 1:00:00
Type: test_task
Policy: oneshot
@@ -110,7 +94,7 @@
<BLANKLINE>
>>> print(click.unstyle(pretty_print_task(task, full=True)))
Task 1234
- Next run: ... (2019-02-21 13:52:35+00:00)
+ Next run: ... (2019-02-21T13:52:35.407818)
Interval: 1:00:00
Type: test_task
Policy: oneshot
@@ -125,13 +109,13 @@
key2: 42
<BLANKLINE>
"""
- import arrow
+ import humanize
- next_run = arrow.get(task["next_run"])
+ next_run = task["next_run"]
lines = [
"%s %s\n" % (click.style("Task", bold=True), task["id"]),
click.style(" Next run: ", bold=True),
- "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()),
+ "%s (%s)" % (humanize.naturaldate(next_run), next_run.isoformat()),
"\n",
click.style(" Interval: ", bold=True),
str(task["current_interval"]),
@@ -213,10 +197,10 @@
import csv
import json
- import arrow
+ from swh.scheduler.utils import utcnow
tasks = []
- now = arrow.utcnow()
+ now = utcnow()
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
@@ -230,7 +214,7 @@
"args": args,
"kwargs": kwargs,
}
- task["next_run"] = DATETIME.convert(task.get("next_run", now), None, None)
+ task["next_run"] = task.get("next_run", now)
tasks.append(task)
created = scheduler.create_tasks(tasks)
@@ -273,7 +257,7 @@
Note: if the priority is not given, the task won't have the priority set,
which is considered as the lowest priority level.
"""
- import arrow
+ from swh.scheduler.utils import utcnow
from .utils import parse_options
@@ -281,7 +265,7 @@
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
- now = arrow.utcnow()
+ now = utcnow()
(args, kw) = parse_options(options)
task = {
@@ -289,7 +273,7 @@
"policy": policy,
"priority": priority,
"arguments": {"args": args, "kwargs": kw,},
- "next_run": DATETIME.convert(next_run or now, None, None),
+ "next_run": next_run or now,
}
created = scheduler.create_tasks([task])
@@ -587,13 +571,13 @@
swh-scheduler task respawn 1 3 12
"""
- import arrow
+ from swh.scheduler.utils import utcnow
scheduler = ctx.obj["scheduler"]
if not scheduler:
raise ValueError("Scheduler class (local/remote) must be instantiated")
if next_run is None:
- next_run = arrow.utcnow()
+ next_run = utcnow()
output = []
scheduler.set_status_tasks(
@@ -678,10 +662,9 @@
"""
from itertools import groupby
- import arrow
-
from swh.core.utils import grouper
from swh.scheduler.backend_es import ElasticSearchBackend
+ from swh.scheduler.utils import utcnow
config = ctx.obj["config"]
scheduler = ctx.obj["scheduler"]
@@ -699,7 +682,7 @@
logger.info("**NO CLEANUP**")
es_storage = ElasticSearchBackend(**config)
- now = arrow.utcnow()
+ now = utcnow()
# Default to archive tasks from a rolling month starting the week
# prior to the current one
diff --git a/swh/scheduler/tests/es/test_cli_task.py b/swh/scheduler/tests/es/test_cli_task.py
--- a/swh/scheduler/tests/es/test_cli_task.py
+++ b/swh/scheduler/tests/es/test_cli_task.py
@@ -8,11 +8,11 @@
import random
import uuid
-import arrow
from click.testing import CliRunner
import pytest
from swh.scheduler.cli import cli
+from swh.scheduler.utils import utcnow
from ..common import TASK_TYPES, TEMPLATES, tasks_from_template
@@ -28,7 +28,7 @@
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)
- next_run_start = arrow.utcnow().datetime - datetime.timedelta(days=1)
+ next_run_start = utcnow() - datetime.timedelta(days=1)
recurring = tasks_from_template(template_git, next_run_start, 100)
oneshots = tasks_from_template(
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -16,7 +16,7 @@
from swh.core.api.classes import stream_results
from swh.model.model import Origin
from swh.scheduler.cli import cli
-from swh.scheduler.utils import create_task_dict
+from swh.scheduler.utils import create_task_dict, utcnow
from swh.storage import get_storage
CLI_CONFIG = """
@@ -47,10 +47,10 @@
def test_schedule_tasks(swh_scheduler):
csv_data = (
b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
- + datetime.datetime.utcnow().isoformat().encode()
+ + utcnow().isoformat().encode()
+ b"\n"
+ b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
- + datetime.datetime.utcnow().isoformat().encode()
+ + utcnow().isoformat().encode()
+ b"\n"
)
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
@@ -63,7 +63,7 @@
Created 2 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
@@ -73,7 +73,7 @@
key: 'value'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
@@ -114,7 +114,7 @@
Created 1 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -139,7 +139,7 @@
Created 1 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
@@ -177,7 +177,7 @@
Found 1 swh-test-ping tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -229,7 +229,7 @@
Found 1 swh-test-ping tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -261,7 +261,7 @@
Found 2 swh-test-ping tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -270,7 +270,7 @@
key: 'value0'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -279,7 +279,7 @@
key: 'value1'
Task 3
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -315,7 +315,7 @@
Found 1 swh-test-ping tasks
Task 2
- Next run: in a day \(.*\)
+ Next run: tomorrow \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -342,7 +342,7 @@
Found 2 tasks
Task 1
- Next run: in 3 days \(.*\)
+ Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -353,7 +353,7 @@
key: 'value1'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -380,7 +380,7 @@
Found 1 tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -409,7 +409,7 @@
Found 2 tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -420,7 +420,7 @@
key: 'value2'
Task 3
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -449,7 +449,7 @@
Found 2 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -460,7 +460,7 @@
key: 'value1'
Task 3
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -487,7 +487,7 @@
Found 2 tasks
Task 1
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -498,7 +498,7 @@
key: 'value1'
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -536,7 +536,7 @@
Found 1 tasks
Task 2
- Next run: just now \(.*\)
+ Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
@@ -574,7 +574,7 @@
Found 1 tasks
Task 1
- Next run: in 3 days \(.*\)
+ Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -11,16 +11,18 @@
from typing import Any, Dict, List, Optional
import uuid
-from arrow import utcnow
import attr
import pytest
from swh.scheduler.exc import StaleData
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin, ListedOriginPageToken
+from swh.scheduler.utils import utcnow
from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template
+ONEDAY = datetime.timedelta(days=1)
+
def subdict(d, keys=None, excl=()):
if keys is None:
@@ -394,10 +396,10 @@
# no pagination scenario
# retrieve tasks to archive
- after = _time.shift(days=-1)
- after_ts = after.format("YYYY-MM-DD")
- before = utcnow().shift(days=1)
- before_ts = before.format("YYYY-MM-DD")
+ after = _time - ONEDAY
+ after_ts = after.strftime("%Y-%m-%d")
+ before = utcnow() + ONEDAY
+ before_ts = before.strftime("%Y-%m-%d")
tasks_result = swh_scheduler.filter_task_to_archive(
after_ts=after_ts, before_ts=before_ts, limit=total_tasks
)
diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py
--- a/swh/scheduler/utils.py
+++ b/swh/scheduler/utils.py
@@ -7,6 +7,10 @@
from datetime import datetime, timezone
+def utcnow():
+ return datetime.now(tz=timezone.utc)
+
+
def get_task(task_name):
"""Retrieve task object in our application instance by its fully
qualified python name.
@@ -50,7 +54,7 @@
task = {
"policy": policy,
"type": type,
- "next_run": datetime.now(tz=timezone.utc),
+ "next_run": utcnow(),
"arguments": {
"args": args if args else [],
"kwargs": kwargs if kwargs else {},
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Aug 24, 6:04 PM (1 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226186
Attached To
D4642: Replace usage of arrow datetime objects in favor of pure datetime ones
Event Timeline
Log In to Comment