Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 29 Lines | |||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] | |||||
"""The type returned by BaseModel.unique_key().""" | |||||
SHA1_SIZE = 20 | SHA1_SIZE = 20 | ||||
# TODO: Limit this to 20 bytes | # TODO: Limit this to 20 bytes | ||||
Sha1Git = bytes | Sha1Git = bytes | ||||
Sha1 = bytes | Sha1 = bytes | ||||
KT = TypeVar("KT") | KT = TypeVar("KT") | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | class BaseModel: | ||||
def anonymize(self: ModelType) -> Optional[ModelType]: | def anonymize(self: ModelType) -> Optional[ModelType]: | ||||
"""Returns an anonymized version of the object, if needed. | """Returns an anonymized version of the object, if needed. | ||||
If the object model does not need/support anonymization, returns None. | If the object model does not need/support anonymization, returns None. | ||||
""" | """ | ||||
return None | return None | ||||
def unique_key(self) -> KeyType: | |||||
"""Returns a unique key for this object, that can be used for | |||||
deduplication.""" | |||||
raise NotImplementedError(f"unique_key for {self}") | |||||
class HashableObject(metaclass=ABCMeta): | class HashableObject(metaclass=ABCMeta): | ||||
"""Mixin to automatically compute object identifier hash when | """Mixin to automatically compute object identifier hash when | ||||
the associated model is instantiated.""" | the associated model is instantiated.""" | ||||
@staticmethod | @staticmethod | ||||
@abstractmethod | @abstractmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
"""Derived model classes must implement this to compute | """Derived model classes must implement this to compute | ||||
the object hash from its dict representation.""" | the object hash from its dict representation.""" | ||||
pass | pass | ||||
def __attrs_post_init__(self): | def __attrs_post_init__(self): | ||||
if not self.id: | if not self.id: | ||||
obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | ||||
object.__setattr__(self, "id", obj_id) | object.__setattr__(self, "id", obj_id) | ||||
def unique_key(self) -> KeyType: | |||||
return self.id # type: ignore | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | """Represents the author/committer of a revision or release.""" | ||||
object_type: Final = "person" | object_type: Final = "person" | ||||
fullname = attr.ib(type=bytes, validator=type_validator()) | fullname = attr.ib(type=bytes, validator=type_validator()) | ||||
▲ Show 20 Lines • Show All 121 Lines • ▼ Show 20 Lines | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Origin(BaseModel): | class Origin(BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
object_type: Final = "origin" | object_type: Final = "origin" | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
def unique_key(self) -> KeyType: | |||||
return {"url": self.url} | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents an origin visit with a given type at a given point in time, by a | """Represents an origin visit with a given type at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
object_type: Final = "origin_visit" | object_type: Final = "origin_visit" | ||||
Show All 12 Lines | class OriginVisit(BaseModel): | ||||
def to_dict(self): | def to_dict(self): | ||||
"""Serializes the date as a string and omits the visit id if it is | """Serializes the date as a string and omits the visit id if it is | ||||
`None`.""" | `None`.""" | ||||
ov = super().to_dict() | ov = super().to_dict() | ||||
if ov["visit"] is None: | if ov["visit"] is None: | ||||
del ov["visit"] | del ov["visit"] | ||||
return ov | return ov | ||||
def unique_key(self) -> KeyType: | |||||
return {"origin": self.origin, "date": str(self.date)} | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class OriginVisitStatus(BaseModel): | class OriginVisitStatus(BaseModel): | ||||
"""Represents a visit update of an origin at a given point in time. | """Represents a visit update of an origin at a given point in time. | ||||
""" | """ | ||||
object_type: Final = "origin_visit_status" | object_type: Final = "origin_visit_status" | ||||
Show All 15 Lines | class OriginVisitStatus(BaseModel): | ||||
) | ) | ||||
@date.validator | @date.validator | ||||
def check_date(self, attribute, value): | def check_date(self, attribute, value): | ||||
"""Checks the date has a timezone.""" | """Checks the date has a timezone.""" | ||||
if value is not None and value.tzinfo is None: | if value is not None and value.tzinfo is None: | ||||
raise ValueError("date must be a timezone-aware datetime.") | raise ValueError("date must be a timezone-aware datetime.") | ||||
def unique_key(self) -> KeyType: | |||||
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} | |||||
class TargetType(Enum): | class TargetType(Enum): | ||||
"""The type of content pointed to by a snapshot branch. Usually a | """The type of content pointed to by a snapshot branch. Usually a | ||||
revision or an alias.""" | revision or an alias.""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
Show All 30 Lines | def check_target(self, attribute, value): | ||||
raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls(target=d["target"], target_type=TargetType(d["target_type"])) | return cls(target=d["target"], target_type=TargetType(d["target_type"])) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Snapshot(BaseModel, HashableObject): | class Snapshot(HashableObject, BaseModel): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
object_type: Final = "snapshot" | object_type: Final = "snapshot" | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=ImmutableDict[bytes, Optional[SnapshotBranch]], | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
Show All 12 Lines | def from_dict(cls, d): | ||||
(name, SnapshotBranch.from_dict(branch) if branch else None) | (name, SnapshotBranch.from_dict(branch) if branch else None) | ||||
for (name, branch) in d.pop("branches").items() | for (name, branch) in d.pop("branches").items() | ||||
), | ), | ||||
**d, | **d, | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Release(BaseModel, HashableObject): | class Release(HashableObject, BaseModel): | ||||
object_type: Final = "release" | object_type: Final = "release" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | target_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | class RevisionType(Enum): | ||||
MERCURIAL = "hg" | MERCURIAL = "hg" | ||||
def tuplify_extra_headers(value: Iterable): | def tuplify_extra_headers(value: Iterable): | ||||
return tuple((k, v) for k, v in value) | return tuple((k, v) for k, v in value) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Revision(BaseModel, HashableObject): | class Revision(HashableObject, BaseModel): | ||||
object_type: Final = "revision" | object_type: Final = "revision" | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
author = attr.ib(type=Person, validator=type_validator()) | author = attr.ib(type=Person, validator=type_validator()) | ||||
committer = attr.ib(type=Person, validator=type_validator()) | committer = attr.ib(type=Person, validator=type_validator()) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | ||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator() | type=Optional[TimestampWithTimezone], validator=type_validator() | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | class DirectoryEntry(BaseModel): | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) | type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) | ||||
target = attr.ib(type=Sha1Git, validator=type_validator()) | target = attr.ib(type=Sha1Git, validator=type_validator()) | ||||
perms = attr.ib(type=int, validator=type_validator()) | perms = attr.ib(type=int, validator=type_validator()) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Directory(BaseModel, HashableObject): | class Directory(HashableObject, BaseModel): | ||||
object_type: Final = "directory" | object_type: Final = "directory" | ||||
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return directory_identifier(object_dict) | return directory_identifier(object_dict) | ||||
▲ Show 20 Lines • Show All 115 Lines • ▼ Show 20 Lines | def with_data(self) -> "Content": | ||||
be None after this call. | be None after this call. | ||||
This call is almost a no-op, but subclasses may overload this method | This call is almost a no-op, but subclasses may overload this method | ||||
to lazy-load data (eg. from disk or objstorage).""" | to lazy-load data (eg. from disk or objstorage).""" | ||||
if self.data is None: | if self.data is None: | ||||
raise MissingData("Content data is None.") | raise MissingData("Content data is None.") | ||||
return self | return self | ||||
def unique_key(self) -> KeyType: | |||||
return self.sha1 # TODO: use a dict of hashes | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
object_type: Final = "skipped_content" | object_type: Final = "skipped_content" | ||||
sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | class SkippedContent(BaseContent): | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d2 = d.copy() | d2 = d.copy() | ||||
if d2.pop("data", None) is not None: | if d2.pop("data", None) is not None: | ||||
raise ValueError('SkippedContent has no "data" attribute %r' % d) | raise ValueError('SkippedContent has no "data" attribute %r' % d) | ||||
return super().from_dict(d2, use_subclass=False) | return super().from_dict(d2, use_subclass=False) | ||||
def unique_key(self) -> KeyType: | |||||
return self.hashes() | |||||
class MetadataAuthorityType(Enum): | class MetadataAuthorityType(Enum): | ||||
DEPOSIT_CLIENT = "deposit_client" | DEPOSIT_CLIENT = "deposit_client" | ||||
FORGE = "forge" | FORGE = "forge" | ||||
REGISTRY = "registry" | REGISTRY = "registry" | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
Show All 18 Lines | def to_dict(self): | ||||
del d["metadata"] | del d["metadata"] | ||||
return d | return d | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d["type"] = MetadataAuthorityType(d["type"]) | d["type"] = MetadataAuthorityType(d["type"]) | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def unique_key(self) -> KeyType: | |||||
return {"type": self.type.value, "url": self.url} | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class MetadataFetcher(BaseModel): | class MetadataFetcher(BaseModel): | ||||
"""Represents a software component used to fetch metadata from a metadata | """Represents a software component used to fetch metadata from a metadata | ||||
authority, and ingest them into the Software Heritage archive.""" | authority, and ingest them into the Software Heritage archive.""" | ||||
object_type: Final = "metadata_fetcher" | object_type: Final = "metadata_fetcher" | ||||
name = attr.ib(type=str, validator=type_validator()) | name = attr.ib(type=str, validator=type_validator()) | ||||
version = attr.ib(type=str, validator=type_validator()) | version = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, Any]], | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | default=None, | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
def to_dict(self): | def to_dict(self): | ||||
d = super().to_dict() | d = super().to_dict() | ||||
if d["metadata"] is None: | if d["metadata"] is None: | ||||
del d["metadata"] | del d["metadata"] | ||||
return d | return d | ||||
def unique_key(self) -> KeyType: | |||||
return {"name": self.name, "version": self.version} | |||||
class MetadataTargetType(Enum): | class MetadataTargetType(Enum): | ||||
"""The type of object extrinsic metadata refer to.""" | """The type of object extrinsic metadata refer to.""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
▲ Show 20 Lines • Show All 199 Lines • ▼ Show 20 Lines | def from_dict(cls, d): | ||||
d["id"] = parse_swhid(d["id"]) | d["id"] = parse_swhid(d["id"]) | ||||
swhid_keys = ("snapshot", "release", "revision", "directory") | swhid_keys = ("snapshot", "release", "revision", "directory") | ||||
for swhid_key in swhid_keys: | for swhid_key in swhid_keys: | ||||
if d.get(swhid_key): | if d.get(swhid_key): | ||||
d[swhid_key] = parse_swhid(d[swhid_key]) | d[swhid_key] = parse_swhid(d[swhid_key]) | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def unique_key(self) -> KeyType: | |||||
return { | |||||
"type": self.type.value, | |||||
"id": str(self.id), | |||||
"authority_type": self.authority.type.value, | |||||
"authority_url": self.authority.url, | |||||
"discovery_date": str(self.discovery_date), | |||||
"fetcher_name": self.fetcher.name, | |||||
"fetcher_version": self.fetcher.version, | |||||
} |