Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 15 Lines | |||||
from typing_extensions import Final | from typing_extensions import Final | ||||
from .collections import ImmutableDict | from .collections import ImmutableDict | ||||
from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes | from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes | ||||
from .identifiers import ( | from .identifiers import ( | ||||
directory_identifier, | directory_identifier, | ||||
normalize_timestamp, | normalize_timestamp, | ||||
origin_identifier, | origin_identifier, | ||||
parse_swhid, | |||||
release_identifier, | release_identifier, | ||||
revision_identifier, | revision_identifier, | ||||
snapshot_identifier, | snapshot_identifier, | ||||
) | ) | ||||
from .identifiers import CoreSWHID | |||||
from .identifiers import ExtendedObjectType as SwhidExtendedObjectType | from .identifiers import ExtendedObjectType as SwhidExtendedObjectType | ||||
from .identifiers import ExtendedSWHID | from .identifiers import ExtendedSWHID | ||||
from .identifiers import ObjectType as SwhidObjectType | from .identifiers import ObjectType as SwhidObjectType | ||||
from .identifiers import SWHID, CoreSWHID | |||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
Show All 21 Lines | ) -> Optional[ImmutableDict[KT, VT]]: | ||||
else: | else: | ||||
return d | return d | ||||
def dictify(value): | def dictify(value): | ||||
"Helper function used by BaseModel.to_dict()" | "Helper function used by BaseModel.to_dict()" | ||||
if isinstance(value, BaseModel): | if isinstance(value, BaseModel): | ||||
return value.to_dict() | return value.to_dict() | ||||
elif isinstance(value, (SWHID, ExtendedSWHID)): | elif isinstance(value, (CoreSWHID, ExtendedSWHID)): | ||||
return str(value) | return str(value) | ||||
elif isinstance(value, Enum): | elif isinstance(value, Enum): | ||||
return value.value | return value.value | ||||
elif isinstance(value, (dict, ImmutableDict)): | elif isinstance(value, (dict, ImmutableDict)): | ||||
return {k: dictify(v) for k, v in value.items()} | return {k: dictify(v) for k, v in value.items()} | ||||
elif isinstance(value, tuple): | elif isinstance(value, tuple): | ||||
return tuple(dictify(v) for v in value) | return tuple(dictify(v) for v in value) | ||||
else: | else: | ||||
▲ Show 20 Lines • Show All 816 Lines • ▼ Show 20 Lines | class RawExtrinsicMetadata(BaseModel): | ||||
# the metadata itself | # the metadata itself | ||||
format = attr.ib(type=str, validator=type_validator()) | format = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib(type=bytes, validator=type_validator()) | metadata = attr.ib(type=bytes, validator=type_validator()) | ||||
# context | # context | ||||
origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | ||||
visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) | visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) | ||||
snapshot = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | snapshot = attr.ib( | ||||
release = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | type=Optional[CoreSWHID], default=None, validator=type_validator() | ||||
revision = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | ) | ||||
release = attr.ib( | |||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | |||||
) | |||||
revision = attr.ib( | |||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | |||||
) | |||||
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | ||||
directory = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | directory = attr.ib( | ||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | |||||
) | |||||
@discovery_date.validator | @discovery_date.validator | ||||
def check_discovery_date(self, attribute, value): | def check_discovery_date(self, attribute, value): | ||||
"""Checks the discovery_date has a timezone.""" | """Checks the discovery_date has a timezone.""" | ||||
if value is not None and value.tzinfo is None: | if value is not None and value.tzinfo is None: | ||||
raise ValueError("discovery_date must be a timezone-aware datetime.") | raise ValueError("discovery_date must be a timezone-aware datetime.") | ||||
@origin.validator | @origin.validator | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | def check_snapshot(self, attribute, value): | ||||
SwhidExtendedObjectType.DIRECTORY, | SwhidExtendedObjectType.DIRECTORY, | ||||
SwhidExtendedObjectType.CONTENT, | SwhidExtendedObjectType.CONTENT, | ||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'snapshot' context for " | f"Unexpected 'snapshot' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid("snapshot", value) | self._check_swhid(SwhidObjectType.SNAPSHOT, value) | ||||
@release.validator | @release.validator | ||||
def check_release(self, attribute, value): | def check_release(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in ( | if self.target.object_type not in ( | ||||
SwhidExtendedObjectType.REVISION, | SwhidExtendedObjectType.REVISION, | ||||
SwhidExtendedObjectType.DIRECTORY, | SwhidExtendedObjectType.DIRECTORY, | ||||
SwhidExtendedObjectType.CONTENT, | SwhidExtendedObjectType.CONTENT, | ||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'release' context for " | f"Unexpected 'release' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid("release", value) | self._check_swhid(SwhidObjectType.RELEASE, value) | ||||
@revision.validator | @revision.validator | ||||
def check_revision(self, attribute, value): | def check_revision(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in ( | if self.target.object_type not in ( | ||||
SwhidExtendedObjectType.DIRECTORY, | SwhidExtendedObjectType.DIRECTORY, | ||||
SwhidExtendedObjectType.CONTENT, | SwhidExtendedObjectType.CONTENT, | ||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'revision' context for " | f"Unexpected 'revision' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid("revision", value) | self._check_swhid(SwhidObjectType.REVISION, value) | ||||
@path.validator | @path.validator | ||||
def check_path(self, attribute, value): | def check_path(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in ( | if self.target.object_type not in ( | ||||
SwhidExtendedObjectType.DIRECTORY, | SwhidExtendedObjectType.DIRECTORY, | ||||
Show All 10 Lines | def check_directory(self, attribute, value): | ||||
return | return | ||||
if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,): | if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'directory' context for " | f"Unexpected 'directory' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid("directory", value) | self._check_swhid(SwhidObjectType.DIRECTORY, value) | ||||
def _check_swhid(self, expected_object_type, swhid): | def _check_swhid(self, expected_object_type, swhid): | ||||
if isinstance(swhid, str): | if isinstance(swhid, str): | ||||
raise ValueError(f"Expected SWHID, got a string: {swhid}") | raise ValueError(f"Expected SWHID, got a string: {swhid}") | ||||
if swhid.object_type != expected_object_type: | if swhid.object_type != expected_object_type: | ||||
raise ValueError( | raise ValueError( | ||||
f"Expected SWHID type '{expected_object_type}', " | f"Expected SWHID type '{expected_object_type.name.lower()}', " | ||||
f"got '{swhid.object_type}' in {swhid}" | f"got '{swhid.object_type.name.lower()}' in {swhid}" | ||||
) | ) | ||||
if swhid.metadata: | |||||
raise ValueError(f"Expected core SWHID, but got: {swhid}") | |||||
def to_dict(self): | def to_dict(self): | ||||
d = super().to_dict() | d = super().to_dict() | ||||
context_keys = ( | context_keys = ( | ||||
"origin", | "origin", | ||||
"visit", | "visit", | ||||
"snapshot", | "snapshot", | ||||
"release", | "release", | ||||
Show All 13 Lines | def from_dict(cls, d): | ||||
"target": ExtendedSWHID.from_string(d["target"]), | "target": ExtendedSWHID.from_string(d["target"]), | ||||
"authority": MetadataAuthority.from_dict(d["authority"]), | "authority": MetadataAuthority.from_dict(d["authority"]), | ||||
"fetcher": MetadataFetcher.from_dict(d["fetcher"]), | "fetcher": MetadataFetcher.from_dict(d["fetcher"]), | ||||
} | } | ||||
swhid_keys = ("snapshot", "release", "revision", "directory") | swhid_keys = ("snapshot", "release", "revision", "directory") | ||||
for swhid_key in swhid_keys: | for swhid_key in swhid_keys: | ||||
if d.get(swhid_key): | if d.get(swhid_key): | ||||
d[swhid_key] = parse_swhid(d[swhid_key]) | d[swhid_key] = CoreSWHID.from_string(d[swhid_key]) | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return { | return { | ||||
"target": str(self.target), | "target": str(self.target), | ||||
"authority_type": self.authority.type.value, | "authority_type": self.authority.type.value, | ||||
"authority_url": self.authority.url, | "authority_url": self.authority.url, | ||||
"discovery_date": str(self.discovery_date), | "discovery_date": str(self.discovery_date), | ||||
"fetcher_name": self.fetcher.name, | "fetcher_name": self.fetcher.name, | ||||
"fetcher_version": self.fetcher.version, | "fetcher_version": self.fetcher.version, | ||||
} | } |