Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
ardumont: copyright header ¯\_(ツ)_/¯
| |||||
Done Inline Actionsthx douardda: thx | |||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from enum import Enum | from enum import Enum | ||||
from hashlib import sha256 | from hashlib import sha256 | ||||
from typing import Dict, Optional, Tuple, TypeVar, Union | from typing import Dict, Optional, Tuple, TypeVar, Union | ||||
from typing_extensions import Final | |||||
import attr | import attr | ||||
from attrs_strict import type_validator | from attrs_strict import type_validator | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from .identifiers import ( | from .identifiers import ( | ||||
normalize_timestamp, | normalize_timestamp, | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def __attrs_post_init__(self): | ||||
if not self.id: | if not self.id: | ||||
obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | ||||
object.__setattr__(self, "id", obj_id) | object.__setattr__(self, "id", obj_id) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | """Represents the author/committer of a revision or release.""" | ||||
Not Done Inline ActionsI would rather use Final from typing-extensions to avoid repeating the literal string. tag: Final = "person" anlambert: I would rather use `Final` from [[ https://mypy.readthedocs.io/en/stable/final_attrs.html#final… | |||||
Not Done Inline ActionsThat's even more than that, that will even make it unmodifiable [1]. We can also adds its type Final[str]. So that suits us even better ;) [1] Quoting the linked documentation: You can use the typing_extensions. Final qualifier to indicate that a name or attribute should not be reassigned, redefined, or overridden. This is often useful for module and class level constants as a way to prevent unintended modification. Mypy will prevent further assignments to final names... ardumont: That's even more than that, that will even make it unmodifiable [1]. We can also adds its type… | |||||
Done Inline ActionsThe advantage of Literal is it's a PEP (and has been integrated in py3.8). I have no strong opinion on this myself, I'll do whatever we decide. douardda: The advantage of Literal is it's a PEP (and has been integrated in py3.8).
I have no strong… | |||||
Not Done Inline ActionsFinal is also a PEP and has also been integrated in Python 3.8 ;-) anlambert: `Final` is also a [[ https://www.python.org/dev/peps/pep-0591/ | PEP ]] and has also been… | |||||
object_type: Final = "person" | |||||
fullname = attr.ib(type=bytes, validator=type_validator()) | fullname = attr.ib(type=bytes, validator=type_validator()) | ||||
name = attr.ib(type=Optional[bytes], validator=type_validator()) | name = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
email = attr.ib(type=Optional[bytes], validator=type_validator()) | email = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
@classmethod | @classmethod | ||||
def from_fullname(cls, fullname: bytes): | def from_fullname(cls, fullname: bytes): | ||||
"""Returns a Person object, by guessing the name and email from the | """Returns a Person object, by guessing the name and email from the | ||||
fullname, in the `name <email>` format. | fullname, in the `name <email>` format. | ||||
Show All 36 Lines | def anonymize(self) -> "Person": | ||||
""" | """ | ||||
return Person(fullname=sha256(self.fullname).digest(), name=None, email=None,) | return Person(fullname=sha256(self.fullname).digest(), name=None, email=None,) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Timestamp(BaseModel): | class Timestamp(BaseModel): | ||||
"""Represents a naive timestamp from a VCS.""" | """Represents a naive timestamp from a VCS.""" | ||||
object_type: Final = "timestamp" | |||||
seconds = attr.ib(type=int, validator=type_validator()) | seconds = attr.ib(type=int, validator=type_validator()) | ||||
microseconds = attr.ib(type=int, validator=type_validator()) | microseconds = attr.ib(type=int, validator=type_validator()) | ||||
@seconds.validator | @seconds.validator | ||||
def check_seconds(self, attribute, value): | def check_seconds(self, attribute, value): | ||||
"""Check that seconds fit in a 64-bits signed integer.""" | """Check that seconds fit in a 64-bits signed integer.""" | ||||
if not (-(2 ** 63) <= value < 2 ** 63): | if not (-(2 ** 63) <= value < 2 ** 63): | ||||
raise ValueError("Seconds must be a signed 64-bits integer.") | raise ValueError("Seconds must be a signed 64-bits integer.") | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if not (0 <= value < 10 ** 6): | if not (0 <= value < 10 ** 6): | ||||
raise ValueError("Microseconds must be in [0, 1000000[.") | raise ValueError("Microseconds must be in [0, 1000000[.") | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class TimestampWithTimezone(BaseModel): | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | """Represents a TZ-aware timestamp from a VCS.""" | ||||
object_type: Final = "timestamp_with_timezone" | |||||
timestamp = attr.ib(type=Timestamp, validator=type_validator()) | timestamp = attr.ib(type=Timestamp, validator=type_validator()) | ||||
offset = attr.ib(type=int, validator=type_validator()) | offset = attr.ib(type=int, validator=type_validator()) | ||||
negative_utc = attr.ib(type=bool, validator=type_validator()) | negative_utc = attr.ib(type=bool, validator=type_validator()) | ||||
@offset.validator | @offset.validator | ||||
def check_offset(self, attribute, value): | def check_offset(self, attribute, value): | ||||
"""Checks the offset is a 16-bits signed integer (in theory, it | """Checks the offset is a 16-bits signed integer (in theory, it | ||||
should always be between -14 and +14 hours).""" | should always be between -14 and +14 hours).""" | ||||
Show All 34 Lines | def from_iso8601(cls, s): | ||||
tstz = attr.evolve(tstz, negative_utc=True) | tstz = attr.evolve(tstz, negative_utc=True) | ||||
return tstz | return tstz | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Origin(BaseModel): | class Origin(BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
object_type: Final = "origin" | |||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents a visit of an origin at a given point in time, by a | """Represents a visit of an origin at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
object_type: Final = "origin_visit" | |||||
origin = attr.ib(type=str, validator=type_validator()) | origin = attr.ib(type=str, validator=type_validator()) | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
type = attr.ib(type=str, validator=type_validator()) | type = attr.ib(type=str, validator=type_validator()) | ||||
"""Should not be set before calling 'origin_visit_add()'.""" | """Should not be set before calling 'origin_visit_add()'.""" | ||||
visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) | visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) | ||||
status = attr.ib(type=Optional[str], validator=type_validator(), default=None) | status = attr.ib(type=Optional[str], validator=type_validator(), default=None) | ||||
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator(), default=None) | snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator(), default=None) | ||||
Show All 11 Lines | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class OriginVisitStatus(BaseModel): | class OriginVisitStatus(BaseModel): | ||||
"""Represents a visit update of an origin at a given point in time. | """Represents a visit update of an origin at a given point in time. | ||||
""" | """ | ||||
object_type: Final = "origin_visit_status" | |||||
origin = attr.ib(type=str, validator=type_validator()) | origin = attr.ib(type=str, validator=type_validator()) | ||||
visit = attr.ib(type=int, validator=type_validator()) | visit = attr.ib(type=int, validator=type_validator()) | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), | validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), | ||||
) | ) | ||||
Show All 24 Lines | class ObjectType(Enum): | ||||
RELEASE = "release" | RELEASE = "release" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SnapshotBranch(BaseModel): | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | """Represents one of the branches of a snapshot.""" | ||||
object_type: Final = "snapshot_branch" | |||||
target = attr.ib(type=bytes, validator=type_validator()) | target = attr.ib(type=bytes, validator=type_validator()) | ||||
target_type = attr.ib(type=TargetType, validator=type_validator()) | target_type = attr.ib(type=TargetType, validator=type_validator()) | ||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if self.target_type != TargetType.ALIAS and self.target is not None: | if self.target_type != TargetType.ALIAS and self.target is not None: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls(target=d["target"], target_type=TargetType(d["target_type"])) | return cls(target=d["target"], target_type=TargetType(d["target_type"])) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Snapshot(BaseModel, HashableObject): | class Snapshot(BaseModel, HashableObject): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
object_type: Final = "snapshot" | |||||
branches = attr.ib( | branches = attr.ib( | ||||
type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() | type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return snapshot_identifier(object_dict) | return snapshot_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
branches={ | branches={ | ||||
name: SnapshotBranch.from_dict(branch) if branch else None | name: SnapshotBranch.from_dict(branch) if branch else None | ||||
for (name, branch) in d.pop("branches").items() | for (name, branch) in d.pop("branches").items() | ||||
}, | }, | ||||
**d, | **d, | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Release(BaseModel, HashableObject): | class Release(BaseModel, HashableObject): | ||||
object_type: Final = "release" | |||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | target_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | ||||
date = attr.ib( | date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | class RevisionType(Enum): | ||||
TAR = "tar" | TAR = "tar" | ||||
DSC = "dsc" | DSC = "dsc" | ||||
SUBVERSION = "svn" | SUBVERSION = "svn" | ||||
MERCURIAL = "hg" | MERCURIAL = "hg" | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Revision(BaseModel, HashableObject): | class Revision(BaseModel, HashableObject): | ||||
object_type: Final = "revision" | |||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
author = attr.ib(type=Person, validator=type_validator()) | author = attr.ib(type=Person, validator=type_validator()) | ||||
committer = attr.ib(type=Person, validator=type_validator()) | committer = attr.ib(type=Person, validator=type_validator()) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | ||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator() | type=Optional[TimestampWithTimezone], validator=type_validator() | ||||
) | ) | ||||
type = attr.ib(type=RevisionType, validator=type_validator()) | type = attr.ib(type=RevisionType, validator=type_validator()) | ||||
Show All 38 Lines | def anonymize(self) -> "Revision": | ||||
""" | """ | ||||
return attr.evolve( | return attr.evolve( | ||||
self, author=self.author.anonymize(), committer=self.committer.anonymize() | self, author=self.author.anonymize(), committer=self.committer.anonymize() | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class DirectoryEntry(BaseModel): | class DirectoryEntry(BaseModel): | ||||
object_type: Final = "directory_entry" | |||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) | type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) | ||||
target = attr.ib(type=Sha1Git, validator=type_validator()) | target = attr.ib(type=Sha1Git, validator=type_validator()) | ||||
perms = attr.ib(type=int, validator=type_validator()) | perms = attr.ib(type=int, validator=type_validator()) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Directory(BaseModel, HashableObject): | class Directory(BaseModel, HashableObject): | ||||
object_type: Final = "directory" | |||||
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return directory_identifier(object_dict) | return directory_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
Show All 40 Lines | class BaseContent(BaseModel): | ||||
def hashes(self) -> Dict[str, bytes]: | def hashes(self) -> Dict[str, bytes]: | ||||
"""Returns a dictionary {hash_name: hash_value}""" | """Returns a dictionary {hash_name: hash_value}""" | ||||
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Content(BaseContent): | class Content(BaseContent): | ||||
object_type: Final = "content" | |||||
sha1 = attr.ib(type=bytes, validator=type_validator()) | sha1 = attr.ib(type=bytes, validator=type_validator()) | ||||
sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) | sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) | ||||
sha256 = attr.ib(type=bytes, validator=type_validator()) | sha256 = attr.ib(type=bytes, validator=type_validator()) | ||||
blake2s256 = attr.ib(type=bytes, validator=type_validator()) | blake2s256 = attr.ib(type=bytes, validator=type_validator()) | ||||
length = attr.ib(type=int, validator=type_validator()) | length = attr.ib(type=int, validator=type_validator()) | ||||
status = attr.ib( | status = attr.ib( | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def with_data(self) -> "Content": | ||||
to lazy-load data (eg. from disk or objstorage).""" | to lazy-load data (eg. from disk or objstorage).""" | ||||
if self.data is None: | if self.data is None: | ||||
raise MissingData("Content data is None.") | raise MissingData("Content data is None.") | ||||
return self | return self | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
object_type: Final = "skipped_content" | |||||
sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) | blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
length = attr.ib(type=Optional[int], validator=type_validator()) | length = attr.ib(type=Optional[int], validator=type_validator()) | ||||
status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) | status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) | ||||
▲ Show 20 Lines • Show All 55 Lines • Show Last 20 Lines |
copyright header ¯\_(ツ)_/¯