diff --git a/swh/model/model.py b/swh/model/model.py index 8910db8..b77bfa6 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,1638 +1,1644 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Implementation of Software Heritage's data model See :ref:`data-model` for an overview of the data model. The classes defined in this module are immutable `attrs objects `__ and enums. All classes define a ``from_dict`` class method and a ``to_dict`` method to convert between them and msgpack-serializable objects. """ from abc import ABCMeta, abstractmethod import collections import datetime from enum import Enum import hashlib from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union import attr from attrs_strict import AttributeTypeError import dateutil.parser import iso8601 from typing_extensions import Final from . import git_objects from .collections import ImmutableDict from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_hex from .swhids import CoreSWHID from .swhids import ExtendedObjectType as SwhidExtendedObjectType from .swhids import ExtendedSWHID from .swhids import ObjectType as SwhidObjectType class MissingData(Exception): """Raised by `Content.with_data` when it has no way of fetching the data (but not when fetching the data fails).""" pass KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] """The type returned by BaseModel.unique_key().""" SHA1_SIZE = 20 _OFFSET_CHARS = frozenset(b"+-0123456789") # TODO: Limit this to 20 bytes Sha1Git = bytes Sha1 = bytes KT = TypeVar("KT") VT = TypeVar("VT") def hash_repr(h: bytes) -> str: if h is None: return "None" else: return f"hash_to_bytes('{hash_to_hex(h)}')" def freeze_optional_dict( d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore ) -> Optional[ImmutableDict[KT, VT]]: if isinstance(d, dict): return ImmutableDict(d) else: return d def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, (CoreSWHID, ExtendedSWHID)): return str(value) elif isinstance(value, Enum): return value.value elif isinstance(value, (dict, ImmutableDict)): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, tuple): return tuple(dictify(v) for v in value) else: return value def _check_type(type_, value): if type_ is object or type_ is Any: return True if type_ is None: return value is None origin = getattr(type_, "__origin__", None) # Non-generic type, check it directly if origin is None: # This is functionally equivalent to using just this: # return isinstance(value, type) # but using type equality before isinstance allows very quick checks # when the exact class is used (which is the overwhelming majority of cases) # while still allowing subclasses to be used. return type(value) == type_ or isinstance(value, type_) # Check the type of the value itself # # For the same reason as above, this condition is functionally equivalent to: # if origin is not Union and not isinstance(value, origin): if origin is not Union and type(value) != origin and not isinstance(value, origin): return False # Then, if it's a container, check its items. if origin is tuple: args = type_.__args__ if len(args) == 2 and args[1] is Ellipsis: # Infinite tuple return all(_check_type(args[0], item) for item in value) else: # Finite tuple if len(args) != len(value): return False return all( _check_type(item_type, item) for (item_type, item) in zip(args, value) ) elif origin is Union: args = type_.__args__ return any(_check_type(variant, value) for variant in args) elif origin is ImmutableDict: (key_type, value_type) = type_.__args__ return all( _check_type(key_type, key) and _check_type(value_type, value) for (key, value) in value.items() ) else: # No need to check dict or list. because they are converted to ImmutableDict # and tuple respectively. raise NotImplementedError(f"Type-checking {type_}") def type_validator(): """Like attrs_strict.type_validator(), but stricter. It is an attrs validator, which checks attributes have the specified type, using type equality instead of ``isinstance()``, for improved performance """ def validator(instance, attribute, value): if not _check_type(attribute.type, value): raise AttributeTypeError(value, attribute) return validator ModelType = TypeVar("ModelType", bound="BaseModel") class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" __slots__ = () def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" return dictify(attr.asdict(self, recurse=False)) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) def anonymize(self: ModelType) -> Optional[ModelType]: """Returns an anonymized version of the object, if needed. If the object model does not need/support anonymization, returns None. """ return None def unique_key(self) -> KeyType: """Returns a unique key for this object, that can be used for deduplication.""" raise NotImplementedError(f"unique_key for {self}") def check(self) -> None: """Performs internal consistency checks, and raises an error if one fails.""" # without the type-ignore comment below, attr >= 22.1.0 causes mypy to report: # Argument 1 has incompatible type "BaseModel"; expected "AttrsInstance" attr.validate(self) # type: ignore[arg-type] def _compute_hash_from_manifest(manifest: bytes) -> Sha1Git: return hashlib.new("sha1", manifest).digest() class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" __slots__ = () id: Sha1Git def compute_hash(self) -> bytes: """Derived model classes must implement this to compute the object hash. This method is called by the object initialization if the `id` attribute is set to an empty value. """ return self._compute_hash_from_attributes() @abstractmethod def _compute_hash_from_attributes(self) -> Sha1Git: raise NotImplementedError(f"_compute_hash_from_attributes for {self}") def __attrs_post_init__(self): if not self.id: obj_id = self.compute_hash() object.__setattr__(self, "id", obj_id) def unique_key(self) -> KeyType: return self.id def check(self) -> None: super().check() # type: ignore if self.id != self.compute_hash(): raise ValueError("'id' does not match recomputed hash.") class HashableObjectWithManifest(HashableObject): """Derived class of HashableObject, for objects that may need to store verbatim git objects as ``raw_manifest`` to preserve original hashes.""" __slots__ = () raw_manifest: Optional[bytes] = None """Stores the original content of git objects when they cannot be faithfully represented using only the other attributes. This should only be used as a last resort, and only set in the Git loader, for objects too corrupt to fit the data model.""" def to_dict(self): d = super().to_dict() if d["raw_manifest"] is None: del d["raw_manifest"] return d def compute_hash(self) -> bytes: """Derived model classes must implement this to compute the object hash. This method is called by the object initialization if the `id` attribute is set to an empty value. """ if self.raw_manifest is None: return super().compute_hash() # calls self._compute_hash_from_attributes() else: return _compute_hash_from_manifest(self.raw_manifest) def check(self) -> None: super().check() if ( self.raw_manifest is not None and self.id == self._compute_hash_from_attributes() ): raise ValueError( f"{self} has a non-none raw_manifest attribute, but does not need it." ) @attr.s(frozen=True, slots=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" object_type: Final = "person" fullname = attr.ib(type=bytes, validator=type_validator()) name = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) email = attr.ib(type=Optional[bytes], validator=type_validator(), eq=False) @classmethod def from_fullname(cls, fullname: bytes): """Returns a Person object, by guessing the name and email from the fullname, in the `name ` format. The fullname is left unchanged.""" if fullname is None: raise TypeError("fullname is None.") name: Optional[bytes] email: Optional[bytes] try: open_bracket = fullname.index(b"<") except ValueError: name = fullname email = None else: raw_name = fullname[:open_bracket] raw_email = fullname[open_bracket + 1 :] if not raw_name: name = None else: name = raw_name.strip() try: close_bracket = raw_email.rindex(b">") except ValueError: email = raw_email else: email = raw_email[:close_bracket] return Person( name=name or None, email=email or None, fullname=fullname, ) def anonymize(self) -> "Person": """Returns an anonymized version of the Person object. Anonymization is simply a Person which fullname is the hashed, with unset name or email. """ return Person( fullname=hashlib.sha256(self.fullname).digest(), name=None, email=None, ) @classmethod def from_dict(cls, d): """ If the fullname is missing, construct a fullname using the following heuristics: if the name value is None, we return the email in angle brackets, else, we return the name, a space, and the email in angle brackets. """ if "fullname" not in d: parts = [] if d["name"] is not None: parts.append(d["name"]) if d["email"] is not None: parts.append(b"".join([b"<", d["email"], b">"])) fullname = b" ".join(parts) d = {**d, "fullname": fullname} d = {"name": None, "email": None, **d} return super().from_dict(d) @attr.s(frozen=True, slots=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" object_type: Final = "timestamp" seconds = attr.ib(type=int, validator=type_validator()) microseconds = attr.ib(type=int, validator=type_validator()) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-(2**63) <= value < 2**63): raise ValueError("Seconds must be a signed 64-bits integer.") @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10**6): raise ValueError("Microseconds must be in [0, 1000000[.") @attr.s(frozen=True, slots=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" object_type: Final = "timestamp_with_timezone" timestamp = attr.ib(type=Timestamp, validator=type_validator()) offset_bytes = attr.ib(type=bytes, validator=type_validator()) """Raw git representation of the timezone, as an offset from UTC. It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and ``-0000``). However, when created from git objects, it must be the exact bytes used in the original objects, so it may differ from this format when they do. """ @classmethod def from_numeric_offset( cls, timestamp: Timestamp, offset: int, negative_utc: bool ) -> "TimestampWithTimezone": """Returns a :class:`TimestampWithTimezone` instance from the old dictionary format (with ``offset`` and ``negative_utc`` instead of ``offset_bytes``). """ negative = offset < 0 or negative_utc (hours, minutes) = divmod(abs(offset), 60) offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() tstz = TimestampWithTimezone(timestamp=timestamp, offset_bytes=offset_bytes) assert tstz.offset_minutes() == offset, (tstz.offset_minutes(), offset) return tstz @classmethod def from_dict( cls, time_representation: Union[Dict, datetime.datetime, int] ) -> "TimestampWithTimezone": """Builds a TimestampWithTimezone from any of the formats accepted by :func:`swh.model.normalize_timestamp`.""" # TODO: this accept way more types than just dicts; find a better # name if isinstance(time_representation, dict): ts = time_representation["timestamp"] if isinstance(ts, dict): seconds = ts.get("seconds", 0) microseconds = ts.get("microseconds", 0) elif isinstance(ts, int): seconds = ts microseconds = 0 else: raise ValueError( f"TimestampWithTimezone.from_dict received non-integer timestamp " f"member {ts!r}" ) timestamp = Timestamp(seconds=seconds, microseconds=microseconds) if "offset_bytes" in time_representation: return cls( timestamp=timestamp, offset_bytes=time_representation["offset_bytes"], ) else: # old format offset = time_representation["offset"] negative_utc = time_representation.get("negative_utc") or False return cls.from_numeric_offset(timestamp, offset, negative_utc) elif isinstance(time_representation, datetime.datetime): # TODO: warn when using from_dict() on a datetime utcoffset = time_representation.utcoffset() time_representation = time_representation.astimezone(datetime.timezone.utc) microseconds = time_representation.microsecond if microseconds: time_representation = time_representation.replace(microsecond=0) seconds = int(time_representation.timestamp()) if utcoffset is None: raise ValueError( f"TimestampWithTimezone.from_dict received datetime without " f"timezone: {time_representation}" ) # utcoffset is an integer number of minutes seconds_offset = utcoffset.total_seconds() offset = int(seconds_offset) // 60 # TODO: warn if remainder is not zero return cls.from_numeric_offset( Timestamp(seconds=seconds, microseconds=microseconds), offset, False ) elif isinstance(time_representation, int): # TODO: warn when using from_dict() on an int seconds = time_representation timestamp = Timestamp(seconds=time_representation, microseconds=0) return cls(timestamp=timestamp, offset_bytes=b"+0000") else: raise ValueError( f"TimestampWithTimezone.from_dict received non-integer timestamp: " f"{time_representation!r}" ) @classmethod def from_datetime(cls, dt: datetime.datetime) -> "TimestampWithTimezone": return cls.from_dict(dt) def to_datetime(self) -> datetime.datetime: """Convert to a datetime (with a timezone set to the recorded fixed UTC offset) Beware that this conversion can be lossy: ``-0000`` and 'weird' offsets cannot be represented. Also note that it may fail due to type overflow. """ timestamp = datetime.datetime.fromtimestamp( self.timestamp.seconds, datetime.timezone(datetime.timedelta(minutes=self.offset_minutes())), ) timestamp = timestamp.replace(microsecond=self.timestamp.microseconds) return timestamp @classmethod def from_iso8601(cls, s): """Builds a TimestampWithTimezone from an ISO8601-formatted string.""" dt = iso8601.parse_date(s) tstz = cls.from_datetime(dt) if dt.tzname() == "-00:00": assert tstz.offset_bytes == b"+0000" tstz = attr.evolve(tstz, offset_bytes=b"-0000") return tstz @staticmethod def _parse_offset_bytes(offset_bytes: bytes) -> int: """Parses an ``offset_bytes`` value (in Git's ``[+-]HHMM`` format), and returns the corresponding numeric values (in number of minutes). Tries to account for some mistakes in the format, to support incorrect Git implementations. >>> TimestampWithTimezone._parse_offset_bytes(b"+0000") 0 >>> TimestampWithTimezone._parse_offset_bytes(b"-0000") 0 >>> TimestampWithTimezone._parse_offset_bytes(b"+0200") 120 >>> TimestampWithTimezone._parse_offset_bytes(b"-0200") -120 >>> TimestampWithTimezone._parse_offset_bytes(b"+200") 120 >>> TimestampWithTimezone._parse_offset_bytes(b"-200") -120 >>> TimestampWithTimezone._parse_offset_bytes(b"+02") 120 >>> TimestampWithTimezone._parse_offset_bytes(b"-02") -120 >>> TimestampWithTimezone._parse_offset_bytes(b"+0010") 10 >>> TimestampWithTimezone._parse_offset_bytes(b"-0010") -10 >>> TimestampWithTimezone._parse_offset_bytes(b"+200000000000000000") 0 >>> TimestampWithTimezone._parse_offset_bytes(b"+0160") # 60 minutes... 0 """ offset_str = offset_bytes.decode() assert offset_str[0] in "+-" sign = int(offset_str[0] + "1") if len(offset_str) <= 3: hours = int(offset_str[1:]) minutes = 0 else: hours = int(offset_str[1:-2]) minutes = int(offset_str[-2:]) offset = sign * (hours * 60 + minutes) if (0 <= minutes <= 59) and (-(2**15) <= offset < 2**15): return offset else: # can't parse it to a reasonable value; give up and pretend it's UTC. return 0 def offset_minutes(self): """Returns the offset, as a number of minutes since UTC. >>> TimestampWithTimezone( ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0000" ... ).offset_minutes() 0 >>> TimestampWithTimezone( ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0200" ... ).offset_minutes() 120 >>> TimestampWithTimezone( ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"-0200" ... ).offset_minutes() -120 >>> TimestampWithTimezone( ... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530" ... ).offset_minutes() 330 """ return self._parse_offset_bytes(self.offset_bytes) @attr.s(frozen=True, slots=True) class Origin(HashableObject, BaseModel): """Represents a software source: a VCS and an URL.""" object_type: Final = "origin" url = attr.ib(type=str, validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") def unique_key(self) -> KeyType: return {"url": self.url} def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(self.url.encode("utf-8")) def swhid(self) -> ExtendedSWHID: """Returns a SWHID representing this origin.""" return ExtendedSWHID( object_type=SwhidExtendedObjectType.ORIGIN, object_id=self.id, ) @attr.s(frozen=True, slots=True) class OriginVisit(BaseModel): """Represents an origin visit with a given type at a given point in time, by a SWH loader.""" object_type: Final = "origin_visit" origin = attr.ib(type=str, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) type = attr.ib(type=str, validator=type_validator()) """Should not be set before calling 'origin_visit_add()'.""" visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) @date.validator def check_date(self, attribute, value): """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov["visit"] is None: del ov["visit"] return ov def unique_key(self) -> KeyType: return {"origin": self.origin, "date": str(self.date)} @attr.s(frozen=True, slots=True) class OriginVisitStatus(BaseModel): """Represents a visit update of an origin at a given point in time.""" object_type: Final = "origin_visit_status" origin = attr.ib(type=str, validator=type_validator()) visit = attr.ib(type=int, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_( ["created", "ongoing", "full", "partial", "not_found", "failed"] ), ) snapshot = attr.ib( type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr ) # Type is optional be to able to use it before adding it to the database model type = attr.ib(type=Optional[str], validator=type_validator(), default=None) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) @date.validator def check_date(self, attribute, value): """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") def unique_key(self) -> KeyType: return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" ALIAS = "alias" def __repr__(self): return f"TargetType.{self.name}" class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" def __repr__(self): return f"ObjectType.{self.name}" @attr.s(frozen=True, slots=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" object_type: Final = "snapshot_branch" target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=TargetType, validator=type_validator()) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError("Wrong length for bytes identifier: %d" % len(value)) @classmethod def from_dict(cls, d): return cls(target=d["target"], target_type=TargetType(d["target_type"])) @attr.s(frozen=True, slots=True) class Snapshot(HashableObject, BaseModel): """Represents the full state of an origin at a given point in time.""" object_type: Final = "snapshot" branches = attr.ib( type=ImmutableDict[bytes, Optional[SnapshotBranch]], validator=type_validator(), converter=freeze_optional_dict, ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest( git_objects.snapshot_git_object(self, ignore_unresolved=True) ) @classmethod def from_dict(cls, d): d = d.copy() return cls( branches=ImmutableDict( (name, SnapshotBranch.from_dict(branch) if branch else None) for (name, branch) in d.pop("branches").items() ), **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) @attr.s(frozen=True, slots=True) class Release(HashableObjectWithManifest, BaseModel): object_type: Final = "release" name = attr.ib(type=bytes, validator=type_validator()) message = attr.ib(type=Optional[bytes], validator=type_validator()) target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=ObjectType, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator(), default=None ) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) raw_manifest = attr.ib(type=Optional[bytes], default=None) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.release_git_object(self)) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError("release date must be None if author is None.") def to_dict(self): rel = super().to_dict() if rel["metadata"] is None: del rel["metadata"] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get("author"): d["author"] = Person.from_dict(d["author"]) if d.get("date"): d["date"] = TimestampWithTimezone.from_dict(d["date"]) return cls(target_type=ObjectType(d.pop("target_type")), **d) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id) def anonymize(self) -> "Release": """Returns an anonymized version of the Release object. Anonymization consists in replacing the author with an anonymized Person object. """ author = self.author and self.author.anonymize() return attr.evolve(self, author=author) class RevisionType(Enum): GIT = "git" TAR = "tar" DSC = "dsc" SUBVERSION = "svn" MERCURIAL = "hg" CVS = "cvs" BAZAAR = "bzr" def __repr__(self): return f"RevisionType.{self.name}" def tuplify_extra_headers(value: Iterable): return tuple((k, v) for k, v in value) @attr.s(frozen=True, slots=True) class Revision(HashableObjectWithManifest, BaseModel): object_type: Final = "revision" message = attr.ib(type=Optional[bytes], validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator()) committer = attr.ib(type=Optional[Person], validator=type_validator()) date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) committer_date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator() ) type = attr.ib(type=RevisionType, validator=type_validator()) directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) extra_headers = attr.ib( type=Tuple[Tuple[bytes, bytes], ...], validator=type_validator(), converter=tuplify_extra_headers, default=(), ) raw_manifest = attr.ib(type=Optional[bytes], default=None) def __attrs_post_init__(self): super().__attrs_post_init__() # ensure metadata is a deep copy of whatever was given, and if needed # extract extra_headers from there if self.metadata: metadata = self.metadata if not self.extra_headers and "extra_headers" in metadata: (extra_headers, metadata) = metadata.copy_pop("extra_headers") object.__setattr__( self, "extra_headers", tuplify_extra_headers(extra_headers), ) attr.validate(self) object.__setattr__(self, "metadata", metadata) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.revision_git_object(self)) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError("revision date must be None if author is None.") @committer.validator def check_committer(self, attribute, value): """If the committer is `None`, checks the committer_date is `None` too.""" if self.committer is None and self.committer_date is not None: raise ValueError( "revision committer_date must be None if committer is None." ) @classmethod def from_dict(cls, d): d = d.copy() date = d.pop("date") if date: date = TimestampWithTimezone.from_dict(date) committer_date = d.pop("committer_date") if committer_date: committer_date = TimestampWithTimezone.from_dict(committer_date) author = d.pop("author") if author: author = Person.from_dict(author) committer = d.pop("committer") if committer: committer = Person.from_dict(committer) return cls( author=author, committer=committer, date=date, committer_date=committer_date, type=RevisionType(d.pop("type")), parents=tuple(d.pop("parents")), # for BW compat **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id) def anonymize(self) -> "Revision": """Returns an anonymized version of the Revision object. Anonymization consists in replacing the author and committer with an anonymized Person object. """ return attr.evolve( self, author=None if self.author is None else self.author.anonymize(), committer=None if self.committer is None else self.committer.anonymize(), ) _DIR_ENTRY_TYPES = ["file", "dir", "rev"] @attr.s(frozen=True, slots=True) class DirectoryEntry(BaseModel): object_type: Final = "directory_entry" name = attr.ib(type=bytes, validator=type_validator()) type = attr.ib(type=str, validator=attr.validators.in_(_DIR_ENTRY_TYPES)) target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @name.validator def check_name(self, attribute, value): if b"/" in value: raise ValueError(f"{value!r} is not a valid directory entry name.") @attr.s(frozen=True, slots=True) class Directory(HashableObjectWithManifest, BaseModel): object_type: Final = "directory" entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) raw_manifest = attr.ib(type=Optional[bytes], default=None) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.directory_git_object(self)) @entries.validator def check_entries(self, attribute, value): seen = set() for entry in value: if entry.name in seen: # Cannot use self.swhid() here, self.id may be None raise ValueError( f"swh:1:dir:{hash_to_hex(self.id)} has duplicated entry name: " f"{entry.name!r}" ) seen.add(entry.name) @classmethod def from_dict(cls, d): d = d.copy() return cls( entries=tuple( DirectoryEntry.from_dict(entry) for entry in d.pop("entries") ), **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=self.id) @classmethod def from_possibly_duplicated_entries( cls, *, entries: Tuple[DirectoryEntry, ...], id: Sha1Git = b"", raw_manifest: Optional[bytes] = None, ) -> Tuple[bool, "Directory"]: """Constructs a ``Directory`` object from a list of entries that may contain duplicated names. This is required to represent legacy objects, that were ingested in the storage database before this check was added. As it is impossible for a ``Directory`` instances to have more than one entry with a given names, this function computes a ``raw_manifest`` and renames one of the entries before constructing the ``Directory``. Returns: ``(is_corrupt, directory)`` where ``is_corrupt`` is True iff some entry names were indeed duplicated """ # First, try building a Directory object normally without any extra computation, # which works the overwhelming majority of the time: try: return (False, Directory(entries=entries, id=id, raw_manifest=raw_manifest)) except ValueError: pass # If it fails: # 1. compute a raw_manifest if there isn't already one: if raw_manifest is None: # invalid_directory behaves like a Directory object, but without the # duplicated entry check; which allows computing its raw_manifest invalid_directory = type("", (), {})() invalid_directory.entries = entries raw_manifest = git_objects.directory_git_object(invalid_directory) # 2. look for duplicated entries: entries_by_name: Dict[ bytes, Dict[str, List[DirectoryEntry]] ] = collections.defaultdict(lambda: collections.defaultdict(list)) for entry in entries: entries_by_name[entry.name][entry.type].append(entry) # 3. strip duplicates deduplicated_entries = [] for entry_lists in entries_by_name.values(): # We could pick one entry at random to keep the original name; but we try to # "minimize" the impact, by preserving entries of type "rev" first # (because renaming them would likely break git submodules entirely # when this directory is written to disk), # then entries of type "dir" (because renaming them affects the path # of every file in the dir, instead of just one "cnt"). dir_entry_types = ("rev", "dir", "file") assert set(dir_entry_types) == set(_DIR_ENTRY_TYPES) picked_winner = False # when True, all future entries must be renamed for type_ in dir_entry_types: for entry in entry_lists[type_]: if not picked_winner: # this is the "most important" entry according to this # heuristic; it gets to keep its name. deduplicated_entries.append(entry) picked_winner = True else: # the heuristic already found an entry more important than # this one; so this one must be renamed to something. # we pick the beginning of its hash, it should be good enough # to avoid any conflict. new_name = ( entry.name + b"_" + hash_to_bytehex(entry.target)[0:10] ) renamed_entry = attr.evolve(entry, name=new_name) deduplicated_entries.append(renamed_entry) # Finally, return the "fixed" the directory dir_ = Directory( entries=tuple(deduplicated_entries), id=id, raw_manifest=raw_manifest ) return (True, dir_) @attr.s(frozen=True, slots=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) ) @staticmethod def _hash_data(data: bytes): """Hash some data, returning most of the fields of a content object""" d = MultiHash.from_data(data).digest() d["data"] = data d["length"] = len(data) return d @classmethod def from_dict(cls, d, use_subclass=True): if use_subclass: # Chooses a subclass to instantiate instead. if d["status"] == "absent": return SkippedContent.from_dict(d) else: return Content.from_dict(d) else: return super().from_dict(d) def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError("{} is not a valid hash name.".format(hash_name)) return getattr(self, hash_name) def hashes(self) -> Dict[str, bytes]: """Returns a dictionary {hash_name: hash_value}""" return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} @attr.s(frozen=True, slots=True) class Content(BaseContent): object_type: Final = "content" sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) length = attr.ib(type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden"]), default="visible", ) data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError("Length must be positive.") @ctime.validator def check_ctime(self, attribute, value): """Checks the ctime has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("ctime must be a timezone-aware datetime.") def to_dict(self): content = super().to_dict() if content["data"] is None: del content["data"] if content["ctime"] is None: del content["ctime"] return content @classmethod def from_data(cls, data, status="visible", ctime=None) -> "Content": """Generate a Content from a given `data` byte string. This populates the Content with the hashes and length for the data passed as argument, as well as the data itself. """ d = cls._hash_data(data) d["status"] = status d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): if isinstance(d.get("ctime"), str): d = d.copy() d["ctime"] = dateutil.parser.parse(d["ctime"]) return super().from_dict(d, use_subclass=False) def with_data(self) -> "Content": """Loads the `data` attribute; meaning that it is guaranteed not to be None after this call. This call is almost a no-op, but subclasses may overload this method to lazy-load data (eg. from disk or objstorage).""" if self.data is None: raise MissingData("Content data is None.") return self def unique_key(self) -> KeyType: return self.sha1 # TODO: use a dict of hashes def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) @attr.s(frozen=True, slots=True) class SkippedContent(BaseContent): object_type: Final = "skipped_content" sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) sha1_git = attr.ib( type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr ) sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) blake2s256 = attr.ib( type=Optional[bytes], validator=type_validator(), repr=hash_repr ) length = attr.ib(type=Optional[int], validator=type_validator()) status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if value is None: raise ValueError("Must provide a reason if content is absent.") @length.validator def check_length(self, attribute, value): """Checks the length is positive or -1.""" if value < -1: raise ValueError("Length must be positive or -1.") @ctime.validator def check_ctime(self, attribute, value): """Checks the ctime has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("ctime must be a timezone-aware datetime.") def to_dict(self): content = super().to_dict() if content["origin"] is None: del content["origin"] if content["ctime"] is None: del content["ctime"] return content @classmethod def from_data( cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None ) -> "SkippedContent": """Generate a SkippedContent from a given `data` byte string. This populates the SkippedContent with the hashes and length for the data passed as argument. You can use `attr.evolve` on such a generated content to nullify some of its attributes, e.g. for tests. """ d = cls._hash_data(data) del d["data"] d["status"] = "absent" d["reason"] = reason d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): d2 = d.copy() if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) def unique_key(self) -> KeyType: return self.hashes() class MetadataAuthorityType(Enum): DEPOSIT_CLIENT = "deposit_client" FORGE = "forge" REGISTRY = "registry" def __repr__(self): return f"MetadataAuthorityType.{self.name}" @attr.s(frozen=True, slots=True) class MetadataAuthority(BaseModel): """Represents an entity that provides metadata about an origin or software artifact.""" object_type: Final = "metadata_authority" type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) url = attr.ib(type=str, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, Any]], default=None, validator=type_validator(), converter=freeze_optional_dict, ) def to_dict(self): d = super().to_dict() if d["metadata"] is None: del d["metadata"] return d @classmethod def from_dict(cls, d): d = { **d, "type": MetadataAuthorityType(d["type"]), } return super().from_dict(d) def unique_key(self) -> KeyType: return {"type": self.type.value, "url": self.url} @attr.s(frozen=True, slots=True) class MetadataFetcher(BaseModel): """Represents a software component used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive.""" object_type: Final = "metadata_fetcher" name = attr.ib(type=str, validator=type_validator()) version = attr.ib(type=str, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, Any]], default=None, validator=type_validator(), converter=freeze_optional_dict, ) def to_dict(self): d = super().to_dict() if d["metadata"] is None: del d["metadata"] return d def unique_key(self) -> KeyType: return {"name": self.name, "version": self.version} def normalize_discovery_date(value: Any) -> datetime.datetime: if not isinstance(value, datetime.datetime): raise TypeError("discovery_date must be a timezone-aware datetime.") if value.tzinfo is None: raise ValueError("discovery_date must be a timezone-aware datetime.") # Normalize timezone to utc, and truncate microseconds to 0 return value.astimezone(datetime.timezone.utc).replace(microsecond=0) @attr.s(frozen=True, slots=True) class RawExtrinsicMetadata(HashableObject, BaseModel): object_type: Final = "raw_extrinsic_metadata" # target object target = attr.ib(type=ExtendedSWHID, validator=type_validator()) # source discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) authority = attr.ib(type=MetadataAuthority, validator=type_validator()) fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) # the metadata itself format = attr.ib(type=str, validator=type_validator()) metadata = attr.ib(type=bytes, validator=type_validator()) # context origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) snapshot = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) release = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) revision = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) directory = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest( git_objects.raw_extrinsic_metadata_git_object(self) ) @origin.validator def check_origin(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.SNAPSHOT, SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'origin' context for " f"{self.target.object_type.name.lower()} object: {value}" ) if value.startswith("swh:"): # Technically this is valid; but: # 1. SWHIDs are URIs, not URLs # 2. if a SWHID gets here, it's very likely to be a mistake # (and we can remove this check if it turns out there is a # legitimate use for it). raise ValueError(f"SWHID used as context origin URL: {value}") @visit.validator def check_visit(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.SNAPSHOT, SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'visit' context for " f"{self.target.object_type.name.lower()} object: {value}" ) if self.origin is None: raise ValueError("'origin' context must be set if 'visit' is.") if value <= 0: raise ValueError("Nonpositive visit id") @snapshot.validator def check_snapshot(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'snapshot' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.SNAPSHOT, value) @release.validator def check_release(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'release' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.RELEASE, value) @revision.validator def check_revision(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'revision' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.REVISION, value) @path.validator def check_path(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'path' context for " f"{self.target.object_type.name.lower()} object: {value}" ) @directory.validator def check_directory(self, attribute, value): if value is None: return if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,): raise ValueError( f"Unexpected 'directory' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.DIRECTORY, value) def _check_swhid(self, expected_object_type, swhid): if isinstance(swhid, str): raise ValueError(f"Expected SWHID, got a string: {swhid}") if swhid.object_type != expected_object_type: raise ValueError( f"Expected SWHID type '{expected_object_type.name.lower()}', " f"got '{swhid.object_type.name.lower()}' in {swhid}" ) def to_dict(self): d = super().to_dict() context_keys = ( "origin", "visit", "snapshot", "release", "revision", "directory", "path", ) for context_key in context_keys: if d[context_key] is None: del d[context_key] return d @classmethod def from_dict(cls, d): + if "type" in d: + # Convert from old schema + type_ = d.pop("type") + if type_ == "origin": + d["target"] = str(Origin(d["target"]).swhid()) + d = { **d, "target": ExtendedSWHID.from_string(d["target"]), "authority": MetadataAuthority.from_dict(d["authority"]), "fetcher": MetadataFetcher.from_dict(d["fetcher"]), } swhid_keys = ("snapshot", "release", "revision", "directory") for swhid_key in swhid_keys: if d.get(swhid_key): d[swhid_key] = CoreSWHID.from_string(d[swhid_key]) return super().from_dict(d) def swhid(self) -> ExtendedSWHID: """Returns a SWHID representing this RawExtrinsicMetadata object.""" return ExtendedSWHID( object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, object_id=self.id, ) @attr.s(frozen=True, slots=True) class ExtID(HashableObject, BaseModel): object_type: Final = "extid" extid_type = attr.ib(type=str, validator=type_validator()) extid = attr.ib(type=bytes, validator=type_validator()) target = attr.ib(type=CoreSWHID, validator=type_validator()) extid_version = attr.ib(type=int, validator=type_validator(), default=0) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) @classmethod def from_dict(cls, d): return cls( extid=d["extid"], extid_type=d["extid_type"], target=CoreSWHID.from_string(d["target"]), extid_version=d.get("extid_version", 0), ) def _compute_hash_from_attributes(self) -> bytes: return _compute_hash_from_manifest(git_objects.extid_git_object(self)) # Note: we need the type ignore stanza here because mypy cannot figure that all # subclasses of BaseModel do have an object_type attribute, even if BaseModel # itself does not (because these are Final) SWH_MODEL_OBJECT_TYPES: Dict[str, Type[BaseModel]] = { cls.object_type: cls # type: ignore for cls in ( Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, Release, Revision, Directory, DirectoryEntry, Content, SkippedContent, MetadataAuthority, MetadataFetcher, RawExtrinsicMetadata, ExtID, ) } diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 4540c43..1fd832a 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -1,1786 +1,1821 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import copy import datetime import hashlib from typing import Any, List, Optional, Tuple, Union import attr from attrs_strict import AttributeTypeError import dateutil from hypothesis import given from hypothesis.strategies import binary, none import pytest from swh.model.collections import ImmutableDict from swh.model.from_disk import DentryPerms import swh.model.git_objects from swh.model.hashutil import MultiHash, hash_to_bytes import swh.model.hypothesis_strategies as strategies import swh.model.model from swh.model.model import ( BaseModel, Content, Directory, DirectoryEntry, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MissingData, Origin, OriginVisit, OriginVisitStatus, Person, RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, TargetType, Timestamp, TimestampWithTimezone, type_validator, ) import swh.model.swhids from swh.model.swhids import CoreSWHID, ExtendedSWHID, ObjectType from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.model.tests.test_identifiers import ( TS_DATETIMES, TS_TIMEZONES, directory_example, metadata_example, release_example, revision_example, snapshot_example, ) EXAMPLE_HASH = hash_to_bytes("94a9ed024d3859793618152ea559a168bbcbb5e2") @given(strategies.objects()) def test_todict_inverse_fromdict(objtype_and_obj): (obj_type, obj) = objtype_and_obj if obj_type in ("origin", "origin_visit"): return obj_as_dict = obj.to_dict() obj_as_dict_copy = copy.deepcopy(obj_as_dict) # Check the composition of to_dict and from_dict is the identity assert obj == type(obj).from_dict(obj_as_dict) # Check from_dict() does not change the input dict assert obj_as_dict == obj_as_dict_copy # Check the composition of from_dict and to_dict is the identity assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() @given(strategies.objects()) def test_repr(objtype_and_obj): """Checks every model object has a working repr(), and that it can be eval()uated (so that printed objects can be copy-pasted to write test cases.)""" (obj_type, obj) = objtype_and_obj r = repr(obj) env = { "tzutc": lambda: datetime.timezone.utc, "tzfile": dateutil.tz.tzfile, "hash_to_bytes": hash_to_bytes, **swh.model.swhids.__dict__, **swh.model.model.__dict__, } assert eval(r, env) == obj @attr.s class Cls1: pass @attr.s class Cls2(Cls1): pass _custom_namedtuple = collections.namedtuple("_custom_namedtuple", "a b") class _custom_tuple(tuple): pass # List of (type, valid_values, invalid_values) _TYPE_VALIDATOR_PARAMETERS: List[Tuple[Any, List[Any], List[Any]]] = [ # base types: ( bool, [True, False], [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ("foo",), ImmutableDict()], ), ( int, [-1, 0, 1, 42, 1000, DentryPerms.directory, True, False], [None, "123", 0.0, (), ImmutableDict()], ), ( float, [-1.0, 0.0, 1.0, float("infinity"), float("NaN")], [True, False, None, 1, "1.2", (), ImmutableDict()], ), ( bytes, [b"", b"123"], [None, bytearray(b"\x12\x34"), "123", 0, 123, (), (1, 2, 3), ImmutableDict()], ), (str, ["", "123"], [None, b"123", b"", 0, (), (1, 2, 3), ImmutableDict()]), (None, [None], [b"", b"123", "", "foo", 0, 123, ImmutableDict(), float("NaN")]), # unions: ( Optional[int], [None, -1, 0, 1, 42, 1000, DentryPerms.directory], ["123", 0.0, (), ImmutableDict()], ), ( Optional[bytes], [None, b"", b"123"], ["123", "", 0, (), (1, 2, 3), ImmutableDict()], ), ( Union[str, bytes], ["", "123", b"123", b""], [None, 0, (), (1, 2, 3), ImmutableDict()], ), ( Union[str, bytes, None], ["", "123", b"123", b"", None], [0, (), (1, 2, 3), ImmutableDict()], ), # tuples ( Tuple[str, str], [("foo", "bar"), ("", ""), _custom_namedtuple("", ""), _custom_tuple(("", ""))], [("foo",), ("foo", "bar", "baz"), ("foo", 42), (42, "foo")], ), ( Tuple[str, ...], [ ("foo",), ("foo", "bar"), ("", ""), ("foo", "bar", "baz"), _custom_namedtuple("", ""), _custom_tuple(("", "")), ], [("foo", 42), (42, "foo")], ), # composite generic: ( Tuple[Union[str, int], Union[str, int]], [("foo", "foo"), ("foo", 42), (42, "foo"), (42, 42)], [("foo", b"bar"), (b"bar", "foo")], ), ( Union[Tuple[str, str], Tuple[int, int]], [("foo", "foo"), (42, 42)], [("foo", b"bar"), (b"bar", "foo"), ("foo", 42), (42, "foo")], ), ( Tuple[Tuple[bytes, bytes], ...], [(), ((b"foo", b"bar"),), ((b"foo", b"bar"), (b"baz", b"qux"))], [((b"foo", "bar"),), ((b"foo", b"bar"), ("baz", b"qux"))], ), # standard types: ( datetime.datetime, [ datetime.datetime(2021, 12, 15, 12, 59, 27), datetime.datetime(2021, 12, 15, 12, 59, 27, tzinfo=datetime.timezone.utc), ], [None, 123], ), # ImmutableDict ( ImmutableDict[str, int], [ ImmutableDict(), ImmutableDict({"foo": 42}), ImmutableDict({"foo": 42, "bar": 123}), ], [ImmutableDict({"foo": "bar"}), ImmutableDict({42: 123})], ), # Any: ( object, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [], ), ( Any, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [], ), ( ImmutableDict[Any, int], [ ImmutableDict(), ImmutableDict({"foo": 42}), ImmutableDict({"foo": 42, "bar": 123}), ImmutableDict({42: 123}), ], [ImmutableDict({"foo": "bar"})], ), ( ImmutableDict[str, Any], [ ImmutableDict(), ImmutableDict({"foo": 42}), ImmutableDict({"foo": "bar"}), ImmutableDict({"foo": 42, "bar": 123}), ], [ImmutableDict({42: 123})], ), # attr objects: ( Timestamp, [ Timestamp(seconds=123, microseconds=0), ], [None, "2021-09-28T11:27:59", 123], ), ( Cls1, [Cls1(), Cls2()], [None, b"abcd"], ), # enums: ( TargetType, [TargetType.CONTENT, TargetType.ALIAS], ["content", "alias", 123, None], ), ] @pytest.mark.parametrize( "type_,value", [ pytest.param(type_, value, id=f"type={type_}, value={value}") for (type_, values, _) in _TYPE_VALIDATOR_PARAMETERS for value in values ], ) def test_type_validator_valid(type_, value): type_validator()(None, attr.ib(type=type_), value) @pytest.mark.parametrize( "type_,value", [ pytest.param(type_, value, id=f"type={type_}, value={value}") for (type_, _, values) in _TYPE_VALIDATOR_PARAMETERS for value in values ], ) def test_type_validator_invalid(type_, value): with pytest.raises(AttributeTypeError): type_validator()(None, attr.ib(type=type_), value) @pytest.mark.parametrize("object_type, objects", TEST_OBJECTS.items()) def test_swh_model_todict_fromdict(object_type, objects): """checks model objects in swh_model_data are in correct shape""" assert objects for obj in objects: # Check the composition of from_dict and to_dict is the identity obj_as_dict = obj.to_dict() assert obj == type(obj).from_dict(obj_as_dict) assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() def test_unique_key(): url = "http://example.org/" date = datetime.datetime.now(tz=datetime.timezone.utc) id_ = b"42" * 10 assert Origin(url=url).unique_key() == {"url": url} assert OriginVisit(origin=url, date=date, type="git").unique_key() == { "origin": url, "date": str(date), } assert OriginVisitStatus( origin=url, visit=42, date=date, status="created", snapshot=None ).unique_key() == { "origin": url, "visit": "42", "date": str(date), } assert Snapshot.from_dict({**snapshot_example, "id": id_}).unique_key() == id_ assert Release.from_dict({**release_example, "id": id_}).unique_key() == id_ assert Revision.from_dict({**revision_example, "id": id_}).unique_key() == id_ assert Directory.from_dict({**directory_example, "id": id_}).unique_key() == id_ assert ( RawExtrinsicMetadata.from_dict({**metadata_example, "id": id_}).unique_key() == id_ ) cont = Content.from_data(b"foo") assert cont.unique_key().hex() == "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" kwargs = { **cont.to_dict(), "reason": "foo", "status": "absent", } del kwargs["data"] assert SkippedContent(**kwargs).unique_key() == cont.hashes() # Anonymization @given(strategies.objects()) def test_anonymization(objtype_and_obj): (obj_type, obj) = objtype_and_obj def check_person(p): if p is not None: assert p.name is None assert p.email is None assert len(p.fullname) == 32 anon_obj = obj.anonymize() if obj_type == "person": assert anon_obj is not None check_person(anon_obj) elif obj_type == "release": assert anon_obj is not None check_person(anon_obj.author) elif obj_type == "revision": assert anon_obj is not None check_person(anon_obj.author) check_person(anon_obj.committer) else: assert anon_obj is None # Origin, OriginVisit, OriginVisitStatus @given(strategies.origins()) def test_todict_origins(origin): obj = origin.to_dict() assert "type" not in obj assert type(origin)(url=origin.url) == type(origin).from_dict(obj) @given(strategies.origin_visits()) def test_todict_origin_visits(origin_visit): obj = origin_visit.to_dict() assert origin_visit == type(origin_visit).from_dict(obj) def test_origin_visit_naive_datetime(): with pytest.raises(ValueError, match="must be a timezone-aware datetime"): OriginVisit( origin="http://foo/", date=datetime.datetime.now(), type="git", ) @given(strategies.origin_visit_statuses()) def test_todict_origin_visit_statuses(origin_visit_status): obj = origin_visit_status.to_dict() assert origin_visit_status == type(origin_visit_status).from_dict(obj) def test_origin_visit_status_naive_datetime(): with pytest.raises(ValueError, match="must be a timezone-aware datetime"): OriginVisitStatus( origin="http://foo/", visit=42, date=datetime.datetime.now(), status="ongoing", snapshot=None, ) # Timestamp @given(strategies.timestamps()) def test_timestamps_strategy(timestamp): attr.validate(timestamp) def test_timestamp_seconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds="0", microseconds=0) attr.validate(Timestamp(seconds=2**63 - 1, microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=2**63, microseconds=0) attr.validate(Timestamp(seconds=-(2**63), microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=-(2**63) - 1, microseconds=0) def test_timestamp_microseconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds=0, microseconds="0") attr.validate(Timestamp(seconds=0, microseconds=10**6 - 1)) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=10**6) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=-1) def test_timestamp_from_dict(): assert Timestamp.from_dict({"seconds": 10, "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": "10", "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": 10, "microseconds": "5"}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": -1}) Timestamp.from_dict({"seconds": 0, "microseconds": 10**6 - 1}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": 10**6}) # TimestampWithTimezone def test_timestampwithtimezone(): ts = Timestamp(seconds=0, microseconds=0) tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"+0000") attr.validate(tstz) assert tstz.offset_minutes() == 0 assert tstz.offset_bytes == b"+0000" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"+0010") attr.validate(tstz) assert tstz.offset_minutes() == 10 assert tstz.offset_bytes == b"+0010" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"-0010") attr.validate(tstz) assert tstz.offset_minutes() == -10 assert tstz.offset_bytes == b"-0010" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"-0000") attr.validate(tstz) assert tstz.offset_minutes() == 0 assert tstz.offset_bytes == b"-0000" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"-1030") attr.validate(tstz) assert tstz.offset_minutes() == -630 assert tstz.offset_bytes == b"-1030" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"+1320") attr.validate(tstz) assert tstz.offset_minutes() == 800 assert tstz.offset_bytes == b"+1320" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"+200") attr.validate(tstz) assert tstz.offset_minutes() == 120 assert tstz.offset_bytes == b"+200" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"+02") attr.validate(tstz) assert tstz.offset_minutes() == 120 assert tstz.offset_bytes == b"+02" tstz = TimestampWithTimezone(timestamp=ts, offset_bytes=b"+2000000000") attr.validate(tstz) assert tstz.offset_minutes() == 0 assert tstz.offset_bytes == b"+2000000000" with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=datetime.datetime.now(), offset_bytes=b"+0000") with pytest.raises((AttributeTypeError, TypeError)): TimestampWithTimezone(timestamp=ts, offset_bytes=0) def test_timestampwithtimezone_from_datetime(): # Typical case tz = datetime.timezone(datetime.timedelta(minutes=+60)) date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=1582810759, microseconds=0, ), offset_bytes=b"+0100", ) # Typical case (close to epoch) tz = datetime.timezone(datetime.timedelta(minutes=+60)) date = datetime.datetime(1970, 1, 1, 1, 0, 5, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=5, microseconds=0, ), offset_bytes=b"+0100", ) # non-integer number of seconds before UNIX epoch date = datetime.datetime( 1969, 12, 31, 23, 59, 59, 100000, tzinfo=datetime.timezone.utc ) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=-1, microseconds=100000, ), offset_bytes=b"+0000", ) # non-integer number of seconds in both the timestamp and the offset tz = datetime.timezone(datetime.timedelta(microseconds=-600000)) date = datetime.datetime(1969, 12, 31, 23, 59, 59, 600000, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=0, microseconds=200000, ), offset_bytes=b"+0000", ) # timezone offset with non-integer number of seconds, for dates before epoch # we round down to the previous second, so it should be the same as # 1969-01-01T23:59:59Z tz = datetime.timezone(datetime.timedelta(microseconds=900000)) date = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=-1, microseconds=100000, ), offset_bytes=b"+0000", ) def test_timestampwithtimezone_from_naive_datetime(): date = datetime.datetime(2020, 2, 27, 14, 39, 19) with pytest.raises(ValueError, match="datetime without timezone"): TimestampWithTimezone.from_datetime(date) def test_timestampwithtimezone_from_iso8601(): date = "2020-02-27 14:39:19.123456+0100" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=1582810759, microseconds=123456, ), offset_bytes=b"+0100", ) def test_timestampwithtimezone_from_iso8601_negative_utc(): date = "2020-02-27 13:39:19-0000" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=1582810759, microseconds=0, ), offset_bytes=b"-0000", ) @pytest.mark.parametrize("date", TS_DATETIMES) @pytest.mark.parametrize("tz", TS_TIMEZONES) @pytest.mark.parametrize("microsecond", [0, 1, 10, 100, 1000, 999999]) def test_timestampwithtimezone_to_datetime(date, tz, microsecond): date = date.replace(tzinfo=tz, microsecond=microsecond) tstz = TimestampWithTimezone.from_datetime(date) assert tstz.to_datetime() == date assert tstz.to_datetime().utcoffset() == date.utcoffset() def test_person_from_fullname(): """The author should have name, email and fullname filled.""" actual_person = Person.from_fullname(b"tony ") assert actual_person == Person( fullname=b"tony ", name=b"tony", email=b"ynot@dagobah", ) def test_person_from_fullname_no_email(): """The author and fullname should be the same as the input (author).""" actual_person = Person.from_fullname(b"tony") assert actual_person == Person( fullname=b"tony", name=b"tony", email=None, ) def test_person_from_fullname_empty_person(): """Empty person has only its fullname filled with the empty byte-string. """ actual_person = Person.from_fullname(b"") assert actual_person == Person( fullname=b"", name=None, email=None, ) def test_git_author_line_to_author(): # edge case out of the way with pytest.raises(TypeError): Person.from_fullname(None) tests = { b"a ": Person( name=b"a", email=b"b@c.com", fullname=b"a ", ), b"": Person( name=None, email=b"foo@bar.com", fullname=b"", ), b"malformed ': Person( name=b"malformed", email=b'"', ), b"trailing ": Person( name=b"trailing", email=b"sp@c.e", fullname=b"trailing ", ), b"no": Person( name=b"no", email=b"sp@c.e", fullname=b"no", ), b" more ": Person( name=b"more", email=b"sp@c.es", fullname=b" more ", ), b" <>": Person( name=None, email=None, fullname=b" <>", ), } for person in sorted(tests): expected_person = tests[person] assert expected_person == Person.from_fullname(person) def test_person_comparison(): """Check only the fullname attribute is used to compare Person objects""" person = Person(fullname=b"p1", name=None, email=None) assert attr.evolve(person, name=b"toto") == person assert attr.evolve(person, email=b"toto@example.com") == person person = Person(fullname=b"", name=b"toto", email=b"toto@example.com") assert attr.evolve(person, fullname=b"dude") != person # Content def test_content_get_hash(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) for (hash_name, hash_) in hashes.items(): assert c.get_hash(hash_name) == hash_ def test_content_hashes(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) assert c.hashes() == hashes def test_content_data(): c = Content( length=42, status="visible", data=b"foo", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) assert c.with_data() == c def test_content_data_missing(): c = Content( length=42, status="visible", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with pytest.raises(MissingData): c.with_data() @given(strategies.present_contents_d()) def test_content_from_dict(content_d): c = Content.from_data(**content_d) assert c assert c.ctime == content_d["ctime"] content_d2 = c.to_dict() c2 = Content.from_dict(content_d2) assert c2.ctime == c.ctime def test_content_from_dict_str_ctime(): # test with ctime as a string n = datetime.datetime(2020, 5, 6, 12, 34, tzinfo=datetime.timezone.utc) content_d = { "ctime": n.isoformat(), "data": b"", "length": 0, "sha1": b"\x00", "sha256": b"\x00", "sha1_git": b"\x00", "blake2s256": b"\x00", } c = Content.from_dict(content_d) assert c.ctime == n def test_content_from_dict_str_naive_ctime(): # test with ctime as a string n = datetime.datetime(2020, 5, 6, 12, 34) content_d = { "ctime": n.isoformat(), "data": b"", "length": 0, "sha1": b"\x00", "sha256": b"\x00", "sha1_git": b"\x00", "blake2s256": b"\x00", } with pytest.raises(ValueError, match="must be a timezone-aware datetime."): Content.from_dict(content_d) @given(binary(max_size=4096)) def test_content_from_data(data): c = Content.from_data(data) assert c.data == data assert c.length == len(data) assert c.status == "visible" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(binary(max_size=4096)) def test_hidden_content_from_data(data): c = Content.from_data(data, status="hidden") assert c.data == data assert c.length == len(data) assert c.status == "hidden" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value def test_content_naive_datetime(): c = Content.from_data(b"foo") with pytest.raises(ValueError, match="must be a timezone-aware datetime"): Content( **c.to_dict(), ctime=datetime.datetime.now(), ) @given(strategies.present_contents()) def test_content_git_roundtrip(content): assert content.data is not None raw = swh.model.git_objects.content_git_object(content) sha1_git = hashlib.new("sha1", raw).digest() assert content.sha1_git == sha1_git # SkippedContent @given(binary(max_size=4096)) def test_skipped_content_from_data(data): c = SkippedContent.from_data(data, reason="reason") assert c.reason == "reason" assert c.length == len(data) assert c.status == "absent" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(strategies.skipped_contents_d()) def test_skipped_content_origin_is_str(skipped_content_d): assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = "http://path/to/origin" assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = Origin(url="http://path/to/origin") with pytest.raises(ValueError, match="origin"): SkippedContent.from_dict(skipped_content_d) def test_skipped_content_naive_datetime(): c = SkippedContent.from_data(b"foo", reason="reason") with pytest.raises(ValueError, match="must be a timezone-aware datetime"): SkippedContent( **c.to_dict(), ctime=datetime.datetime.now(), ) # Directory @given(strategies.directories(raw_manifest=none())) def test_directory_check(directory): directory.check() directory2 = attr.evolve(directory, id=b"\x00" * 20) with pytest.raises(ValueError, match="does not match recomputed hash"): directory2.check() directory2 = attr.evolve( directory, raw_manifest=swh.model.git_objects.directory_git_object(directory) ) with pytest.raises( ValueError, match="non-none raw_manifest attribute, but does not need it." ): directory2.check() @given(strategies.directories(raw_manifest=none())) def test_directory_raw_manifest(directory): assert "raw_manifest" not in directory.to_dict() raw_manifest = b"foo" id_ = hashlib.new("sha1", raw_manifest).digest() directory2 = attr.evolve(directory, raw_manifest=raw_manifest) assert directory2.to_dict()["raw_manifest"] == raw_manifest with pytest.raises(ValueError, match="does not match recomputed hash"): directory2.check() directory2 = attr.evolve(directory, raw_manifest=raw_manifest, id=id_) assert directory2.id is not None assert directory2.id == id_ != directory.id assert directory2.to_dict()["raw_manifest"] == raw_manifest directory2.check() def test_directory_entry_name_validation(): with pytest.raises(ValueError, match="valid directory entry name."): DirectoryEntry(name=b"foo/", type="dir", target=b"\x00" * 20, perms=0), def test_directory_duplicate_entry_name(): entries = ( DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), ) with pytest.raises(ValueError, match="duplicated entry name"): Directory(entries=entries) entries = ( DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), ) with pytest.raises(ValueError, match="duplicated entry name"): Directory(entries=entries) @given(strategies.directories()) def test_directory_from_possibly_duplicated_entries__no_duplicates(directory): """ Directory.from_possibly_duplicated_entries should return the directory unchanged if it has no duplicated entry name. """ assert (False, directory) == Directory.from_possibly_duplicated_entries( id=directory.id, entries=directory.entries, raw_manifest=directory.raw_manifest ) assert (False, directory) == Directory.from_possibly_duplicated_entries( entries=directory.entries, raw_manifest=directory.raw_manifest ) @pytest.mark.parametrize("rev_first", [True, False]) def test_directory_from_possibly_duplicated_entries__rev_and_dir(rev_first): entries = ( DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), DirectoryEntry(name=b"foo", type="rev", target=b"\x00" * 20, perms=0), ) if rev_first: entries = tuple(reversed(entries)) (is_corrupt, dir_) = Directory.from_possibly_duplicated_entries(entries=entries) assert is_corrupt assert dir_.entries == ( DirectoryEntry(name=b"foo", type="rev", target=b"\x00" * 20, perms=0), DirectoryEntry( name=b"foo_0101010101", type="dir", target=b"\x01" * 20, perms=1 ), ) # order is independent of 'rev_first' because it is always sorted in git order assert dir_.raw_manifest == ( # fmt: off b"tree 52\x00" + b"0 foo\x00" + b"\x00" * 20 + b"1 foo\x00" + b"\x01" * 20 # fmt: on ) @pytest.mark.parametrize("file_first", [True, False]) def test_directory_from_possibly_duplicated_entries__file_and_dir(file_first): entries = ( DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), ) if file_first: entries = tuple(reversed(entries)) (is_corrupt, dir_) = Directory.from_possibly_duplicated_entries(entries=entries) assert is_corrupt assert dir_.entries == ( DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), DirectoryEntry( name=b"foo_0000000000", type="file", target=b"\x00" * 20, perms=0 ), ) # order is independent of 'file_first' because it is always sorted in git order assert dir_.raw_manifest == ( # fmt: off b"tree 52\x00" + b"0 foo\x00" + b"\x00" * 20 + b"1 foo\x00" + b"\x01" * 20 # fmt: on ) def test_directory_from_possibly_duplicated_entries__two_files1(): entries = ( DirectoryEntry(name=b"foo", type="file", target=b"\x01" * 20, perms=1), DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), ) (is_corrupt, dir_) = Directory.from_possibly_duplicated_entries(entries=entries) assert is_corrupt assert dir_.entries == ( DirectoryEntry(name=b"foo", type="file", target=b"\x01" * 20, perms=1), DirectoryEntry( name=b"foo_0000000000", type="file", target=b"\x00" * 20, perms=0 ), ) assert dir_.raw_manifest == ( # fmt: off b"tree 52\x00" + b"1 foo\x00" + b"\x01" * 20 + b"0 foo\x00" + b"\x00" * 20 # fmt: on ) def test_directory_from_possibly_duplicated_entries__two_files2(): """ Same as above, but entries are in a different order (and order matters to break the tie) """ entries = ( DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), DirectoryEntry(name=b"foo", type="file", target=b"\x01" * 20, perms=1), ) (is_corrupt, dir_) = Directory.from_possibly_duplicated_entries(entries=entries) assert is_corrupt assert dir_.entries == ( DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), DirectoryEntry( name=b"foo_0101010101", type="file", target=b"\x01" * 20, perms=1 ), ) assert dir_.raw_manifest == ( # fmt: off b"tree 52\x00" + b"0 foo\x00" + b"\x00" * 20 + b"1 foo\x00" + b"\x01" * 20 # fmt: on ) def test_directory_from_possibly_duplicated_entries__preserve_manifest(): entries = ( DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), DirectoryEntry(name=b"foo", type="rev", target=b"\x00" * 20, perms=0), ) (is_corrupt, dir_) = Directory.from_possibly_duplicated_entries( entries=entries, raw_manifest=b"blah" ) assert is_corrupt assert dir_.entries == ( DirectoryEntry(name=b"foo", type="rev", target=b"\x00" * 20, perms=0), DirectoryEntry( name=b"foo_0101010101", type="dir", target=b"\x01" * 20, perms=1 ), ) assert dir_.raw_manifest == b"blah" # Release @given(strategies.releases(raw_manifest=none())) def test_release_check(release): release.check() release2 = attr.evolve(release, id=b"\x00" * 20) with pytest.raises(ValueError, match="does not match recomputed hash"): release2.check() release2 = attr.evolve( release, raw_manifest=swh.model.git_objects.release_git_object(release) ) with pytest.raises( ValueError, match="non-none raw_manifest attribute, but does not need it." ): release2.check() @given(strategies.releases(raw_manifest=none())) def test_release_raw_manifest(release): raw_manifest = b"foo" id_ = hashlib.new("sha1", raw_manifest).digest() release2 = attr.evolve(release, raw_manifest=raw_manifest) assert release2.to_dict()["raw_manifest"] == raw_manifest with pytest.raises(ValueError, match="does not match recomputed hash"): release2.check() release2 = attr.evolve(release, raw_manifest=raw_manifest, id=id_) assert release2.id is not None assert release2.id == id_ != release.id assert release2.to_dict()["raw_manifest"] == raw_manifest release2.check() # Revision @given(strategies.revisions(raw_manifest=none())) def test_revision_check(revision): revision.check() revision2 = attr.evolve(revision, id=b"\x00" * 20) with pytest.raises(ValueError, match="does not match recomputed hash"): revision2.check() revision2 = attr.evolve( revision, raw_manifest=swh.model.git_objects.revision_git_object(revision) ) with pytest.raises( ValueError, match="non-none raw_manifest attribute, but does not need it." ): revision2.check() @given(strategies.revisions(raw_manifest=none())) def test_revision_raw_manifest(revision): raw_manifest = b"foo" id_ = hashlib.new("sha1", raw_manifest).digest() revision2 = attr.evolve(revision, raw_manifest=raw_manifest) assert revision2.to_dict()["raw_manifest"] == raw_manifest with pytest.raises(ValueError, match="does not match recomputed hash"): revision2.check() revision2 = attr.evolve(revision, raw_manifest=raw_manifest, id=id_) assert revision2.id is not None assert revision2.id == id_ != revision.id assert revision2.to_dict()["raw_manifest"] == raw_manifest revision2.check() def test_revision_extra_headers_no_headers(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_model = Revision(**rev_dict) assert rev_model.metadata is None assert rev_model.extra_headers == () rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } rev_model = Revision(**rev_dict) assert rev_model.metadata == rev_dict["metadata"] assert rev_model.extra_headers == () def test_revision_extra_headers_with_headers(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\x00"), (b"header1", b"again"), ) rev_dict["extra_headers"] = extra_headers rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_in_metadata(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\x00"), (b"header1", b"again"), ) # check the bw-compat init hook does the job # ie. extra_headers are given in the metadata field rev_dict["metadata"]["extra_headers"] = extra_headers rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_as_lists(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = {} extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\x00"), (b"header1", b"again"), ) # check Revision.extra_headers tuplify does the job rev_dict["extra_headers"] = [list(x) for x in extra_headers] rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_type_error(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) orig_rev_dict = attr.asdict(rev, recurse=False) orig_rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( ("header1", b"value1"), (b"header2", 42), ("header1", "again"), ) # check headers one at a time # if given as extra_header for extra_header in extra_headers: rev_dict = copy.deepcopy(orig_rev_dict) rev_dict["extra_headers"] = (extra_header,) with pytest.raises(AttributeTypeError): Revision(**rev_dict) # if given as metadata for extra_header in extra_headers: rev_dict = copy.deepcopy(orig_rev_dict) rev_dict["metadata"]["extra_headers"] = (extra_header,) with pytest.raises(AttributeTypeError): Revision(**rev_dict) def test_revision_extra_headers_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_model = Revision.from_dict(rev_dict) assert rev_model.metadata is None assert rev_model.extra_headers == () rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } rev_model = Revision.from_dict(rev_dict) assert rev_model.metadata == rev_dict["metadata"] assert rev_model.extra_headers == () extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) rev_dict["extra_headers"] = extra_headers rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_in_metadata_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) # check the bw-compat init hook does the job rev_dict["metadata"]["extra_headers"] = extra_headers rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_as_lists_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_model = Revision.from_dict(rev_dict) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) # check Revision.extra_headers converter does the job rev_dict["extra_headers"] = [list(x) for x in extra_headers] rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_no_author_or_committer_from_dict(): rev_dict = revision_example.copy() rev_dict["author"] = rev_dict["date"] = None rev_dict["committer"] = rev_dict["committer_date"] = None rev_model = Revision.from_dict(rev_dict) assert rev_model.to_dict() == { **rev_dict, "parents": tuple(rev_dict["parents"]), "extra_headers": (), "metadata": None, } def test_revision_none_author_or_committer(): rev_dict = revision_example.copy() rev_dict["author"] = None with pytest.raises(ValueError, match=".*date must be None if author is None.*"): Revision.from_dict(rev_dict) rev_dict = revision_example.copy() rev_dict["committer"] = None with pytest.raises( ValueError, match=".*committer_date must be None if committer is None.*" ): Revision.from_dict(rev_dict) @given(strategies.objects(split_content=True)) def test_object_type(objtype_and_obj): obj_type, obj = objtype_and_obj assert obj_type == obj.object_type def test_object_type_is_final(): object_types = set() def check_final(cls): if hasattr(cls, "object_type"): assert cls.object_type not in object_types object_types.add(cls.object_type) if cls.__subclasses__(): assert not hasattr(cls, "object_type") for subcls in cls.__subclasses__(): check_final(subcls) check_final(BaseModel) _metadata_authority = MetadataAuthority( type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org", ) _metadata_fetcher = MetadataFetcher( name="test-fetcher", version="0.0.1", ) _content_swhid = ExtendedSWHID.from_string( "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2" ) _origin_url = "https://forge.softwareheritage.org/source/swh-model.git" _origin_swhid = ExtendedSWHID.from_string( - "swh:1:ori:94a9ed024d3859793618152ea559a168bbcbb5e2" + "swh:1:ori:433b4f5612f0720ed51fa7aeaf43a3625870057b" ) _dummy_qualifiers = {"origin": "https://example.com", "lines": "42"} _common_metadata_fields = dict( discovery_date=datetime.datetime( 2021, 1, 29, 13, 57, 9, tzinfo=datetime.timezone.utc ), authority=_metadata_authority, fetcher=_metadata_fetcher, format="json", metadata=b'{"origin": "https://example.com", "lines": "42"}', ) def test_metadata_valid(): """Checks valid RawExtrinsicMetadata objects don't raise an error.""" # Simplest case RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields) # Object with an SWHID RawExtrinsicMetadata( target=_content_swhid, **_common_metadata_fields, ) +def test_metadata_from_old_dict(): + common_fields = { + "authority": {"type": "forge", "url": "https://forge.softwareheritage.org"}, + "fetcher": { + "name": "test-fetcher", + "version": "0.0.1", + }, + "discovery_date": _common_metadata_fields["discovery_date"], + "format": "json", + "metadata": b'{"origin": "https://example.com", "lines": "42"}', + } + + m = RawExtrinsicMetadata( + target=_origin_swhid, + **_common_metadata_fields, + ) + assert ( + RawExtrinsicMetadata.from_dict( + {"id": m.id, "target": _origin_url, "type": "origin", **common_fields} + ) + == m + ) + + m = RawExtrinsicMetadata( + target=_content_swhid, + **_common_metadata_fields, + ) + assert ( + RawExtrinsicMetadata.from_dict( + {"target": str(_content_swhid), "type": "content", **common_fields} + ) + == m + ) + + def test_metadata_to_dict(): """Checks valid RawExtrinsicMetadata objects don't raise an error.""" common_fields = { "authority": {"type": "forge", "url": "https://forge.softwareheritage.org"}, "fetcher": { "name": "test-fetcher", "version": "0.0.1", }, "discovery_date": _common_metadata_fields["discovery_date"], "format": "json", "metadata": b'{"origin": "https://example.com", "lines": "42"}', } m = RawExtrinsicMetadata( target=_origin_swhid, **_common_metadata_fields, ) assert m.to_dict() == { "target": str(_origin_swhid), - "id": b"@j\xc9\x01\xbc\x1e#p*\xf3q9\xa7u\x97\x00\x14\x02xa", + "id": b"\xa3)q\x0f\xf7p\xc7\xb0\\O\xe8\x84\x83Z\xb0]\x81\xe9\x95\x13", **common_fields, } assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m m = RawExtrinsicMetadata( target=_content_swhid, **_common_metadata_fields, ) assert m.to_dict() == { "target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", "id": b"\xbc\xa3U\xddf\x19U\xc5\xd2\xd7\xdfK\xd7c\x1f\xa8\xfeh\x992", **common_fields, } assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m hash_hex = "6162" * 10 hash_bin = b"ab" * 10 m = RawExtrinsicMetadata( target=_content_swhid, **_common_metadata_fields, origin="https://example.org/", snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=hash_bin), release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=hash_bin), revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=hash_bin), path=b"/foo/bar", directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=hash_bin), ) assert m.to_dict() == { "target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", "id": b"\x14l\xb0\x1f\xb9\xc0{)\xc7\x0f\xbd\xc0*,YZ\xf5C\xab\xfc", **common_fields, "origin": "https://example.org/", "snapshot": f"swh:1:snp:{hash_hex}", "release": f"swh:1:rel:{hash_hex}", "revision": f"swh:1:rev:{hash_hex}", "path": b"/foo/bar", "directory": f"swh:1:dir:{hash_hex}", } assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m def test_metadata_invalid_target(): """Checks various invalid values for the 'target' field.""" # SWHID passed as string instead of SWHID with pytest.raises(ValueError, match="target must be.*ExtendedSWHID"): RawExtrinsicMetadata( target="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", **_common_metadata_fields, ) def test_metadata_naive_datetime(): with pytest.raises(ValueError, match="must be a timezone-aware datetime"): RawExtrinsicMetadata( target=_origin_swhid, **{**_common_metadata_fields, "discovery_date": datetime.datetime.now()}, ) def test_metadata_validate_context_origin(): """Checks validation of RawExtrinsicMetadata.origin.""" # Origins can't have an 'origin' context with pytest.raises( ValueError, match="Unexpected 'origin' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, origin=_origin_url, **_common_metadata_fields, ) # but all other types can RawExtrinsicMetadata( target=_content_swhid, origin=_origin_url, **_common_metadata_fields, ) # SWHIDs aren't valid origin URLs with pytest.raises(ValueError, match="SWHID used as context origin URL"): RawExtrinsicMetadata( target=_content_swhid, origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", **_common_metadata_fields, ) def test_metadata_validate_context_visit(): """Checks validation of RawExtrinsicMetadata.visit.""" # Origins can't have a 'visit' context with pytest.raises( ValueError, match="Unexpected 'visit' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, visit=42, **_common_metadata_fields, ) # but all other types can RawExtrinsicMetadata( target=_content_swhid, origin=_origin_url, visit=42, **_common_metadata_fields, ) # Missing 'origin' with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"): RawExtrinsicMetadata( target=_content_swhid, visit=42, **_common_metadata_fields, ) # visit id must be positive with pytest.raises(ValueError, match="Nonpositive visit id"): RawExtrinsicMetadata( target=_content_swhid, origin=_origin_url, visit=-42, **_common_metadata_fields, ) def test_metadata_validate_context_snapshot(): """Checks validation of RawExtrinsicMetadata.snapshot.""" # Origins can't have a 'snapshot' context with pytest.raises( ValueError, match="Unexpected 'snapshot' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, snapshot=CoreSWHID( object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'snapshot', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, snapshot=CoreSWHID( object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) def test_metadata_validate_context_release(): """Checks validation of RawExtrinsicMetadata.release.""" # Origins can't have a 'release' context with pytest.raises( ValueError, match="Unexpected 'release' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, release=CoreSWHID( object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'release', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, release=CoreSWHID( object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) def test_metadata_validate_context_revision(): """Checks validation of RawExtrinsicMetadata.revision.""" # Origins can't have a 'revision' context with pytest.raises( ValueError, match="Unexpected 'revision' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, revision=CoreSWHID( object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'revision', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, revision=CoreSWHID( object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) def test_metadata_validate_context_path(): """Checks validation of RawExtrinsicMetadata.path.""" # Origins can't have a 'path' context with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"): RawExtrinsicMetadata( target=_origin_swhid, path=b"/foo/bar", **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, path=b"/foo/bar", **_common_metadata_fields, ) def test_metadata_validate_context_directory(): """Checks validation of RawExtrinsicMetadata.directory.""" # Origins can't have a 'directory' context with pytest.raises( ValueError, match="Unexpected 'directory' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, directory=CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, directory=CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'directory', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, directory=CoreSWHID( object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) def test_metadata_normalize_discovery_date(): fields_copy = {**_common_metadata_fields} truncated_date = fields_copy.pop("discovery_date") assert truncated_date.microsecond == 0 # Check for TypeError on disabled object type: we removed attrs_strict's # type_validator with pytest.raises(TypeError): RawExtrinsicMetadata( target=_content_swhid, discovery_date="not a datetime", **fields_copy ) # Check for truncation to integral second date_with_us = truncated_date.replace(microsecond=42) md = RawExtrinsicMetadata( target=_content_swhid, discovery_date=date_with_us, **fields_copy, ) assert md.discovery_date == truncated_date assert md.discovery_date.tzinfo == datetime.timezone.utc # Check that the timezone gets normalized. Timezones can be offset by a # non-integral number of seconds, so we need to handle that. timezone = datetime.timezone(offset=datetime.timedelta(hours=2)) date_with_tz = truncated_date.astimezone(timezone) assert date_with_tz.tzinfo != datetime.timezone.utc md = RawExtrinsicMetadata( target=_content_swhid, discovery_date=date_with_tz, **fields_copy, ) assert md.discovery_date == truncated_date assert md.discovery_date.tzinfo == datetime.timezone.utc