Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show First 20 Lines • Show All 141 Lines • ▼ Show 20 Lines | elif origin is ImmutableDict: | ||||
for (key, value) in value.items() | for (key, value) in value.items() | ||||
) | ) | ||||
else: | else: | ||||
# No need to check dict or list. because they are converted to ImmutableDict | # No need to check dict or list. because they are converted to ImmutableDict | ||||
# and tuple respectively. | # and tuple respectively. | ||||
raise NotImplementedError(f"Type-checking {type_}") | raise NotImplementedError(f"Type-checking {type_}") | ||||
def generic_type_validator(instance, attribute, value): | |||||
"""validates the type of an attribute value whatever the attribute type""" | |||||
if not _check_type(attribute.type, value): | |||||
raise AttributeTypeError(value, attribute) | |||||
def type_validator(): | def type_validator(): | ||||
"""Like attrs_strict.type_validator(), but stricter. | """Like attrs_strict.type_validator(), but stricter. | ||||
It is an attrs validator, which checks attributes have the specified type, | It is an attrs validator, which checks attributes have the specified type, | ||||
using type equality instead of ``isinstance()``, for improved performance | using type equality instead of ``isinstance()``, for improved performance | ||||
""" | """ | ||||
return generic_type_validator | |||||
def validator(instance, attribute, value): | |||||
if not _check_type(attribute.type, value): | |||||
raise AttributeTypeError(value, attribute) | |||||
return validator | def optimize_all_validators(cls, old_fields): | ||||
"""process validators to turn them into a faster version … eventually""" | |||||
new_fields = [] | |||||
for f in old_fields: | |||||
if f.validator is generic_type_validator: | |||||
f = f.evolve(validator=generic_type_validator) | |||||
new_fields.append(f) | |||||
return new_fields | |||||
ModelType = TypeVar("ModelType", bound="BaseModel") | ModelType = TypeVar("ModelType", bound="BaseModel") | ||||
class BaseModel: | class BaseModel: | ||||
"""Base class for SWH model classes. | """Base class for SWH model classes. | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | def check(self) -> None: | ||||
self.raw_manifest is not None | self.raw_manifest is not None | ||||
and self.id == self._compute_hash_from_attributes() | and self.id == self._compute_hash_from_attributes() | ||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"{self} has a non-none raw_manifest attribute, but does not need it." | f"{self} has a non-none raw_manifest attribute, but does not need it." | ||||
) | ) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | """Represents the author/committer of a revision or release.""" | ||||
object_type: Final = "person" | object_type: Final = "person" | ||||
fullname = attr.ib(type=bytes, validator=type_validator()) | fullname = attr.ib(type=bytes, validator=type_validator()) | ||||
name = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) | name = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) | ||||
email = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) | email = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def from_dict(cls, d): | ||||
parts.append(b"".join([b"<", d["email"], b">"])) | parts.append(b"".join([b"<", d["email"], b">"])) | ||||
fullname = b" ".join(parts) | fullname = b" ".join(parts) | ||||
d = {**d, "fullname": fullname} | d = {**d, "fullname": fullname} | ||||
d = {"name": None, "email": None, **d} | d = {"name": None, "email": None, **d} | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Timestamp(BaseModel): | class Timestamp(BaseModel): | ||||
"""Represents a naive timestamp from a VCS.""" | """Represents a naive timestamp from a VCS.""" | ||||
object_type: Final = "timestamp" | object_type: Final = "timestamp" | ||||
seconds = attr.ib(type=int, validator=type_validator()) | seconds = attr.ib(type=int, validator=type_validator()) | ||||
microseconds = attr.ib(type=int, validator=type_validator()) | microseconds = attr.ib(type=int, validator=type_validator()) | ||||
@seconds.validator | @seconds.validator | ||||
def check_seconds(self, attribute, value): | def check_seconds(self, attribute, value): | ||||
"""Check that seconds fit in a 64-bits signed integer.""" | """Check that seconds fit in a 64-bits signed integer.""" | ||||
if not (-(2**63) <= value < 2**63): | if not (-(2**63) <= value < 2**63): | ||||
raise ValueError("Seconds must be a signed 64-bits integer.") | raise ValueError("Seconds must be a signed 64-bits integer.") | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if not (0 <= value < 10**6): | if not (0 <= value < 10**6): | ||||
raise ValueError("Microseconds must be in [0, 1000000[.") | raise ValueError("Microseconds must be in [0, 1000000[.") | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class TimestampWithTimezone(BaseModel): | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | """Represents a TZ-aware timestamp from a VCS.""" | ||||
object_type: Final = "timestamp_with_timezone" | object_type: Final = "timestamp_with_timezone" | ||||
timestamp = attr.ib(type=Timestamp, validator=type_validator()) | timestamp = attr.ib(type=Timestamp, validator=type_validator()) | ||||
offset_bytes = attr.ib(type=bytes, validator=type_validator()) | offset_bytes = attr.ib(type=bytes, validator=type_validator()) | ||||
▲ Show 20 Lines • Show All 180 Lines • ▼ Show 20 Lines | def offset_minutes(self): | ||||
>>> TimestampWithTimezone( | >>> TimestampWithTimezone( | ||||
... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530" | ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530" | ||||
... ).offset_minutes() | ... ).offset_minutes() | ||||
330 | 330 | ||||
""" | """ | ||||
return self._parse_offset_bytes(self.offset_bytes) | return self._parse_offset_bytes(self.offset_bytes) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Origin(HashableObject, BaseModel): | class Origin(HashableObject, BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
object_type: Final = "origin" | object_type: Final = "origin" | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"url": self.url} | return {"url": self.url} | ||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
return _compute_hash_from_manifest(self.url.encode("utf-8")) | return _compute_hash_from_manifest(self.url.encode("utf-8")) | ||||
def swhid(self) -> ExtendedSWHID: | def swhid(self) -> ExtendedSWHID: | ||||
"""Returns a SWHID representing this origin.""" | """Returns a SWHID representing this origin.""" | ||||
return ExtendedSWHID( | return ExtendedSWHID( | ||||
object_type=SwhidExtendedObjectType.ORIGIN, | object_type=SwhidExtendedObjectType.ORIGIN, | ||||
object_id=self.id, | object_id=self.id, | ||||
) | ) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents an origin visit with a given type at a given point in time, by a | """Represents an origin visit with a given type at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
object_type: Final = "origin_visit" | object_type: Final = "origin_visit" | ||||
origin = attr.ib(type=str, validator=type_validator()) | origin = attr.ib(type=str, validator=type_validator()) | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
Show All 14 Lines | def to_dict(self): | ||||
if ov["visit"] is None: | if ov["visit"] is None: | ||||
del ov["visit"] | del ov["visit"] | ||||
return ov | return ov | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"origin": self.origin, "date": str(self.date)} | return {"origin": self.origin, "date": str(self.date)} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class OriginVisitStatus(BaseModel): | class OriginVisitStatus(BaseModel): | ||||
"""Represents a visit update of an origin at a given point in time.""" | """Represents a visit update of an origin at a given point in time.""" | ||||
object_type: Final = "origin_visit_status" | object_type: Final = "origin_visit_status" | ||||
origin = attr.ib(type=str, validator=type_validator()) | origin = attr.ib(type=str, validator=type_validator()) | ||||
visit = attr.ib(type=int, validator=type_validator()) | visit = attr.ib(type=int, validator=type_validator()) | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | class ObjectType(Enum): | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"ObjectType.{self.name}" | return f"ObjectType.{self.name}" | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class SnapshotBranch(BaseModel): | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | """Represents one of the branches of a snapshot.""" | ||||
object_type: Final = "snapshot_branch" | object_type: Final = "snapshot_branch" | ||||
target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
target_type = attr.ib(type=TargetType, validator=type_validator()) | target_type = attr.ib(type=TargetType, validator=type_validator()) | ||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if self.target_type != TargetType.ALIAS and self.target is not None: | if self.target_type != TargetType.ALIAS and self.target is not None: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls(target=d["target"], target_type=TargetType(d["target_type"])) | return cls(target=d["target"], target_type=TargetType(d["target_type"])) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Snapshot(HashableObject, BaseModel): | class Snapshot(HashableObject, BaseModel): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
object_type: Final = "snapshot" | object_type: Final = "snapshot" | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=ImmutableDict[bytes, Optional[SnapshotBranch]], | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
Show All 17 Lines | def from_dict(cls, d): | ||||
**d, | **d, | ||||
) | ) | ||||
def swhid(self) -> CoreSWHID: | def swhid(self) -> CoreSWHID: | ||||
"""Returns a SWHID representing this object.""" | """Returns a SWHID representing this object.""" | ||||
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) | return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Release(HashableObjectWithManifest, BaseModel): | class Release(HashableObjectWithManifest, BaseModel): | ||||
object_type: Final = "release" | object_type: Final = "release" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) | target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | target_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | class RevisionType(Enum): | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"RevisionType.{self.name}" | return f"RevisionType.{self.name}" | ||||
def tuplify_extra_headers(value: Iterable): | def tuplify_extra_headers(value: Iterable): | ||||
return tuple((k, v) for k, v in value) | return tuple((k, v) for k, v in value) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Revision(HashableObjectWithManifest, BaseModel): | class Revision(HashableObjectWithManifest, BaseModel): | ||||
object_type: Final = "revision" | object_type: Final = "revision" | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator()) | author = attr.ib(type=Optional[Person], validator=type_validator()) | ||||
committer = attr.ib(type=Optional[Person], validator=type_validator()) | committer = attr.ib(type=Optional[Person], validator=type_validator()) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | ||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
▲ Show 20 Lines • Show All 95 Lines • ▼ Show 20 Lines | def anonymize(self) -> "Revision": | ||||
author=None if self.author is None else self.author.anonymize(), | author=None if self.author is None else self.author.anonymize(), | ||||
committer=None if self.committer is None else self.committer.anonymize(), | committer=None if self.committer is None else self.committer.anonymize(), | ||||
) | ) | ||||
_DIR_ENTRY_TYPES = ["file", "dir", "rev"] | _DIR_ENTRY_TYPES = ["file", "dir", "rev"] | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class DirectoryEntry(BaseModel): | class DirectoryEntry(BaseModel): | ||||
object_type: Final = "directory_entry" | object_type: Final = "directory_entry" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
type = attr.ib(type=str, validator=attr.validators.in_(_DIR_ENTRY_TYPES)) | type = attr.ib(type=str, validator=attr.validators.in_(_DIR_ENTRY_TYPES)) | ||||
target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | ||||
perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) | perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@name.validator | @name.validator | ||||
def check_name(self, attribute, value): | def check_name(self, attribute, value): | ||||
if b"/" in value: | if b"/" in value: | ||||
raise ValueError(f"{value!r} is not a valid directory entry name.") | raise ValueError(f"{value!r} is not a valid directory entry name.") | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Directory(HashableObjectWithManifest, BaseModel): | class Directory(HashableObjectWithManifest, BaseModel): | ||||
object_type: Final = "directory" | object_type: Final = "directory" | ||||
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
raw_manifest = attr.ib(type=Optional[bytes], default=None) | raw_manifest = attr.ib(type=Optional[bytes], default=None) | ||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
▲ Show 20 Lines • Show All 102 Lines • ▼ Show 20 Lines | ) -> Tuple[bool, "Directory"]: | ||||
# Finally, return the "fixed" the directory | # Finally, return the "fixed" the directory | ||||
dir_ = Directory( | dir_ = Directory( | ||||
entries=tuple(deduplicated_entries), id=id, raw_manifest=raw_manifest | entries=tuple(deduplicated_entries), id=id, raw_manifest=raw_manifest | ||||
) | ) | ||||
return (True, dir_) | return (True, dir_) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class BaseContent(BaseModel): | class BaseContent(BaseModel): | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) | type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) | ||||
) | ) | ||||
@staticmethod | @staticmethod | ||||
def _hash_data(data: bytes): | def _hash_data(data: bytes): | ||||
"""Hash some data, returning most of the fields of a content object""" | """Hash some data, returning most of the fields of a content object""" | ||||
Show All 19 Lines | def get_hash(self, hash_name): | ||||
raise ValueError("{} is not a valid hash name.".format(hash_name)) | raise ValueError("{} is not a valid hash name.".format(hash_name)) | ||||
return getattr(self, hash_name) | return getattr(self, hash_name) | ||||
def hashes(self) -> Dict[str, bytes]: | def hashes(self) -> Dict[str, bytes]: | ||||
"""Returns a dictionary {hash_name: hash_value}""" | """Returns a dictionary {hash_name: hash_value}""" | ||||
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Content(BaseContent): | class Content(BaseContent): | ||||
object_type: Final = "content" | object_type: Final = "content" | ||||
sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | ||||
sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | class Content(BaseContent): | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return self.sha1 # TODO: use a dict of hashes | return self.sha1 # TODO: use a dict of hashes | ||||
def swhid(self) -> CoreSWHID: | def swhid(self) -> CoreSWHID: | ||||
"""Returns a SWHID representing this object.""" | """Returns a SWHID representing this object.""" | ||||
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) | return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
object_type: Final = "skipped_content" | object_type: Final = "skipped_content" | ||||
sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | ||||
sha1_git = attr.ib( | sha1_git = attr.ib( | ||||
type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr | type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr | ||||
) | ) | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | class MetadataAuthorityType(Enum): | ||||
DEPOSIT_CLIENT = "deposit_client" | DEPOSIT_CLIENT = "deposit_client" | ||||
FORGE = "forge" | FORGE = "forge" | ||||
REGISTRY = "registry" | REGISTRY = "registry" | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"MetadataAuthorityType.{self.name}" | return f"MetadataAuthorityType.{self.name}" | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class MetadataAuthority(BaseModel): | class MetadataAuthority(BaseModel): | ||||
"""Represents an entity that provides metadata about an origin or | """Represents an entity that provides metadata about an origin or | ||||
software artifact.""" | software artifact.""" | ||||
object_type: Final = "metadata_authority" | object_type: Final = "metadata_authority" | ||||
type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) | type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
Show All 17 Lines | def from_dict(cls, d): | ||||
"type": MetadataAuthorityType(d["type"]), | "type": MetadataAuthorityType(d["type"]), | ||||
} | } | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"type": self.type.value, "url": self.url} | return {"type": self.type.value, "url": self.url} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class MetadataFetcher(BaseModel): | class MetadataFetcher(BaseModel): | ||||
"""Represents a software component used to fetch metadata from a metadata | """Represents a software component used to fetch metadata from a metadata | ||||
authority, and ingest them into the Software Heritage archive.""" | authority, and ingest them into the Software Heritage archive.""" | ||||
object_type: Final = "metadata_fetcher" | object_type: Final = "metadata_fetcher" | ||||
name = attr.ib(type=str, validator=type_validator()) | name = attr.ib(type=str, validator=type_validator()) | ||||
version = attr.ib(type=str, validator=type_validator()) | version = attr.ib(type=str, validator=type_validator()) | ||||
Show All 20 Lines | def normalize_discovery_date(value: Any) -> datetime.datetime: | ||||
if value.tzinfo is None: | if value.tzinfo is None: | ||||
raise ValueError("discovery_date must be a timezone-aware datetime.") | raise ValueError("discovery_date must be a timezone-aware datetime.") | ||||
# Normalize timezone to utc, and truncate microseconds to 0 | # Normalize timezone to utc, and truncate microseconds to 0 | ||||
return value.astimezone(datetime.timezone.utc).replace(microsecond=0) | return value.astimezone(datetime.timezone.utc).replace(microsecond=0) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class RawExtrinsicMetadata(HashableObject, BaseModel): | class RawExtrinsicMetadata(HashableObject, BaseModel): | ||||
object_type: Final = "raw_extrinsic_metadata" | object_type: Final = "raw_extrinsic_metadata" | ||||
# target object | # target object | ||||
target = attr.ib(type=ExtendedSWHID, validator=type_validator()) | target = attr.ib(type=ExtendedSWHID, validator=type_validator()) | ||||
# source | # source | ||||
discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) | discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) | ||||
▲ Show 20 Lines • Show All 206 Lines • ▼ Show 20 Lines | class RawExtrinsicMetadata(HashableObject, BaseModel): | ||||
def swhid(self) -> ExtendedSWHID: | def swhid(self) -> ExtendedSWHID: | ||||
"""Returns a SWHID representing this RawExtrinsicMetadata object.""" | """Returns a SWHID representing this RawExtrinsicMetadata object.""" | ||||
return ExtendedSWHID( | return ExtendedSWHID( | ||||
object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, | object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, | ||||
object_id=self.id, | object_id=self.id, | ||||
) | ) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class ExtID(HashableObject, BaseModel): | class ExtID(HashableObject, BaseModel): | ||||
object_type: Final = "extid" | object_type: Final = "extid" | ||||
extid_type = attr.ib(type=str, validator=type_validator()) | extid_type = attr.ib(type=str, validator=type_validator()) | ||||
extid = attr.ib(type=bytes, validator=type_validator()) | extid = attr.ib(type=bytes, validator=type_validator()) | ||||
target = attr.ib(type=CoreSWHID, validator=type_validator()) | target = attr.ib(type=CoreSWHID, validator=type_validator()) | ||||
extid_version = attr.ib(type=int, validator=type_validator(), default=0) | extid_version = attr.ib(type=int, validator=type_validator(), default=0) | ||||
▲ Show 20 Lines • Show All 41 Lines • Show Last 20 Lines |