Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 19 Lines | |||||
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | ||||
from .identifiers import ( | from .identifiers import ( | ||||
normalize_timestamp, | normalize_timestamp, | ||||
directory_identifier, | directory_identifier, | ||||
revision_identifier, | revision_identifier, | ||||
release_identifier, | release_identifier, | ||||
snapshot_identifier, | snapshot_identifier, | ||||
SWHID, | SWHID, | ||||
parse_swhid, | |||||
) | ) | ||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
Show All 17 Lines | ) -> Optional[ImmutableDict[KT, VT]]: | ||||
else: | else: | ||||
return d | return d | ||||
def dictify(value): | def dictify(value): | ||||
"Helper function used by BaseModel.to_dict()" | "Helper function used by BaseModel.to_dict()" | ||||
if isinstance(value, BaseModel): | if isinstance(value, BaseModel): | ||||
return value.to_dict() | return value.to_dict() | ||||
elif isinstance(value, SWHID): | |||||
return str(value) | |||||
elif isinstance(value, Enum): | elif isinstance(value, Enum): | ||||
return value.value | return value.value | ||||
elif isinstance(value, (dict, ImmutableDict)): | elif isinstance(value, (dict, ImmutableDict)): | ||||
return {k: dictify(v) for k, v in value.items()} | return {k: dictify(v) for k, v in value.items()} | ||||
elif isinstance(value, tuple): | elif isinstance(value, tuple): | ||||
return tuple(dictify(v) for v in value) | return tuple(dictify(v) for v in value) | ||||
else: | else: | ||||
return value | return value | ||||
▲ Show 20 Lines • Show All 668 Lines • ▼ Show 20 Lines | class MetadataAuthority(BaseModel): | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, Any]], | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | default=None, | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
def to_dict(self): | |||||
d = super().to_dict() | |||||
if d["metadata"] is None: | |||||
del d["metadata"] | |||||
return d | |||||
@classmethod | |||||
def from_dict(cls, d): | |||||
d["type"] = MetadataAuthorityType(d["type"]) | |||||
return super().from_dict(d) | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class MetadataFetcher(BaseModel): | class MetadataFetcher(BaseModel): | ||||
"""Represents a software component used to fetch metadata from a metadata | """Represents a software component used to fetch metadata from a metadata | ||||
authority, and ingest them into the Software Heritage archive.""" | authority, and ingest them into the Software Heritage archive.""" | ||||
name = attr.ib(type=str, validator=type_validator()) | name = attr.ib(type=str, validator=type_validator()) | ||||
version = attr.ib(type=str, validator=type_validator()) | version = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, Any]], | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | default=None, | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
def to_dict(self): | |||||
d = super().to_dict() | |||||
if d["metadata"] is None: | |||||
del d["metadata"] | |||||
return d | |||||
class MetadataTargetType(Enum): | class MetadataTargetType(Enum): | ||||
"""The type of object extrinsic metadata refer to.""" | """The type of object extrinsic metadata refer to.""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
▲ Show 20 Lines • Show All 156 Lines • ▼ Show 20 Lines | def _check_pid(self, expected_object_type, pid): | ||||
if pid.object_type != expected_object_type: | if pid.object_type != expected_object_type: | ||||
raise ValueError( | raise ValueError( | ||||
f"Expected SWHID type '{expected_object_type}', " | f"Expected SWHID type '{expected_object_type}', " | ||||
f"got '{pid.object_type}' in {pid}" | f"got '{pid.object_type}' in {pid}" | ||||
) | ) | ||||
if pid.metadata: | if pid.metadata: | ||||
raise ValueError(f"Expected core SWHID, but got: {pid}") | raise ValueError(f"Expected core SWHID, but got: {pid}") | ||||
def to_dict(self): | |||||
d = super().to_dict() | |||||
context_keys = ( | |||||
"origin", | |||||
"visit", | |||||
"snapshot", | |||||
"release", | |||||
"revision", | |||||
"directory", | |||||
"path", | |||||
) | |||||
for context_key in context_keys: | |||||
if d[context_key] is None: | |||||
del d[context_key] | |||||
return d | |||||
@classmethod | |||||
def from_dict(cls, d): | |||||
d = { | |||||
**d, | |||||
"type": MetadataTargetType(d["type"]), | |||||
"authority": MetadataAuthority.from_dict(d["authority"]), | |||||
"fetcher": MetadataFetcher.from_dict(d["fetcher"]), | |||||
} | |||||
if d["type"] != MetadataTargetType.ORIGIN: | |||||
d["id"] = parse_swhid(d["id"]) | |||||
swhid_keys = ("snapshot", "release", "revision", "directory") | |||||
for swhid_key in swhid_keys: | |||||
if d.get(swhid_key): | |||||
d[swhid_key] = parse_swhid(d[swhid_key]) | |||||
return super().from_dict(d) |