Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/model.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from typing import Any, Dict, List, Optional, Tuple, Union | from typing import Any, Dict, List, Optional, Tuple, Union | ||||
from uuid import UUID | from uuid import UUID | ||||
▲ Show 20 Lines • Show All 150 Lines • ▼ Show 20 Lines | class ListedOrigin(BaseSchedulerModel): | ||||
) | ) | ||||
last_seen = attr.ib( | last_seen = attr.ib( | ||||
type=Optional[datetime.datetime], | type=Optional[datetime.datetime], | ||||
validator=[type_validator()], | validator=[type_validator()], | ||||
default=None, | default=None, | ||||
metadata={"auto_now": True}, | metadata={"auto_now": True}, | ||||
) | ) | ||||
def as_task_dict(self): | |||||
return { | |||||
"type": f"load-{self.visit_type}", | |||||
"arguments": { | |||||
"args": [], | |||||
"kwargs": {"url": self.url, **self.extra_loader_arguments}, | |||||
}, | |||||
} | |||||
ListedOriginPageToken = Tuple[UUID, str] | ListedOriginPageToken = Tuple[UUID, str] | ||||
def convert_listed_origin_page_token( | def convert_listed_origin_page_token( | ||||
input: Union[None, ListedOriginPageToken, List[Union[UUID, str]]] | input: Union[None, ListedOriginPageToken, List[Union[UUID, str]]] | ||||
) -> Optional[ListedOriginPageToken]: | ) -> Optional[ListedOriginPageToken]: | ||||
if input is None: | if input is None: | ||||
Show All 22 Lines |