diff --git a/swh/model/collections.py b/swh/model/collections.py new file mode 100644 --- /dev/null +++ b/swh/model/collections.py @@ -0,0 +1,49 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections.abc import Mapping +from typing import Dict, Generic, Iterable, Optional, Tuple, TypeVar, Union + +KT = TypeVar("KT") +VT = TypeVar("VT") + + +class ImmutableDict(Mapping, Generic[KT, VT]): + data: Tuple[Tuple[KT, VT], ...] + + def __init__(self, data: Union[Iterable[Tuple[KT, VT]], Dict[KT, VT]] = {}): + if isinstance(data, dict): + self.data = tuple(item for item in data.items()) + else: + self.data = tuple(data) + + def __getitem__(self, key): + for (k, v) in self.data: + if k == key: + return v + raise KeyError(key) + + def __iter__(self): + for (k, v) in self.data: + yield k + + def __len__(self): + return len(self.data) + + def items(self): + yield from self.data + + def copy_pop(self, popped_key) -> Tuple[Optional[VT], "ImmutableDict[KT, VT]"]: + """Returns a copy of this ImmutableDict without the given key, + as well as the value associated to the key.""" + popped_value = None + new_items = [] + for (key, value) in self.data: + if key == popped_key: + popped_value = value + else: + new_items.append((key, value)) + + return (popped_value, ImmutableDict(new_items)) diff --git a/swh/model/model.py b/swh/model/model.py --- a/swh/model/model.py +++ b/swh/model/model.py @@ -6,10 +6,9 @@ import datetime from abc import ABCMeta, abstractmethod -from copy import deepcopy from enum import Enum from hashlib import sha256 -from typing import Dict, Iterable, Optional, Tuple, TypeVar, Union +from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union from typing_extensions import Final import attr @@ -17,14 +16,16 @@ import dateutil.parser import iso8601 +from .collections import ImmutableDict +from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash from .identifiers import ( normalize_timestamp, directory_identifier, revision_identifier, release_identifier, snapshot_identifier, + SWHID, ) -from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash class MissingData(Exception): @@ -40,13 +41,26 @@ Sha1Git = bytes +KT = TypeVar("KT") +VT = TypeVar("VT") + + +def freeze_optional_dict( + d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore +) -> Optional[ImmutableDict[KT, VT]]: + if isinstance(d, dict): + return ImmutableDict(d) + else: + return d + + def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, Enum): return value.value - elif isinstance(value, dict): + elif isinstance(value, (dict, ImmutableDict)): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, tuple): return tuple(dictify(v) for v in value) @@ -276,7 +290,10 @@ ) snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) metadata = attr.ib( - type=Optional[Dict[str, object]], validator=type_validator(), default=None + type=Optional[ImmutableDict[str, object]], + validator=type_validator(), + converter=freeze_optional_dict, + default=None, ) @@ -331,7 +348,9 @@ object_type: Final = "snapshot" branches = attr.ib( - type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() + type=ImmutableDict[bytes, Optional[SnapshotBranch]], + validator=type_validator(), + converter=freeze_optional_dict, ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @@ -343,10 +362,10 @@ def from_dict(cls, d): d = d.copy() return cls( - branches={ - name: SnapshotBranch.from_dict(branch) if branch else None + branches=ImmutableDict( + (name, SnapshotBranch.from_dict(branch) if branch else None) for (name, branch) in d.pop("branches").items() - }, + ), **d, ) @@ -365,7 +384,10 @@ type=Optional[TimestampWithTimezone], validator=type_validator(), default=None ) metadata = attr.ib( - type=Optional[Dict[str, object]], validator=type_validator(), default=None + type=Optional[ImmutableDict[str, object]], + validator=type_validator(), + converter=freeze_optional_dict, + default=None, ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @@ -430,14 +452,17 @@ directory = attr.ib(type=Sha1Git, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( - type=Optional[Dict[str, object]], validator=type_validator(), default=None + type=Optional[ImmutableDict[str, object]], + validator=type_validator(), + converter=freeze_optional_dict, + default=None, ) parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") extra_headers = attr.ib( - type=Tuple[Tuple[bytes, bytes], ...], # but it makes mypy sad + type=Tuple[Tuple[bytes, bytes], ...], validator=type_validator(), - converter=tuplify_extra_headers, # type: ignore + converter=tuplify_extra_headers, default=(), ) @@ -446,12 +471,11 @@ # ensure metadata is a deep copy of whatever was given, and if needed # extract extra_headers from there if self.metadata: - metadata = deepcopy(self.metadata) + metadata = self.metadata if not self.extra_headers and "extra_headers" in metadata: + (extra_headers, metadata) = metadata.copy_pop("extra_headers") object.__setattr__( - self, - "extra_headers", - tuplify_extra_headers(metadata.pop("extra_headers")), + self, "extra_headers", tuplify_extra_headers(extra_headers), ) attr.validate(self) object.__setattr__(self, "metadata", metadata) @@ -696,3 +720,212 @@ if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) + + +class MetadataAuthorityType(Enum): + DEPOSIT = "deposit" + FORGE = "forge" + REGISTRY = "registry" + + +@attr.s(frozen=True) +class MetadataAuthority(BaseModel): + """Represents an entity that provides metadata about an origin or + software artifact.""" + + type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) + url = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib( + type=Optional[ImmutableDict[str, Any]], + default=None, + validator=type_validator(), + converter=freeze_optional_dict, + ) + + +@attr.s(frozen=True) +class MetadataFetcher(BaseModel): + """Represents a software component used to fetch metadata from a metadata + authority, and ingest them into the Software Heritage archive.""" + + name = attr.ib(type=str, validator=type_validator()) + version = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib( + type=Optional[ImmutableDict[str, Any]], + default=None, + validator=type_validator(), + converter=freeze_optional_dict, + ) + + +class MetadataTargetType(Enum): + """The type of object extrinsic metadata refer to.""" + + CONTENT = "content" + DIRECTORY = "directory" + REVISION = "revision" + RELEASE = "release" + SNAPSHOT = "snapshot" + ORIGIN = "origin" + + +@attr.s(frozen=True) +class RawExtrinsicMetadata(BaseModel): + # target object + type = attr.ib(type=MetadataTargetType, validator=type_validator()) + id = attr.ib(type=Union[str, SWHID], validator=type_validator()) + """URL if type=MetadataTargetType.ORIGIN, else core SWHID""" + + # source + discovery_date = attr.ib(type=datetime.datetime, validator=type_validator()) + authority = attr.ib(type=MetadataAuthority, validator=type_validator()) + fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) + + # the metadata itself + format = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib(type=bytes, validator=type_validator()) + + # context + origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) + visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) + snapshot = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + release = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + revision = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) + directory = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) + + @id.validator + def check_id(self, attribute, value): + if self.type == MetadataTargetType.ORIGIN: + if isinstance(value, SWHID) or value.startswith("swh:"): + raise ValueError( + "Got SWHID as id for origin metadata (expected an URL)." + ) + else: + self._check_pid(self.type.value, value) + + @origin.validator + def check_origin(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'origin' context for {self.type.value} object: {value}" + ) + + if value.startswith("swh:"): + # Technically this is valid; but: + # 1. SWHIDs are URIs, not URLs + # 2. if a SWHID gets here, it's very likely to be a mistake + # (and we can remove this check if it turns out there is a + # legitimate use for it). + raise ValueError(f"SWHID used as context origin URL: {value}") + + @visit.validator + def check_visit(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'visit' context for {self.type.value} object: {value}" + ) + + if self.origin is None: + raise ValueError("'origin' context must be set if 'visit' is.") + + if value <= 0: + raise ValueError("Nonpositive visit id") + + @snapshot.validator + def check_snapshot(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'snapshot' context for {self.type.value} object: {value}" + ) + + self._check_pid("snapshot", value) + + @release.validator + def check_release(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'release' context for {self.type.value} object: {value}" + ) + + self._check_pid("release", value) + + @revision.validator + def check_revision(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'revision' context for {self.type.value} object: {value}" + ) + + self._check_pid("revision", value) + + @path.validator + def check_path(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'path' context for {self.type.value} object: {value}" + ) + + @directory.validator + def check_directory(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'directory' context for {self.type.value} object: {value}" + ) + + self._check_pid("directory", value) + + def _check_pid(self, expected_object_type, pid): + if isinstance(pid, str): + raise ValueError(f"Expected SWHID, got a string: {pid}") + + if pid.object_type != expected_object_type: + raise ValueError( + f"Expected SWHID type '{expected_object_type}', " + f"got '{pid.object_type}' in {pid}" + ) + + if pid.metadata: + raise ValueError(f"Expected core SWHID, but got: {pid}") diff --git a/swh/model/tests/test_collections.py b/swh/model/tests/test_collections.py new file mode 100644 --- /dev/null +++ b/swh/model/tests/test_collections.py @@ -0,0 +1,48 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.model.collections import ImmutableDict + + +def test_immutabledict_empty(): + d = ImmutableDict() + + assert d == {} + assert d != {"foo": "bar"} + + assert list(d) == [] + assert list(d.items()) == [] + + +def test_immutabledict_one_item(): + d = ImmutableDict({"foo": "bar"}) + + assert d == {"foo": "bar"} + assert d != {} + + assert d["foo"] == "bar" + with pytest.raises(KeyError, match="bar"): + d["bar"] + + assert list(d) == ["foo"] + assert list(d.items()) == [("foo", "bar")] + + +def test_immutabledict_immutable(): + d = ImmutableDict({"foo": "bar"}) + + with pytest.raises(TypeError, match="item assignment"): + d["bar"] = "baz" + + with pytest.raises(TypeError, match="item deletion"): + del d["foo"] + + +def test_immutabledict_copy_pop(): + d = ImmutableDict({"foo": "bar", "baz": "qux"}) + + assert d.copy_pop("foo") == ("bar", ImmutableDict({"baz": "qux"})) diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -25,6 +25,11 @@ TimestampWithTimezone, MissingData, Person, + RawExtrinsicMetadata, + MetadataTargetType, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, ) from swh.model.hashutil import hash_to_bytes, MultiHash import swh.model.hypothesis_strategies as strategies @@ -33,6 +38,8 @@ revision_identifier, release_identifier, snapshot_identifier, + parse_swhid, + SWHID, ) from swh.model.tests.test_identifiers import ( directory_example, @@ -678,3 +685,403 @@ check_final(subcls) check_final(BaseModel) + + +_metadata_authority = MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org", +) +_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",) +_content_swhid = parse_swhid("swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2") +_origin_url = "https://forge.softwareheritage.org/source/swh-model.git" +_common_metadata_fields = dict( + discovery_date=datetime.datetime.now(), + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', +) + + +def test_metadata_valid(): + """Checks valid RawExtrinsicMetadata objects don't raise an error.""" + + # Simplest case + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, id=_origin_url, **_common_metadata_fields + ) + + # Object with an SWHID + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, id=_content_swhid, **_common_metadata_fields + ) + + +def test_metadata_invalid_id(): + """Checks various invalid values for the 'id' field.""" + + # SWHID for an origin + with pytest.raises(ValueError, match="expected an URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, id=_content_swhid, **_common_metadata_fields + ) + + # SWHID for an origin (even when passed as string) + with pytest.raises(ValueError, match="expected an URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + **_common_metadata_fields, + ) + + # URL for a non-origin + with pytest.raises(ValueError, match="Expected SWHID, got a string"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, id=_origin_url, **_common_metadata_fields + ) + + # SWHID passed as string instead of SWHID + with pytest.raises(ValueError, match="Expected SWHID, got a string"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + **_common_metadata_fields, + ) + + # Object type does not match the SWHID + with pytest.raises( + ValueError, match="Expected SWHID type 'revision', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=_content_swhid, + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_origin(): + """Checks validation of RawExtrinsicMetadata.origin.""" + + # Origins can't have an 'origin' context + with pytest.raises( + ValueError, match="Unexpected 'origin' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + origin=_origin_url, + **_common_metadata_fields, + ) + + # but all other types can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin=_origin_url, + **_common_metadata_fields, + ) + + # SWHIDs aren't valid origin URLs + with pytest.raises(ValueError, match="SWHID used as context origin URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_visit(): + """Checks validation of RawExtrinsicMetadata.visit.""" + + # Origins can't have a 'visit' context + with pytest.raises( + ValueError, match="Unexpected 'visit' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + visit=42, + **_common_metadata_fields, + ) + + # but all other types can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin=_origin_url, + visit=42, + **_common_metadata_fields, + ) + + # Missing 'origin' + with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + visit=42, + **_common_metadata_fields, + ) + + # visit id must be positive + with pytest.raises(ValueError, match="Nonpositive visit id"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + origin=_origin_url, + visit=-42, + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_snapshot(): + """Checks validation of RawExtrinsicMetadata.snapshot.""" + + # Origins can't have a 'snapshot' context + with pytest.raises( + ValueError, match="Unexpected 'snapshot' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + snapshot=SWHID( + object_type="snapshot", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + snapshot=SWHID( + object_type="snapshot", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2" + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + snapshot=SWHID( + object_type="snapshot", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'snapshot', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + snapshot=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_release(): + """Checks validation of RawExtrinsicMetadata.release.""" + + # Origins can't have a 'release' context + with pytest.raises( + ValueError, match="Unexpected 'release' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + release=SWHID( + object_type="release", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + release=SWHID( + object_type="release", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2" + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + release=SWHID( + object_type="release", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'release', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + release=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_revision(): + """Checks validation of RawExtrinsicMetadata.revision.""" + + # Origins can't have a 'revision' context + with pytest.raises( + ValueError, match="Unexpected 'revision' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + revision=SWHID( + object_type="revision", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + revision=SWHID( + object_type="revision", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2" + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + revision=SWHID( + object_type="revision", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'revision', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + revision=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_path(): + """Checks validation of RawExtrinsicMetadata.path.""" + + # Origins can't have a 'path' context + with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + path=b"/foo/bar", + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + path=b"/foo/bar", + **_common_metadata_fields, + ) + + +def test_metadata_validate_context_directory(): + """Checks validation of RawExtrinsicMetadata.directory.""" + + # Origins can't have a 'directory' context + with pytest.raises( + ValueError, match="Unexpected 'directory' context for origin object" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_origin_url, + directory=SWHID( + object_type="directory", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # but content can + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + directory=SWHID( + object_type="directory", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + ) + + # Non-core SWHID + with pytest.raises(ValueError, match="Expected core SWHID"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + directory=SWHID( + object_type="directory", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + metadata={"foo": "bar"}, + ), + **_common_metadata_fields, + ) + + # SWHID type doesn't match the expected type of this context key + with pytest.raises( + ValueError, match="Expected SWHID type 'directory', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + directory=SWHID( + object_type="content", + object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", + ), + **_common_metadata_fields, + )