Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 17 Lines | |||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
import collections | import collections | ||||
import datetime | import datetime | ||||
from enum import Enum | from enum import Enum | ||||
import hashlib | import hashlib | ||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union | from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union | ||||
import attr | import attr | ||||
from attr._make import _AndValidator | |||||
from attr.validators import and_ | |||||
from attrs_strict import AttributeTypeError | from attrs_strict import AttributeTypeError | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from typing_extensions import Final | from typing_extensions import Final | ||||
from . import git_objects | from . import git_objects | ||||
from .collections import ImmutableDict | from .collections import ImmutableDict | ||||
from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_hex | from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_hex | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def dictify(value): | ||||
elif isinstance(value, (dict, ImmutableDict)): | elif isinstance(value, (dict, ImmutableDict)): | ||||
return {k: dictify(v) for k, v in value.items()} | return {k: dictify(v) for k, v in value.items()} | ||||
elif isinstance(value, tuple): | elif isinstance(value, tuple): | ||||
return tuple(dictify(v) for v in value) | return tuple(dictify(v) for v in value) | ||||
else: | else: | ||||
return value | return value | ||||
def _check_type(type_, value): | def generic_type_validator(instance, attribute, value): | ||||
if type_ is object or type_ is Any: | """validates the type of an attribute value whatever the attribute type""" | ||||
return True | raise NotImplementedError("generic type check should have been optimized") | ||||
if type_ is None: | |||||
return value is None | |||||
origin = getattr(type_, "__origin__", None) | def _true_validator(instance, attribute, value, expected_type=None, origin_value=None): | ||||
pass | |||||
# Non-generic type, check it directly | |||||
if origin is None: | def _none_validator(instance, attribute, value, expected_type=None, origin_value=None): | ||||
if value is not None: | |||||
if origin_value is None: | |||||
origin_value = value | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
def _origin_type_validator( | |||||
instance, attribute, value, expected_type=None, origin_value=None | |||||
): | |||||
# This is functionally equivalent to using just this: | # This is functionally equivalent to using just this: | ||||
# return isinstance(value, type) | # return isinstance(value, type) | ||||
# but using type equality before isinstance allows very quick checks | # but using type equality before isinstance allows very quick checks | ||||
# when the exact class is used (which is the overwhelming majority of cases) | # when the exact class is used (which is the overwhelming majority of cases) | ||||
# while still allowing subclasses to be used. | # while still allowing subclasses to be used. | ||||
return type(value) == type_ or isinstance(value, type_) | if expected_type is None: | ||||
expected_type = attribute.type | |||||
if not (type(value) == expected_type or isinstance(value, expected_type)): | |||||
if origin_value is None: | |||||
origin_value = value | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
def _tuple_infinite_validator( | |||||
instance, | |||||
attribute, | |||||
value, | |||||
expected_type=None, | |||||
origin_value=None, | |||||
): | |||||
type_ = type(value) | |||||
if origin_value is None: | |||||
origin_value = value | |||||
if type_ != tuple and not isinstance(value, tuple): | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
if expected_type is None: | |||||
expected_type = attribute.type | |||||
args = expected_type.__args__ | |||||
# assert len(args) == 2 and args[1] is Ellipsis | |||||
expected_value_type = args[0] | |||||
validator = optimized_validator(expected_value_type) | |||||
for i in value: | |||||
validator( | |||||
instance, | |||||
attribute, | |||||
i, | |||||
expected_type=expected_value_type, | |||||
origin_value=origin_value, | |||||
) | |||||
def _tuple_bytes_bytes_validator( | |||||
instance, | |||||
attribute, | |||||
value, | |||||
expected_type=None, | |||||
origin_value=None, | |||||
): | |||||
type_ = type(value) | |||||
if type_ != tuple and not isinstance(value, tuple): | |||||
if origin_value is None: | |||||
origin_value = value | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
if len(value) != 2: | |||||
if origin_value is None: | |||||
origin_value = value | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
if type(value[0]) is not bytes or type(value[1]) is not bytes: | |||||
if origin_value is None: | |||||
origin_value = value | |||||
vlorentz: could you add tests for this? it seems uncovered | |||||
Done Inline ActionsGood point, fixing it in the next update. marmoute: Good point, fixing it in the next update. | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
def _tuple_finite_validator( | |||||
instance, | |||||
attribute, | |||||
value, | |||||
expected_type=None, | |||||
origin_value=None, | |||||
): | |||||
# might be useful to optimise the sub-validator tuple, in practice, we only | |||||
# have [bytes, bytes] | |||||
type_ = type(value) | |||||
if origin_value is None: | |||||
origin_value = value | |||||
if type_ != tuple and not isinstance(value, tuple): | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
if expected_type is None: | |||||
expected_type = attribute.type | |||||
args = expected_type.__args__ | |||||
# assert len(args) != 2 or args[1] is Ellipsis | |||||
if len(args) != len(value): | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
for item_type, item in zip(args, value): | |||||
validator = optimized_validator(item_type) | |||||
validator( | |||||
instance, | |||||
attribute, | |||||
item, | |||||
expected_type=item_type, | |||||
origin_value=origin_value, | |||||
) | |||||
def _immutable_dict_validator( | |||||
instance, | |||||
attribute, | |||||
value, | |||||
expected_type=None, | |||||
origin_value=None, | |||||
): | |||||
value_type = type(value) | |||||
if origin_value is None: | |||||
origin_value = value | |||||
if value_type != ImmutableDict and not isinstance(value, ImmutableDict): | |||||
raise AttributeTypeError(origin_value, attribute) | |||||
if expected_type is None: | |||||
expected_type = attribute.type | |||||
(expected_key_type, expected_value_type) = expected_type.__args__ | |||||
key_validator = optimized_validator(expected_key_type) | |||||
value_validator = optimized_validator(expected_value_type) | |||||
for (item_key, item_value) in value.items(): | |||||
key_validator( | |||||
instance, | |||||
attribute, | |||||
item_key, | |||||
expected_type=expected_key_type, | |||||
origin_value=origin_value, | |||||
) | |||||
value_validator( | |||||
instance, | |||||
attribute, | |||||
item_value, | |||||
expected_type=expected_value_type, | |||||
origin_value=origin_value, | |||||
) | |||||
def optimized_validator(type_): | |||||
if type_ is object or type_ is Any: | |||||
return _true_validator | |||||
if type_ is None: | |||||
return _none_validator | |||||
origin = getattr(type_, "__origin__", None) | |||||
# Check the type of the value itself | # Non-generic type, check it directly | ||||
# | if origin is None: | ||||
# For the same reason as above, this condition is functionally equivalent to: | return _origin_type_validator | ||||
# if origin is not Union and not isinstance(value, origin): | |||||
if origin is not Union and type(value) != origin and not isinstance(value, origin): | |||||
return False | |||||
# Then, if it's a container, check its items. | # Then, if it's a container, check its items. | ||||
if origin is tuple: | if origin is tuple: | ||||
args = type_.__args__ | args = type_.__args__ | ||||
if len(args) == 2 and args[1] is Ellipsis: | if len(args) == 2 and args[1] is Ellipsis: | ||||
# Infinite tuple | # Infinite tuple | ||||
return all(_check_type(args[0], item) for item in value) | return _tuple_infinite_validator | ||||
elif args == (bytes, bytes): | |||||
return _tuple_bytes_bytes_validator | |||||
else: | else: | ||||
# Finite tuple | return _tuple_finite_validator | ||||
if len(args) != len(value): | |||||
return False | |||||
return all( | |||||
_check_type(item_type, item) for (item_type, item) in zip(args, value) | |||||
) | |||||
elif origin is Union: | elif origin is Union: | ||||
args = type_.__args__ | args = type_.__args__ | ||||
return any(_check_type(variant, value) for variant in args) | all_validators = tuple((optimized_validator(t), t) for t in args) | ||||
elif origin is ImmutableDict: | |||||
(key_type, value_type) = type_.__args__ | def union_validator( | ||||
return all( | instance, | ||||
_check_type(key_type, key) and _check_type(value_type, value) | attribute, | ||||
for (key, value) in value.items() | value, | ||||
expected_type=None, | |||||
origin_value=None, | |||||
): | |||||
if origin_value is None: | |||||
origin_value = value | |||||
for (validator, type_) in all_validators: | |||||
try: | |||||
validator( | |||||
instance, | |||||
attribute, | |||||
value, | |||||
expected_type=type_, | |||||
origin_value=origin_value, | |||||
) | ) | ||||
except AttributeTypeError: | |||||
pass | |||||
else: | |||||
break | |||||
else: | else: | ||||
raise AttributeTypeError(origin_value, attribute) | |||||
return union_validator | |||||
elif origin is ImmutableDict: | |||||
return _immutable_dict_validator | |||||
# No need to check dict or list. because they are converted to ImmutableDict | # No need to check dict or list. because they are converted to ImmutableDict | ||||
# and tuple respectively. | # and tuple respectively. | ||||
raise NotImplementedError(f"Type-checking {type_}") | raise NotImplementedError(f"Type-checking {type_}") | ||||
def type_validator(): | def optimize_all_validators(cls, old_fields): | ||||
"""Like attrs_strict.type_validator(), but stricter. | """process validators to turn them into a faster version … eventually""" | ||||
new_fields = [] | |||||
It is an attrs validator, which checks attributes have the specified type, | for f in old_fields: | ||||
using type equality instead of ``isinstance()``, for improved performance | validator = f.validator | ||||
""" | if validator is generic_type_validator: | ||||
validator = optimized_validator(f.type) | |||||
def validator(instance, attribute, value): | elif isinstance(validator, _AndValidator): | ||||
if not _check_type(attribute.type, value): | new_and = [] | ||||
raise AttributeTypeError(value, attribute) | for v in validator._validators: | ||||
if v is generic_type_validator: | |||||
v = optimized_validator(f.type) | |||||
new_and.append(v) | |||||
validator = and_(*new_and) | |||||
else: | |||||
validator = None | |||||
return validator | if validator is not None: | ||||
f = f.evolve(validator=validator) | |||||
new_fields.append(f) | |||||
return new_fields | |||||
ModelType = TypeVar("ModelType", bound="BaseModel") | ModelType = TypeVar("ModelType", bound="BaseModel") | ||||
class BaseModel: | class BaseModel: | ||||
"""Base class for SWH model classes. | """Base class for SWH model classes. | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | def check(self) -> None: | ||||
self.raw_manifest is not None | self.raw_manifest is not None | ||||
and self.id == self._compute_hash_from_attributes() | and self.id == self._compute_hash_from_attributes() | ||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"{self} has a non-none raw_manifest attribute, but does not need it." | f"{self} has a non-none raw_manifest attribute, but does not need it." | ||||
) | ) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | """Represents the author/committer of a revision or release.""" | ||||
object_type: Final = "person" | object_type: Final = "person" | ||||
fullname = attr.ib(type=bytes, validator=type_validator()) | fullname = attr.ib(type=bytes, validator=generic_type_validator) | ||||
name = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) | name = attr.ib(type=Optional[bytes], validator=generic_type_validator, eq=False) | ||||
email = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) | email = attr.ib(type=Optional[bytes], validator=generic_type_validator, eq=False) | ||||
@classmethod | @classmethod | ||||
def from_fullname(cls, fullname: bytes): | def from_fullname(cls, fullname: bytes): | ||||
"""Returns a Person object, by guessing the name and email from the | """Returns a Person object, by guessing the name and email from the | ||||
fullname, in the `name <email>` format. | fullname, in the `name <email>` format. | ||||
The fullname is left unchanged.""" | The fullname is left unchanged.""" | ||||
if fullname is None: | if fullname is None: | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def from_dict(cls, d): | ||||
parts.append(b"".join([b"<", d["email"], b">"])) | parts.append(b"".join([b"<", d["email"], b">"])) | ||||
fullname = b" ".join(parts) | fullname = b" ".join(parts) | ||||
d = {**d, "fullname": fullname} | d = {**d, "fullname": fullname} | ||||
d = {"name": None, "email": None, **d} | d = {"name": None, "email": None, **d} | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Timestamp(BaseModel): | class Timestamp(BaseModel): | ||||
"""Represents a naive timestamp from a VCS.""" | """Represents a naive timestamp from a VCS.""" | ||||
object_type: Final = "timestamp" | object_type: Final = "timestamp" | ||||
seconds = attr.ib(type=int, validator=type_validator()) | seconds = attr.ib(type=int) | ||||
microseconds = attr.ib(type=int, validator=type_validator()) | microseconds = attr.ib(type=int) | ||||
@seconds.validator | @seconds.validator | ||||
def check_seconds(self, attribute, value): | def check_seconds(self, attribute, value): | ||||
"""Check that seconds fit in a 64-bits signed integer.""" | """Check that seconds fit in a 64-bits signed integer.""" | ||||
if value.__class__ is not int: | |||||
raise AttributeTypeError(value, attribute) | |||||
if not (-(2**63) <= value < 2**63): | if not (-(2**63) <= value < 2**63): | ||||
raise ValueError("Seconds must be a signed 64-bits integer.") | raise ValueError("Seconds must be a signed 64-bits integer.") | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if value.__class__ is not int: | |||||
raise AttributeTypeError(value, attribute) | |||||
if not (0 <= value < 10**6): | if not (0 <= value < 10**6): | ||||
raise ValueError("Microseconds must be in [0, 1000000[.") | raise ValueError("Microseconds must be in [0, 1000000[.") | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class TimestampWithTimezone(BaseModel): | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | """Represents a TZ-aware timestamp from a VCS.""" | ||||
object_type: Final = "timestamp_with_timezone" | object_type: Final = "timestamp_with_timezone" | ||||
timestamp = attr.ib(type=Timestamp, validator=type_validator()) | timestamp = attr.ib(type=Timestamp, validator=generic_type_validator) | ||||
offset_bytes = attr.ib(type=bytes, validator=type_validator()) | offset_bytes = attr.ib(type=bytes, validator=generic_type_validator) | ||||
"""Raw git representation of the timezone, as an offset from UTC. | """Raw git representation of the timezone, as an offset from UTC. | ||||
It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and | It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and | ||||
``-0000``). | ``-0000``). | ||||
However, when created from git objects, it must be the exact bytes used in the | However, when created from git objects, it must be the exact bytes used in the | ||||
original objects, so it may differ from this format when they do. | original objects, so it may differ from this format when they do. | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 172 Lines • ▼ Show 20 Lines | def offset_minutes(self): | ||||
>>> TimestampWithTimezone( | >>> TimestampWithTimezone( | ||||
... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530" | ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530" | ||||
... ).offset_minutes() | ... ).offset_minutes() | ||||
330 | 330 | ||||
""" | """ | ||||
return self._parse_offset_bytes(self.offset_bytes) | return self._parse_offset_bytes(self.offset_bytes) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Origin(HashableObject, BaseModel): | class Origin(HashableObject, BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
object_type: Final = "origin" | object_type: Final = "origin" | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=generic_type_validator) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=generic_type_validator, default=b"") | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"url": self.url} | return {"url": self.url} | ||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
return _compute_hash_from_manifest(self.url.encode("utf-8")) | return _compute_hash_from_manifest(self.url.encode("utf-8")) | ||||
def swhid(self) -> ExtendedSWHID: | def swhid(self) -> ExtendedSWHID: | ||||
"""Returns a SWHID representing this origin.""" | """Returns a SWHID representing this origin.""" | ||||
return ExtendedSWHID( | return ExtendedSWHID( | ||||
object_type=SwhidExtendedObjectType.ORIGIN, | object_type=SwhidExtendedObjectType.ORIGIN, | ||||
object_id=self.id, | object_id=self.id, | ||||
) | ) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents an origin visit with a given type at a given point in time, by a | """Represents an origin visit with a given type at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
object_type: Final = "origin_visit" | object_type: Final = "origin_visit" | ||||
origin = attr.ib(type=str, validator=type_validator()) | origin = attr.ib(type=str, validator=generic_type_validator) | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime) | ||||
type = attr.ib(type=str, validator=type_validator()) | type = attr.ib(type=str, validator=generic_type_validator) | ||||
"""Should not be set before calling 'origin_visit_add()'.""" | """Should not be set before calling 'origin_visit_add()'.""" | ||||
visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) | visit = attr.ib(type=Optional[int], validator=generic_type_validator, default=None) | ||||
@date.validator | @date.validator | ||||
def check_date(self, attribute, value): | def check_date(self, attribute, value): | ||||
"""Checks the date has a timezone.""" | """Checks the date has a timezone.""" | ||||
if value.__class__ is not datetime.datetime: | |||||
raise AttributeTypeError(value, attribute) | |||||
if value is not None and value.tzinfo is None: | if value is not None and value.tzinfo is None: | ||||
raise ValueError("date must be a timezone-aware datetime.") | raise ValueError("date must be a timezone-aware datetime.") | ||||
def to_dict(self): | def to_dict(self): | ||||
"""Serializes the date as a string and omits the visit id if it is | """Serializes the date as a string and omits the visit id if it is | ||||
`None`.""" | `None`.""" | ||||
ov = super().to_dict() | ov = super().to_dict() | ||||
if ov["visit"] is None: | if ov["visit"] is None: | ||||
del ov["visit"] | del ov["visit"] | ||||
return ov | return ov | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"origin": self.origin, "date": str(self.date)} | return {"origin": self.origin, "date": str(self.date)} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class OriginVisitStatus(BaseModel): | class OriginVisitStatus(BaseModel): | ||||
"""Represents a visit update of an origin at a given point in time.""" | """Represents a visit update of an origin at a given point in time.""" | ||||
object_type: Final = "origin_visit_status" | object_type: Final = "origin_visit_status" | ||||
origin = attr.ib(type=str, validator=type_validator()) | origin = attr.ib(type=str, validator=generic_type_validator) | ||||
visit = attr.ib(type=int, validator=type_validator()) | visit = attr.ib(type=int, validator=generic_type_validator) | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_( | validator=attr.validators.in_( | ||||
["created", "ongoing", "full", "partial", "not_found", "failed"] | ["created", "ongoing", "full", "partial", "not_found", "failed"] | ||||
), | ), | ||||
) | ) | ||||
snapshot = attr.ib( | snapshot = attr.ib( | ||||
type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr | type=Optional[Sha1Git], validator=generic_type_validator, repr=hash_repr | ||||
) | ) | ||||
# Type is optional be to able to use it before adding it to the database model | # Type is optional be to able to use it before adding it to the database model | ||||
type = attr.ib(type=Optional[str], validator=type_validator(), default=None) | type = attr.ib(type=Optional[str], validator=generic_type_validator, default=None) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
@date.validator | @date.validator | ||||
def check_date(self, attribute, value): | def check_date(self, attribute, value): | ||||
"""Checks the date has a timezone.""" | """Checks the date has a timezone.""" | ||||
if value.__class__ is not datetime.datetime: | |||||
raise AttributeTypeError(value, attribute) | |||||
if value is not None and value.tzinfo is None: | if value is not None and value.tzinfo is None: | ||||
raise ValueError("date must be a timezone-aware datetime.") | raise ValueError("date must be a timezone-aware datetime.") | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} | return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} | ||||
class TargetType(Enum): | class TargetType(Enum): | ||||
Show All 19 Lines | class ObjectType(Enum): | ||||
REVISION = "revision" | REVISION = "revision" | ||||
RELEASE = "release" | RELEASE = "release" | ||||
SNAPSHOT = "snapshot" | SNAPSHOT = "snapshot" | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"ObjectType.{self.name}" | return f"ObjectType.{self.name}" | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class SnapshotBranch(BaseModel): | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | """Represents one of the branches of a snapshot.""" | ||||
object_type: Final = "snapshot_branch" | object_type: Final = "snapshot_branch" | ||||
target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | target = attr.ib(type=bytes, repr=hash_repr) | ||||
target_type = attr.ib(type=TargetType, validator=type_validator()) | target_type = attr.ib(type=TargetType, validator=generic_type_validator) | ||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if value.__class__ is not bytes: | |||||
raise AttributeTypeError(value, attribute) | |||||
if self.target_type != TargetType.ALIAS and self.target is not None: | if self.target_type != TargetType.ALIAS and self.target is not None: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | raise ValueError("Wrong length for bytes identifier: %d" % len(value)) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls(target=d["target"], target_type=TargetType(d["target_type"])) | return cls(target=d["target"], target_type=TargetType(d["target_type"])) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Snapshot(HashableObject, BaseModel): | class Snapshot(HashableObject, BaseModel): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
object_type: Final = "snapshot" | object_type: Final = "snapshot" | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=ImmutableDict[bytes, Optional[SnapshotBranch]], | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | id = attr.ib( | ||||
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr | |||||
) | |||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
return _compute_hash_from_manifest( | return _compute_hash_from_manifest( | ||||
git_objects.snapshot_git_object(self, ignore_unresolved=True) | git_objects.snapshot_git_object(self, ignore_unresolved=True) | ||||
) | ) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
branches=ImmutableDict( | branches=ImmutableDict( | ||||
(name, SnapshotBranch.from_dict(branch) if branch else None) | (name, SnapshotBranch.from_dict(branch) if branch else None) | ||||
for (name, branch) in d.pop("branches").items() | for (name, branch) in d.pop("branches").items() | ||||
), | ), | ||||
**d, | **d, | ||||
) | ) | ||||
def swhid(self) -> CoreSWHID: | def swhid(self) -> CoreSWHID: | ||||
"""Returns a SWHID representing this object.""" | """Returns a SWHID representing this object.""" | ||||
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) | return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Release(HashableObjectWithManifest, BaseModel): | class Release(HashableObjectWithManifest, BaseModel): | ||||
object_type: Final = "release" | object_type: Final = "release" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=generic_type_validator) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=generic_type_validator) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) | target = attr.ib( | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | type=Optional[Sha1Git], validator=generic_type_validator, repr=hash_repr | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | ) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | target_type = attr.ib(type=ObjectType, validator=generic_type_validator) | ||||
synthetic = attr.ib(type=bool, validator=generic_type_validator) | |||||
author = attr.ib( | |||||
type=Optional[Person], validator=generic_type_validator, default=None | |||||
) | |||||
date = attr.ib( | date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | type=Optional[TimestampWithTimezone], | ||||
validator=generic_type_validator, | |||||
default=None, | |||||
) | ) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | id = attr.ib( | ||||
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr | |||||
) | |||||
raw_manifest = attr.ib(type=Optional[bytes], default=None) | raw_manifest = attr.ib(type=Optional[bytes], default=None) | ||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
return _compute_hash_from_manifest(git_objects.release_git_object(self)) | return _compute_hash_from_manifest(git_objects.release_git_object(self)) | ||||
@author.validator | @author.validator | ||||
def check_author(self, attribute, value): | def check_author(self, attribute, value): | ||||
"""If the author is `None`, checks the date is `None` too.""" | """If the author is `None`, checks the date is `None` too.""" | ||||
Show All 40 Lines | class RevisionType(Enum): | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"RevisionType.{self.name}" | return f"RevisionType.{self.name}" | ||||
def tuplify_extra_headers(value: Iterable): | def tuplify_extra_headers(value: Iterable): | ||||
return tuple((k, v) for k, v in value) | return tuple((k, v) for k, v in value) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Revision(HashableObjectWithManifest, BaseModel): | class Revision(HashableObjectWithManifest, BaseModel): | ||||
object_type: Final = "revision" | object_type: Final = "revision" | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=generic_type_validator) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator()) | author = attr.ib(type=Optional[Person], validator=generic_type_validator) | ||||
committer = attr.ib(type=Optional[Person], validator=type_validator()) | committer = attr.ib(type=Optional[Person], validator=generic_type_validator) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=generic_type_validator | |||||
) | |||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator() | type=Optional[TimestampWithTimezone], validator=generic_type_validator | ||||
) | ) | ||||
type = attr.ib(type=RevisionType, validator=type_validator()) | type = attr.ib(type=RevisionType, validator=generic_type_validator) | ||||
directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | directory = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=generic_type_validator) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, object]], | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
default=None, | default=None, | ||||
) | ) | ||||
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | parents = attr.ib( | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | type=Tuple[Sha1Git, ...], validator=generic_type_validator, default=() | ||||
) | |||||
id = attr.ib( | |||||
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr | |||||
) | |||||
extra_headers = attr.ib( | extra_headers = attr.ib( | ||||
type=Tuple[Tuple[bytes, bytes], ...], | type=Tuple[Tuple[bytes, bytes], ...], | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=tuplify_extra_headers, | converter=tuplify_extra_headers, | ||||
default=(), | default=(), | ||||
) | ) | ||||
raw_manifest = attr.ib(type=Optional[bytes], default=None) | raw_manifest = attr.ib(type=Optional[bytes], default=None) | ||||
def __attrs_post_init__(self): | def __attrs_post_init__(self): | ||||
super().__attrs_post_init__() | super().__attrs_post_init__() | ||||
# ensure metadata is a deep copy of whatever was given, and if needed | # ensure metadata is a deep copy of whatever was given, and if needed | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | def anonymize(self) -> "Revision": | ||||
author=None if self.author is None else self.author.anonymize(), | author=None if self.author is None else self.author.anonymize(), | ||||
committer=None if self.committer is None else self.committer.anonymize(), | committer=None if self.committer is None else self.committer.anonymize(), | ||||
) | ) | ||||
_DIR_ENTRY_TYPES = ["file", "dir", "rev"] | _DIR_ENTRY_TYPES = ["file", "dir", "rev"] | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class DirectoryEntry(BaseModel): | class DirectoryEntry(BaseModel): | ||||
object_type: Final = "directory_entry" | object_type: Final = "directory_entry" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes) | ||||
type = attr.ib(type=str, validator=attr.validators.in_(_DIR_ENTRY_TYPES)) | type = attr.ib(type=str, validator=attr.validators.in_(_DIR_ENTRY_TYPES)) | ||||
target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | target = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr) | ||||
perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) | perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@name.validator | @name.validator | ||||
def check_name(self, attribute, value): | def check_name(self, attribute, value): | ||||
if value.__class__ is not bytes: | |||||
raise AttributeTypeError(value, attribute) | |||||
if b"/" in value: | if b"/" in value: | ||||
raise ValueError(f"{value!r} is not a valid directory entry name.") | raise ValueError(f"{value!r} is not a valid directory entry name.") | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Directory(HashableObjectWithManifest, BaseModel): | class Directory(HashableObjectWithManifest, BaseModel): | ||||
object_type: Final = "directory" | object_type: Final = "directory" | ||||
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) | entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=generic_type_validator) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | id = attr.ib( | ||||
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr | |||||
) | |||||
raw_manifest = attr.ib(type=Optional[bytes], default=None) | raw_manifest = attr.ib(type=Optional[bytes], default=None) | ||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
return _compute_hash_from_manifest(git_objects.directory_git_object(self)) | return _compute_hash_from_manifest(git_objects.directory_git_object(self)) | ||||
@entries.validator | @entries.validator | ||||
def check_entries(self, attribute, value): | def check_entries(self, attribute, value): | ||||
seen = set() | seen = set() | ||||
▲ Show 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | ) -> Tuple[bool, "Directory"]: | ||||
# Finally, return the "fixed" the directory | # Finally, return the "fixed" the directory | ||||
dir_ = Directory( | dir_ = Directory( | ||||
entries=tuple(deduplicated_entries), id=id, raw_manifest=raw_manifest | entries=tuple(deduplicated_entries), id=id, raw_manifest=raw_manifest | ||||
) | ) | ||||
return (True, dir_) | return (True, dir_) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class BaseContent(BaseModel): | class BaseContent(BaseModel): | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) | type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) | ||||
) | ) | ||||
@staticmethod | @staticmethod | ||||
def _hash_data(data: bytes): | def _hash_data(data: bytes): | ||||
"""Hash some data, returning most of the fields of a content object""" | """Hash some data, returning most of the fields of a content object""" | ||||
Show All 19 Lines | def get_hash(self, hash_name): | ||||
raise ValueError("{} is not a valid hash name.".format(hash_name)) | raise ValueError("{} is not a valid hash name.".format(hash_name)) | ||||
return getattr(self, hash_name) | return getattr(self, hash_name) | ||||
def hashes(self) -> Dict[str, bytes]: | def hashes(self) -> Dict[str, bytes]: | ||||
"""Returns a dictionary {hash_name: hash_value}""" | """Returns a dictionary {hash_name: hash_value}""" | ||||
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class Content(BaseContent): | class Content(BaseContent): | ||||
object_type: Final = "content" | object_type: Final = "content" | ||||
sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | sha1 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | ||||
sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) | sha1_git = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr) | ||||
sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | sha256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | ||||
blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) | blake2s256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr) | ||||
length = attr.ib(type=int, validator=type_validator()) | length = attr.ib(type=int) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(["visible", "hidden"]), | validator=attr.validators.in_(["visible", "hidden"]), | ||||
default="visible", | default="visible", | ||||
) | ) | ||||
data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None) | data = attr.ib(type=Optional[bytes], validator=generic_type_validator, default=None) | ||||
ctime = attr.ib( | ctime = attr.ib( | ||||
type=Optional[datetime.datetime], | type=Optional[datetime.datetime], | ||||
validator=type_validator(), | |||||
default=None, | default=None, | ||||
eq=False, | eq=False, | ||||
) | ) | ||||
@length.validator | @length.validator | ||||
def check_length(self, attribute, value): | def check_length(self, attribute, value): | ||||
"""Checks the length is positive.""" | """Checks the length is positive.""" | ||||
if value.__class__ is not int: | |||||
raise AttributeTypeError(value, attribute) | |||||
if value < 0: | if value < 0: | ||||
raise ValueError("Length must be positive.") | raise ValueError("Length must be positive.") | ||||
@ctime.validator | @ctime.validator | ||||
def check_ctime(self, attribute, value): | def check_ctime(self, attribute, value): | ||||
"""Checks the ctime has a timezone.""" | """Checks the ctime has a timezone.""" | ||||
if value is not None and value.tzinfo is None: | if value is not None: | ||||
if value.__class__ is not datetime.datetime: | |||||
raise AttributeTypeError(value, attribute) | |||||
if value.tzinfo is None: | |||||
raise ValueError("ctime must be a timezone-aware datetime.") | raise ValueError("ctime must be a timezone-aware datetime.") | ||||
def to_dict(self): | def to_dict(self): | ||||
content = super().to_dict() | content = super().to_dict() | ||||
if content["data"] is None: | if content["data"] is None: | ||||
del content["data"] | del content["data"] | ||||
if content["ctime"] is None: | if content["ctime"] is None: | ||||
del content["ctime"] | del content["ctime"] | ||||
return content | return content | ||||
Show All 30 Lines | class Content(BaseContent): | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return self.sha1 # TODO: use a dict of hashes | return self.sha1 # TODO: use a dict of hashes | ||||
def swhid(self) -> CoreSWHID: | def swhid(self) -> CoreSWHID: | ||||
"""Returns a SWHID representing this object.""" | """Returns a SWHID representing this object.""" | ||||
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) | return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
object_type: Final = "skipped_content" | object_type: Final = "skipped_content" | ||||
sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | sha1 = attr.ib( | ||||
type=Optional[bytes], validator=generic_type_validator, repr=hash_repr | |||||
) | |||||
sha1_git = attr.ib( | sha1_git = attr.ib( | ||||
type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr | type=Optional[Sha1Git], validator=generic_type_validator, repr=hash_repr | ||||
) | |||||
sha256 = attr.ib( | |||||
type=Optional[bytes], validator=generic_type_validator, repr=hash_repr | |||||
) | ) | ||||
sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) | |||||
blake2s256 = attr.ib( | blake2s256 = attr.ib( | ||||
type=Optional[bytes], validator=type_validator(), repr=hash_repr | type=Optional[bytes], validator=generic_type_validator, repr=hash_repr | ||||
) | ) | ||||
length = attr.ib(type=Optional[int], validator=type_validator()) | length = attr.ib(type=Optional[int]) | ||||
status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) | status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) | ||||
reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) | reason = attr.ib(type=Optional[str], default=None) | ||||
origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) | origin = attr.ib(type=Optional[str], validator=generic_type_validator, default=None) | ||||
ctime = attr.ib( | ctime = attr.ib( | ||||
type=Optional[datetime.datetime], | type=Optional[datetime.datetime], | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
default=None, | default=None, | ||||
eq=False, | eq=False, | ||||
) | ) | ||||
@reason.validator | @reason.validator | ||||
def check_reason(self, attribute, value): | def check_reason(self, attribute, value): | ||||
"""Checks the reason is full if status != absent.""" | """Checks the reason is full if status != absent.""" | ||||
assert self.reason == value | assert self.reason == value | ||||
if value is None: | if value is None: | ||||
raise ValueError("Must provide a reason if content is absent.") | raise ValueError("Must provide a reason if content is absent.") | ||||
elif value.__class__ is not str: | |||||
raise AttributeTypeError(value, attribute) | |||||
@length.validator | @length.validator | ||||
def check_length(self, attribute, value): | def check_length(self, attribute, value): | ||||
"""Checks the length is positive or -1.""" | """Checks the length is positive or -1.""" | ||||
if value < -1: | if value.__class__ is not int: | ||||
raise AttributeTypeError(value, attribute) | |||||
elif value < -1: | |||||
raise ValueError("Length must be positive or -1.") | raise ValueError("Length must be positive or -1.") | ||||
@ctime.validator | @ctime.validator | ||||
def check_ctime(self, attribute, value): | def check_ctime(self, attribute, value): | ||||
"""Checks the ctime has a timezone.""" | """Checks the ctime has a timezone.""" | ||||
if value is not None and value.tzinfo is None: | if value is not None: | ||||
if value.__class__ is not datetime.datetime: | |||||
raise AttributeTypeError(value, attribute) | |||||
elif value.tzinfo is None: | |||||
raise ValueError("ctime must be a timezone-aware datetime.") | raise ValueError("ctime must be a timezone-aware datetime.") | ||||
def to_dict(self): | def to_dict(self): | ||||
content = super().to_dict() | content = super().to_dict() | ||||
if content["origin"] is None: | if content["origin"] is None: | ||||
del content["origin"] | del content["origin"] | ||||
if content["ctime"] is None: | if content["ctime"] is None: | ||||
del content["ctime"] | del content["ctime"] | ||||
return content | return content | ||||
Show All 32 Lines | class MetadataAuthorityType(Enum): | ||||
DEPOSIT_CLIENT = "deposit_client" | DEPOSIT_CLIENT = "deposit_client" | ||||
FORGE = "forge" | FORGE = "forge" | ||||
REGISTRY = "registry" | REGISTRY = "registry" | ||||
def __repr__(self): | def __repr__(self): | ||||
return f"MetadataAuthorityType.{self.name}" | return f"MetadataAuthorityType.{self.name}" | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class MetadataAuthority(BaseModel): | class MetadataAuthority(BaseModel): | ||||
"""Represents an entity that provides metadata about an origin or | """Represents an entity that provides metadata about an origin or | ||||
software artifact.""" | software artifact.""" | ||||
object_type: Final = "metadata_authority" | object_type: Final = "metadata_authority" | ||||
type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) | type = attr.ib(type=MetadataAuthorityType, validator=generic_type_validator) | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=generic_type_validator) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, Any]], | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | default=None, | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
def to_dict(self): | def to_dict(self): | ||||
d = super().to_dict() | d = super().to_dict() | ||||
if d["metadata"] is None: | if d["metadata"] is None: | ||||
del d["metadata"] | del d["metadata"] | ||||
return d | return d | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = { | d = { | ||||
**d, | **d, | ||||
"type": MetadataAuthorityType(d["type"]), | "type": MetadataAuthorityType(d["type"]), | ||||
} | } | ||||
return super().from_dict(d) | return super().from_dict(d) | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"type": self.type.value, "url": self.url} | return {"type": self.type.value, "url": self.url} | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class MetadataFetcher(BaseModel): | class MetadataFetcher(BaseModel): | ||||
"""Represents a software component used to fetch metadata from a metadata | """Represents a software component used to fetch metadata from a metadata | ||||
authority, and ingest them into the Software Heritage archive.""" | authority, and ingest them into the Software Heritage archive.""" | ||||
object_type: Final = "metadata_fetcher" | object_type: Final = "metadata_fetcher" | ||||
name = attr.ib(type=str, validator=type_validator()) | name = attr.ib(type=str, validator=generic_type_validator) | ||||
version = attr.ib(type=str, validator=type_validator()) | version = attr.ib(type=str, validator=generic_type_validator) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[ImmutableDict[str, Any]], | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | default=None, | ||||
validator=type_validator(), | validator=generic_type_validator, | ||||
converter=freeze_optional_dict, | converter=freeze_optional_dict, | ||||
) | ) | ||||
def to_dict(self): | def to_dict(self): | ||||
d = super().to_dict() | d = super().to_dict() | ||||
if d["metadata"] is None: | if d["metadata"] is None: | ||||
del d["metadata"] | del d["metadata"] | ||||
return d | return d | ||||
def unique_key(self) -> KeyType: | def unique_key(self) -> KeyType: | ||||
return {"name": self.name, "version": self.version} | return {"name": self.name, "version": self.version} | ||||
def normalize_discovery_date(value: Any) -> datetime.datetime: | def normalize_discovery_date(value: Any) -> datetime.datetime: | ||||
if not isinstance(value, datetime.datetime): | if not isinstance(value, datetime.datetime): | ||||
raise TypeError("discovery_date must be a timezone-aware datetime.") | raise TypeError("discovery_date must be a timezone-aware datetime.") | ||||
if value.tzinfo is None: | if value.tzinfo is None: | ||||
raise ValueError("discovery_date must be a timezone-aware datetime.") | raise ValueError("discovery_date must be a timezone-aware datetime.") | ||||
# Normalize timezone to utc, and truncate microseconds to 0 | # Normalize timezone to utc, and truncate microseconds to 0 | ||||
return value.astimezone(datetime.timezone.utc).replace(microsecond=0) | return value.astimezone(datetime.timezone.utc).replace(microsecond=0) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class RawExtrinsicMetadata(HashableObject, BaseModel): | class RawExtrinsicMetadata(HashableObject, BaseModel): | ||||
object_type: Final = "raw_extrinsic_metadata" | object_type: Final = "raw_extrinsic_metadata" | ||||
# target object | # target object | ||||
target = attr.ib(type=ExtendedSWHID, validator=type_validator()) | target = attr.ib(type=ExtendedSWHID, validator=generic_type_validator) | ||||
# source | # source | ||||
discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) | discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) | ||||
authority = attr.ib(type=MetadataAuthority, validator=type_validator()) | authority = attr.ib(type=MetadataAuthority, validator=generic_type_validator) | ||||
fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) | fetcher = attr.ib(type=MetadataFetcher, validator=generic_type_validator) | ||||
# the metadata itself | # the metadata itself | ||||
format = attr.ib(type=str, validator=type_validator()) | format = attr.ib(type=str, validator=generic_type_validator) | ||||
metadata = attr.ib(type=bytes, validator=type_validator()) | metadata = attr.ib(type=bytes, validator=generic_type_validator) | ||||
# context | # context | ||||
origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | origin = attr.ib(type=Optional[str], default=None, validator=generic_type_validator) | ||||
visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) | visit = attr.ib(type=Optional[int], default=None) | ||||
snapshot = attr.ib( | snapshot = attr.ib(type=Optional[CoreSWHID], default=None) | ||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | release = attr.ib(type=Optional[CoreSWHID], default=None) | ||||
) | revision = attr.ib(type=Optional[CoreSWHID], default=None) | ||||
release = attr.ib( | path = attr.ib(type=Optional[bytes], default=None) | ||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | directory = attr.ib(type=Optional[CoreSWHID], default=None) | ||||
) | |||||
revision = attr.ib( | |||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | |||||
) | |||||
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | |||||
directory = attr.ib( | |||||
type=Optional[CoreSWHID], default=None, validator=type_validator() | |||||
) | |||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | id = attr.ib( | ||||
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr | |||||
) | |||||
def _compute_hash_from_attributes(self) -> bytes: | def _compute_hash_from_attributes(self) -> bytes: | ||||
return _compute_hash_from_manifest( | return _compute_hash_from_manifest( | ||||
git_objects.raw_extrinsic_metadata_git_object(self) | git_objects.raw_extrinsic_metadata_git_object(self) | ||||
) | ) | ||||
@origin.validator | @origin.validator | ||||
def check_origin(self, attribute, value): | def check_origin(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in ( | if value.__class__ is not str: | ||||
SwhidExtendedObjectType.SNAPSHOT, | raise AttributeTypeError(value, attribute) | ||||
SwhidExtendedObjectType.RELEASE, | obj_type = self.target.object_type | ||||
SwhidExtendedObjectType.REVISION, | if not ( | ||||
SwhidExtendedObjectType.DIRECTORY, | obj_type is SwhidExtendedObjectType.SNAPSHOT | ||||
SwhidExtendedObjectType.CONTENT, | or obj_type is SwhidExtendedObjectType.RELEASE | ||||
or obj_type is SwhidExtendedObjectType.REVISION | |||||
or obj_type is SwhidExtendedObjectType.DIRECTORY | |||||
or obj_type is SwhidExtendedObjectType.CONTENT | |||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'origin' context for " | f"Unexpected 'origin' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
if value.startswith("swh:"): | if value.startswith("swh:"): | ||||
# Technically this is valid; but: | # Technically this is valid; but: | ||||
# 1. SWHIDs are URIs, not URLs | # 1. SWHIDs are URIs, not URLs | ||||
# 2. if a SWHID gets here, it's very likely to be a mistake | # 2. if a SWHID gets here, it's very likely to be a mistake | ||||
# (and we can remove this check if it turns out there is a | # (and we can remove this check if it turns out there is a | ||||
# legitimate use for it). | # legitimate use for it). | ||||
raise ValueError(f"SWHID used as context origin URL: {value}") | raise ValueError(f"SWHID used as context origin URL: {value}") | ||||
@visit.validator | @visit.validator | ||||
def check_visit(self, attribute, value): | def check_visit(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if value.__class__ is not int: | |||||
raise AttributeTypeError(value, attribute) | |||||
if self.target.object_type not in ( | obj_type = self.target.object_type | ||||
SwhidExtendedObjectType.SNAPSHOT, | if not ( | ||||
SwhidExtendedObjectType.RELEASE, | obj_type is SwhidExtendedObjectType.SNAPSHOT | ||||
SwhidExtendedObjectType.REVISION, | or obj_type is SwhidExtendedObjectType.RELEASE | ||||
SwhidExtendedObjectType.DIRECTORY, | or obj_type is SwhidExtendedObjectType.REVISION | ||||
SwhidExtendedObjectType.CONTENT, | or obj_type is SwhidExtendedObjectType.DIRECTORY | ||||
or obj_type is SwhidExtendedObjectType.CONTENT | |||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'visit' context for " | f"Unexpected 'visit' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
if self.origin is None: | if self.origin is None: | ||||
raise ValueError("'origin' context must be set if 'visit' is.") | raise ValueError("'origin' context must be set if 'visit' is.") | ||||
if value <= 0: | if value <= 0: | ||||
raise ValueError("Nonpositive visit id") | raise ValueError("Nonpositive visit id") | ||||
@snapshot.validator | @snapshot.validator | ||||
def check_snapshot(self, attribute, value): | def check_snapshot(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if value.__class__ is not CoreSWHID: | |||||
raise AttributeTypeError(value, attribute) | |||||
if self.target.object_type not in ( | obj_type = self.target.object_type | ||||
SwhidExtendedObjectType.RELEASE, | if not ( | ||||
SwhidExtendedObjectType.REVISION, | obj_type is SwhidExtendedObjectType.RELEASE | ||||
SwhidExtendedObjectType.DIRECTORY, | or obj_type is SwhidExtendedObjectType.REVISION | ||||
SwhidExtendedObjectType.CONTENT, | or obj_type is SwhidExtendedObjectType.DIRECTORY | ||||
or obj_type is SwhidExtendedObjectType.CONTENT | |||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'snapshot' context for " | f"Unexpected 'snapshot' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid(SwhidObjectType.SNAPSHOT, value) | self._check_swhid(SwhidObjectType.SNAPSHOT, value) | ||||
@release.validator | @release.validator | ||||
def check_release(self, attribute, value): | def check_release(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if value.__class__ is not CoreSWHID: | |||||
raise AttributeTypeError(value, attribute) | |||||
if self.target.object_type not in ( | obj_type = self.target.object_type | ||||
SwhidExtendedObjectType.REVISION, | if not ( | ||||
SwhidExtendedObjectType.DIRECTORY, | obj_type is SwhidExtendedObjectType.REVISION | ||||
SwhidExtendedObjectType.CONTENT, | or obj_type is SwhidExtendedObjectType.DIRECTORY | ||||
or obj_type is SwhidExtendedObjectType.CONTENT | |||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'release' context for " | f"Unexpected 'release' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid(SwhidObjectType.RELEASE, value) | self._check_swhid(SwhidObjectType.RELEASE, value) | ||||
@revision.validator | @revision.validator | ||||
def check_revision(self, attribute, value): | def check_revision(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in ( | if value.__class__ is not CoreSWHID: | ||||
SwhidExtendedObjectType.DIRECTORY, | raise AttributeTypeError(value, attribute) | ||||
SwhidExtendedObjectType.CONTENT, | |||||
obj_type = self.target.object_type | |||||
if not ( | |||||
obj_type is SwhidExtendedObjectType.DIRECTORY | |||||
or obj_type is SwhidExtendedObjectType.CONTENT | |||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'revision' context for " | f"Unexpected 'revision' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid(SwhidObjectType.REVISION, value) | self._check_swhid(SwhidObjectType.REVISION, value) | ||||
@path.validator | @path.validator | ||||
def check_path(self, attribute, value): | def check_path(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in ( | if value.__class__ is not bytes: | ||||
SwhidExtendedObjectType.DIRECTORY, | raise AttributeTypeError(value, attribute) | ||||
SwhidExtendedObjectType.CONTENT, | |||||
obj_type = self.target.object_type | |||||
if not ( | |||||
obj_type is SwhidExtendedObjectType.DIRECTORY | |||||
or obj_type is SwhidExtendedObjectType.CONTENT | |||||
): | ): | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'path' context for " | f"Unexpected 'path' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
@directory.validator | @directory.validator | ||||
def check_directory(self, attribute, value): | def check_directory(self, attribute, value): | ||||
if value is None: | if value is None: | ||||
return | return | ||||
if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,): | if value.__class__ is not CoreSWHID: | ||||
raise AttributeTypeError(value, attribute) | |||||
if self.target.object_type is not SwhidExtendedObjectType.CONTENT: | |||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected 'directory' context for " | f"Unexpected 'directory' context for " | ||||
f"{self.target.object_type.name.lower()} object: {value}" | f"{self.target.object_type.name.lower()} object: {value}" | ||||
) | ) | ||||
self._check_swhid(SwhidObjectType.DIRECTORY, value) | self._check_swhid(SwhidObjectType.DIRECTORY, value) | ||||
def _check_swhid(self, expected_object_type, swhid): | def _check_swhid(self, expected_object_type, swhid): | ||||
if isinstance(swhid, str): | if swhid.__class__ is not CoreSWHID: | ||||
raise ValueError(f"Expected SWHID, got a string: {swhid}") | raise ValueError(f"Expected SWHID, got a {swhid.__class__}: {swhid}") | ||||
if swhid.object_type != expected_object_type: | if swhid.object_type != expected_object_type: | ||||
raise ValueError( | raise ValueError( | ||||
f"Expected SWHID type '{expected_object_type.name.lower()}', " | f"Expected SWHID type '{expected_object_type.name.lower()}', " | ||||
f"got '{swhid.object_type.name.lower()}' in {swhid}" | f"got '{swhid.object_type.name.lower()}' in {swhid}" | ||||
) | ) | ||||
def to_dict(self): | def to_dict(self): | ||||
Show All 38 Lines | class RawExtrinsicMetadata(HashableObject, BaseModel): | ||||
def swhid(self) -> ExtendedSWHID: | def swhid(self) -> ExtendedSWHID: | ||||
"""Returns a SWHID representing this RawExtrinsicMetadata object.""" | """Returns a SWHID representing this RawExtrinsicMetadata object.""" | ||||
return ExtendedSWHID( | return ExtendedSWHID( | ||||
object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, | object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, | ||||
object_id=self.id, | object_id=self.id, | ||||
) | ) | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) | ||||
class ExtID(HashableObject, BaseModel): | class ExtID(HashableObject, BaseModel): | ||||
object_type: Final = "extid" | object_type: Final = "extid" | ||||
extid_type = attr.ib(type=str, validator=type_validator()) | extid_type = attr.ib(type=str, validator=generic_type_validator) | ||||
extid = attr.ib(type=bytes, validator=type_validator()) | extid = attr.ib(type=bytes, validator=generic_type_validator) | ||||
target = attr.ib(type=CoreSWHID, validator=type_validator()) | target = attr.ib(type=CoreSWHID, validator=generic_type_validator) | ||||
extid_version = attr.ib(type=int, validator=type_validator(), default=0) | extid_version = attr.ib(type=int, validator=generic_type_validator, default=0) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) | id = attr.ib( | ||||
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr | |||||
) | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
extid=d["extid"], | extid=d["extid"], | ||||
extid_type=d["extid_type"], | extid_type=d["extid_type"], | ||||
target=CoreSWHID.from_string(d["target"]), | target=CoreSWHID.from_string(d["target"]), | ||||
extid_version=d.get("extid_version", 0), | extid_version=d.get("extid_version", 0), | ||||
Show All 32 Lines |
could you add tests for this? it seems uncovered