Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9338850
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
index 2771aca..64df67a 100644
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -1,102 +1,136 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from typing import TYPE_CHECKING, Iterable, List, Optional
import click
from . import cli
if TYPE_CHECKING:
from ..model import ListedOrigin
@cli.group("origin")
@click.pass_context
def origin(ctx):
"""Manipulate listed origins."""
if not ctx.obj["scheduler"]:
raise ValueError("Scheduler class (local/remote) must be instantiated")
def format_origins(
origins: List[ListedOrigin],
fields: Optional[List[str]] = None,
with_header: bool = True,
) -> Iterable[str]:
"""Format a list of origins as CSV.
Arguments:
origins: list of origins to output
fields: optional list of fields to output (defaults to all fields)
with_header: if True, output a CSV header.
"""
import csv
from io import StringIO
import attr
from ..model import ListedOrigin
expected_fields = [field.name for field in attr.fields(ListedOrigin)]
if not fields:
fields = expected_fields
unknown_fields = set(fields) - set(expected_fields)
if unknown_fields:
raise ValueError(
"Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields)
)
output = StringIO()
writer = csv.writer(output)
def csv_row(data):
"""Return a single CSV-formatted row. We clear the output buffer after we're
done to keep it reasonably sized."""
writer.writerow(data)
output.seek(0)
ret = output.read().rstrip()
output.seek(0)
output.truncate()
return ret
if with_header:
yield csv_row(fields)
for origin in origins:
yield csv_row(str(getattr(origin, field)) for field in fields)
@origin.command("grab-next")
@click.option(
"--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy"
)
@click.option(
"--fields", "-f", default=None, help="Listed origin fields to print on output"
)
@click.option(
"--with-header/--without-header",
is_flag=True,
default=True,
help="Print the CSV header?",
)
@click.argument("count", type=int)
@click.pass_context
def grab_next(ctx, policy: str, fields: Optional[str], with_header: bool, count: int):
"""Grab the next COUNT origins to visit from the listed origins table."""
if fields:
parsed_fields: Optional[List[str]] = fields.split(",")
else:
parsed_fields = None
scheduler = ctx.obj["scheduler"]
origins = scheduler.grab_next_visits(count, policy=policy)
for line in format_origins(origins, fields=parsed_fields, with_header=with_header):
click.echo(line)
+
+
+@origin.command("schedule-next")
+@click.option(
+ "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy"
+)
+@click.argument("count", type=int)
+@click.pass_context
+def schedule_next(ctx, policy: str, count: int):
+ """Send the next COUNT origin visits to the scheduler as one-shot tasks."""
+ from ..utils import utcnow
+ from .task import pretty_print_task
+
+ scheduler = ctx.obj["scheduler"]
+
+ origins = scheduler.grab_next_visits(count, policy=policy)
+
+ created = scheduler.create_tasks(
+ [
+ {
+ **origin.as_task_dict(),
+ "policy": "oneshot",
+ "next_run": utcnow(),
+ "retries_left": 1,
+ }
+ for origin in origins
+ ]
+ )
+
+ output = ["Created %d tasks\n" % len(created)]
+ for task in created:
+ output.append(pretty_print_task(task))
+
+ click.echo_via_pager("\n".join(output))
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
index f404ff9..d425b9e 100644
--- a/swh/scheduler/model.py
+++ b/swh/scheduler/model.py
@@ -1,235 +1,244 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import UUID
import attr
import attr.converters
from attrs_strict import type_validator
def check_timestamptz(value) -> None:
"""Checks the date has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
@attr.s
class BaseSchedulerModel:
"""Base class for database-backed objects.
These database-backed objects are defined through attrs-based attributes
that match the columns of the database 1:1. This is a (very) lightweight
ORM.
These attrs-based attributes have metadata specific to the functionality
expected from these fields in the database:
- `primary_key`: the column is a primary key; it should be filtered out
when doing an `update` of the object
- `auto_primary_key`: the column is a primary key, which is automatically handled
by the database. It will not be inserted to. This must be matched with a
database-side default value.
- `auto_now_add`: the column is a timestamp that is set to the current time when
the object is inserted, and never updated afterwards. This must be matched with
a database-side default value.
- `auto_now`: the column is a timestamp that is set to the current time when
the object is inserted or updated.
"""
_pk_cols: Optional[Tuple[str, ...]] = None
_select_cols: Optional[Tuple[str, ...]] = None
_insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None
@classmethod
def primary_key_columns(cls) -> Tuple[str, ...]:
"""Get the primary key columns for this object type"""
if cls._pk_cols is None:
columns: List[str] = []
for field in attr.fields(cls):
if any(
field.metadata.get(flag)
for flag in ("auto_primary_key", "primary_key")
):
columns.append(field.name)
cls._pk_cols = tuple(sorted(columns))
return cls._pk_cols
@classmethod
def select_columns(cls) -> Tuple[str, ...]:
"""Get all the database columns needed for a `select` on this object type"""
if cls._select_cols is None:
columns: List[str] = []
for field in attr.fields(cls):
columns.append(field.name)
cls._select_cols = tuple(sorted(columns))
return cls._select_cols
@classmethod
def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]:
"""Get the database columns and metavars needed for an `insert` or `update` on
this object type.
This implements support for the `auto_*` field metadata attributes.
"""
if cls._insert_cols_and_metavars is None:
zipped_cols_and_metavars: List[Tuple[str, str]] = []
for field in attr.fields(cls):
if any(
field.metadata.get(flag)
for flag in ("auto_now_add", "auto_primary_key")
):
continue
elif field.metadata.get("auto_now"):
zipped_cols_and_metavars.append((field.name, "now()"))
else:
zipped_cols_and_metavars.append((field.name, f"%({field.name})s"))
zipped_cols_and_metavars.sort()
cols, metavars = zip(*zipped_cols_and_metavars)
cls._insert_cols_and_metavars = cols, metavars
return cls._insert_cols_and_metavars
@attr.s
class Lister(BaseSchedulerModel):
name = attr.ib(type=str, validator=[type_validator()])
instance_name = attr.ib(type=str, validator=[type_validator()])
# Populated by database
id = attr.ib(
type=Optional[UUID],
validator=type_validator(),
default=None,
metadata={"auto_primary_key": True},
)
current_state = attr.ib(
type=Dict[str, Any], validator=[type_validator()], factory=dict
)
created = attr.ib(
type=Optional[datetime.datetime],
validator=[type_validator()],
default=None,
metadata={"auto_now_add": True},
)
updated = attr.ib(
type=Optional[datetime.datetime],
validator=[type_validator()],
default=None,
metadata={"auto_now": True},
)
@attr.s
class ListedOrigin(BaseSchedulerModel):
"""Basic information about a listed origin, output by a lister"""
lister_id = attr.ib(
type=UUID, validator=[type_validator()], metadata={"primary_key": True}
)
url = attr.ib(
type=str, validator=[type_validator()], metadata={"primary_key": True}
)
visit_type = attr.ib(
type=str, validator=[type_validator()], metadata={"primary_key": True}
)
extra_loader_arguments = attr.ib(
type=Dict[str, str], validator=[type_validator()], factory=dict
)
last_update = attr.ib(
type=Optional[datetime.datetime], validator=[type_validator()], default=None,
)
last_scheduled = attr.ib(
type=Optional[datetime.datetime], validator=[type_validator()], default=None,
)
enabled = attr.ib(type=bool, validator=[type_validator()], default=True)
first_seen = attr.ib(
type=Optional[datetime.datetime],
validator=[type_validator()],
default=None,
metadata={"auto_now_add": True},
)
last_seen = attr.ib(
type=Optional[datetime.datetime],
validator=[type_validator()],
default=None,
metadata={"auto_now": True},
)
+ def as_task_dict(self):
+ return {
+ "type": f"load-{self.visit_type}",
+ "arguments": {
+ "args": [],
+ "kwargs": {"url": self.url, **self.extra_loader_arguments},
+ },
+ }
+
ListedOriginPageToken = Tuple[UUID, str]
def convert_listed_origin_page_token(
input: Union[None, ListedOriginPageToken, List[Union[UUID, str]]]
) -> Optional[ListedOriginPageToken]:
if input is None:
return None
if isinstance(input, tuple):
return input
x, y = input
assert isinstance(x, UUID)
assert isinstance(y, str)
return (x, y)
@attr.s
class PaginatedListedOriginList(BaseSchedulerModel):
"""A list of listed origins, with a continuation token"""
origins = attr.ib(type=List[ListedOrigin], validator=[type_validator()])
next_page_token = attr.ib(
type=Optional[ListedOriginPageToken],
validator=[type_validator()],
converter=convert_listed_origin_page_token,
default=None,
)
@attr.s(frozen=True, slots=True)
class OriginVisitStats(BaseSchedulerModel):
"""Represents an aggregated origin visits view.
"""
url = attr.ib(
type=str, validator=[type_validator()], metadata={"primary_key": True}
)
visit_type = attr.ib(
type=str, validator=[type_validator()], metadata={"primary_key": True}
)
last_eventful = attr.ib(
type=Optional[datetime.datetime], validator=type_validator()
)
last_uneventful = attr.ib(
type=Optional[datetime.datetime], validator=type_validator()
)
last_failed = attr.ib(type=Optional[datetime.datetime], validator=type_validator())
@last_eventful.validator
def check_last_eventful(self, attribute, value):
check_timestamptz(value)
@last_uneventful.validator
def check_last_uneventful(self, attribute, value):
check_timestamptz(value)
@last_failed.validator
def check_last_failed(self, attribute, value):
check_timestamptz(value)
diff --git a/swh/scheduler/tests/test_cli_origin.py b/swh/scheduler/tests/test_cli_origin.py
index 87812ad..b570484 100644
--- a/swh/scheduler/tests/test_cli_origin.py
+++ b/swh/scheduler/tests/test_cli_origin.py
@@ -1,76 +1,103 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Tuple
import pytest
from swh.scheduler.cli.origin import format_origins
+from swh.scheduler.tests.common import TASK_TYPES
from swh.scheduler.tests.test_cli import invoke as basic_invoke
def invoke(scheduler, args: Tuple[str, ...] = (), catch_exceptions: bool = False):
return basic_invoke(
scheduler, args=["origin", *args], catch_exceptions=catch_exceptions
)
def test_cli_origin(swh_scheduler):
"""Check that swh scheduler origin returns its help text"""
result = invoke(swh_scheduler)
assert "Commands:" in result.stdout
def test_format_origins_basic(listed_origins):
listed_origins = listed_origins[:100]
basic_output = list(format_origins(listed_origins))
# 1 header line + all origins
assert len(basic_output) == len(listed_origins) + 1
no_header_output = list(format_origins(listed_origins, with_header=False))
assert basic_output[1:] == no_header_output
def test_format_origins_fields_unknown(listed_origins):
listed_origins = listed_origins[:10]
it = format_origins(listed_origins, fields=["unknown_field"])
with pytest.raises(ValueError, match="unknown_field"):
next(it)
def test_format_origins_fields(listed_origins):
listed_origins = listed_origins[:10]
fields = ["lister_id", "url", "visit_type"]
output = list(format_origins(listed_origins, fields=fields))
assert output[0] == ",".join(fields)
for i, origin in enumerate(listed_origins):
assert output[i + 1] == f"{origin.lister_id},{origin.url},{origin.visit_type}"
def test_grab_next(swh_scheduler, listed_origins):
num_origins = 10
assert len(listed_origins) >= num_origins
swh_scheduler.record_listed_origins(listed_origins)
result = invoke(swh_scheduler, args=("grab-next", str(num_origins)))
assert result.exit_code == 0
out_lines = result.stdout.splitlines()
assert len(out_lines) == num_origins + 1
fields = out_lines[0].split(",")
returned_origins = [dict(zip(fields, line.split(","))) for line in out_lines[1:]]
# Check that we've received origins we had listed in the first place
assert set(origin["url"] for origin in returned_origins) <= set(
origin.url for origin in listed_origins
)
+
+
+def test_schedule_next(swh_scheduler, listed_origins):
+ for task_type in TASK_TYPES.values():
+ swh_scheduler.create_task_type(task_type)
+
+ num_origins = 10
+ assert len(listed_origins) >= num_origins
+
+ swh_scheduler.record_listed_origins(listed_origins)
+
+ result = invoke(swh_scheduler, args=("schedule-next", str(num_origins)))
+ assert result.exit_code == 0
+
+ # pull all tasks out of the scheduler
+ tasks = swh_scheduler.search_tasks()
+ assert len(tasks) == num_origins
+
+ scheduled_tasks = {
+ (task["type"], task["arguments"]["kwargs"]["url"]) for task in tasks
+ }
+ all_possible_tasks = {
+ (f"load-{origin.visit_type}", origin.url) for origin in listed_origins
+ }
+
+ assert scheduled_tasks <= all_possible_tasks
diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py
index 47bb618..1293e64 100644
--- a/swh/scheduler/tests/test_model.py
+++ b/swh/scheduler/tests/test_model.py
@@ -1,94 +1,123 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
+import uuid
import attr
from swh.scheduler import model
def test_select_columns():
@attr.s
class TestModel(model.BaseSchedulerModel):
id = attr.ib(type=str)
test1 = attr.ib(type=str)
a_first_attr = attr.ib(type=str)
@property
def test2(self):
"""This property should not show up in the extracted columns"""
return self.test1
assert TestModel.select_columns() == ("a_first_attr", "id", "test1")
def test_insert_columns():
@attr.s
class TestModel(model.BaseSchedulerModel):
id = attr.ib(type=str)
test1 = attr.ib(type=str)
@property
def test2(self):
"""This property should not show up in the extracted columns"""
return self.test1
assert TestModel.insert_columns_and_metavars() == (
("id", "test1"),
("%(id)s", "%(test1)s"),
)
def test_insert_columns_auto_now_add():
@attr.s
class TestModel(model.BaseSchedulerModel):
id = attr.ib(type=str)
test1 = attr.ib(type=str)
added = attr.ib(type=datetime.datetime, metadata={"auto_now_add": True})
assert TestModel.insert_columns_and_metavars() == (
("id", "test1"),
("%(id)s", "%(test1)s"),
)
def test_insert_columns_auto_now():
@attr.s
class TestModel(model.BaseSchedulerModel):
id = attr.ib(type=str)
test1 = attr.ib(type=str)
updated = attr.ib(type=datetime.datetime, metadata={"auto_now": True})
assert TestModel.insert_columns_and_metavars() == (
("id", "test1", "updated"),
("%(id)s", "%(test1)s", "now()"),
)
def test_insert_columns_primary_key():
@attr.s
class TestModel(model.BaseSchedulerModel):
id = attr.ib(type=str, metadata={"auto_primary_key": True})
test1 = attr.ib(type=str)
assert TestModel.insert_columns_and_metavars() == (("test1",), ("%(test1)s",))
def test_insert_primary_key():
@attr.s
class TestModel(model.BaseSchedulerModel):
id = attr.ib(type=str, metadata={"auto_primary_key": True})
test1 = attr.ib(type=str)
assert TestModel.primary_key_columns() == ("id",)
@attr.s
class TestModel2(model.BaseSchedulerModel):
col1 = attr.ib(type=str, metadata={"primary_key": True})
col2 = attr.ib(type=str, metadata={"primary_key": True})
test1 = attr.ib(type=str)
assert TestModel2.primary_key_columns() == ("col1", "col2")
+
+
+def test_listed_origin_as_task_dict():
+ origin = model.ListedOrigin(
+ lister_id=uuid.uuid4(), url="http://example.com/", visit_type="git",
+ )
+
+ task = origin.as_task_dict()
+ assert task == {
+ "type": "load-git",
+ "arguments": {"args": [], "kwargs": {"url": "http://example.com/"}},
+ }
+
+ origin_w_args = model.ListedOrigin(
+ lister_id=uuid.uuid4(),
+ url="http://example.com/svn/",
+ visit_type="svn",
+ extra_loader_arguments={"foo": "bar"},
+ )
+
+ task_w_args = origin_w_args.as_task_dict()
+ assert task_w_args == {
+ "type": "load-svn",
+ "arguments": {
+ "args": [],
+ "kwargs": {"url": "http://example.com/svn/", "foo": "bar"},
+ },
+ }
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:10 AM (6 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3248221
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment