Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show First 20 Lines • Show All 106 Lines • ▼ Show 20 Lines | def unique_key(self) -> KeyType: | ||||
deduplication.""" | deduplication.""" | ||||
raise NotImplementedError(f"unique_key for {self}") | raise NotImplementedError(f"unique_key for {self}") | ||||
class HashableObject(metaclass=ABCMeta): | class HashableObject(metaclass=ABCMeta): | ||||
"""Mixin to automatically compute object identifier hash when | """Mixin to automatically compute object identifier hash when | ||||
the associated model is instantiated.""" | the associated model is instantiated.""" | ||||
@staticmethod | |||||
@abstractmethod | @abstractmethod | ||||
def compute_hash(object_dict): | def compute_hash(self) -> bytes: | ||||
"""Derived model classes must implement this to compute | """Derived model classes must implement this to compute | ||||
the object hash from its dict representation.""" | the object hash. | ||||
This method is called by the object initialization if the `id` | |||||
attribute is set to an empty value. | |||||
""" | |||||
pass | pass | ||||
def __attrs_post_init__(self): | def __attrs_post_init__(self): | ||||
if not self.id: | if not self.id: | ||||
obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | obj_id = self.compute_hash() | ||||
object.__setattr__(self, "id", obj_id) | object.__setattr__(self, "id", obj_id) | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return self.id # type: ignore | return self.id # type: ignore | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
▲ Show 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | class Snapshot(HashableObject, BaseModel): | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=ImmutableDict[bytes, Optional[SnapshotBranch]], | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | def compute_hash(self) -> bytes: | ||||
def compute_hash(object_dict): | return hash_to_bytes(snapshot_identifier(self.to_dict())) | ||||
return snapshot_identifier(object_dict) | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
branches=ImmutableDict( | branches=ImmutableDict( | ||||
(name, SnapshotBranch.from_dict(branch) if branch else None) | (name, SnapshotBranch.from_dict(branch) if branch else None) | ||||
for (name, branch) in d.pop("branches").items() | for (name, branch) in d.pop("branches").items() | ||||
Show All 18 Lines | class Release(HashableObject, BaseModel): | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | def compute_hash(self) -> bytes: | ||||
def compute_hash(object_dict): | return hash_to_bytes(release_identifier(self.to_dict())) | ||||
return release_identifier(object_dict) | |||||
@author.validator | @author.validator | ||||
def check_author(self, attribute, value): | def check_author(self, attribute, value): | ||||
"""If the author is `None`, checks the date is `None` too.""" | """If the author is `None`, checks the date is `None` too.""" | ||||
if self.author is None and self.date is not None: | if self.author is None and self.date is not None: | ||||
raise ValueError("release date must be None if author is None.") | raise ValueError("release date must be None if author is None.") | ||||
def to_dict(self): | def to_dict(self): | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | def __attrs_post_init__(self): | ||||
if not self.extra_headers and "extra_headers" in metadata: | if not self.extra_headers and "extra_headers" in metadata: | ||||
(extra_headers, metadata) = metadata.copy_pop("extra_headers") | (extra_headers, metadata) = metadata.copy_pop("extra_headers") | ||||
object.__setattr__( | object.__setattr__( | ||||
self, "extra_headers", tuplify_extra_headers(extra_headers), | self, "extra_headers", tuplify_extra_headers(extra_headers), | ||||
) | ) | ||||
attr.validate(self) | attr.validate(self) | ||||
object.__setattr__(self, "metadata", metadata) | object.__setattr__(self, "metadata", metadata) | ||||
@staticmethod | def compute_hash(self) -> bytes: | ||||
def compute_hash(object_dict): | return hash_to_bytes(revision_identifier(self.to_dict())) | ||||
return revision_identifier(object_dict) | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
date = d.pop("date") | date = d.pop("date") | ||||
if date: | if date: | ||||
date = TimestampWithTimezone.from_dict(date) | date = TimestampWithTimezone.from_dict(date) | ||||
Show All 35 Lines | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Directory(HashableObject, BaseModel): | class Directory(HashableObject, BaseModel): | ||||
object_type: Final = "directory" | object_type: Final = "directory" | ||||
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | def compute_hash(self) -> bytes: | ||||
def compute_hash(object_dict): | return hash_to_bytes(directory_identifier(self.to_dict())) | ||||
return directory_identifier(object_dict) | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
entries=tuple( | entries=tuple( | ||||
DirectoryEntry.from_dict(entry) for entry in d.pop("entries") | DirectoryEntry.from_dict(entry) for entry in d.pop("entries") | ||||
), | ), | ||||
▲ Show 20 Lines • Show All 487 Lines • Show Last 20 Lines |