diff --git a/swh/model/model.py b/swh/model/model.py index c4f185f..ab11b8b 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,698 +1,902 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from abc import ABCMeta, abstractmethod from copy import deepcopy from enum import Enum from hashlib import sha256 -from typing import Dict, Iterable, Optional, Tuple, TypeVar, Union +from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union from typing_extensions import Final import attr from attrs_strict import type_validator import dateutil.parser import iso8601 from .identifiers import ( normalize_timestamp, directory_identifier, revision_identifier, release_identifier, snapshot_identifier, + SWHID, ) from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash class MissingData(Exception): """Raised by `Content.with_data` when it has no way of fetching the data (but not when fetching the data fails).""" pass SHA1_SIZE = 20 # TODO: Limit this to 20 bytes Sha1Git = bytes def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, Enum): return value.value elif isinstance(value, dict): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, tuple): return tuple(dictify(v) for v in value) else: return value ModelType = TypeVar("ModelType", bound="BaseModel") class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" return dictify(attr.asdict(self, recurse=False)) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) def anonymize(self: ModelType) -> Optional[ModelType]: """Returns an anonymized version of the object, if needed. If the object model does not need/support anonymization, returns None. """ return None class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" @staticmethod @abstractmethod def compute_hash(object_dict): """Derived model classes must implement this to compute the object hash from its dict representation.""" pass def __attrs_post_init__(self): if not self.id: obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) object.__setattr__(self, "id", obj_id) @attr.s(frozen=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" object_type: Final = "person" fullname = attr.ib(type=bytes, validator=type_validator()) name = attr.ib(type=Optional[bytes], validator=type_validator()) email = attr.ib(type=Optional[bytes], validator=type_validator()) @classmethod def from_fullname(cls, fullname: bytes): """Returns a Person object, by guessing the name and email from the fullname, in the `name ` format. The fullname is left unchanged.""" if fullname is None: raise TypeError("fullname is None.") name: Optional[bytes] email: Optional[bytes] try: open_bracket = fullname.index(b"<") except ValueError: name = fullname email = None else: raw_name = fullname[:open_bracket] raw_email = fullname[open_bracket + 1 :] if not raw_name: name = None else: name = raw_name.strip() try: close_bracket = raw_email.rindex(b">") except ValueError: email = raw_email else: email = raw_email[:close_bracket] return Person(name=name or None, email=email or None, fullname=fullname,) def anonymize(self) -> "Person": """Returns an anonymized version of the Person object. Anonymization is simply a Person which fullname is the hashed, with unset name or email. """ return Person(fullname=sha256(self.fullname).digest(), name=None, email=None,) @attr.s(frozen=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" object_type: Final = "timestamp" seconds = attr.ib(type=int, validator=type_validator()) microseconds = attr.ib(type=int, validator=type_validator()) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-(2 ** 63) <= value < 2 ** 63): raise ValueError("Seconds must be a signed 64-bits integer.") @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10 ** 6): raise ValueError("Microseconds must be in [0, 1000000[.") @attr.s(frozen=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" object_type: Final = "timestamp_with_timezone" timestamp = attr.ib(type=Timestamp, validator=type_validator()) offset = attr.ib(type=int, validator=type_validator()) negative_utc = attr.ib(type=bool, validator=type_validator()) @offset.validator def check_offset(self, attribute, value): """Checks the offset is a 16-bits signed integer (in theory, it should always be between -14 and +14 hours).""" if not (-(2 ** 15) <= value < 2 ** 15): # max 14 hours offset in theory, but you never know what # you'll find in the wild... raise ValueError("offset too large: %d minutes" % value) @negative_utc.validator def check_negative_utc(self, attribute, value): if self.offset and value: raise ValueError("negative_utc can only be True is offset=0") @classmethod def from_dict(cls, obj: Union[Dict, datetime.datetime, int]): """Builds a TimestampWithTimezone from any of the formats accepted by :func:`swh.model.normalize_timestamp`.""" # TODO: this accept way more types than just dicts; find a better # name d = normalize_timestamp(obj) return cls( timestamp=Timestamp.from_dict(d["timestamp"]), offset=d["offset"], negative_utc=d["negative_utc"], ) @classmethod def from_datetime(cls, dt: datetime.datetime): return cls.from_dict(dt) @classmethod def from_iso8601(cls, s): """Builds a TimestampWithTimezone from an ISO8601-formatted string. """ dt = iso8601.parse_date(s) tstz = cls.from_datetime(dt) if dt.tzname() == "-00:00": tstz = attr.evolve(tstz, negative_utc=True) return tstz @attr.s(frozen=True) class Origin(BaseModel): """Represents a software source: a VCS and an URL.""" object_type: Final = "origin" url = attr.ib(type=str, validator=type_validator()) @attr.s(frozen=True) class OriginVisit(BaseModel): """Represents an origin visit with a given type at a given point in time, by a SWH loader.""" object_type: Final = "origin_visit" origin = attr.ib(type=str, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) type = attr.ib(type=str, validator=type_validator()) """Should not be set before calling 'origin_visit_add()'.""" visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov["visit"] is None: del ov["visit"] return ov @attr.s(frozen=True) class OriginVisitStatus(BaseModel): """Represents a visit update of an origin at a given point in time. """ object_type: Final = "origin_visit_status" origin = attr.ib(type=str, validator=type_validator()) visit = attr.ib(type=int, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), ) snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" ALIAS = "alias" class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" @attr.s(frozen=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" object_type: Final = "snapshot_branch" target = attr.ib(type=bytes, validator=type_validator()) target_type = attr.ib(type=TargetType, validator=type_validator()) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError("Wrong length for bytes identifier: %d" % len(value)) @classmethod def from_dict(cls, d): return cls(target=d["target"], target_type=TargetType(d["target_type"])) @attr.s(frozen=True) class Snapshot(BaseModel, HashableObject): """Represents the full state of an origin at a given point in time.""" object_type: Final = "snapshot" branches = attr.ib( type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return snapshot_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( branches={ name: SnapshotBranch.from_dict(branch) if branch else None for (name, branch) in d.pop("branches").items() }, **d, ) @attr.s(frozen=True) class Release(BaseModel, HashableObject): object_type: Final = "release" name = attr.ib(type=bytes, validator=type_validator()) message = attr.ib(type=Optional[bytes], validator=type_validator()) target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) target_type = attr.ib(type=ObjectType, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator(), default=None ) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return release_identifier(object_dict) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError("release date must be None if author is None.") def to_dict(self): rel = super().to_dict() if rel["metadata"] is None: del rel["metadata"] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get("author"): d["author"] = Person.from_dict(d["author"]) if d.get("date"): d["date"] = TimestampWithTimezone.from_dict(d["date"]) return cls(target_type=ObjectType(d.pop("target_type")), **d) def anonymize(self) -> "Release": """Returns an anonymized version of the Release object. Anonymization consists in replacing the author with an anonymized Person object. """ author = self.author and self.author.anonymize() return attr.evolve(self, author=author) class RevisionType(Enum): GIT = "git" TAR = "tar" DSC = "dsc" SUBVERSION = "svn" MERCURIAL = "hg" def tuplify_extra_headers(value: Iterable) -> Tuple: return tuple((k, v) for k, v in value) @attr.s(frozen=True) class Revision(BaseModel, HashableObject): object_type: Final = "revision" message = attr.ib(type=Optional[bytes], validator=type_validator()) author = attr.ib(type=Person, validator=type_validator()) committer = attr.ib(type=Person, validator=type_validator()) date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) committer_date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator() ) type = attr.ib(type=RevisionType, validator=type_validator()) directory = attr.ib(type=Sha1Git, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") extra_headers = attr.ib( type=Tuple[Tuple[bytes, bytes], ...], # but it makes mypy sad validator=type_validator(), converter=tuplify_extra_headers, # type: ignore default=(), ) def __attrs_post_init__(self): super().__attrs_post_init__() # ensure metadata is a deep copy of whatever was given, and if needed # extract extra_headers from there if self.metadata: metadata = deepcopy(self.metadata) if not self.extra_headers and "extra_headers" in metadata: object.__setattr__( self, "extra_headers", tuplify_extra_headers(metadata.pop("extra_headers")), ) attr.validate(self) object.__setattr__(self, "metadata", metadata) @staticmethod def compute_hash(object_dict): return revision_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() date = d.pop("date") if date: date = TimestampWithTimezone.from_dict(date) committer_date = d.pop("committer_date") if committer_date: committer_date = TimestampWithTimezone.from_dict(committer_date) return cls( author=Person.from_dict(d.pop("author")), committer=Person.from_dict(d.pop("committer")), date=date, committer_date=committer_date, type=RevisionType(d.pop("type")), parents=tuple(d.pop("parents")), # for BW compat **d, ) def anonymize(self) -> "Revision": """Returns an anonymized version of the Revision object. Anonymization consists in replacing the author and committer with an anonymized Person object. """ return attr.evolve( self, author=self.author.anonymize(), committer=self.committer.anonymize() ) @attr.s(frozen=True) class DirectoryEntry(BaseModel): object_type: Final = "directory_entry" name = attr.ib(type=bytes, validator=type_validator()) type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) target = attr.ib(type=Sha1Git, validator=type_validator()) perms = attr.ib(type=int, validator=type_validator()) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @attr.s(frozen=True) class Directory(BaseModel, HashableObject): object_type: Final = "directory" entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return directory_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( entries=tuple( DirectoryEntry.from_dict(entry) for entry in d.pop("entries") ), **d, ) @attr.s(frozen=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) ) @staticmethod def _hash_data(data: bytes): """Hash some data, returning most of the fields of a content object""" d = MultiHash.from_data(data).digest() d["data"] = data d["length"] = len(data) return d @classmethod def from_dict(cls, d, use_subclass=True): if use_subclass: # Chooses a subclass to instantiate instead. if d["status"] == "absent": return SkippedContent.from_dict(d) else: return Content.from_dict(d) else: return super().from_dict(d) def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError("{} is not a valid hash name.".format(hash_name)) return getattr(self, hash_name) def hashes(self) -> Dict[str, bytes]: """Returns a dictionary {hash_name: hash_value}""" return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} @attr.s(frozen=True) class Content(BaseContent): object_type: Final = "content" sha1 = attr.ib(type=bytes, validator=type_validator()) sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) sha256 = attr.ib(type=bytes, validator=type_validator()) blake2s256 = attr.ib(type=bytes, validator=type_validator()) length = attr.ib(type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden"]), default="visible", ) data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError("Length must be positive.") def to_dict(self): content = super().to_dict() if content["data"] is None: del content["data"] return content @classmethod def from_data(cls, data, status="visible", ctime=None) -> "Content": """Generate a Content from a given `data` byte string. This populates the Content with the hashes and length for the data passed as argument, as well as the data itself. """ d = cls._hash_data(data) d["status"] = status d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): if isinstance(d.get("ctime"), str): d = d.copy() d["ctime"] = dateutil.parser.parse(d["ctime"]) return super().from_dict(d, use_subclass=False) def with_data(self) -> "Content": """Loads the `data` attribute; meaning that it is guaranteed not to be None after this call. This call is almost a no-op, but subclasses may overload this method to lazy-load data (eg. from disk or objstorage).""" if self.data is None: raise MissingData("Content data is None.") return self @attr.s(frozen=True) class SkippedContent(BaseContent): object_type: Final = "skipped_content" sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) length = attr.ib(type=Optional[int], validator=type_validator()) status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None, eq=False, ) @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if value is None: raise ValueError("Must provide a reason if content is absent.") @length.validator def check_length(self, attribute, value): """Checks the length is positive or -1.""" if value < -1: raise ValueError("Length must be positive or -1.") def to_dict(self): content = super().to_dict() if content["origin"] is None: del content["origin"] return content @classmethod def from_data( cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None ) -> "SkippedContent": """Generate a SkippedContent from a given `data` byte string. This populates the SkippedContent with the hashes and length for the data passed as argument. You can use `attr.evolve` on such a generated content to nullify some of its attributes, e.g. for tests. """ d = cls._hash_data(data) del d["data"] d["status"] = "absent" d["reason"] = reason d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): d2 = d.copy() if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) + + +class MetadataAuthorityType(Enum): + DEPOSIT = "deposit" + FORGE = "forge" + REGISTRY = "registry" + + +@attr.s(frozen=True) +class MetadataAuthority(BaseModel): + """Represents an entity that provides metadata about an origin or + software artifact.""" + + type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) + url = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib( + type=Optional[Dict[str, Any]], default=None, validator=type_validator() + ) + + +@attr.s(frozen=True) +class MetadataFetcher(BaseModel): + """Represents a software component used to fetch metadata from a metadata + authority, and ingest them into the Software Heritage archive.""" + + name = attr.ib(type=str, validator=type_validator()) + version = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib( + type=Optional[Dict[str, Any]], default=None, validator=type_validator() + ) + + +class MetadataTargetType(Enum): + """The type of object extrinsic metadata refer to.""" + + CONTENT = "content" + DIRECTORY = "directory" + REVISION = "revision" + RELEASE = "release" + SNAPSHOT = "snapshot" + ORIGIN = "origin" + + +@attr.s(frozen=True) +class RawExtrinsicMetadata(BaseModel): + # target object + type = attr.ib(type=MetadataTargetType, validator=type_validator()) + id = attr.ib(type=Union[str, SWHID], validator=type_validator()) + """URL if type=MetadataTargetType.ORIGIN, else core SWHID""" + + # source + discovery_date = attr.ib(type=datetime.datetime, validator=type_validator()) + authority = attr.ib(type=MetadataAuthority, validator=type_validator()) + fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) + + # the metadata itself + format = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib(type=bytes, validator=type_validator()) + + # context + origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) + visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) + snapshot = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + release = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + revision = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) + directory = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + + @id.validator + def check_id(self, attribute, value): + if self.type == MetadataTargetType.ORIGIN: + if isinstance(value, SWHID) or value.startswith("swh:"): + raise ValueError( + "Got SWHID as id for origin metadata (expected an URL)." + ) + else: + self._check_pid(self.type.value, value) + + @origin.validator + def check_origin(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'origin' context for {self.type.value} object: {value}" + ) + + if value.startswith("swh:"): + # Technically this is valid; but: + # 1. SWHIDs are URIs, not URLs + # 2. if a SWHID gets here, it's very likely to be a mistake + # (and we can remove this check if it turns out there is a + # legitimate use for it). + raise ValueError(f"SWHID used as context origin URL: {value}") + + @visit.validator + def check_visit(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'visit' context for {self.type.value} object: {value}" + ) + + if self.origin is None: + raise ValueError("'origin' context must be set if 'visit' is.") + + if value <= 0: + raise ValueError("Nonpositive visit id") + + @snapshot.validator + def check_snapshot(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'snapshot' context for {self.type.value} object: {value}" + ) + + self._check_pid("snapshot", value) + + @release.validator + def check_release(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'release' context for {self.type.value} object: {value}" + ) + + self._check_pid("release", value) + + @revision.validator + def check_revision(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'revision' context for {self.type.value} object: {value}" + ) + + self._check_pid("revision", value) + + @path.validator + def check_path(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'path' context for {self.type.value} object: {value}" + ) + + @directory.validator + def check_directory(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'directory' context for {self.type.value} object: {value}" + ) + + self._check_pid("directory", value) + + def _check_pid(self, expected_object_type, pid): + if isinstance(pid, str): + raise ValueError(f"Expected SWHID, got a string: {pid}") + + if pid.object_type != expected_object_type: + raise ValueError( + f"Expected SWHID type '{expected_object_type}', " + f"got '{pid.object_type}' in {pid}" + ) + + if pid.metadata: + raise ValueError(f"Expected core SWHID, but got: {pid}") diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index edfc829..43c32a0 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -1,680 +1,1087 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import attr from attrs_strict import AttributeTypeError from hypothesis import given from hypothesis.strategies import binary import pytest from swh.model.model import ( BaseModel, Content, SkippedContent, Directory, Revision, Release, Snapshot, Origin, Timestamp, TimestampWithTimezone, MissingData, Person, + RawExtrinsicMetadata, + MetadataTargetType, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, ) from swh.model.hashutil import hash_to_bytes, MultiHash import swh.model.hypothesis_strategies as strategies from swh.model.identifiers import ( directory_identifier, revision_identifier, release_identifier, snapshot_identifier, + parse_swhid, + SWHID, ) from swh.model.tests.test_identifiers import ( directory_example, revision_example, release_example, snapshot_example, ) @given(strategies.objects()) def test_todict_inverse_fromdict(objtype_and_obj): (obj_type, obj) = objtype_and_obj if obj_type in ("origin", "origin_visit"): return obj_as_dict = obj.to_dict() obj_as_dict_copy = copy.deepcopy(obj_as_dict) # Check the composition of to_dict and from_dict is the identity assert obj == type(obj).from_dict(obj_as_dict) # Check from_dict() does not change the input dict assert obj_as_dict == obj_as_dict_copy # Check the composition of from_dict and to_dict is the identity assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() # Anonymization @given(strategies.objects()) def test_anonymization(objtype_and_obj): (obj_type, obj) = objtype_and_obj def check_person(p): if p is not None: assert p.name is None assert p.email is None assert len(p.fullname) == 32 anon_obj = obj.anonymize() if obj_type == "person": assert anon_obj is not None check_person(anon_obj) elif obj_type == "release": assert anon_obj is not None check_person(anon_obj.author) elif obj_type == "revision": assert anon_obj is not None check_person(anon_obj.author) check_person(anon_obj.committer) else: assert anon_obj is None # Origin, OriginVisit @given(strategies.origins()) def test_todict_origins(origin): obj = origin.to_dict() assert "type" not in obj assert type(origin)(url=origin.url) == type(origin).from_dict(obj) @given(strategies.origin_visits()) def test_todict_origin_visits(origin_visit): obj = origin_visit.to_dict() assert origin_visit == type(origin_visit).from_dict(obj) @given(strategies.origin_visit_statuses()) def test_todict_origin_visit_statuses(origin_visit_status): obj = origin_visit_status.to_dict() assert origin_visit_status == type(origin_visit_status).from_dict(obj) # Timestamp @given(strategies.timestamps()) def test_timestamps_strategy(timestamp): attr.validate(timestamp) def test_timestamp_seconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds="0", microseconds=0) attr.validate(Timestamp(seconds=2 ** 63 - 1, microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=2 ** 63, microseconds=0) attr.validate(Timestamp(seconds=-(2 ** 63), microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=-(2 ** 63) - 1, microseconds=0) def test_timestamp_microseconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds=0, microseconds="0") attr.validate(Timestamp(seconds=0, microseconds=10 ** 6 - 1)) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=10 ** 6) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=-1) def test_timestamp_from_dict(): assert Timestamp.from_dict({"seconds": 10, "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": "10", "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": 10, "microseconds": "5"}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": -1}) Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6 - 1}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6}) # TimestampWithTimezone def test_timestampwithtimezone(): ts = Timestamp(seconds=0, microseconds=0) tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=False) attr.validate(tstz) assert tstz.negative_utc is False attr.validate(TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False)) attr.validate(TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False)) tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=True) attr.validate(tstz) assert tstz.negative_utc is True with pytest.raises(AttributeTypeError): TimestampWithTimezone( timestamp=datetime.datetime.now(), offset=0, negative_utc=False ) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset="0", negative_utc=False) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset=1.0, negative_utc=False) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=0) with pytest.raises(ValueError): TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=True) with pytest.raises(ValueError): TimestampWithTimezone(timestamp=ts, offset=-1, negative_utc=True) def test_timestampwithtimezone_from_datetime(): tz = datetime.timezone(datetime.timedelta(minutes=+60)) date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=0,), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601(): date = "2020-02-27 14:39:19.123456+0100" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=123456,), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601_negative_utc(): date = "2020-02-27 13:39:19-0000" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=0,), offset=0, negative_utc=True, ) def test_person_from_fullname(): """The author should have name, email and fullname filled. """ actual_person = Person.from_fullname(b"tony ") assert actual_person == Person( fullname=b"tony ", name=b"tony", email=b"ynot@dagobah", ) def test_person_from_fullname_no_email(): """The author and fullname should be the same as the input (author). """ actual_person = Person.from_fullname(b"tony") assert actual_person == Person(fullname=b"tony", name=b"tony", email=None,) def test_person_from_fullname_empty_person(): """Empty person has only its fullname filled with the empty byte-string. """ actual_person = Person.from_fullname(b"") assert actual_person == Person(fullname=b"", name=None, email=None,) def test_git_author_line_to_author(): # edge case out of the way with pytest.raises(TypeError): Person.from_fullname(None) tests = { b"a ": Person(name=b"a", email=b"b@c.com", fullname=b"a ",), b"": Person( name=None, email=b"foo@bar.com", fullname=b"", ), b"malformed ': Person( name=b"malformed", email=b'"', ), b"trailing ": Person( name=b"trailing", email=b"sp@c.e", fullname=b"trailing ", ), b"no": Person(name=b"no", email=b"sp@c.e", fullname=b"no",), b" more ": Person( name=b"more", email=b"sp@c.es", fullname=b" more ", ), b" <>": Person(name=None, email=None, fullname=b" <>",), } for person in sorted(tests): expected_person = tests[person] assert expected_person == Person.from_fullname(person) # Content def test_content_get_hash(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) for (hash_name, hash_) in hashes.items(): assert c.get_hash(hash_name) == hash_ def test_content_hashes(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) assert c.hashes() == hashes def test_content_data(): c = Content( length=42, status="visible", data=b"foo", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) assert c.with_data() == c def test_content_data_missing(): c = Content( length=42, status="visible", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with pytest.raises(MissingData): c.with_data() @given(strategies.present_contents_d()) def test_content_from_dict(content_d): c = Content.from_data(**content_d) assert c assert c.ctime == content_d["ctime"] content_d2 = c.to_dict() c2 = Content.from_dict(content_d2) assert c2.ctime == c.ctime def test_content_from_dict_str_ctime(): # test with ctime as a string n = datetime.datetime(2020, 5, 6, 12, 34) content_d = { "ctime": n.isoformat(), "data": b"", "length": 0, "sha1": b"\x00", "sha256": b"\x00", "sha1_git": b"\x00", "blake2s256": b"\x00", } c = Content.from_dict(content_d) assert c.ctime == n @given(binary(max_size=4096)) def test_content_from_data(data): c = Content.from_data(data) assert c.data == data assert c.length == len(data) assert c.status == "visible" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(binary(max_size=4096)) def test_hidden_content_from_data(data): c = Content.from_data(data, status="hidden") assert c.data == data assert c.length == len(data) assert c.status == "hidden" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value # SkippedContent @given(binary(max_size=4096)) def test_skipped_content_from_data(data): c = SkippedContent.from_data(data, reason="reason") assert c.reason == "reason" assert c.length == len(data) assert c.status == "absent" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(strategies.skipped_contents_d()) def test_skipped_content_origin_is_str(skipped_content_d): assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = "http://path/to/origin" assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = Origin(url="http://path/to/origin") with pytest.raises(ValueError, match="origin"): SkippedContent.from_dict(skipped_content_d) # Revision def test_revision_extra_headers_no_headers(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_model = Revision(**rev_dict) assert rev_model.metadata is None assert rev_model.extra_headers == () rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } rev_model = Revision(**rev_dict) assert rev_model.metadata == rev_dict["metadata"] assert rev_model.extra_headers == () def test_revision_extra_headers_with_headers(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\u0000"), (b"header1", b"again"), ) rev_dict["extra_headers"] = extra_headers rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_in_metadata(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\u0000"), (b"header1", b"again"), ) # check the bw-compat init hook does the job # ie. extra_headers are given in the metadata field rev_dict["metadata"]["extra_headers"] = extra_headers rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_as_lists(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) rev_dict = attr.asdict(rev, recurse=False) rev_dict["metadata"] = {} extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\u0000"), (b"header1", b"again"), ) # check Revision.extra_headers tuplify does the job rev_dict["extra_headers"] = [list(x) for x in extra_headers] rev_model = Revision(**rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_type_error(): rev_dict = revision_example.copy() rev_dict.pop("id") rev = Revision.from_dict(rev_dict) orig_rev_dict = attr.asdict(rev, recurse=False) orig_rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( ("header1", b"value1"), (b"header2", 42), ("header1", "again"), ) # check headers one at a time # if given as extra_header for extra_header in extra_headers: rev_dict = copy.deepcopy(orig_rev_dict) rev_dict["extra_headers"] = (extra_header,) with pytest.raises(AttributeTypeError): Revision(**rev_dict) # if given as metadata for extra_header in extra_headers: rev_dict = copy.deepcopy(orig_rev_dict) rev_dict["metadata"]["extra_headers"] = (extra_header,) with pytest.raises(AttributeTypeError): Revision(**rev_dict) def test_revision_extra_headers_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_model = Revision.from_dict(rev_dict) assert rev_model.metadata is None assert rev_model.extra_headers == () rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } rev_model = Revision.from_dict(rev_dict) assert rev_model.metadata == rev_dict["metadata"] assert rev_model.extra_headers == () extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) rev_dict["extra_headers"] = extra_headers rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_in_metadata_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) # check the bw-compat init hook does the job rev_dict["metadata"]["extra_headers"] = extra_headers rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers def test_revision_extra_headers_as_lists_from_dict(): rev_dict = revision_example.copy() rev_dict.pop("id") rev_model = Revision.from_dict(rev_dict) rev_dict["metadata"] = { "something": "somewhere", "some other thing": "stranger", } extra_headers = ( (b"header1", b"value1"), (b"header2", b"42"), (b"header3", b"should I?\nmaybe\x00\xff"), (b"header1", b"again"), ) # check Revision.extra_headers converter does the job rev_dict["extra_headers"] = [list(x) for x in extra_headers] rev_model = Revision.from_dict(rev_dict) assert "extra_headers" not in rev_model.metadata assert rev_model.extra_headers == extra_headers # ID computation def test_directory_model_id_computation(): dir_dict = directory_example.copy() del dir_dict["id"] dir_id = hash_to_bytes(directory_identifier(dir_dict)) dir_model = Directory.from_dict(dir_dict) assert dir_model.id == dir_id def test_revision_model_id_computation(): rev_dict = revision_example.copy() del rev_dict["id"] rev_id = hash_to_bytes(revision_identifier(rev_dict)) rev_model = Revision.from_dict(rev_dict) assert rev_model.id == rev_id def test_revision_model_id_computation_with_no_date(): """We can have revision with date to None """ rev_dict = revision_example.copy() rev_dict["date"] = None rev_dict["committer_date"] = None del rev_dict["id"] rev_id = hash_to_bytes(revision_identifier(rev_dict)) rev_model = Revision.from_dict(rev_dict) assert rev_model.date is None assert rev_model.committer_date is None assert rev_model.id == rev_id def test_release_model_id_computation(): rel_dict = release_example.copy() del rel_dict["id"] rel_id = hash_to_bytes(release_identifier(rel_dict)) rel_model = Release.from_dict(rel_dict) assert isinstance(rel_model.date, TimestampWithTimezone) assert rel_model.id == hash_to_bytes(rel_id) def test_snapshot_model_id_computation(): snp_dict = snapshot_example.copy() del snp_dict["id"] snp_id = hash_to_bytes(snapshot_identifier(snp_dict)) snp_model = Snapshot.from_dict(snp_dict) assert snp_model.id == snp_id @given(strategies.objects(split_content=True)) def test_object_type(objtype_and_obj): obj_type, obj = objtype_and_obj assert obj_type == obj.object_type def test_object_type_is_final(): object_types = set() def check_final(cls): if hasattr(cls, "object_type"): assert cls.object_type not in object_types object_types.add(cls.object_type) if cls.__subclasses__(): assert not hasattr(cls, "object_type") for subcls in cls.__subclasses__(): check_final(subcls) check_final(BaseModel) + + +_metadata_authority = MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org", +) +_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",) +_content_swhid = parse_swhid("swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2") +_origin_url = "https://forge.softwareheritage.org/source/swh-model.git" +_common_metadata_fields = dict( + discovery_date=datetime.datetime.now(), + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', +) + + +def test_metadata_valid(): + """Checks valid RawExtrinsicMetadata objects don't raise an error.""" + + # Simplest case + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, id=_origin_url, **_common_metadata_fields + ) + + # Object with an SWHID + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, id=_content_swhid, **_common_metadata_fields + ) + + +def test_metadata_invalid_id(): + """Checks various invalid values for the 'id' field.""" + + # SWHID for an origin + with pytest.raises(ValueError, match="expected an URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, id=_content_swhid, **_common_metadata_fields + ) + + # SWHID for an origin (even when passed as string) + with pytest.raises(ValueError, match="expected an URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + **_common_metadata_fields, + ) + + # URL for a non-origin + with pytest.raises(ValueError, match="Expected SWHID, got a string"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, id=_origin_url, **_common_metadata_fields + ) + + # SWHID passed as string instead of SWHID + with pytest.raises(ValueError, match="Expected SWHID, got a string"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + **_common_metadata_fields, + ) + + # Object type does not match the SWHID + with pytest.raises( + ValueError, match="Expected SWHID type 'revision', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=_content_swhid, + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_origin(): + """Checks validation of RawExtrinsicMetadata.origin.""" + + # Origins can't have an 'origin' context + with pytest.raises( + ValueError, match="Unexpected 'origin' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + origin=_origin_url, + **_common_metadata_fields, + ) + + # but all other types can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin=_origin_url, + **_common_metadata_fields, + ) + + # SWHIDs aren't valid origin URLs + with pytest.raises(ValueError, match="SWHID used as context origin URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_visit(): + """Checks validation of RawExtrinsicMetadata.visit.""" + + # Origins can't have a 'visit' context + with pytest.raises( + ValueError, match="Unexpected 'visit' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + visit=42, + **_common_metadata_fields, + ) + + # but all other types can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin=_origin_url, + visit=42, + **_common_metadata_fields, + ) + + # Missing 'origin' + with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + visit=42, + **_common_metadata_fields, + ) + + # visit id must be positive + with pytest.raises(ValueError, match="Nonpositive visit id"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin=_origin_url, + visit=-42, + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_snapshot(): + """Checks validation of RawExtrinsicMetadata.snapshot.""" + + # Origins can't have a 'snapshot' context + with pytest.raises( + ValueError, match="Unexpected 'snapshot' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + snapshot=SWHID( + object_type="snapshot", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + snapshot=SWHID( + object_type="snapshot", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2" + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + snapshot=SWHID( + object_type="snapshot", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'snapshot', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + snapshot=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_release(): + """Checks validation of RawExtrinsicMetadata.release.""" + + # Origins can't have a 'release' context + with pytest.raises( + ValueError, match="Unexpected 'release' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + release=SWHID( + object_type="release", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + release=SWHID( + object_type="release", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2" + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + release=SWHID( + object_type="release", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'release', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + release=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_revision(): + """Checks validation of RawExtrinsicMetadata.revision.""" + + # Origins can't have a 'revision' context + with pytest.raises( + ValueError, match="Unexpected 'revision' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + revision=SWHID( + object_type="revision", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + revision=SWHID( + object_type="revision", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2" + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + revision=SWHID( + object_type="revision", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'revision', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + revision=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_path(): + """Checks validation of RawExtrinsicMetadata.path.""" + + # Origins can't have a 'path' context + with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + path=b"/foo/bar", + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + path=b"/foo/bar", + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_directory(): + """Checks validation of RawExtrinsicMetadata.directory.""" + + # Origins can't have a 'directory' context + with pytest.raises( + ValueError, match="Unexpected 'directory' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + directory=SWHID( + object_type="directory", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + directory=SWHID( + object_type="directory", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + directory=SWHID( + object_type="directory", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'directory', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + directory=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + )