diff --git a/swh/model/model.py b/swh/model/model.py index b391719..b981b48 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,1540 +1,1562 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Implementation of Software Heritage's data model See :ref:`data-model` for an overview of the data model. The classes defined in this module are immutable `attrs objects `__ and enums. All classes define a ``from_dict`` class method and a ``to_dict`` method to convert between them and msgpack-serializable objects. """ from abc import ABCMeta, abstractmethod import datetime from enum import Enum import hashlib from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union import attr from attrs_strict import AttributeTypeError import dateutil.parser import iso8601 from typing_extensions import Final from . import git_objects from .collections import ImmutableDict from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex from .swhids import CoreSWHID from .swhids import ExtendedObjectType as SwhidExtendedObjectType from .swhids import ExtendedSWHID from .swhids import ObjectType as SwhidObjectType class MissingData(Exception): """Raised by `Content.with_data` when it has no way of fetching the data (but not when fetching the data fails).""" pass KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] """The type returned by BaseModel.unique_key().""" SHA1_SIZE = 20 _OFFSET_CHARS = frozenset(b"+-0123456789") # TODO: Limit this to 20 bytes Sha1Git = bytes Sha1 = bytes KT = TypeVar("KT") VT = TypeVar("VT") def hash_repr(h: bytes) -> str: if h is None: return "None" else: return f"hash_to_bytes('{hash_to_hex(h)}')" def freeze_optional_dict( d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore ) -> Optional[ImmutableDict[KT, VT]]: if isinstance(d, dict): return ImmutableDict(d) else: return d def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, (CoreSWHID, ExtendedSWHID)): return str(value) elif isinstance(value, Enum): return value.value elif isinstance(value, (dict, ImmutableDict)): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, tuple): return tuple(dictify(v) for v in value) else: return value def _check_type(type_, value): if type_ is object or type_ is Any: return True if type_ is None: return value is None origin = getattr(type_, "__origin__", None) # Non-generic type, check it directly if origin is None: # This is functionally equivalent to using just this: # return isinstance(value, type) # but using type equality before isinstance allows very quick checks # when the exact class is used (which is the overwhelming majority of cases) # while still allowing subclasses to be used. return type(value) == type_ or isinstance(value, type_) # Check the type of the value itself # # For the same reason as above, this condition is functionally equivalent to: # if origin is not Union and not isinstance(value, origin): if origin is not Union and type(value) != origin and not isinstance(value, origin): return False # Then, if it's a container, check its items. if origin is tuple: args = type_.__args__ if len(args) == 2 and args[1] is Ellipsis: # Infinite tuple return all(_check_type(args[0], item) for item in value) else: # Finite tuple if len(args) != len(value): return False return all( _check_type(item_type, item) for (item_type, item) in zip(args, value) ) elif origin is Union: args = type_.__args__ return any(_check_type(variant, value) for variant in args) elif origin is ImmutableDict: (key_type, value_type) = type_.__args__ return all( _check_type(key_type, key) and _check_type(value_type, value) for (key, value) in value.items() ) else: # No need to check dict or list. because they are converted to ImmutableDict # and tuple respectively. raise NotImplementedError(f"Type-checking {type_}") def type_validator(): """Like attrs_strict.type_validator(), but stricter. It is an attrs validator, which checks attributes have the specified type, using type equality instead of ``isinstance()``, for improved performance """ def validator(instance, attribute, value): if not _check_type(attribute.type, value): raise AttributeTypeError(value, attribute) return validator ModelType = TypeVar("ModelType", bound="BaseModel") class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" __slots__ = () def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" return dictify(attr.asdict(self, recurse=False)) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) def anonymize(self: ModelType) -> Optional[ModelType]: """Returns an anonymized version of the object, if needed. If the object model does not need/support anonymization, returns None. """ return None def unique_key(self) -> KeyType: """Returns a unique key for this object, that can be used for deduplication.""" raise NotImplementedError(f"unique_key for {self}") def check(self) -> None: """Performs internal consistency checks, and raises an error if one fails.""" attr.validate(self) def _compute_hash_from_manifest(manifest: bytes) -> Sha1Git: return hashlib.new("sha1", manifest).digest() class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" __slots__ = () id: Sha1Git def compute_hash(self) -> bytes: """Derived model classes must implement this to compute the object hash. This method is called by the object initialization if the `id` attribute is set to an empty value. """ return self._compute_hash_from_attributes() @abstractmethod def _compute_hash_from_attributes(self) -> Sha1Git: raise NotImplementedError(f"_compute_hash_from_attributes for {self}") def __attrs_post_init__(self): if not self.id: obj_id = self.compute_hash() object.__setattr__(self, "id", obj_id) def unique_key(self) -> KeyType: return self.id def check(self) -> None: super().check() # type: ignore if self.id != self.compute_hash(): raise ValueError("'id' does not match recomputed hash.") class HashableObjectWithManifest(HashableObject): """Derived class of HashableObject, for objects that may need to store verbatim git objects as ``raw_manifest`` to preserve original hashes.""" raw_manifest: Optional[bytes] = None """Stores the original content of git objects when they cannot be faithfully represented using only the other attributes. This should only be used as a last resort, and only set in the Git loader, for objects too corrupt to fit the data model.""" def to_dict(self): d = super().to_dict() if d["raw_manifest"] is None: del d["raw_manifest"] return d def compute_hash(self) -> bytes: """Derived model classes must implement this to compute the object hash. This method is called by the object initialization if the `id` attribute is set to an empty value. """ if self.raw_manifest is None: return super().compute_hash() else: return _compute_hash_from_manifest(self.raw_manifest) def check(self) -> None: super().check() if ( self.raw_manifest is not None and self.id == self._compute_hash_from_attributes() ): raise ValueError( f"{self} has a non-none raw_manifest attribute, but does not need it." ) @attr.s(frozen=True, slots=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" object_type: Final = "person" fullname = attr.ib(type=bytes, validator=type_validator()) name = attr.ib(type=Optional[bytes], validator=type_validator()) email = attr.ib(type=Optional[bytes], validator=type_validator()) @classmethod def from_fullname(cls, fullname: bytes): """Returns a Person object, by guessing the name and email from the fullname, in the `name ` format. The fullname is left unchanged.""" if fullname is None: raise TypeError("fullname is None.") name: Optional[bytes] email: Optional[bytes] try: open_bracket = fullname.index(b"<") except ValueError: name = fullname email = None else: raw_name = fullname[:open_bracket] raw_email = fullname[open_bracket + 1 :] if not raw_name: name = None else: name = raw_name.strip() try: close_bracket = raw_email.rindex(b">") except ValueError: email = raw_email else: email = raw_email[:close_bracket] return Person(name=name or None, email=email or None, fullname=fullname,) def anonymize(self) -> "Person": """Returns an anonymized version of the Person object. Anonymization is simply a Person which fullname is the hashed, with unset name or email. """ return Person( fullname=hashlib.sha256(self.fullname).digest(), name=None, email=None, ) @classmethod def from_dict(cls, d): """ If the fullname is missing, construct a fullname using the following heuristics: if the name value is None, we return the email in angle brackets, else, we return the name, a space, and the email in angle brackets. """ if "fullname" not in d: parts = [] if d["name"] is not None: parts.append(d["name"]) if d["email"] is not None: parts.append(b"".join([b"<", d["email"], b">"])) fullname = b" ".join(parts) d = {**d, "fullname": fullname} d = {"name": None, "email": None, **d} return super().from_dict(d) @attr.s(frozen=True, slots=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" object_type: Final = "timestamp" seconds = attr.ib(type=int, validator=type_validator()) microseconds = attr.ib(type=int, validator=type_validator()) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-(2 ** 63) <= value < 2 ** 63): raise ValueError("Seconds must be a signed 64-bits integer.") @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10 ** 6): raise ValueError("Microseconds must be in [0, 1000000[.") @attr.s(frozen=True, slots=True, init=False) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" object_type: Final = "timestamp_with_timezone" timestamp = attr.ib(type=Timestamp, validator=type_validator()) offset = attr.ib(type=int, validator=type_validator()) negative_utc = attr.ib(type=bool, validator=type_validator()) offset_bytes = attr.ib(type=bytes, validator=type_validator()) """Raw git representation of the timezone, as an offset from UTC. It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and ``-0000``). However, when created from git objects, it must be the exact bytes used in the original objects, so it may differ from this format when they do. """ def __init__( self, timestamp: Timestamp, offset: int = None, negative_utc: bool = None, offset_bytes: bytes = None, ): if offset_bytes is None: if offset is None: raise AttributeError("Neither 'offset' nor 'offset_bytes' was passed.") if negative_utc is None: raise AttributeError( "Neither 'negative_utc' nor 'offset_bytes' was passed." ) negative = offset < 0 or negative_utc (hours, minutes) = divmod(abs(offset), 60) offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() else: offset = self._parse_offset_bytes(offset_bytes) negative_utc = offset == 0 and offset_bytes.startswith(b"-") self.__attrs_init__( # type: ignore timestamp=timestamp, offset=offset, negative_utc=negative_utc, offset_bytes=offset_bytes, ) @offset.validator def check_offset(self, attribute, value): """Checks the offset is a 16-bits signed integer (in theory, it should always be between -14 and +14 hours).""" if not (-(2 ** 15) <= value < 2 ** 15): # max 14 hours offset in theory, but you never know what # you'll find in the wild... raise ValueError("offset too large: %d minutes" % value) self._check_offsets_match() @negative_utc.validator def check_negative_utc(self, attribute, value): if self.offset and value: raise ValueError("negative_utc can only be True is offset=0") self._check_offsets_match() @offset_bytes.validator def check_offset_bytes(self, attribute, value): if not set(value) <= _OFFSET_CHARS: raise ValueError(f"invalid characters in offset_bytes: {value!r}") self._check_offsets_match() @staticmethod def _parse_offset_bytes(offset_bytes: bytes) -> int: """Parses an ``offset_bytes`` value (in Git's ``[+-]HHMM`` format), and returns the corresponding numeric values (in number of minutes). Tries to account for some mistakes in the format, to support incorrect Git implementations. >>> TimestampWithTimezone._parse_offset_bytes(b"+0000") 0 >>> TimestampWithTimezone._parse_offset_bytes(b"-0000") 0 >>> TimestampWithTimezone._parse_offset_bytes(b"+0200") 120 >>> TimestampWithTimezone._parse_offset_bytes(b"-0200") -120 >>> TimestampWithTimezone._parse_offset_bytes(b"+200") 120 >>> TimestampWithTimezone._parse_offset_bytes(b"-200") -120 >>> TimestampWithTimezone._parse_offset_bytes(b"+02") 120 >>> TimestampWithTimezone._parse_offset_bytes(b"-02") -120 >>> TimestampWithTimezone._parse_offset_bytes(b"+0010") 10 >>> TimestampWithTimezone._parse_offset_bytes(b"-0010") -10 >>> TimestampWithTimezone._parse_offset_bytes(b"+200000000000000000") 0 >>> TimestampWithTimezone._parse_offset_bytes(b"+0160") # 60 minutes... 0 """ offset_str = offset_bytes.decode() assert offset_str[0] in "+-" sign = int(offset_str[0] + "1") if len(offset_str) <= 3: hours = int(offset_str[1:]) minutes = 0 else: hours = int(offset_str[1:-2]) minutes = int(offset_str[-2:]) offset = sign * (hours * 60 + minutes) if (0 <= minutes <= 59) and (-(2 ** 15) <= offset < 2 ** 15): return offset else: # can't parse it to a reasonable value; give up and pretend it's UTC. return 0 def _check_offsets_match(self): offset = self._parse_offset_bytes(self.offset_bytes) if offset != self.offset: raise ValueError( f"offset_bytes ({self.offset_bytes!r}) does not match offset " f"{divmod(self.offset, 60)}" ) if offset == 0 and self.negative_utc != self.offset_bytes.startswith(b"-"): raise ValueError( f"offset_bytes ({self.offset_bytes!r}) does not match negative_utc " f"({self.negative_utc})" ) @classmethod def from_numeric_offset( cls, timestamp: Timestamp, offset: int, negative_utc: bool ) -> "TimestampWithTimezone": """Returns a :class:`TimestampWithTimezone` instance from the old dictionary format (with ``offset`` and ``negative_utc`` instead of ``offset_bytes``). """ negative = offset < 0 or negative_utc (hours, minutes) = divmod(abs(offset), 60) offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() tstz = TimestampWithTimezone( timestamp=timestamp, offset_bytes=offset_bytes, offset=offset, negative_utc=negative_utc, ) assert tstz.offset == offset, (tstz.offset, offset) return tstz @classmethod def from_dict( cls, time_representation: Union[Dict, datetime.datetime, int] ) -> "TimestampWithTimezone": """Builds a TimestampWithTimezone from any of the formats accepted by :func:`swh.model.normalize_timestamp`.""" # TODO: this accept way more types than just dicts; find a better # name if isinstance(time_representation, dict): ts = time_representation["timestamp"] if isinstance(ts, dict): seconds = ts.get("seconds", 0) microseconds = ts.get("microseconds", 0) elif isinstance(ts, int): seconds = ts microseconds = 0 else: raise ValueError( f"TimestampWithTimezone.from_dict received non-integer timestamp " f"member {ts!r}" ) timestamp = Timestamp(seconds=seconds, microseconds=microseconds) if "offset_bytes" in time_representation: return TimestampWithTimezone( timestamp=timestamp, offset_bytes=time_representation["offset_bytes"], ) else: # old format offset = time_representation["offset"] negative_utc = time_representation.get("negative_utc") or False return cls.from_numeric_offset(timestamp, offset, negative_utc) elif isinstance(time_representation, datetime.datetime): # TODO: warn when using from_dict() on a datetime utcoffset = time_representation.utcoffset() time_representation = time_representation.astimezone(datetime.timezone.utc) microseconds = time_representation.microsecond if microseconds: time_representation = time_representation.replace(microsecond=0) seconds = int(time_representation.timestamp()) if utcoffset is None: raise ValueError( f"TimestampWithTimezone.from_dict received datetime without " f"timezone: {time_representation}" ) # utcoffset is an integer number of minutes seconds_offset = utcoffset.total_seconds() offset = int(seconds_offset) // 60 # TODO: warn if remainder is not zero return cls.from_numeric_offset( Timestamp(seconds=seconds, microseconds=microseconds), offset, False ) elif isinstance(time_representation, int): # TODO: warn when using from_dict() on an int seconds = time_representation timestamp = Timestamp(seconds=time_representation, microseconds=0) return TimestampWithTimezone(timestamp=timestamp, offset_bytes=b"+0000") else: raise ValueError( f"TimestampWithTimezone.from_dict received non-integer timestamp: " f"{time_representation!r}" ) @classmethod def from_datetime(cls, dt: datetime.datetime) -> "TimestampWithTimezone": return cls.from_dict(dt) def to_datetime(self) -> datetime.datetime: """Convert to a datetime (with a timezone set to the recorded fixed UTC offset) Beware that this conversion can be lossy: ``-0000`` and 'weird' offsets cannot be represented. Also note that it may fail due to type overflow. """ timestamp = datetime.datetime.fromtimestamp( self.timestamp.seconds, datetime.timezone(datetime.timedelta(minutes=self.offset)), ) timestamp = timestamp.replace(microsecond=self.timestamp.microseconds) return timestamp @classmethod def from_iso8601(cls, s): """Builds a TimestampWithTimezone from an ISO8601-formatted string. """ dt = iso8601.parse_date(s) tstz = cls.from_datetime(dt) if dt.tzname() == "-00:00": assert tstz.offset_bytes == b"+0000" tstz = attr.evolve(tstz, offset_bytes=b"-0000", negative_utc=True) return tstz + def offset_minutes(self): + """Returns the offset, as a number of minutes since UTC. + + >>> TimestampWithTimezone( + ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0000" + ... ).offset_minutes() + 0 + >>> TimestampWithTimezone( + ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0200" + ... ).offset_minutes() + 120 + >>> TimestampWithTimezone( + ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"-0200" + ... ).offset_minutes() + -120 + >>> TimestampWithTimezone( + ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530" + ... ).offset_minutes() + 330 + """ + return self.offset + @attr.s(frozen=True, slots=True) class Origin(HashableObject, BaseModel): """Represents a software source: a VCS and an URL.""" object_type: Final = "origin" url = attr.ib(type=str, validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") def unique_key(self) -> KeyType: return {"url": self.url} def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(self.url.encode("utf-8")) def swhid(self) -> ExtendedSWHID: """Returns a SWHID representing this origin.""" return ExtendedSWHID( object_type=SwhidExtendedObjectType.ORIGIN, object_id=self.id, ) @attr.s(frozen=True, slots=True) class OriginVisit(BaseModel): """Represents an origin visit with a given type at a given point in time, by a SWH loader.""" object_type: Final = "origin_visit" origin = attr.ib(type=str, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) type = attr.ib(type=str, validator=type_validator()) """Should not be set before calling 'origin_visit_add()'.""" visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) @date.validator def check_date(self, attribute, value): """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov["visit"] is None: del ov["visit"] return ov def unique_key(self) -> KeyType: return {"origin": self.origin, "date": str(self.date)} @attr.s(frozen=True, slots=True) class OriginVisitStatus(BaseModel): """Represents a visit update of an origin at a given point in time. """ object_type: Final = "origin_visit_status" origin = attr.ib(type=str, validator=type_validator()) visit = attr.ib(type=int, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_( ["created", "ongoing", "full", "partial", "not_found", "failed"] ), ) snapshot = attr.ib( type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr ) # Type is optional be to able to use it before adding it to the database model type = attr.ib(type=Optional[str], validator=type_validator(), default=None) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) @date.validator def check_date(self, attribute, value): """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") def unique_key(self) -> KeyType: return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" ALIAS = "alias" def __repr__(self): return f"TargetType.{self.name}" class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" def __repr__(self): return f"ObjectType.{self.name}" @attr.s(frozen=True, slots=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" object_type: Final = "snapshot_branch" target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=TargetType, validator=type_validator()) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError("Wrong length for bytes identifier: %d" % len(value)) @classmethod def from_dict(cls, d): return cls(target=d["target"], target_type=TargetType(d["target_type"])) @attr.s(frozen=True, slots=True) class Snapshot(HashableObject, BaseModel): """Represents the full state of an origin at a given point in time.""" object_type: Final = "snapshot" branches = attr.ib( type=ImmutableDict[bytes, Optional[SnapshotBranch]], validator=type_validator(), converter=freeze_optional_dict, ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.snapshot_git_object(self)) @classmethod def from_dict(cls, d): d = d.copy() return cls( branches=ImmutableDict( (name, SnapshotBranch.from_dict(branch) if branch else None) for (name, branch) in d.pop("branches").items() ), **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) @attr.s(frozen=True, slots=True) class Release(HashableObjectWithManifest, BaseModel): object_type: Final = "release" name = attr.ib(type=bytes, validator=type_validator()) message = attr.ib(type=Optional[bytes], validator=type_validator()) target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=ObjectType, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator(), default=None ) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) raw_manifest = attr.ib(type=Optional[bytes], default=None) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.release_git_object(self)) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError("release date must be None if author is None.") def to_dict(self): rel = super().to_dict() if rel["metadata"] is None: del rel["metadata"] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get("author"): d["author"] = Person.from_dict(d["author"]) if d.get("date"): d["date"] = TimestampWithTimezone.from_dict(d["date"]) return cls(target_type=ObjectType(d.pop("target_type")), **d) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id) def anonymize(self) -> "Release": """Returns an anonymized version of the Release object. Anonymization consists in replacing the author with an anonymized Person object. """ author = self.author and self.author.anonymize() return attr.evolve(self, author=author) class RevisionType(Enum): GIT = "git" TAR = "tar" DSC = "dsc" SUBVERSION = "svn" MERCURIAL = "hg" CVS = "cvs" BAZAAR = "bzr" def __repr__(self): return f"RevisionType.{self.name}" def tuplify_extra_headers(value: Iterable): return tuple((k, v) for k, v in value) @attr.s(frozen=True, slots=True) class Revision(HashableObjectWithManifest, BaseModel): object_type: Final = "revision" message = attr.ib(type=Optional[bytes], validator=type_validator()) author = attr.ib(type=Person, validator=type_validator()) committer = attr.ib(type=Person, validator=type_validator()) date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) committer_date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator() ) type = attr.ib(type=RevisionType, validator=type_validator()) directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) extra_headers = attr.ib( type=Tuple[Tuple[bytes, bytes], ...], validator=type_validator(), converter=tuplify_extra_headers, default=(), ) raw_manifest = attr.ib(type=Optional[bytes], default=None) def __attrs_post_init__(self): super().__attrs_post_init__() # ensure metadata is a deep copy of whatever was given, and if needed # extract extra_headers from there if self.metadata: metadata = self.metadata if not self.extra_headers and "extra_headers" in metadata: (extra_headers, metadata) = metadata.copy_pop("extra_headers") object.__setattr__( self, "extra_headers", tuplify_extra_headers(extra_headers), ) attr.validate(self) object.__setattr__(self, "metadata", metadata) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.revision_git_object(self)) @classmethod def from_dict(cls, d): d = d.copy() date = d.pop("date") if date: date = TimestampWithTimezone.from_dict(date) committer_date = d.pop("committer_date") if committer_date: committer_date = TimestampWithTimezone.from_dict(committer_date) return cls( author=Person.from_dict(d.pop("author")), committer=Person.from_dict(d.pop("committer")), date=date, committer_date=committer_date, type=RevisionType(d.pop("type")), parents=tuple(d.pop("parents")), # for BW compat **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id) def anonymize(self) -> "Revision": """Returns an anonymized version of the Revision object. Anonymization consists in replacing the author and committer with an anonymized Person object. """ return attr.evolve( self, author=self.author.anonymize(), committer=self.committer.anonymize() ) @attr.s(frozen=True, slots=True) class DirectoryEntry(BaseModel): object_type: Final = "directory_entry" name = attr.ib(type=bytes, validator=type_validator()) type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @name.validator def check_name(self, attribute, value): if b"/" in value: raise ValueError(f"{value!r} is not a valid directory entry name.") @attr.s(frozen=True, slots=True) class Directory(HashableObjectWithManifest, BaseModel): object_type: Final = "directory" entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) raw_manifest = attr.ib(type=Optional[bytes], default=None) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.directory_git_object(self)) @entries.validator def check_entries(self, attribute, value): seen = set() for entry in value: if entry.name in seen: raise ValueError( "{self.swhid()} has duplicated entry name: {entry.name!r}" ) seen.add(entry.name) @classmethod def from_dict(cls, d): d = d.copy() return cls( entries=tuple( DirectoryEntry.from_dict(entry) for entry in d.pop("entries") ), **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=self.id) @attr.s(frozen=True, slots=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) ) @staticmethod def _hash_data(data: bytes): """Hash some data, returning most of the fields of a content object""" d = MultiHash.from_data(data).digest() d["data"] = data d["length"] = len(data) return d @classmethod def from_dict(cls, d, use_subclass=True): if use_subclass: # Chooses a subclass to instantiate instead. if d["status"] == "absent": return SkippedContent.from_dict(d) else: return Content.from_dict(d) else: return super().from_dict(d) def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError("{} is not a valid hash name.".format(hash_name)) return getattr(self, hash_name) def hashes(self) -> Dict[str, bytes]: """Returns a dictionary {hash_name: hash_value}""" return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} @attr.s(frozen=True, slots=True) class Content(BaseContent): object_type: Final = "content" sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) length = attr.ib(type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden"]), default="visible", ) data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError("Length must be positive.") @ctime.validator def check_ctime(self, attribute, value): """Checks the ctime has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("ctime must be a timezone-aware datetime.") def to_dict(self): content = super().to_dict() if content["data"] is None: del content["data"] if content["ctime"] is None: del content["ctime"] return content @classmethod def from_data(cls, data, status="visible", ctime=None) -> "Content": """Generate a Content from a given `data` byte string. This populates the Content with the hashes and length for the data passed as argument, as well as the data itself. """ d = cls._hash_data(data) d["status"] = status d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): if isinstance(d.get("ctime"), str): d = d.copy() d["ctime"] = dateutil.parser.parse(d["ctime"]) return super().from_dict(d, use_subclass=False) def with_data(self) -> "Content": """Loads the `data` attribute; meaning that it is guaranteed not to be None after this call. This call is almost a no-op, but subclasses may overload this method to lazy-load data (eg. from disk or objstorage).""" if self.data is None: raise MissingData("Content data is None.") return self def unique_key(self) -> KeyType: return self.sha1 # TODO: use a dict of hashes def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) @attr.s(frozen=True, slots=True) class SkippedContent(BaseContent): object_type: Final = "skipped_content" sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) sha1_git = attr.ib( type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr ) sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) blake2s256 = attr.ib( type=Optional[bytes], validator=type_validator(), repr=hash_repr ) length = attr.ib(type=Optional[int], validator=type_validator()) status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if value is None: raise ValueError("Must provide a reason if content is absent.") @length.validator def check_length(self, attribute, value): """Checks the length is positive or -1.""" if value < -1: raise ValueError("Length must be positive or -1.") @ctime.validator def check_ctime(self, attribute, value): """Checks the ctime has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("ctime must be a timezone-aware datetime.") def to_dict(self): content = super().to_dict() if content["origin"] is None: del content["origin"] if content["ctime"] is None: del content["ctime"] return content @classmethod def from_data( cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None ) -> "SkippedContent": """Generate a SkippedContent from a given `data` byte string. This populates the SkippedContent with the hashes and length for the data passed as argument. You can use `attr.evolve` on such a generated content to nullify some of its attributes, e.g. for tests. """ d = cls._hash_data(data) del d["data"] d["status"] = "absent" d["reason"] = reason d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): d2 = d.copy() if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) def unique_key(self) -> KeyType: return self.hashes() class MetadataAuthorityType(Enum): DEPOSIT_CLIENT = "deposit_client" FORGE = "forge" REGISTRY = "registry" def __repr__(self): return f"MetadataAuthorityType.{self.name}" @attr.s(frozen=True, slots=True) class MetadataAuthority(BaseModel): """Represents an entity that provides metadata about an origin or software artifact.""" object_type: Final = "metadata_authority" type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) url = attr.ib(type=str, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, Any]], default=None, validator=type_validator(), converter=freeze_optional_dict, ) def to_dict(self): d = super().to_dict() if d["metadata"] is None: del d["metadata"] return d @classmethod def from_dict(cls, d): d = { **d, "type": MetadataAuthorityType(d["type"]), } return super().from_dict(d) def unique_key(self) -> KeyType: return {"type": self.type.value, "url": self.url} @attr.s(frozen=True, slots=True) class MetadataFetcher(BaseModel): """Represents a software component used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive.""" object_type: Final = "metadata_fetcher" name = attr.ib(type=str, validator=type_validator()) version = attr.ib(type=str, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, Any]], default=None, validator=type_validator(), converter=freeze_optional_dict, ) def to_dict(self): d = super().to_dict() if d["metadata"] is None: del d["metadata"] return d def unique_key(self) -> KeyType: return {"name": self.name, "version": self.version} def normalize_discovery_date(value: Any) -> datetime.datetime: if not isinstance(value, datetime.datetime): raise TypeError("discovery_date must be a timezone-aware datetime.") if value.tzinfo is None: raise ValueError("discovery_date must be a timezone-aware datetime.") # Normalize timezone to utc, and truncate microseconds to 0 return value.astimezone(datetime.timezone.utc).replace(microsecond=0) @attr.s(frozen=True, slots=True) class RawExtrinsicMetadata(HashableObject, BaseModel): object_type: Final = "raw_extrinsic_metadata" # target object target = attr.ib(type=ExtendedSWHID, validator=type_validator()) # source discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) authority = attr.ib(type=MetadataAuthority, validator=type_validator()) fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) # the metadata itself format = attr.ib(type=str, validator=type_validator()) metadata = attr.ib(type=bytes, validator=type_validator()) # context origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) snapshot = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) release = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) revision = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) directory = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest( git_objects.raw_extrinsic_metadata_git_object(self) ) @origin.validator def check_origin(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.SNAPSHOT, SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'origin' context for " f"{self.target.object_type.name.lower()} object: {value}" ) if value.startswith("swh:"): # Technically this is valid; but: # 1. SWHIDs are URIs, not URLs # 2. if a SWHID gets here, it's very likely to be a mistake # (and we can remove this check if it turns out there is a # legitimate use for it). raise ValueError(f"SWHID used as context origin URL: {value}") @visit.validator def check_visit(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.SNAPSHOT, SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'visit' context for " f"{self.target.object_type.name.lower()} object: {value}" ) if self.origin is None: raise ValueError("'origin' context must be set if 'visit' is.") if value <= 0: raise ValueError("Nonpositive visit id") @snapshot.validator def check_snapshot(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'snapshot' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.SNAPSHOT, value) @release.validator def check_release(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'release' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.RELEASE, value) @revision.validator def check_revision(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'revision' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.REVISION, value) @path.validator def check_path(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'path' context for " f"{self.target.object_type.name.lower()} object: {value}" ) @directory.validator def check_directory(self, attribute, value): if value is None: return if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,): raise ValueError( f"Unexpected 'directory' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.DIRECTORY, value) def _check_swhid(self, expected_object_type, swhid): if isinstance(swhid, str): raise ValueError(f"Expected SWHID, got a string: {swhid}") if swhid.object_type != expected_object_type: raise ValueError( f"Expected SWHID type '{expected_object_type.name.lower()}', " f"got '{swhid.object_type.name.lower()}' in {swhid}" ) def to_dict(self): d = super().to_dict() context_keys = ( "origin", "visit", "snapshot", "release", "revision", "directory", "path", ) for context_key in context_keys: if d[context_key] is None: del d[context_key] return d @classmethod def from_dict(cls, d): d = { **d, "target": ExtendedSWHID.from_string(d["target"]), "authority": MetadataAuthority.from_dict(d["authority"]), "fetcher": MetadataFetcher.from_dict(d["fetcher"]), } swhid_keys = ("snapshot", "release", "revision", "directory") for swhid_key in swhid_keys: if d.get(swhid_key): d[swhid_key] = CoreSWHID.from_string(d[swhid_key]) return super().from_dict(d) def swhid(self) -> ExtendedSWHID: """Returns a SWHID representing this RawExtrinsicMetadata object.""" return ExtendedSWHID( object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, object_id=self.id, ) @attr.s(frozen=True, slots=True) class ExtID(HashableObject, BaseModel): object_type: Final = "extid" extid_type = attr.ib(type=str, validator=type_validator()) extid = attr.ib(type=bytes, validator=type_validator()) target = attr.ib(type=CoreSWHID, validator=type_validator()) extid_version = attr.ib(type=int, validator=type_validator(), default=0) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) @classmethod def from_dict(cls, d): return cls( extid=d["extid"], extid_type=d["extid_type"], target=CoreSWHID.from_string(d["target"]), extid_version=d.get("extid_version", 0), ) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.extid_git_object(self))