Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
""" | """ | ||||
Implementation of Software Heritage's data model | Implementation of Software Heritage's data model | ||||
See :ref:`data-model` for an overview of the data model. | See :ref:`data-model` for an overview of the data model. | ||||
Show All 17 Lines | |||||
from attr.validators import and_ | from attr.validators import and_ | ||||
from attrs_strict import AttributeTypeError | from attrs_strict import AttributeTypeError | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from typing_extensions import Final | from typing_extensions import Final | ||||
from . import git_objects | from . import git_objects | ||||
from .collections import ImmutableDict | from .collections import ImmutableDict | ||||
from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_hex | from .hashutil import ( | ||||
ALGORITHMS, | |||||
DEFAULT_ALGORITHMS, | |||||
MultiHash, | |||||
hash_to_bytehex, | |||||
hash_to_hex, | |||||
) | |||||
from .swhids import CoreSWHID | from .swhids import CoreSWHID | ||||
from .swhids import ExtendedObjectType as SwhidExtendedObjectType | from .swhids import ExtendedObjectType as SwhidExtendedObjectType | ||||
from .swhids import ExtendedSWHID | from .swhids import ExtendedSWHID | ||||
from .swhids import ObjectType as SwhidObjectType | from .swhids import ObjectType as SwhidObjectType | ||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
▲ Show 20 Lines • Show All 1,242 Lines • ▼ Show 20 Lines | |||||
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class BaseContent(BaseModel): | class BaseContent(BaseModel): | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) | type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) | ||||
) | ) | ||||
@staticmethod | @staticmethod | ||||
def _hash_data(data: bytes): | def _hash_data(data: bytes, hash_names=DEFAULT_ALGORITHMS): | ||||
"""Hash some data, returning most of the fields of a content object""" | """Hash some data, returning most of the fields of a content object""" | ||||
d = MultiHash.from_data(data).digest() | d = MultiHash.from_data(data, hash_names).digest() | ||||
d["data"] = data | d["data"] = data | ||||
d["length"] = len(data) | d["length"] = len(data) | ||||
return d | return d | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d, use_subclass=True): | def from_dict(cls, d, use_subclass=True): | ||||
if use_subclass: | if use_subclass: | ||||
# Chooses a subclass to instantiate instead. | # Chooses a subclass to instantiate instead. | ||||
if d["status"] == "absent": | if d["status"] == "absent": | ||||
return SkippedContent.from_dict(d) | return SkippedContent.from_dict(d) | ||||
else: | else: | ||||
return Content.from_dict(d) | return Content.from_dict(d) | ||||
else: | else: | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def get_hash(self, hash_name): | def get_hash(self, hash_name): | ||||
if hash_name not in DEFAULT_ALGORITHMS: | if hash_name not in ALGORITHMS: | ||||
raise ValueError("{} is not a valid hash name.".format(hash_name)) | raise ValueError("{} is not a valid hash name.".format(hash_name)) | ||||
return getattr(self, hash_name) | value = getattr(self, hash_name) | ||||
if value is None: | |||||
vlorentz: Please, that error message means nothing out of context. | |||||
raise ValueError("Content objects do not store {} hashes".format(hash_name)) | |||||
return value | |||||
def hashes(self) -> Dict[str, bytes]: | def hashes(self) -> Dict[str, bytes]: | ||||
"""Returns a dictionary {hash_name: hash_value}""" | """Returns a dictionary {hash_name: hash_value}""" | ||||
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | ||||
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Content(BaseContent): | class Content(BaseContent): | ||||
object_type: Final = "content" | object_type: Final = "content" | ||||
sha1 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | sha1 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | ||||
sha1_git = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr) | sha1_git = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr) | ||||
sha256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | sha256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | ||||
blake2s256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | blake2s256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | ||||
length = attr.ib(type=int) | length = attr.ib(type=int) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(["visible", "hidden"]), | validator=attr.validators.in_(["visible", "hidden"]), | ||||
default="visible", | default="visible", | ||||
) | ) | ||||
sha384 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
sha512 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
blake2s256 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
blake2b512 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
md5 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
data = attr.ib(type=Optional[bytes], validator=generic_type_validator, default=None) | data = attr.ib(type=Optional[bytes], validator=generic_type_validator, default=None) | ||||
ctime = attr.ib( | ctime = attr.ib( | ||||
type=Optional[datetime.datetime], | type=Optional[datetime.datetime], | ||||
default=None, | default=None, | ||||
eq=False, | eq=False, | ||||
) | ) | ||||
Show All 18 Lines | def to_dict(self): | ||||
content = super().to_dict() | content = super().to_dict() | ||||
if content["data"] is None: | if content["data"] is None: | ||||
del content["data"] | del content["data"] | ||||
if content["ctime"] is None: | if content["ctime"] is None: | ||||
del content["ctime"] | del content["ctime"] | ||||
return content | return content | ||||
@classmethod | @classmethod | ||||
def from_data(cls, data, status="visible", ctime=None) -> "Content": | def from_data( | ||||
cls, data, status="visible", ctime=None, hash_names=DEFAULT_ALGORITHMS | |||||
) -> "Content": | |||||
"""Generate a Content from a given `data` byte string. | """Generate a Content from a given `data` byte string. | ||||
This populates the Content with the hashes and length for the data | This populates the Content with the hashes and length for the data | ||||
passed as argument, as well as the data itself. | passed as argument, as well as the data itself. | ||||
""" | """ | ||||
d = cls._hash_data(data) | d = cls._hash_data(data, hash_names) | ||||
d["status"] = status | d["status"] = status | ||||
d["ctime"] = ctime | d["ctime"] = ctime | ||||
return cls(**d) | return cls(**d) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
if isinstance(d.get("ctime"), str): | if isinstance(d.get("ctime"), str): | ||||
d = d.copy() | d = d.copy() | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | class SkippedContent(BaseContent): | ||||
origin = attr.ib(type=Optional[str], validator=generic_type_validator, default=None) | origin = attr.ib(type=Optional[str], validator=generic_type_validator, default=None) | ||||
ctime = attr.ib( | ctime = attr.ib( | ||||
type=Optional[datetime.datetime], | type=Optional[datetime.datetime], | ||||
validator=generic_type_validator, | validator=generic_type_validator, | ||||
default=None, | default=None, | ||||
eq=False, | eq=False, | ||||
) | ) | ||||
sha384 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
sha512 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
blake2s256 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
blake2b512 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
md5 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=generic_type_validator, | |||||
repr=hash_repr, | |||||
default=None, | |||||
) | |||||
@reason.validator | @reason.validator | ||||
def check_reason(self, attribute, value): | def check_reason(self, attribute, value): | ||||
"""Checks the reason is full if status != absent.""" | """Checks the reason is full if status != absent.""" | ||||
assert self.reason == value | assert self.reason == value | ||||
if value is None: | if value is None: | ||||
raise ValueError("Must provide a reason if content is absent.") | raise ValueError("Must provide a reason if content is absent.") | ||||
elif value.__class__ is not str: | elif value.__class__ is not str: | ||||
▲ Show 20 Lines • Show All 435 Lines • Show Last 20 Lines |
Please, that error message means nothing out of context.