Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 12 Lines | |||||
from attrs_strict import type_validator | from attrs_strict import type_validator | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from typing_extensions import Final | from typing_extensions import Final | ||||
from .collections import ImmutableDict | from .collections import ImmutableDict | ||||
from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes | from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes | ||||
from .identifiers import ( | from .identifiers import ( | ||||
SWHID, | |||||
directory_identifier, | directory_identifier, | ||||
normalize_timestamp, | normalize_timestamp, | ||||
parse_swhid, | parse_swhid, | ||||
release_identifier, | release_identifier, | ||||
revision_identifier, | revision_identifier, | ||||
snapshot_identifier, | snapshot_identifier, | ||||
) | ) | ||||
from .swhid import SWHID, SWHIDObjectType | |||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 232 Lines • ▼ Show 20 Lines | class Origin(BaseModel): | ||||
object_type: Final = "origin" | object_type: Final = "origin" | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"url": self.url} | return {"url": self.url} | ||||
@property | |||||
def id(self) -> bytes: | |||||
return self.url.encode() | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents an origin visit with a given type at a given point in time, by a | """Represents an origin visit with a given type at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
object_type: Final = "origin_visit" | object_type: Final = "origin_visit" | ||||
▲ Show 20 Lines • Show All 416 Lines • ▼ Show 20 Lines | def with_data(self) -> "Content": | ||||
to lazy-load data (eg. from disk or objstorage).""" | to lazy-load data (eg. from disk or objstorage).""" | ||||
if self.data is None: | if self.data is None: | ||||
raise MissingData("Content data is None.") | raise MissingData("Content data is None.") | ||||
return self | return self | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return self.sha1 # TODO: use a dict of hashes | return self.sha1 # TODO: use a dict of hashes | ||||
@property | |||||
def id(self): | |||||
return self.sha1_git | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
object_type: Final = "skipped_content" | object_type: Final = "skipped_content" | ||||
sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
▲ Show 20 Lines • Show All 128 Lines • ▼ Show 20 Lines | def to_dict(self): | ||||
if d["metadata"] is None: | if d["metadata"] is None: | ||||
del d["metadata"] | del d["metadata"] | ||||
return d | return d | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"name": self.name, "version": self.version} | return {"name": self.name, "version": self.version} | ||||
class MetadataTargetType(Enum): | |||||
"""The type of object extrinsic metadata refer to.""" | |||||
CONTENT = "content" | |||||
DIRECTORY = "directory" | |||||
REVISION = "revision" | |||||
RELEASE = "release" | |||||
SNAPSHOT = "snapshot" | |||||
ORIGIN = "origin" | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class RawExtrinsicMetadata(BaseModel): | class RawExtrinsicMetadata(BaseModel): | ||||
object_type: Final = "raw_extrinsic_metadata" | object_type: Final = "raw_extrinsic_metadata" | ||||
# target object | # target object | ||||
type = attr.ib(type=MetadataTargetType, validator=type_validator()) | target = attr.ib(type=SWHID, validator=[type_validator()]) | ||||
target = attr.ib(type=Union[str, SWHID], validator=type_validator()) | |||||
"""URL if type=MetadataTargetType.ORIGIN, else core SWHID""" | |||||
# source | # source | ||||
discovery_date = attr.ib(type=datetime.datetime, validator=type_validator()) | discovery_date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
authority = attr.ib(type=MetadataAuthority, validator=type_validator()) | authority = attr.ib(type=MetadataAuthority, validator=type_validator()) | ||||
fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) | fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) | ||||
# the metadata itself | # the metadata itself | ||||
format = attr.ib(type=str, validator=type_validator()) | format = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib(type=bytes, validator=type_validator()) | metadata = attr.ib(type=bytes, validator=type_validator()) | ||||
# context | # context | ||||
origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | origin = attr.ib(type=Optional[SWHID], default=None, validator=[type_validator()]) | ||||
visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) | visit = attr.ib(type=Optional[int], default=None, validator=[type_validator()]) | ||||
snapshot = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | snapshot = attr.ib(type=Optional[SWHID], default=None, validator=[type_validator()]) | ||||
release = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | release = attr.ib(type=Optional[SWHID], default=None, validator=[type_validator()]) | ||||
revision = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | revision = attr.ib(type=Optional[SWHID], default=None, validator=[type_validator()]) | ||||
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | path = attr.ib(type=Optional[bytes], default=None, validator=[type_validator()]) | ||||
directory = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | directory = attr.ib( | ||||
type=Optional[SWHID], default=None, validator=[type_validator()] | |||||
) | |||||
@property | |||||
def type(self): | |||||
return self.target.object_type | |||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
if self.type == MetadataTargetType.ORIGIN: | self._check_swhid(self.type, value) | ||||
if isinstance(value, SWHID) or value.startswith("swh:"): | |||||
raise ValueError( | |||||
"Got SWHID as target for origin metadata (expected an URL)." | |||||
) | |||||
else: | |||||
self._check_swhid(self.type.value, value) | |||||
@discovery_date.validator | @discovery_date.validator | ||||
def check_discovery_date(self, attribute, value): | def check_discovery_date(self, attribute, value): | ||||
"""Checks the discovery_date has a timezone.""" | """Checks the discovery_date has a timezone.""" | ||||
if value is not None and value.tzinfo is None: | if value is not None and value.tzinfo is None: | ||||
raise ValueError("discovery_date must be a timezone-aware datetime.") | raise ValueError("discovery_date must be a timezone-aware datetime.") | ||||
@origin.validator | @origin.validator | ||||
def check_origin(self, attribute, value): | def check_origin(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in ( | if self.type == SWHIDObjectType.ORIGIN: | ||||
MetadataTargetType.SNAPSHOT, | raise ValueError(f"Unexpected 'origin' context for object: {value}") | ||||
MetadataTargetType.RELEASE, | |||||
MetadataTargetType.REVISION, | |||||
MetadataTargetType.DIRECTORY, | |||||
MetadataTargetType.CONTENT, | |||||
): | |||||
raise ValueError( | |||||
f"Unexpected 'origin' context for {self.type.value} object: {value}" | |||||
) | |||||
if value.startswith("swh:"): | if value.object_type != SWHIDObjectType.ORIGIN: | ||||
# Technically this is valid; but: | raise ValueError(f"Non origin SWHID used as context origin: {value}") | ||||
# 1. SWHIDs are URIs, not URLs | |||||
# 2. if a SWHID gets here, it's very likely to be a mistake | |||||
# (and we can remove this check if it turns out there is a | |||||
# legitimate use for it). | |||||
raise ValueError(f"SWHID used as context origin URL: {value}") | |||||
@visit.validator | @visit.validator | ||||
def check_visit(self, attribute, value): | def check_visit(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in ( | if self.type not in ( | ||||
MetadataTargetType.SNAPSHOT, | SWHIDObjectType.SNAPSHOT, | ||||
MetadataTargetType.RELEASE, | SWHIDObjectType.RELEASE, | ||||
MetadataTargetType.REVISION, | SWHIDObjectType.REVISION, | ||||
MetadataTargetType.DIRECTORY, | SWHIDObjectType.DIRECTORY, | ||||
MetadataTargetType.CONTENT, | SWHIDObjectType.CONTENT, | ||||
): | ): | ||||
raise ValueError( | raise ValueError(f"Unexpected 'visit' context for object: {value}") | ||||
f"Unexpected 'visit' context for {self.type.value} object: {value}" | |||||
) | |||||
if self.origin is None: | if self.origin is None: | ||||
raise ValueError("'origin' context must be set if 'visit' is.") | raise ValueError("'origin' context must be set if 'visit' is.") | ||||
if value <= 0: | if value <= 0: | ||||
raise ValueError("Nonpositive visit id") | raise ValueError("Nonpositive visit id") | ||||
@snapshot.validator | @snapshot.validator | ||||
def check_snapshot(self, attribute, value): | def check_snapshot(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in ( | if self.type not in ( | ||||
MetadataTargetType.RELEASE, | SWHIDObjectType.RELEASE, | ||||
MetadataTargetType.REVISION, | SWHIDObjectType.REVISION, | ||||
MetadataTargetType.DIRECTORY, | SWHIDObjectType.DIRECTORY, | ||||
MetadataTargetType.CONTENT, | SWHIDObjectType.CONTENT, | ||||
): | ): | ||||
raise ValueError( | raise ValueError(f"Unexpected 'snapshot' context for object: {value}") | ||||
f"Unexpected 'snapshot' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_swhid("snapshot", value) | self._check_swhid(SWHIDObjectType.SNAPSHOT, value) | ||||
@release.validator | @release.validator | ||||
def check_release(self, attribute, value): | def check_release(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in ( | if self.type not in ( | ||||
MetadataTargetType.REVISION, | SWHIDObjectType.REVISION, | ||||
MetadataTargetType.DIRECTORY, | SWHIDObjectType.DIRECTORY, | ||||
MetadataTargetType.CONTENT, | SWHIDObjectType.CONTENT, | ||||
): | ): | ||||
raise ValueError( | raise ValueError(f"Unexpected 'release' context for object: {value}") | ||||
f"Unexpected 'release' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_swhid("release", value) | self._check_swhid(SWHIDObjectType.RELEASE, value) | ||||
@revision.validator | @revision.validator | ||||
def check_revision(self, attribute, value): | def check_revision(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): | if self.type not in (SWHIDObjectType.DIRECTORY, SWHIDObjectType.CONTENT,): | ||||
raise ValueError( | raise ValueError(f"Unexpected 'revision' context for object: {value}") | ||||
f"Unexpected 'revision' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_swhid("revision", value) | self._check_swhid(SWHIDObjectType.REVISION, value) | ||||
@path.validator | @path.validator | ||||
def check_path(self, attribute, value): | def check_path(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): | if self.type not in (SWHIDObjectType.DIRECTORY, SWHIDObjectType.CONTENT,): | ||||
raise ValueError( | raise ValueError(f"Unexpected 'path' context for object: {value}") | ||||
f"Unexpected 'path' context for {self.type.value} object: {value}" | |||||
) | |||||
@directory.validator | @directory.validator | ||||
def check_directory(self, attribute, value): | def check_directory(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.type not in (MetadataTargetType.CONTENT,): | if self.type not in (SWHIDObjectType.CONTENT,): | ||||
raise ValueError( | raise ValueError(f"Unexpected 'directory' context for object: {value}") | ||||
f"Unexpected 'directory' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_swhid("directory", value) | self._check_swhid(SWHIDObjectType.DIRECTORY, value) | ||||
def _check_swhid(self, expected_object_type, swhid): | def _check_swhid(self, expected_object_type, swhid): | ||||
if isinstance(swhid, str): | if isinstance(swhid, str): | ||||
raise ValueError(f"Expected SWHID, got a string: {swhid}") | raise ValueError(f"Expected SWHID, got a string: {swhid}") | ||||
if swhid.object_type != expected_object_type: | if swhid.object_type != expected_object_type: | ||||
raise ValueError( | raise ValueError( | ||||
f"Expected SWHID type '{expected_object_type}', " | f"Expected SWHID type '{expected_object_type.value}', " | ||||
f"got '{swhid.object_type}' in {swhid}" | f"got '{swhid.object_type.value}' in {swhid}" | ||||
) | ) | ||||
if swhid.metadata: | if swhid.metadata: | ||||
raise ValueError(f"Expected core SWHID, but got: {swhid}") | raise ValueError(f"Expected core SWHID, but got: {swhid}") | ||||
def to_dict(self): | def to_dict(self): | ||||
d = super().to_dict() | d = super().to_dict() | ||||
context_keys = ( | context_keys = ( | ||||
"origin", | "origin", | ||||
"visit", | "visit", | ||||
"snapshot", | "snapshot", | ||||
"release", | "release", | ||||
"revision", | "revision", | ||||
"directory", | "directory", | ||||
"path", | "path", | ||||
) | ) | ||||
for context_key in context_keys: | for context_key in context_keys: | ||||
if d[context_key] is None: | if d[context_key] is None: | ||||
del d[context_key] | del d[context_key] | ||||
return d | return d | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = { | d = { | ||||
**d, | **d, | ||||
"type": MetadataTargetType(d["type"]), | |||||
"authority": MetadataAuthority.from_dict(d["authority"]), | "authority": MetadataAuthority.from_dict(d["authority"]), | ||||
"fetcher": MetadataFetcher.from_dict(d["fetcher"]), | "fetcher": MetadataFetcher.from_dict(d["fetcher"]), | ||||
} | } | ||||
swhid_keys = ("target", "snapshot", "release", "revision", "directory") | |||||
if d["type"] != MetadataTargetType.ORIGIN: | for k in swhid_keys: | ||||
d["target"] = parse_swhid(d["target"]) | if k in d and isinstance(d[k], str): | ||||
d[k] = parse_swhid(d[k]) | |||||
swhid_keys = ("snapshot", "release", "revision", "directory") | |||||
for swhid_key in swhid_keys: | |||||
if d.get(swhid_key): | |||||
d[swhid_key] = parse_swhid(d[swhid_key]) | |||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return { | return { | ||||
"type": self.type.value, | |||||
"target": str(self.target), | "target": str(self.target), | ||||
"authority_type": self.authority.type.value, | "authority_type": self.authority.type.value, | ||||
"authority_url": self.authority.url, | "authority_url": self.authority.url, | ||||
"discovery_date": str(self.discovery_date), | "discovery_date": str(self.discovery_date), | ||||
"fetcher_name": self.fetcher.name, | "fetcher_name": self.fetcher.name, | ||||
"fetcher_version": self.fetcher.version, | "fetcher_version": self.fetcher.version, | ||||
} | } |