Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 23 Lines | |||||
import attr | import attr | ||||
from attrs_strict import AttributeTypeError | from attrs_strict import AttributeTypeError | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from typing_extensions import Final | from typing_extensions import Final | ||||
from . import git_objects | from . import git_objects | ||||
from .collections import ImmutableDict | from .collections import ImmutableDict | ||||
from .hashutil import DEFAULT_ALGORITHMS, MultiHash | from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex | ||||
from .swhids import CoreSWHID | from .swhids import CoreSWHID | ||||
from .swhids import ExtendedObjectType as SwhidExtendedObjectType | from .swhids import ExtendedObjectType as SwhidExtendedObjectType | ||||
from .swhids import ExtendedSWHID | from .swhids import ExtendedSWHID | ||||
from .swhids import ObjectType as SwhidObjectType | from .swhids import ObjectType as SwhidObjectType | ||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
Show All 12 Lines | |||||
Sha1Git = bytes | Sha1Git = bytes | ||||
Sha1 = bytes | Sha1 = bytes | ||||
KT = TypeVar("KT") | KT = TypeVar("KT") | ||||
VT = TypeVar("VT") | VT = TypeVar("VT") | ||||
def hash_repr(h: bytes) -> str: | |||||
if h is None: | |||||
return "None" | |||||
else: | |||||
return f"hash_to_bytes('{hash_to_hex(h)}')" | |||||
def freeze_optional_dict( | def freeze_optional_dict( | ||||
d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore | d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore | ||||
) -> Optional[ImmutableDict[KT, VT]]: | ) -> Optional[ImmutableDict[KT, VT]]: | ||||
if isinstance(d, dict): | if isinstance(d, dict): | ||||
return ImmutableDict(d) | return ImmutableDict(d) | ||||
else: | else: | ||||
return d | return d | ||||
▲ Show 20 Lines • Show All 417 Lines • ▼ Show 20 Lines | class OriginVisitStatus(BaseModel): | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_( | validator=attr.validators.in_( | ||||
["created", "ongoing", "full", "partial", "not_found", "failed"] | ["created", "ongoing", "full", "partial", "not_found", "failed"] | ||||
), | ), | ||||
) | ) | ||||
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | snapshot = attr.ib( | ||||
type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr | |||||
) | |||||
# Type is optional be to able to use it before adding it to the database model | # Type is optional be to able to use it before adding it to the database model | ||||
type = attr.ib(type=Optional[str], validator=type_validator(), default=None) | type = attr.ib(type=Optional[str], validator=type_validator(), default=None) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
Show All 14 Lines | class TargetType(Enum): | ||||
CONTENT = "content" | CONTENT = "content" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
ALIAS = "alias" | ALIAS = "alias" | ||||
def __repr__(self): | |||||
return f"TargetType.{self.name}" | |||||
class ObjectType(Enum): | class ObjectType(Enum): | ||||
"""The type of content pointed to by a release. Usually a revision""" | """The type of content pointed to by a release. Usually a revision""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
def __repr__(self): | |||||
return f"ObjectType.{self.name}" | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class SnapshotBranch(BaseModel): | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | """Represents one of the branches of a snapshot.""" | ||||
object_type: Final = "snapshot_branch" | object_type: Final = "snapshot_branch" | ||||
target = attr.ib(type=bytes, validator=type_validator()) | target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
target_type = attr.ib(type=TargetType, validator=type_validator()) | target_type = attr.ib(type=TargetType, validator=type_validator()) | ||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if self.target_type != TargetType.ALIAS and self.target is not None: | if self.target_type != TargetType.ALIAS and self.target is not None: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
Show All 10 Lines | class Snapshot(HashableObject, BaseModel): | ||||
object_type: Final = "snapshot" | object_type: Final = "snapshot" | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=ImmutableDict[bytes, Optional[SnapshotBranch]], | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
def compute_hash(self) -> bytes: | def compute_hash(self) -> bytes: | ||||
git_object = git_objects.snapshot_git_object(self) | git_object = git_objects.snapshot_git_object(self) | ||||
return hashlib.new("sha1", git_object).digest() | return hashlib.new("sha1", git_object).digest() | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
Show All 11 Lines | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class Release(HashableObject, BaseModel): | class Release(HashableObject, BaseModel): | ||||
object_type: Final = "release" | object_type: Final = "release" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | target_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | ||||
date = attr.ib( | date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | ||||
) | ) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
def compute_hash(self) -> bytes: | def compute_hash(self) -> bytes: | ||||
git_object = git_objects.release_git_object(self) | git_object = git_objects.release_git_object(self) | ||||
return hashlib.new("sha1", git_object).digest() | return hashlib.new("sha1", git_object).digest() | ||||
@author.validator | @author.validator | ||||
def check_author(self, attribute, value): | def check_author(self, attribute, value): | ||||
"""If the author is `None`, checks the date is `None` too.""" | """If the author is `None`, checks the date is `None` too.""" | ||||
Show All 32 Lines | class RevisionType(Enum): | ||||
GIT = "git" | GIT = "git" | ||||
TAR = "tar" | TAR = "tar" | ||||
DSC = "dsc" | DSC = "dsc" | ||||
SUBVERSION = "svn" | SUBVERSION = "svn" | ||||
MERCURIAL = "hg" | MERCURIAL = "hg" | ||||
CVS = "cvs" | CVS = "cvs" | ||||
BAZAAR = "bzr" | BAZAAR = "bzr" | ||||
def __repr__(self): | |||||
return f"RevisionType.{self.name}" | |||||
def tuplify_extra_headers(value: Iterable): | def tuplify_extra_headers(value: Iterable): | ||||
return tuple((k, v) for k, v in value) | return tuple((k, v) for k, v in value) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class Revision(HashableObject, BaseModel): | class Revision(HashableObject, BaseModel): | ||||
object_type: Final = "revision" | object_type: Final = "revision" | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
author = attr.ib(type=Person, validator=type_validator()) | author = attr.ib(type=Person, validator=type_validator()) | ||||
committer = attr.ib(type=Person, validator=type_validator()) | committer = attr.ib(type=Person, validator=type_validator()) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | ||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator() | type=Optional[TimestampWithTimezone], validator=type_validator() | ||||
) | ) | ||||
type = attr.ib(type=RevisionType, validator=type_validator()) | type = attr.ib(type=RevisionType, validator=type_validator()) | ||||
directory = attr.ib(type=Sha1Git, validator=type_validator()) | directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
extra_headers = attr.ib( | extra_headers = attr.ib( | ||||
type=Tuple[Tuple[bytes, bytes], ...], | type=Tuple[Tuple[bytes, bytes], ...], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=tuplify_extra_headers, | converter=tuplify_extra_headers, | ||||
default=(), | default=(), | ||||
) | ) | ||||
def __attrs_post_init__(self): | def __attrs_post_init__(self): | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class DirectoryEntry(BaseModel): | class DirectoryEntry(BaseModel): | ||||
object_type: Final = "directory_entry" | object_type: Final = "directory_entry" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) | type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) | ||||
target = attr.ib(type=Sha1Git, validator=type_validator()) | target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | ||||
perms = attr.ib(type=int, validator=type_validator(), converter=int) | perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class Directory(HashableObject, BaseModel): | class Directory(HashableObject, BaseModel): | ||||
object_type: Final = "directory" | object_type: Final = "directory" | ||||
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
def compute_hash(self) -> bytes: | def compute_hash(self) -> bytes: | ||||
git_object = git_objects.directory_git_object(self) | git_object = git_objects.directory_git_object(self) | ||||
return hashlib.new("sha1", git_object).digest() | return hashlib.new("sha1", git_object).digest() | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def hashes(self) -> Dict[str, bytes]: | ||||
"""Returns a dictionary {hash_name: hash_value}""" | """Returns a dictionary {hash_name: hash_value}""" | ||||
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class Content(BaseContent): | class Content(BaseContent): | ||||
object_type: Final = "content" | object_type: Final = "content" | ||||
sha1 = attr.ib(type=bytes, validator=type_validator()) | sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) | sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | ||||
sha256 = attr.ib(type=bytes, validator=type_validator()) | sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
blake2s256 = attr.ib(type=bytes, validator=type_validator()) | blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | ||||
length = attr.ib(type=int, validator=type_validator()) | length = attr.ib(type=int, validator=type_validator()) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(["visible", "hidden"]), | validator=attr.validators.in_(["visible", "hidden"]), | ||||
default="visible", | default="visible", | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def swhid(self) -> CoreSWHID: | ||||
"""Returns a SWHID representing this object.""" | """Returns a SWHID representing this object.""" | ||||
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) | return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
object_type: Final = "skipped_content" | object_type: Final = "skipped_content" | ||||
sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | ||||
sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | sha1_git = attr.ib( | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr | ||||
blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) | ) | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | |||||
blake2s256 = attr.ib( | |||||
type=Optional[bytes], validator=type_validator(), repr=hash_repr | |||||
) | |||||
length = attr.ib(type=Optional[int], validator=type_validator()) | length = attr.ib(type=Optional[int], validator=type_validator()) | ||||
status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) | status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) | ||||
reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) | reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) | ||||
origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) | origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def unique_key(self) -> KeyType: | ||||
return self.hashes() | return self.hashes() | ||||
class MetadataAuthorityType(Enum): | class MetadataAuthorityType(Enum): | ||||
DEPOSIT_CLIENT = "deposit_client" | DEPOSIT_CLIENT = "deposit_client" | ||||
FORGE = "forge" | FORGE = "forge" | ||||
REGISTRY = "registry" | REGISTRY = "registry" | ||||
def __repr__(self): | |||||
return f"MetadataAuthorityType.{self.name}" | |||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class MetadataAuthority(BaseModel): | class MetadataAuthority(BaseModel): | ||||
"""Represents an entity that provides metadata about an origin or | """Represents an entity that provides metadata about an origin or | ||||
software artifact.""" | software artifact.""" | ||||
object_type: Final = "metadata_authority" | object_type: Final = "metadata_authority" | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | class RawExtrinsicMetadata(HashableObject, BaseModel): | ||||
revision = attr.ib( | revision = attr.ib( | ||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | type=Optional[CoreSWHID], default=None, validator=type_validator() | ||||
) | ) | ||||
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | ||||
directory = attr.ib( | directory = attr.ib( | ||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | type=Optional[CoreSWHID], default=None, validator=type_validator() | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
def compute_hash(self) -> bytes: | def compute_hash(self) -> bytes: | ||||
git_object = git_objects.raw_extrinsic_metadata_git_object(self) | git_object = git_objects.raw_extrinsic_metadata_git_object(self) | ||||
return hashlib.new("sha1", git_object).digest() | return hashlib.new("sha1", git_object).digest() | ||||
@origin.validator | @origin.validator | ||||
def check_origin(self, attribute, value): | def check_origin(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
▲ Show 20 Lines • Show All 175 Lines • ▼ Show 20 Lines | |||||
class ExtID(HashableObject, BaseModel): | class ExtID(HashableObject, BaseModel): | ||||
object_type: Final = "extid" | object_type: Final = "extid" | ||||
extid_type = attr.ib(type=str, validator=type_validator()) | extid_type = attr.ib(type=str, validator=type_validator()) | ||||
extid = attr.ib(type=bytes, validator=type_validator()) | extid = attr.ib(type=bytes, validator=type_validator()) | ||||
target = attr.ib(type=CoreSWHID, validator=type_validator()) | target = attr.ib(type=CoreSWHID, validator=type_validator()) | ||||
extid_version = attr.ib(type=int, validator=type_validator(), default=0) | extid_version = attr.ib(type=int, validator=type_validator(), default=0) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
extid=d["extid"], | extid=d["extid"], | ||||
extid_type=d["extid_type"], | extid_type=d["extid_type"], | ||||
target=CoreSWHID.from_string(d["target"]), | target=CoreSWHID.from_string(d["target"]), | ||||
extid_version=d.get("extid_version", 0), | extid_version=d.get("extid_version", 0), | ||||
) | ) | ||||
def compute_hash(self) -> bytes: | def compute_hash(self) -> bytes: | ||||
git_object = git_objects.extid_git_object(self) | git_object = git_objects.extid_git_object(self) | ||||
return hashlib.new("sha1", git_object).digest() | return hashlib.new("sha1", git_object).digest() |