diff --git a/swh/model/model.py b/swh/model/model.py index ad3aa8b..f524e3f 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,1298 +1,1323 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Implementation of Software Heritage's data model See :ref:`data-model` for an overview of the data model. The classes defined in this module are immutable `attrs objects `__ and enums. All classes define a ``from_dict`` class method and a ``to_dict`` method to convert between them and msgpack-serializable objects. """ from abc import ABCMeta, abstractmethod import datetime from enum import Enum import hashlib from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union import attr from attrs_strict import AttributeTypeError import dateutil.parser import iso8601 from typing_extensions import Final from . import git_objects from .collections import ImmutableDict -from .hashutil import DEFAULT_ALGORITHMS, MultiHash +from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex from .swhids import CoreSWHID from .swhids import ExtendedObjectType as SwhidExtendedObjectType from .swhids import ExtendedSWHID from .swhids import ObjectType as SwhidObjectType class MissingData(Exception): """Raised by `Content.with_data` when it has no way of fetching the data (but not when fetching the data fails).""" pass KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] """The type returned by BaseModel.unique_key().""" SHA1_SIZE = 20 # TODO: Limit this to 20 bytes Sha1Git = bytes Sha1 = bytes KT = TypeVar("KT") VT = TypeVar("VT") +def hash_repr(h: bytes) -> str: + if h is None: + return "None" + else: + return f"hash_to_bytes('{hash_to_hex(h)}')" + + def freeze_optional_dict( d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore ) -> Optional[ImmutableDict[KT, VT]]: if isinstance(d, dict): return ImmutableDict(d) else: return d def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, (CoreSWHID, ExtendedSWHID)): return str(value) elif isinstance(value, Enum): return value.value elif isinstance(value, (dict, ImmutableDict)): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, tuple): return tuple(dictify(v) for v in value) else: return value def _check_type(type_, value): if type_ is object or type_ is Any: return True origin = getattr(type_, "__origin__", None) # Non-generic type, check it directly if origin is None: # This is functionally equivalent to using just this: # return isinstance(value, type) # but using type equality before isinstance allows very quick checks # when the exact class is used (which is the overwhelming majority of cases) # while still allowing subclasses to be used. return type(value) == type_ or isinstance(value, type_) # Check the type of the value itself # # For the same reason as above, this condition is functionally equivalent to: # if origin is not Union and not isinstance(value, origin): if origin is not Union and type(value) != origin and not isinstance(value, origin): return False # Then, if it's a container, check its items. if origin is tuple: args = type_.__args__ if len(args) == 2 and args[1] is Ellipsis: # Infinite tuple return all(_check_type(args[0], item) for item in value) else: # Finite tuple if len(args) != len(value): return False return all( _check_type(item_type, item) for (item_type, item) in zip(args, value) ) elif origin is Union: args = type_.__args__ return any(_check_type(variant, value) for variant in args) elif origin is ImmutableDict: (key_type, value_type) = type_.__args__ return all( _check_type(key_type, key) and _check_type(value_type, value) for (key, value) in value.items() ) else: # No need to check dict or list. because they are converted to ImmutableDict # and tuple respectively. raise NotImplementedError(f"Type-checking {type_}") def type_validator(): """Like attrs_strict.type_validator(), but stricter. It is an attrs validator, which checks attributes have the specified type, using type equality instead of ``isinstance()``, for improved performance """ def validator(instance, attribute, value): if not _check_type(attribute.type, value): raise AttributeTypeError(value, attribute) return validator ModelType = TypeVar("ModelType", bound="BaseModel") class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" __slots__ = () def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" return dictify(attr.asdict(self, recurse=False)) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) def anonymize(self: ModelType) -> Optional[ModelType]: """Returns an anonymized version of the object, if needed. If the object model does not need/support anonymization, returns None. """ return None def unique_key(self) -> KeyType: """Returns a unique key for this object, that can be used for deduplication.""" raise NotImplementedError(f"unique_key for {self}") class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" __slots__ = () id: Sha1Git @abstractmethod def compute_hash(self) -> bytes: """Derived model classes must implement this to compute the object hash. This method is called by the object initialization if the `id` attribute is set to an empty value. """ pass def __attrs_post_init__(self): if not self.id: obj_id = self.compute_hash() object.__setattr__(self, "id", obj_id) def unique_key(self) -> KeyType: return self.id @attr.s(frozen=True, slots=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" object_type: Final = "person" fullname = attr.ib(type=bytes, validator=type_validator()) name = attr.ib(type=Optional[bytes], validator=type_validator()) email = attr.ib(type=Optional[bytes], validator=type_validator()) @classmethod def from_fullname(cls, fullname: bytes): """Returns a Person object, by guessing the name and email from the fullname, in the `name ` format. The fullname is left unchanged.""" if fullname is None: raise TypeError("fullname is None.") name: Optional[bytes] email: Optional[bytes] try: open_bracket = fullname.index(b"<") except ValueError: name = fullname email = None else: raw_name = fullname[:open_bracket] raw_email = fullname[open_bracket + 1 :] if not raw_name: name = None else: name = raw_name.strip() try: close_bracket = raw_email.rindex(b">") except ValueError: email = raw_email else: email = raw_email[:close_bracket] return Person(name=name or None, email=email or None, fullname=fullname,) def anonymize(self) -> "Person": """Returns an anonymized version of the Person object. Anonymization is simply a Person which fullname is the hashed, with unset name or email. """ return Person( fullname=hashlib.sha256(self.fullname).digest(), name=None, email=None, ) @classmethod def from_dict(cls, d): """ If the fullname is missing, construct a fullname using the following heuristics: if the name value is None, we return the email in angle brackets, else, we return the name, a space, and the email in angle brackets. """ if "fullname" not in d: parts = [] if d["name"] is not None: parts.append(d["name"]) if d["email"] is not None: parts.append(b"".join([b"<", d["email"], b">"])) fullname = b" ".join(parts) d = {**d, "fullname": fullname} d = {"name": None, "email": None, **d} return super().from_dict(d) @attr.s(frozen=True, slots=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" object_type: Final = "timestamp" seconds = attr.ib(type=int, validator=type_validator()) microseconds = attr.ib(type=int, validator=type_validator()) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-(2 ** 63) <= value < 2 ** 63): raise ValueError("Seconds must be a signed 64-bits integer.") @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10 ** 6): raise ValueError("Microseconds must be in [0, 1000000[.") @attr.s(frozen=True, slots=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" object_type: Final = "timestamp_with_timezone" timestamp = attr.ib(type=Timestamp, validator=type_validator()) offset = attr.ib(type=int, validator=type_validator()) negative_utc = attr.ib(type=bool, validator=type_validator()) @offset.validator def check_offset(self, attribute, value): """Checks the offset is a 16-bits signed integer (in theory, it should always be between -14 and +14 hours).""" if not (-(2 ** 15) <= value < 2 ** 15): # max 14 hours offset in theory, but you never know what # you'll find in the wild... raise ValueError("offset too large: %d minutes" % value) @negative_utc.validator def check_negative_utc(self, attribute, value): if self.offset and value: raise ValueError("negative_utc can only be True is offset=0") @classmethod def from_dict(cls, time_representation: Union[Dict, datetime.datetime, int]): """Builds a TimestampWithTimezone from any of the formats accepted by :func:`swh.model.normalize_timestamp`.""" # TODO: this accept way more types than just dicts; find a better # name negative_utc = False if isinstance(time_representation, dict): ts = time_representation["timestamp"] if isinstance(ts, dict): seconds = ts.get("seconds", 0) microseconds = ts.get("microseconds", 0) elif isinstance(ts, int): seconds = ts microseconds = 0 else: raise ValueError( f"TimestampWithTimezone.from_dict received non-integer timestamp " f"member {ts!r}" ) offset = time_representation["offset"] if "negative_utc" in time_representation: negative_utc = time_representation["negative_utc"] if negative_utc is None: negative_utc = False elif isinstance(time_representation, datetime.datetime): microseconds = time_representation.microsecond if microseconds: time_representation = time_representation.replace(microsecond=0) seconds = int(time_representation.timestamp()) utcoffset = time_representation.utcoffset() if utcoffset is None: raise ValueError( f"TimestampWithTimezone.from_dict received datetime without " f"timezone: {time_representation}" ) # utcoffset is an integer number of minutes seconds_offset = utcoffset.total_seconds() offset = int(seconds_offset) // 60 elif isinstance(time_representation, int): seconds = time_representation microseconds = 0 offset = 0 else: raise ValueError( f"TimestampWithTimezone.from_dict received non-integer timestamp: " f"{time_representation!r}" ) return cls( timestamp=Timestamp(seconds=seconds, microseconds=microseconds), offset=offset, negative_utc=negative_utc, ) @classmethod def from_datetime(cls, dt: datetime.datetime): return cls.from_dict(dt) def to_datetime(self) -> datetime.datetime: """Convert to a datetime (with a timezone set to the recorded fixed UTC offset) Beware that this conversion can be lossy: the negative_utc flag is not taken into consideration (since it cannot be represented in a datetime). Also note that it may fail due to type overflow. """ timestamp = datetime.datetime.fromtimestamp( self.timestamp.seconds, datetime.timezone(datetime.timedelta(minutes=self.offset)), ) timestamp = timestamp.replace(microsecond=self.timestamp.microseconds) return timestamp @classmethod def from_iso8601(cls, s): """Builds a TimestampWithTimezone from an ISO8601-formatted string. """ dt = iso8601.parse_date(s) tstz = cls.from_datetime(dt) if dt.tzname() == "-00:00": tstz = attr.evolve(tstz, negative_utc=True) return tstz @attr.s(frozen=True, slots=True) class Origin(HashableObject, BaseModel): """Represents a software source: a VCS and an URL.""" object_type: Final = "origin" url = attr.ib(type=str, validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") def unique_key(self) -> KeyType: return {"url": self.url} def compute_hash(self) -> bytes: return hashlib.sha1(self.url.encode("utf-8")).digest() def swhid(self) -> ExtendedSWHID: """Returns a SWHID representing this origin.""" return ExtendedSWHID( object_type=SwhidExtendedObjectType.ORIGIN, object_id=self.id, ) @attr.s(frozen=True, slots=True) class OriginVisit(BaseModel): """Represents an origin visit with a given type at a given point in time, by a SWH loader.""" object_type: Final = "origin_visit" origin = attr.ib(type=str, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) type = attr.ib(type=str, validator=type_validator()) """Should not be set before calling 'origin_visit_add()'.""" visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) @date.validator def check_date(self, attribute, value): """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov["visit"] is None: del ov["visit"] return ov def unique_key(self) -> KeyType: return {"origin": self.origin, "date": str(self.date)} @attr.s(frozen=True, slots=True) class OriginVisitStatus(BaseModel): """Represents a visit update of an origin at a given point in time. """ object_type: Final = "origin_visit_status" origin = attr.ib(type=str, validator=type_validator()) visit = attr.ib(type=int, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_( ["created", "ongoing", "full", "partial", "not_found", "failed"] ), ) - snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) + snapshot = attr.ib( + type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr + ) # Type is optional be to able to use it before adding it to the database model type = attr.ib(type=Optional[str], validator=type_validator(), default=None) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) @date.validator def check_date(self, attribute, value): """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") def unique_key(self) -> KeyType: return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" ALIAS = "alias" + def __repr__(self): + return f"TargetType.{self.name}" + class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" + def __repr__(self): + return f"ObjectType.{self.name}" + @attr.s(frozen=True, slots=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" object_type: Final = "snapshot_branch" - target = attr.ib(type=bytes, validator=type_validator()) + target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=TargetType, validator=type_validator()) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError("Wrong length for bytes identifier: %d" % len(value)) @classmethod def from_dict(cls, d): return cls(target=d["target"], target_type=TargetType(d["target_type"])) @attr.s(frozen=True, slots=True) class Snapshot(HashableObject, BaseModel): """Represents the full state of an origin at a given point in time.""" object_type: Final = "snapshot" branches = attr.ib( type=ImmutableDict[bytes, Optional[SnapshotBranch]], validator=type_validator(), converter=freeze_optional_dict, ) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.snapshot_git_object(self) return hashlib.new("sha1", git_object).digest() @classmethod def from_dict(cls, d): d = d.copy() return cls( branches=ImmutableDict( (name, SnapshotBranch.from_dict(branch) if branch else None) for (name, branch) in d.pop("branches").items() ), **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id) @attr.s(frozen=True, slots=True) class Release(HashableObject, BaseModel): object_type: Final = "release" name = attr.ib(type=bytes, validator=type_validator()) message = attr.ib(type=Optional[bytes], validator=type_validator()) - target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) + target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=ObjectType, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator(), default=None ) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.release_git_object(self) return hashlib.new("sha1", git_object).digest() @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError("release date must be None if author is None.") def to_dict(self): rel = super().to_dict() if rel["metadata"] is None: del rel["metadata"] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get("author"): d["author"] = Person.from_dict(d["author"]) if d.get("date"): d["date"] = TimestampWithTimezone.from_dict(d["date"]) return cls(target_type=ObjectType(d.pop("target_type")), **d) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id) def anonymize(self) -> "Release": """Returns an anonymized version of the Release object. Anonymization consists in replacing the author with an anonymized Person object. """ author = self.author and self.author.anonymize() return attr.evolve(self, author=author) class RevisionType(Enum): GIT = "git" TAR = "tar" DSC = "dsc" SUBVERSION = "svn" MERCURIAL = "hg" CVS = "cvs" BAZAAR = "bzr" + def __repr__(self): + return f"RevisionType.{self.name}" + def tuplify_extra_headers(value: Iterable): return tuple((k, v) for k, v in value) @attr.s(frozen=True, slots=True) class Revision(HashableObject, BaseModel): object_type: Final = "revision" message = attr.ib(type=Optional[bytes], validator=type_validator()) author = attr.ib(type=Person, validator=type_validator()) committer = attr.ib(type=Person, validator=type_validator()) date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) committer_date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator() ) type = attr.ib(type=RevisionType, validator=type_validator()) - directory = attr.ib(type=Sha1Git, validator=type_validator()) + directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), converter=freeze_optional_dict, default=None, ) parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) extra_headers = attr.ib( type=Tuple[Tuple[bytes, bytes], ...], validator=type_validator(), converter=tuplify_extra_headers, default=(), ) def __attrs_post_init__(self): super().__attrs_post_init__() # ensure metadata is a deep copy of whatever was given, and if needed # extract extra_headers from there if self.metadata: metadata = self.metadata if not self.extra_headers and "extra_headers" in metadata: (extra_headers, metadata) = metadata.copy_pop("extra_headers") object.__setattr__( self, "extra_headers", tuplify_extra_headers(extra_headers), ) attr.validate(self) object.__setattr__(self, "metadata", metadata) def compute_hash(self) -> bytes: git_object = git_objects.revision_git_object(self) return hashlib.new("sha1", git_object).digest() @classmethod def from_dict(cls, d): d = d.copy() date = d.pop("date") if date: date = TimestampWithTimezone.from_dict(date) committer_date = d.pop("committer_date") if committer_date: committer_date = TimestampWithTimezone.from_dict(committer_date) return cls( author=Person.from_dict(d.pop("author")), committer=Person.from_dict(d.pop("committer")), date=date, committer_date=committer_date, type=RevisionType(d.pop("type")), parents=tuple(d.pop("parents")), # for BW compat **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id) def anonymize(self) -> "Revision": """Returns an anonymized version of the Revision object. Anonymization consists in replacing the author and committer with an anonymized Person object. """ return attr.evolve( self, author=self.author.anonymize(), committer=self.committer.anonymize() ) @attr.s(frozen=True, slots=True) class DirectoryEntry(BaseModel): object_type: Final = "directory_entry" name = attr.ib(type=bytes, validator=type_validator()) type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) - target = attr.ib(type=Sha1Git, validator=type_validator()) - perms = attr.ib(type=int, validator=type_validator(), converter=int) + target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) + perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @attr.s(frozen=True, slots=True) class Directory(HashableObject, BaseModel): object_type: Final = "directory" entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.directory_git_object(self) return hashlib.new("sha1", git_object).digest() @classmethod def from_dict(cls, d): d = d.copy() return cls( entries=tuple( DirectoryEntry.from_dict(entry) for entry in d.pop("entries") ), **d, ) def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=self.id) @attr.s(frozen=True, slots=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) ) @staticmethod def _hash_data(data: bytes): """Hash some data, returning most of the fields of a content object""" d = MultiHash.from_data(data).digest() d["data"] = data d["length"] = len(data) return d @classmethod def from_dict(cls, d, use_subclass=True): if use_subclass: # Chooses a subclass to instantiate instead. if d["status"] == "absent": return SkippedContent.from_dict(d) else: return Content.from_dict(d) else: return super().from_dict(d) def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError("{} is not a valid hash name.".format(hash_name)) return getattr(self, hash_name) def hashes(self) -> Dict[str, bytes]: """Returns a dictionary {hash_name: hash_value}""" return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} @attr.s(frozen=True, slots=True) class Content(BaseContent): object_type: Final = "content" - sha1 = attr.ib(type=bytes, validator=type_validator()) - sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) - sha256 = attr.ib(type=bytes, validator=type_validator()) - blake2s256 = attr.ib(type=bytes, validator=type_validator()) + sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) + sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) + sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) + blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) length = attr.ib(type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden"]), default="visible", ) data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError("Length must be positive.") @ctime.validator def check_ctime(self, attribute, value): """Checks the ctime has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("ctime must be a timezone-aware datetime.") def to_dict(self): content = super().to_dict() if content["data"] is None: del content["data"] if content["ctime"] is None: del content["ctime"] return content @classmethod def from_data(cls, data, status="visible", ctime=None) -> "Content": """Generate a Content from a given `data` byte string. This populates the Content with the hashes and length for the data passed as argument, as well as the data itself. """ d = cls._hash_data(data) d["status"] = status d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): if isinstance(d.get("ctime"), str): d = d.copy() d["ctime"] = dateutil.parser.parse(d["ctime"]) return super().from_dict(d, use_subclass=False) def with_data(self) -> "Content": """Loads the `data` attribute; meaning that it is guaranteed not to be None after this call. This call is almost a no-op, but subclasses may overload this method to lazy-load data (eg. from disk or objstorage).""" if self.data is None: raise MissingData("Content data is None.") return self def unique_key(self) -> KeyType: return self.sha1 # TODO: use a dict of hashes def swhid(self) -> CoreSWHID: """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) @attr.s(frozen=True, slots=True) class SkippedContent(BaseContent): object_type: Final = "skipped_content" - sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) - sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) - sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) - blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) + sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) + sha1_git = attr.ib( + type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr + ) + sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) + blake2s256 = attr.ib( + type=Optional[bytes], validator=type_validator(), repr=hash_repr + ) length = attr.ib(type=Optional[int], validator=type_validator()) status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if value is None: raise ValueError("Must provide a reason if content is absent.") @length.validator def check_length(self, attribute, value): """Checks the length is positive or -1.""" if value < -1: raise ValueError("Length must be positive or -1.") @ctime.validator def check_ctime(self, attribute, value): """Checks the ctime has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("ctime must be a timezone-aware datetime.") def to_dict(self): content = super().to_dict() if content["origin"] is None: del content["origin"] if content["ctime"] is None: del content["ctime"] return content @classmethod def from_data( cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None ) -> "SkippedContent": """Generate a SkippedContent from a given `data` byte string. This populates the SkippedContent with the hashes and length for the data passed as argument. You can use `attr.evolve` on such a generated content to nullify some of its attributes, e.g. for tests. """ d = cls._hash_data(data) del d["data"] d["status"] = "absent" d["reason"] = reason d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): d2 = d.copy() if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) def unique_key(self) -> KeyType: return self.hashes() class MetadataAuthorityType(Enum): DEPOSIT_CLIENT = "deposit_client" FORGE = "forge" REGISTRY = "registry" + def __repr__(self): + return f"MetadataAuthorityType.{self.name}" + @attr.s(frozen=True, slots=True) class MetadataAuthority(BaseModel): """Represents an entity that provides metadata about an origin or software artifact.""" object_type: Final = "metadata_authority" type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) url = attr.ib(type=str, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, Any]], default=None, validator=type_validator(), converter=freeze_optional_dict, ) def to_dict(self): d = super().to_dict() if d["metadata"] is None: del d["metadata"] return d @classmethod def from_dict(cls, d): d = { **d, "type": MetadataAuthorityType(d["type"]), } return super().from_dict(d) def unique_key(self) -> KeyType: return {"type": self.type.value, "url": self.url} @attr.s(frozen=True, slots=True) class MetadataFetcher(BaseModel): """Represents a software component used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive.""" object_type: Final = "metadata_fetcher" name = attr.ib(type=str, validator=type_validator()) version = attr.ib(type=str, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, Any]], default=None, validator=type_validator(), converter=freeze_optional_dict, ) def to_dict(self): d = super().to_dict() if d["metadata"] is None: del d["metadata"] return d def unique_key(self) -> KeyType: return {"name": self.name, "version": self.version} def normalize_discovery_date(value: Any) -> datetime.datetime: if not isinstance(value, datetime.datetime): raise TypeError("discovery_date must be a timezone-aware datetime.") if value.tzinfo is None: raise ValueError("discovery_date must be a timezone-aware datetime.") # Normalize timezone to utc, and truncate microseconds to 0 return value.astimezone(datetime.timezone.utc).replace(microsecond=0) @attr.s(frozen=True, slots=True) class RawExtrinsicMetadata(HashableObject, BaseModel): object_type: Final = "raw_extrinsic_metadata" # target object target = attr.ib(type=ExtendedSWHID, validator=type_validator()) # source discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date) authority = attr.ib(type=MetadataAuthority, validator=type_validator()) fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) # the metadata itself format = attr.ib(type=str, validator=type_validator()) metadata = attr.ib(type=bytes, validator=type_validator()) # context origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) snapshot = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) release = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) revision = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) directory = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator() ) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.raw_extrinsic_metadata_git_object(self) return hashlib.new("sha1", git_object).digest() @origin.validator def check_origin(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.SNAPSHOT, SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'origin' context for " f"{self.target.object_type.name.lower()} object: {value}" ) if value.startswith("swh:"): # Technically this is valid; but: # 1. SWHIDs are URIs, not URLs # 2. if a SWHID gets here, it's very likely to be a mistake # (and we can remove this check if it turns out there is a # legitimate use for it). raise ValueError(f"SWHID used as context origin URL: {value}") @visit.validator def check_visit(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.SNAPSHOT, SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'visit' context for " f"{self.target.object_type.name.lower()} object: {value}" ) if self.origin is None: raise ValueError("'origin' context must be set if 'visit' is.") if value <= 0: raise ValueError("Nonpositive visit id") @snapshot.validator def check_snapshot(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.RELEASE, SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'snapshot' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.SNAPSHOT, value) @release.validator def check_release(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.REVISION, SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'release' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.RELEASE, value) @revision.validator def check_revision(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'revision' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.REVISION, value) @path.validator def check_path(self, attribute, value): if value is None: return if self.target.object_type not in ( SwhidExtendedObjectType.DIRECTORY, SwhidExtendedObjectType.CONTENT, ): raise ValueError( f"Unexpected 'path' context for " f"{self.target.object_type.name.lower()} object: {value}" ) @directory.validator def check_directory(self, attribute, value): if value is None: return if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,): raise ValueError( f"Unexpected 'directory' context for " f"{self.target.object_type.name.lower()} object: {value}" ) self._check_swhid(SwhidObjectType.DIRECTORY, value) def _check_swhid(self, expected_object_type, swhid): if isinstance(swhid, str): raise ValueError(f"Expected SWHID, got a string: {swhid}") if swhid.object_type != expected_object_type: raise ValueError( f"Expected SWHID type '{expected_object_type.name.lower()}', " f"got '{swhid.object_type.name.lower()}' in {swhid}" ) def to_dict(self): d = super().to_dict() context_keys = ( "origin", "visit", "snapshot", "release", "revision", "directory", "path", ) for context_key in context_keys: if d[context_key] is None: del d[context_key] return d @classmethod def from_dict(cls, d): d = { **d, "target": ExtendedSWHID.from_string(d["target"]), "authority": MetadataAuthority.from_dict(d["authority"]), "fetcher": MetadataFetcher.from_dict(d["fetcher"]), } swhid_keys = ("snapshot", "release", "revision", "directory") for swhid_key in swhid_keys: if d.get(swhid_key): d[swhid_key] = CoreSWHID.from_string(d[swhid_key]) return super().from_dict(d) def swhid(self) -> ExtendedSWHID: """Returns a SWHID representing this RawExtrinsicMetadata object.""" return ExtendedSWHID( object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA, object_id=self.id, ) @attr.s(frozen=True, slots=True) class ExtID(HashableObject, BaseModel): object_type: Final = "extid" extid_type = attr.ib(type=str, validator=type_validator()) extid = attr.ib(type=bytes, validator=type_validator()) target = attr.ib(type=CoreSWHID, validator=type_validator()) extid_version = attr.ib(type=int, validator=type_validator(), default=0) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) @classmethod def from_dict(cls, d): return cls( extid=d["extid"], extid_type=d["extid_type"], target=CoreSWHID.from_string(d["target"]), extid_version=d.get("extid_version", 0), ) def compute_hash(self) -> bytes: git_object = git_objects.extid_git_object(self) return hashlib.new("sha1", git_object).digest() diff --git a/swh/model/swhids.py b/swh/model/swhids.py index ee1be20..b1283c1 100644 --- a/swh/model/swhids.py +++ b/swh/model/swhids.py @@ -1,457 +1,463 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Classes to represent :ref:`SWH persistend IDentifiers `. :class:`CoreSWHID` represents a SWHID with no qualifier, and :class:`QualifiedSWHID` represents a SWHID that may have qualifiers. :class:`ExtendedSWHID` extends the definition of SWHID to other object types, and is used internally in Software Heritage; it does not support qualifiers. """ from __future__ import annotations import enum import re from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, Union import urllib.parse import attr from attrs_strict import type_validator from .exceptions import ValidationError from .hashutil import hash_to_bytes, hash_to_hex class ObjectType(enum.Enum): """Possible object types of a QualifiedSWHID or CoreSWHID. The values of each variant is what is used in the SWHID's string representation.""" SNAPSHOT = "snp" REVISION = "rev" RELEASE = "rel" DIRECTORY = "dir" CONTENT = "cnt" class ExtendedObjectType(enum.Enum): """Possible object types of an ExtendedSWHID. The variants are a superset of :class:`ObjectType`'s""" SNAPSHOT = "snp" REVISION = "rev" RELEASE = "rel" DIRECTORY = "dir" CONTENT = "cnt" ORIGIN = "ori" RAW_EXTRINSIC_METADATA = "emd" SWHID_NAMESPACE = "swh" SWHID_VERSION = 1 SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"] EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"] SWHID_SEP = ":" SWHID_CTXT_SEP = ";" SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} SWHID_RE_RAW = ( f"(?P{SWHID_NAMESPACE})" f"{SWHID_SEP}(?P{SWHID_VERSION})" f"{SWHID_SEP}(?P{'|'.join(EXTENDED_SWHID_TYPES)})" f"{SWHID_SEP}(?P[0-9a-f]{{40}})" f"({SWHID_CTXT_SEP}(?P\\S+))?" ) SWHID_RE = re.compile(SWHID_RE_RAW) # type of the "object_type" attribute of the SWHID class; either # ObjectType or ExtendedObjectType _TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) # the SWHID class itself (this is used so that X.from_string() can return X # for all X subclass of _BaseSWHID) _TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID") -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class _BaseSWHID(Generic[_TObjectType]): """Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID. This is an "abstract" class and should not be instantiated directly; it only exists to deduplicate code between these three SWHID classes.""" namespace = attr.ib(type=str, default=SWHID_NAMESPACE) """the namespace of the identifier, defaults to ``swh``""" scheme_version = attr.ib(type=int, default=SWHID_VERSION) """the scheme version of the identifier, defaults to 1""" # overridden by subclasses object_type: _TObjectType """the type of object the identifier points to""" object_id = attr.ib(type=bytes, validator=type_validator()) """object's identifier""" @namespace.validator def check_namespace(self, attribute, value): if value != SWHID_NAMESPACE: raise ValidationError( "Invalid SWHID: invalid namespace: %(namespace)s", params={"namespace": value}, ) @scheme_version.validator def check_scheme_version(self, attribute, value): if value != SWHID_VERSION: raise ValidationError( "Invalid SWHID: invalid version: %(version)s", params={"version": value} ) @object_id.validator def check_object_id(self, attribute, value): if len(value) != 20: raise ValidationError( "Invalid SWHID: invalid checksum: %(object_id)s", params={"object_id": hash_to_hex(value)}, ) def __str__(self) -> str: return SWHID_SEP.join( [ self.namespace, str(self.scheme_version), self.object_type.value, hash_to_hex(self.object_id), ] ) + def __repr__(self) -> str: + return f"{self.__class__.__name__}.from_string('{self}')" + @classmethod def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID: parts = _parse_swhid(s) if parts.pop("qualifiers"): raise ValidationError(f"{cls.__name__} does not support qualifiers.") try: return cls(**parts) except ValueError as e: raise ValidationError( "ValueError: %(args)s", params={"args": e.args} ) from None -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class CoreSWHID(_BaseSWHID[ObjectType]): """ Dataclass holding the relevant info associated to a SoftWare Heritage persistent IDentifier (SWHID). Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs with no qualifiers. Raises: swh.model.exceptions.ValidationError: In case of invalid object type or id To get the raw SWHID string from an instance of this class, use the :func:`str` function: >>> swhid = CoreSWHID( ... object_type=ObjectType.CONTENT, ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), ... ) >>> str(swhid) 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' And vice-versa with :meth:`CoreSWHID.from_string`: >>> swhid == CoreSWHID.from_string( ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" ... ) True """ object_type = attr.ib( type=ObjectType, validator=type_validator(), converter=ObjectType ) """the type of object the identifier points to""" def to_extended(self) -> ExtendedSWHID: """Converts this CoreSWHID into an ExtendedSWHID. As ExtendedSWHID is a superset of CoreSWHID, this is lossless.""" return ExtendedSWHID( namespace=self.namespace, scheme_version=self.scheme_version, object_type=ExtendedObjectType(self.object_type.value), object_id=self.object_id, ) def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: if swhid is None or isinstance(swhid, CoreSWHID): return swhid else: return CoreSWHID.from_string(swhid) def _parse_lines_qualifier( lines: Union[str, Tuple[int, Optional[int]], None] ) -> Optional[Tuple[int, Optional[int]]]: try: if lines is None or isinstance(lines, tuple): return lines elif "-" in lines: (from_, to) = lines.split("-", 2) return (int(from_), int(to)) else: return (int(lines), None) except ValueError: raise ValidationError( "Invalid format for the lines qualifier: %(lines)s", params={"lines": lines} ) def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]: if path is None or isinstance(path, bytes): return path else: return urllib.parse.unquote_to_bytes(path) -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class QualifiedSWHID(_BaseSWHID[ObjectType]): """ Dataclass holding the relevant info associated to a SoftWare Heritage persistent IDentifier (SWHID) Raises: swh.model.exceptions.ValidationError: In case of invalid object type or id To get the raw SWHID string from an instance of this class, use the :func:`str` function: >>> swhid = QualifiedSWHID( ... object_type=ObjectType.CONTENT, ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), ... lines=(5, 10), ... ) >>> str(swhid) 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10' And vice-versa with :meth:`QualifiedSWHID.from_string`: >>> swhid == QualifiedSWHID.from_string( ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10" ... ) True """ object_type = attr.ib( type=ObjectType, validator=type_validator(), converter=ObjectType ) """the type of object the identifier points to""" # qualifiers: origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) """the software origin where an object has been found or observed in the wild, as an URI""" visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid) """the core identifier of a snapshot corresponding to a specific visit of a repository containing the designated object""" anchor = attr.ib( type=Optional[CoreSWHID], default=None, validator=type_validator(), converter=_parse_core_swhid, ) """a designated node in the Merkle DAG relative to which a path to the object is specified, as the core identifier of a directory, a revision, a release, or a snapshot""" path = attr.ib( type=Optional[bytes], default=None, validator=type_validator(), converter=_parse_path_qualifier, ) """the absolute file path, from the root directory associated to the anchor node, to the object; when the anchor denotes a directory or a revision, and almost always when it’s a release, the root directory is uniquely determined; when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD (possibly indirectly), and undefined if such a reference is missing""" lines = attr.ib( type=Optional[Tuple[int, Optional[int]]], default=None, validator=type_validator(), converter=_parse_lines_qualifier, ) """lines: line number(s) of interest, usually within a content object""" @visit.validator def check_visit(self, attribute, value): if value and value.object_type != ObjectType.SNAPSHOT: raise ValidationError( "The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'", params={"type": value.object_type.value}, ) @anchor.validator def check_anchor(self, attribute, value): if value and value.object_type not in ( ObjectType.DIRECTORY, ObjectType.REVISION, ObjectType.RELEASE, ObjectType.SNAPSHOT, ): raise ValidationError( "The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, " "not '%s(type)s'", params={"type": value.object_type.value}, ) def qualifiers(self) -> Dict[str, str]: origin = self.origin if origin: unescaped_origin = origin origin = origin.replace(";", "%3B") assert urllib.parse.unquote_to_bytes( origin ) == urllib.parse.unquote_to_bytes( unescaped_origin ), "Escaping ';' in the origin qualifier corrupted the origin URL." d: Dict[str, Optional[str]] = { "origin": origin, "visit": str(self.visit) if self.visit else None, "anchor": str(self.anchor) if self.anchor else None, "path": ( urllib.parse.quote_from_bytes(self.path) if self.path is not None else None ), "lines": ( "-".join(str(line) for line in self.lines if line is not None) if self.lines else None ), } return {k: v for (k, v) in d.items() if v is not None} def __str__(self) -> str: swhid = SWHID_SEP.join( [ self.namespace, str(self.scheme_version), self.object_type.value, hash_to_hex(self.object_id), ] ) qualifiers = self.qualifiers() if qualifiers: for k, v in qualifiers.items(): swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) return swhid + def __repr__(self) -> str: + return super().__repr__() + @classmethod def from_string(cls, s: str) -> QualifiedSWHID: parts = _parse_swhid(s) qualifiers = parts.pop("qualifiers") invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS if invalid_qualifiers: raise ValidationError( "Invalid qualifier(s): %(qualifiers)s", params={"qualifiers": ", ".join(invalid_qualifiers)}, ) try: return QualifiedSWHID(**parts, **qualifiers) except ValueError as e: raise ValidationError( "ValueError: %(args)s", params={"args": e.args} ) from None -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]): """ Dataclass holding the relevant info associated to a SoftWare Heritage persistent IDentifier (SWHID). It extends `CoreSWHID`, by allowing non-standard object types; and should only be used internally to Software Heritage. Raises: swh.model.exceptions.ValidationError: In case of invalid object type or id To get the raw SWHID string from an instance of this class, use the :func:`str` function: >>> swhid = ExtendedSWHID( ... object_type=ExtendedObjectType.CONTENT, ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), ... ) >>> str(swhid) 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' And vice-versa with :meth:`CoreSWHID.from_string`: >>> swhid == ExtendedSWHID.from_string( ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" ... ) True """ object_type = attr.ib( type=ExtendedObjectType, validator=type_validator(), converter=ExtendedObjectType, ) """the type of object the identifier points to""" def _parse_swhid(swhid: str) -> Dict[str, Any]: """Parse a Software Heritage identifier (SWHID) from string (see: :ref:`persistent-identifiers`.) This is for internal use; use :meth:`CoreSWHID.from_string`, :meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead, as they perform validation and build a dataclass. Args: swhid (str): A persistent identifier Raises: swh.model.exceptions.ValidationError: if passed string is not a valid SWHID """ m = SWHID_RE.fullmatch(swhid) if not m: raise ValidationError( "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} ) parts: Dict[str, Any] = m.groupdict() qualifiers_raw = parts["qualifiers"] parts["qualifiers"] = {} if qualifiers_raw: for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): try: k, v = qualifier.split("=", maxsplit=1) parts["qualifiers"][k] = v except ValueError: raise ValidationError( "Invalid SWHID: invalid qualifier: %(qualifier)s", params={"qualifier": qualifier}, ) parts["scheme_version"] = int(parts["scheme_version"]) parts["object_id"] = hash_to_bytes(parts["object_id"]) return parts diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 11bca63..dfaf143 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -1,1272 +1,1292 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import copy import datetime from typing import Any, List, Optional, Tuple, Union import attr from attrs_strict import AttributeTypeError +import dateutil from hypothesis import given from hypothesis.strategies import binary import pytest from swh.model.collections import ImmutableDict from swh.model.from_disk import DentryPerms from swh.model.hashutil import MultiHash, hash_to_bytes import swh.model.hypothesis_strategies as strategies +import swh.model.model from swh.model.model import ( BaseModel, Content, Directory, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, MissingData, Origin, OriginVisit, OriginVisitStatus, Person, RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, TargetType, Timestamp, TimestampWithTimezone, type_validator, ) +import swh.model.swhids from swh.model.swhids import CoreSWHID, ExtendedSWHID, ObjectType from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.model.tests.test_identifiers import ( TS_DATETIMES, TS_TIMEZONES, directory_example, metadata_example, release_example, revision_example, snapshot_example, ) EXAMPLE_HASH = hash_to_bytes("94a9ed024d3859793618152ea559a168bbcbb5e2") @given(strategies.objects()) def test_todict_inverse_fromdict(objtype_and_obj): (obj_type, obj) = objtype_and_obj if obj_type in ("origin", "origin_visit"): return obj_as_dict = obj.to_dict() obj_as_dict_copy = copy.deepcopy(obj_as_dict) # Check the composition of to_dict and from_dict is the identity assert obj == type(obj).from_dict(obj_as_dict) # Check from_dict() does not change the input dict assert obj_as_dict == obj_as_dict_copy # Check the composition of from_dict and to_dict is the identity assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() +@given(strategies.objects()) +def test_repr(objtype_and_obj): + """Checks every model object has a working repr(), and that it can be eval()uated + (so that printed objects can be copy-pasted to write test cases.)""" + (obj_type, obj) = objtype_and_obj + + r = repr(obj) + env = { + "tzutc": lambda: datetime.timezone.utc, + "tzfile": dateutil.tz.tzfile, + "hash_to_bytes": hash_to_bytes, + **swh.model.swhids.__dict__, + **swh.model.model.__dict__, + } + assert eval(r, env) == obj + + @attr.s class Cls1: pass @attr.s class Cls2(Cls1): pass _custom_namedtuple = collections.namedtuple("_custom_namedtuple", "a b") class _custom_tuple(tuple): pass # List of (type, valid_values, invalid_values) _TYPE_VALIDATOR_PARAMETERS: List[Tuple[Any, List[Any], List[Any]]] = [ # base types: ( bool, [True, False], [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ("foo",), ImmutableDict()], ), ( int, [-1, 0, 1, 42, 1000, DentryPerms.directory, True, False], [None, "123", 0.0, (), ImmutableDict()], ), ( float, [-1.0, 0.0, 1.0, float("infinity"), float("NaN")], [True, False, None, 1, "1.2", (), ImmutableDict()], ), ( bytes, [b"", b"123"], [None, bytearray(b"\x12\x34"), "123", 0, 123, (), (1, 2, 3), ImmutableDict()], ), (str, ["", "123"], [None, b"123", b"", 0, (), (1, 2, 3), ImmutableDict()]), # unions: ( Optional[int], [None, -1, 0, 1, 42, 1000, DentryPerms.directory], ["123", 0.0, (), ImmutableDict()], ), ( Optional[bytes], [None, b"", b"123"], ["123", "", 0, (), (1, 2, 3), ImmutableDict()], ), ( Union[str, bytes], ["", "123", b"123", b""], [None, 0, (), (1, 2, 3), ImmutableDict()], ), ( Union[str, bytes, None], ["", "123", b"123", b"", None], [0, (), (1, 2, 3), ImmutableDict()], ), # tuples ( Tuple[str, str], [("foo", "bar"), ("", ""), _custom_namedtuple("", ""), _custom_tuple(("", ""))], [("foo",), ("foo", "bar", "baz"), ("foo", 42), (42, "foo")], ), ( Tuple[str, ...], [ ("foo",), ("foo", "bar"), ("", ""), ("foo", "bar", "baz"), _custom_namedtuple("", ""), _custom_tuple(("", "")), ], [("foo", 42), (42, "foo")], ), # composite generic: ( Tuple[Union[str, int], Union[str, int]], [("foo", "foo"), ("foo", 42), (42, "foo"), (42, 42)], [("foo", b"bar"), (b"bar", "foo")], ), ( Union[Tuple[str, str], Tuple[int, int]], [("foo", "foo"), (42, 42)], [("foo", b"bar"), (b"bar", "foo"), ("foo", 42), (42, "foo")], ), ( Tuple[Tuple[bytes, bytes], ...], [(), ((b"foo", b"bar"),), ((b"foo", b"bar"), (b"baz", b"qux"))], [((b"foo", "bar"),), ((b"foo", b"bar"), ("baz", b"qux"))], ), # standard types: ( datetime.datetime, [datetime.datetime.now(), datetime.datetime.now(tz=datetime.timezone.utc)], [None, 123], ), # ImmutableDict ( ImmutableDict[str, int], [ ImmutableDict(), ImmutableDict({"foo": 42}), ImmutableDict({"foo": 42, "bar": 123}), ], [ImmutableDict({"foo": "bar"}), ImmutableDict({42: 123})], ), # Any: (object, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],), (Any, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],), ( ImmutableDict[Any, int], [ ImmutableDict(), ImmutableDict({"foo": 42}), ImmutableDict({"foo": 42, "bar": 123}), ImmutableDict({42: 123}), ], [ImmutableDict({"foo": "bar"})], ), ( ImmutableDict[str, Any], [ ImmutableDict(), ImmutableDict({"foo": 42}), ImmutableDict({"foo": "bar"}), ImmutableDict({"foo": 42, "bar": 123}), ], [ImmutableDict({42: 123})], ), # attr objects: ( Timestamp, [Timestamp(seconds=123, microseconds=0),], [None, "2021-09-28T11:27:59", 123], ), (Cls1, [Cls1(), Cls2()], [None, b"abcd"],), # enums: ( TargetType, [TargetType.CONTENT, TargetType.ALIAS], ["content", "alias", 123, None], ), ] @pytest.mark.parametrize( "type_,value", [ pytest.param(type_, value, id=f"type={type_}, value={value}") for (type_, values, _) in _TYPE_VALIDATOR_PARAMETERS for value in values ], ) def test_type_validator_valid(type_, value): type_validator()(None, attr.ib(type=type_), value) @pytest.mark.parametrize( "type_,value", [ pytest.param(type_, value, id=f"type={type_}, value={value}") for (type_, _, values) in _TYPE_VALIDATOR_PARAMETERS for value in values ], ) def test_type_validator_invalid(type_, value): with pytest.raises(AttributeTypeError): type_validator()(None, attr.ib(type=type_), value) @pytest.mark.parametrize("object_type, objects", TEST_OBJECTS.items()) def test_swh_model_todict_fromdict(object_type, objects): """checks model objects in swh_model_data are in correct shape""" assert objects for obj in objects: # Check the composition of from_dict and to_dict is the identity obj_as_dict = obj.to_dict() assert obj == type(obj).from_dict(obj_as_dict) assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() def test_unique_key(): url = "http://example.org/" date = datetime.datetime.now(tz=datetime.timezone.utc) id_ = b"42" * 10 assert Origin(url=url).unique_key() == {"url": url} assert OriginVisit(origin=url, date=date, type="git").unique_key() == { "origin": url, "date": str(date), } assert OriginVisitStatus( origin=url, visit=42, date=date, status="created", snapshot=None ).unique_key() == {"origin": url, "visit": "42", "date": str(date),} assert Snapshot.from_dict({**snapshot_example, "id": id_}).unique_key() == id_ assert Release.from_dict({**release_example, "id": id_}).unique_key() == id_ assert Revision.from_dict({**revision_example, "id": id_}).unique_key() == id_ assert Directory.from_dict({**directory_example, "id": id_}).unique_key() == id_ assert ( RawExtrinsicMetadata.from_dict({**metadata_example, "id": id_}).unique_key() == id_ ) cont = Content.from_data(b"foo") assert cont.unique_key().hex() == "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" kwargs = { **cont.to_dict(), "reason": "foo", "status": "absent", } del kwargs["data"] assert SkippedContent(**kwargs).unique_key() == cont.hashes() # Anonymization @given(strategies.objects()) def test_anonymization(objtype_and_obj): (obj_type, obj) = objtype_and_obj def check_person(p): if p is not None: assert p.name is None assert p.email is None assert len(p.fullname) == 32 anon_obj = obj.anonymize() if obj_type == "person": assert anon_obj is not None check_person(anon_obj) elif obj_type == "release": assert anon_obj is not None check_person(anon_obj.author) elif obj_type == "revision": assert anon_obj is not None check_person(anon_obj.author) check_person(anon_obj.committer) else: assert anon_obj is None # Origin, OriginVisit, OriginVisitStatus @given(strategies.origins()) def test_todict_origins(origin): obj = origin.to_dict() assert "type" not in obj assert type(origin)(url=origin.url) == type(origin).from_dict(obj) @given(strategies.origin_visits()) def test_todict_origin_visits(origin_visit): obj = origin_visit.to_dict() assert origin_visit == type(origin_visit).from_dict(obj) def test_origin_visit_naive_datetime(): with pytest.raises(ValueError, match="must be a timezone-aware datetime"): OriginVisit( origin="http://foo/", date=datetime.datetime.now(), type="git", ) @given(strategies.origin_visit_statuses()) def test_todict_origin_visit_statuses(origin_visit_status): obj = origin_visit_status.to_dict() assert origin_visit_status == type(origin_visit_status).from_dict(obj) def test_origin_visit_status_naive_datetime(): with pytest.raises(ValueError, match="must be a timezone-aware datetime"): OriginVisitStatus( origin="http://foo/", visit=42, date=datetime.datetime.now(), status="ongoing", snapshot=None, ) # Timestamp @given(strategies.timestamps()) def test_timestamps_strategy(timestamp): attr.validate(timestamp) def test_timestamp_seconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds="0", microseconds=0) attr.validate(Timestamp(seconds=2 ** 63 - 1, microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=2 ** 63, microseconds=0) attr.validate(Timestamp(seconds=-(2 ** 63), microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=-(2 ** 63) - 1, microseconds=0) def test_timestamp_microseconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds=0, microseconds="0") attr.validate(Timestamp(seconds=0, microseconds=10 ** 6 - 1)) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=10 ** 6) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=-1) def test_timestamp_from_dict(): assert Timestamp.from_dict({"seconds": 10, "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": "10", "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": 10, "microseconds": "5"}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": -1}) Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6 - 1}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6}) # TimestampWithTimezone def test_timestampwithtimezone(): ts = Timestamp(seconds=0, microseconds=0) tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=False) attr.validate(tstz) assert tstz.negative_utc is False attr.validate(TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False)) attr.validate(TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False)) tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=True) attr.validate(tstz) assert tstz.negative_utc is True with pytest.raises(AttributeTypeError): TimestampWithTimezone( timestamp=datetime.datetime.now(), offset=0, negative_utc=False ) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset="0", negative_utc=False) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset=1.0, negative_utc=False) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=0) with pytest.raises(ValueError): TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=True) with pytest.raises(ValueError): TimestampWithTimezone(timestamp=ts, offset=-1, negative_utc=True) def test_timestampwithtimezone_from_datetime(): tz = datetime.timezone(datetime.timedelta(minutes=+60)) date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=0,), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_naive_datetime(): date = datetime.datetime(2020, 2, 27, 14, 39, 19) with pytest.raises(ValueError, match="datetime without timezone"): TimestampWithTimezone.from_datetime(date) def test_timestampwithtimezone_from_iso8601(): date = "2020-02-27 14:39:19.123456+0100" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=123456,), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601_negative_utc(): date = "2020-02-27 13:39:19-0000" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=0,), offset=0, negative_utc=True, ) @pytest.mark.parametrize("date", TS_DATETIMES) @pytest.mark.parametrize("tz", TS_TIMEZONES) @pytest.mark.parametrize("microsecond", [0, 1, 10, 100, 1000, 999999]) def test_timestampwithtimezone_to_datetime(date, tz, microsecond): date = date.replace(tzinfo=tz, microsecond=microsecond) tstz = TimestampWithTimezone.from_datetime(date) assert tstz.to_datetime() == date assert tstz.to_datetime().utcoffset() == date.utcoffset() def test_person_from_fullname(): """The author should have name, email and fullname filled. """ actual_person = Person.from_fullname(b"tony ") assert actual_person == Person( fullname=b"tony ", name=b"tony", email=b"ynot@dagobah", ) def test_person_from_fullname_no_email(): """The author and fullname should be the same as the input (author). """ actual_person = Person.from_fullname(b"tony") assert actual_person == Person(fullname=b"tony", name=b"tony", email=None,) def test_person_from_fullname_empty_person(): """Empty person has only its fullname filled with the empty byte-string. """ actual_person = Person.from_fullname(b"") assert actual_person == Person(fullname=b"", name=None, email=None,) def test_git_author_line_to_author(): # edge case out of the way with pytest.raises(TypeError): Person.from_fullname(None) tests = { b"a ": Person(name=b"a", email=b"b@c.com", fullname=b"a ",), b"": Person( name=None, email=b"foo@bar.com", fullname=b"", ), b"malformed ': Person( name=b"malformed", email=b'"', ), b"trailing ": Person( name=b"trailing", email=b"sp@c.e", fullname=b"trailing ", ), b"no": Person(name=b"no", email=b"sp@c.e", fullname=b"no",), b" more ": Person( name=b"more", email=b"sp@c.es", fullname=b" more ", ), b" <>": Person(name=None, email=None, fullname=b" <>",), } for person in sorted(tests): expected_person = tests[person] assert expected_person == Person.from_fullname(person) # Content def test_content_get_hash(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) for (hash_name, hash_) in hashes.items(): assert c.get_hash(hash_name) == hash_ def test_content_hashes(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) assert c.hashes() == hashes def test_content_data(): c = Content( length=42, status="visible", data=b"foo", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) assert c.with_data() == c def test_content_data_missing(): c = Content( length=42, status="visible", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with pytest.raises(MissingData): c.with_data() @given(strategies.present_contents_d()) def test_content_from_dict(content_d): c = Content.from_data(**content_d) assert c assert c.ctime == content_d["ctime"] content_d2 = c.to_dict() c2 = Content.from_dict(content_d2) assert c2.ctime == c.ctime def test_content_from_dict_str_ctime(): # test with ctime as a string n = datetime.datetime(2020, 5, 6, 12, 34, tzinfo=datetime.timezone.utc) content_d = { "ctime": n.isoformat(), "data": b"", "length": 0, "sha1": b"\x00", "sha256": b"\x00", "sha1_git": b"\x00", "blake2s256": b"\x00", } c = Content.from_dict(content_d) assert c.ctime == n def test_content_from_dict_str_naive_ctime(): # test with ctime as a string n = datetime.datetime(2020, 5, 6, 12, 34) content_d = { "ctime": n.isoformat(), "data": b"", "length": 0, "sha1": b"\x00", "sha256": b"\x00", "sha1_git": b"\x00", "blake2s256": b"\x00", } with pytest.raises(ValueError, match="must be a timezone-aware datetime."): Content.from_dict(content_d) @given(binary(max_size=4096)) def test_content_from_data(data): c = Content.from_data(data) assert c.data == data assert c.length == len(data) assert c.status == "visible" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(binary(max_size=4096)) def test_hidden_content_from_data(data): c = Content.from_data(data, status="hidden") assert c.data == data assert c.length == len(data) assert c.status == "hidden" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value def test_content_naive_datetime(): c = Content.from_data(b"foo") with pytest.raises(ValueError, match="must be a timezone-aware datetime"): Content( **c.to_dict(), ctime=datetime.datetime.now(), ) # SkippedContent @given(binary(max_size=4096)) def test_skipped_content_from_data(data): c = SkippedContent.from_data(data, reason="reason") assert c.reason == "reason" assert c.length == len(data) assert c.status == "absent" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(strategies.skipped_contents_d()) def test_skipped_content_origin_is_str(skipped_content_d): assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = "http://path/to/origin" assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = Origin(url="http://path/to/origin") with pytest.raises(ValueError, match="origin"): SkippedContent.from_dict(skipped_content_d) def test_skipped_content_naive_datetime(): c = SkippedContent.from_data(b"foo", reason="reason") with pytest.raises(ValueError, match="must be a timezone-aware datetime"): SkippedContent( **c.to_dict(), ctime=datetime.datetime.now(), ) # Revision def test_revision_extra_headers_no_headers(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_model = Revision(**rev_dict) assert rev_model.metadata is None assert rev_model.extra_headers == () rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } rev_model = Revision(**rev_dict) assert rev_model.metadata == rev_dict["metadata"] assert rev_model.extra_headers == () def test_revision_extra_headers_with_headers(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\x00"), (b"header1", b"again"), ) rev_dict["extra_headers"] = extra_headers rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_in_metadata(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\x00"), (b"header1", b"again"), ) # check the bw-compat init hook does the job # ie. extra_headers are given in the metadata field rev_dict["metadata"]["extra_headers"] = extra_headers rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_as_lists(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = {} extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\x00"), (b"header1", b"again"), ) # check Revision.extra_headers tuplify does the job rev_dict["extra_headers"] = [list(x) for x in extra_headers] rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_type_error(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) orig_rev_dict = attr.asdict(rev, recurse=False) orig_rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( ("header1", b"value1"), (b"header2", 42), ("header1", "again"), ) # check headers one at a time # if given as extra_header for extra_header in extra_headers: rev_dict = copy.deepcopy(orig_rev_dict) rev_dict["extra_headers"] = (extra_header,) with pytest.raises(AttributeTypeError): Revision(**rev_dict) # if given as metadata for extra_header in extra_headers: rev_dict = copy.deepcopy(orig_rev_dict) rev_dict["metadata"]["extra_headers"] = (extra_header,) with pytest.raises(AttributeTypeError): Revision(**rev_dict) def test_revision_extra_headers_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_model = Revision.from_dict(rev_dict) assert rev_model.metadata is None assert rev_model.extra_headers == () rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } rev_model = Revision.from_dict(rev_dict) assert rev_model.metadata == rev_dict["metadata"] assert rev_model.extra_headers == () extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) rev_dict["extra_headers"] = extra_headers rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_in_metadata_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) # check the bw-compat init hook does the job rev_dict["metadata"]["extra_headers"] = extra_headers rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_as_lists_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_model = Revision.from_dict(rev_dict) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) # check Revision.extra_headers converter does the job rev_dict["extra_headers"] = [list(x) for x in extra_headers] rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers @given(strategies.objects(split_content=True)) def test_object_type(objtype_and_obj): obj_type, obj = objtype_and_obj assert obj_type == obj.object_type def test_object_type_is_final(): object_types = set() def check_final(cls): if hasattr(cls, "object_type"): assert cls.object_type not in object_types object_types.add(cls.object_type) if cls.__subclasses__(): assert not hasattr(cls, "object_type") for subcls in cls.__subclasses__(): check_final(subcls) check_final(BaseModel) _metadata_authority = MetadataAuthority( type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org", ) _metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",) _content_swhid = ExtendedSWHID.from_string( "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2" ) _origin_url = "https://forge.softwareheritage.org/source/swh-model.git" _origin_swhid = ExtendedSWHID.from_string( "swh:1:ori:94a9ed024d3859793618152ea559a168bbcbb5e2" ) _dummy_qualifiers = {"origin": "https://example.com", "lines": "42"} _common_metadata_fields = dict( discovery_date=datetime.datetime( 2021, 1, 29, 13, 57, 9, tzinfo=datetime.timezone.utc ), authority=_metadata_authority, fetcher=_metadata_fetcher, format="json", metadata=b'{"origin": "https://example.com", "lines": "42"}', ) def test_metadata_valid(): """Checks valid RawExtrinsicMetadata objects don't raise an error.""" # Simplest case RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields) # Object with an SWHID RawExtrinsicMetadata( target=_content_swhid, **_common_metadata_fields, ) def test_metadata_to_dict(): """Checks valid RawExtrinsicMetadata objects don't raise an error.""" common_fields = { "authority": {"type": "forge", "url": "https://forge.softwareheritage.org"}, "fetcher": {"name": "test-fetcher", "version": "0.0.1",}, "discovery_date": _common_metadata_fields["discovery_date"], "format": "json", "metadata": b'{"origin": "https://example.com", "lines": "42"}', } m = RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields,) assert m.to_dict() == { "target": str(_origin_swhid), "id": b"@j\xc9\x01\xbc\x1e#p*\xf3q9\xa7u\x97\x00\x14\x02xa", **common_fields, } assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m m = RawExtrinsicMetadata(target=_content_swhid, **_common_metadata_fields,) assert m.to_dict() == { "target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", "id": b"\xbc\xa3U\xddf\x19U\xc5\xd2\xd7\xdfK\xd7c\x1f\xa8\xfeh\x992", **common_fields, } assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m hash_hex = "6162" * 10 hash_bin = b"ab" * 10 m = RawExtrinsicMetadata( target=_content_swhid, **_common_metadata_fields, origin="https://example.org/", snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=hash_bin), release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=hash_bin), revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=hash_bin), path=b"/foo/bar", directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=hash_bin), ) assert m.to_dict() == { "target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", "id": b"\x14l\xb0\x1f\xb9\xc0{)\xc7\x0f\xbd\xc0*,YZ\xf5C\xab\xfc", **common_fields, "origin": "https://example.org/", "snapshot": f"swh:1:snp:{hash_hex}", "release": f"swh:1:rel:{hash_hex}", "revision": f"swh:1:rev:{hash_hex}", "path": b"/foo/bar", "directory": f"swh:1:dir:{hash_hex}", } assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m def test_metadata_invalid_target(): """Checks various invalid values for the 'target' field.""" # SWHID passed as string instead of SWHID with pytest.raises(ValueError, match="target must be.*ExtendedSWHID"): RawExtrinsicMetadata( target="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", **_common_metadata_fields, ) def test_metadata_naive_datetime(): with pytest.raises(ValueError, match="must be a timezone-aware datetime"): RawExtrinsicMetadata( target=_origin_swhid, **{**_common_metadata_fields, "discovery_date": datetime.datetime.now()}, ) def test_metadata_validate_context_origin(): """Checks validation of RawExtrinsicMetadata.origin.""" # Origins can't have an 'origin' context with pytest.raises( ValueError, match="Unexpected 'origin' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, origin=_origin_url, **_common_metadata_fields, ) # but all other types can RawExtrinsicMetadata( target=_content_swhid, origin=_origin_url, **_common_metadata_fields, ) # SWHIDs aren't valid origin URLs with pytest.raises(ValueError, match="SWHID used as context origin URL"): RawExtrinsicMetadata( target=_content_swhid, origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", **_common_metadata_fields, ) def test_metadata_validate_context_visit(): """Checks validation of RawExtrinsicMetadata.visit.""" # Origins can't have a 'visit' context with pytest.raises( ValueError, match="Unexpected 'visit' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, visit=42, **_common_metadata_fields, ) # but all other types can RawExtrinsicMetadata( target=_content_swhid, origin=_origin_url, visit=42, **_common_metadata_fields, ) # Missing 'origin' with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"): RawExtrinsicMetadata( target=_content_swhid, visit=42, **_common_metadata_fields, ) # visit id must be positive with pytest.raises(ValueError, match="Nonpositive visit id"): RawExtrinsicMetadata( target=_content_swhid, origin=_origin_url, visit=-42, **_common_metadata_fields, ) def test_metadata_validate_context_snapshot(): """Checks validation of RawExtrinsicMetadata.snapshot.""" # Origins can't have a 'snapshot' context with pytest.raises( ValueError, match="Unexpected 'snapshot' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, snapshot=CoreSWHID( object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'snapshot', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, snapshot=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,), **_common_metadata_fields, ) def test_metadata_validate_context_release(): """Checks validation of RawExtrinsicMetadata.release.""" # Origins can't have a 'release' context with pytest.raises( ValueError, match="Unexpected 'release' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH,), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'release', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, release=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,), **_common_metadata_fields, ) def test_metadata_validate_context_revision(): """Checks validation of RawExtrinsicMetadata.revision.""" # Origins can't have a 'revision' context with pytest.raises( ValueError, match="Unexpected 'revision' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, revision=CoreSWHID( object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'revision', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, revision=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,), **_common_metadata_fields, ) def test_metadata_validate_context_path(): """Checks validation of RawExtrinsicMetadata.path.""" # Origins can't have a 'path' context with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"): RawExtrinsicMetadata( target=_origin_swhid, path=b"/foo/bar", **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, path=b"/foo/bar", **_common_metadata_fields, ) def test_metadata_validate_context_directory(): """Checks validation of RawExtrinsicMetadata.directory.""" # Origins can't have a 'directory' context with pytest.raises( ValueError, match="Unexpected 'directory' context for origin object" ): RawExtrinsicMetadata( target=_origin_swhid, directory=CoreSWHID( object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) # but content can RawExtrinsicMetadata( target=_content_swhid, directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH,), **_common_metadata_fields, ) # SWHID type doesn't match the expected type of this context key with pytest.raises( ValueError, match="Expected SWHID type 'directory', got 'content'" ): RawExtrinsicMetadata( target=_content_swhid, directory=CoreSWHID( object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH, ), **_common_metadata_fields, ) def test_metadata_normalize_discovery_date(): fields_copy = {**_common_metadata_fields} truncated_date = fields_copy.pop("discovery_date") assert truncated_date.microsecond == 0 # Check for TypeError on disabled object type: we removed attrs_strict's # type_validator with pytest.raises(TypeError): RawExtrinsicMetadata( target=_content_swhid, discovery_date="not a datetime", **fields_copy ) # Check for truncation to integral second date_with_us = truncated_date.replace(microsecond=42) md = RawExtrinsicMetadata( target=_content_swhid, discovery_date=date_with_us, **fields_copy, ) assert md.discovery_date == truncated_date assert md.discovery_date.tzinfo == datetime.timezone.utc # Check that the timezone gets normalized. Timezones can be offset by a # non-integral number of seconds, so we need to handle that. timezone = datetime.timezone(offset=datetime.timedelta(hours=2)) date_with_tz = truncated_date.astimezone(timezone) assert date_with_tz.tzinfo != datetime.timezone.utc md = RawExtrinsicMetadata( target=_content_swhid, discovery_date=date_with_tz, **fields_copy, ) assert md.discovery_date == truncated_date assert md.discovery_date.tzinfo == datetime.timezone.utc