diff --git a/swh/model/model.py b/swh/model/model.py --- a/swh/model/model.py +++ b/swh/model/model.py @@ -8,7 +8,7 @@ from abc import ABCMeta, abstractmethod from enum import Enum from hashlib import sha256 -from typing import Dict, Optional, Tuple, TypeVar, Union +from typing import Any, Dict, Optional, Tuple, TypeVar, Union from typing_extensions import Final import attr @@ -22,6 +22,7 @@ revision_identifier, release_identifier, snapshot_identifier, + PersistentId, ) from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash @@ -670,3 +671,209 @@ if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) + + +class MetadataAuthorityType(Enum): + DEPOSIT = "deposit" + FORGE = "forge" + REGISTRY = "registry" + + +@attr.s(frozen=True) +class MetadataAuthority(BaseModel): + """Represents an entity that provides metadata about an origin or + software artifact.""" + + type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) + url = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib( + type=Optional[Dict[str, Any]], default=None, validator=type_validator() + ) + + +@attr.s(frozen=True) +class MetadataFetcher(BaseModel): + """Represents a software component used to fetch metadata from a metadata + authority, and ingest them into the Software Heritage archive.""" + + name = attr.ib(type=str, validator=type_validator()) + version = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib( + type=Optional[Dict[str, Any]], default=None, validator=type_validator() + ) + + +class MetadataTargetType(Enum): + """The type of object extrinsic metadata refer to.""" + + CONTENT = "content" + DIRECTORY = "directory" + REVISION = "revision" + RELEASE = "release" + SNAPSHOT = "snapshot" + ORIGIN = "origin" + + +@attr.s(frozen=True) +class RawExtrinsicMetadata(BaseModel): + # target object + type = attr.ib(type=MetadataTargetType, validator=type_validator()) + id = attr.ib(type=Union[str, PersistentId], validator=type_validator()) + """URL if type=MetadataTargetType.ORIGIN, else core SWHID""" + + # source + discovery_date = attr.ib(type=datetime.datetime, validator=type_validator()) + authority = attr.ib(type=MetadataAuthority, validator=type_validator()) + fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) + + # the metadata itself + format = attr.ib(type=str, validator=type_validator()) + metadata = attr.ib(type=bytes, validator=type_validator()) + + # context + origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) + visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) + snapshot = attr.ib( + type=Optional[PersistentId], default=None, validator=type_validator() + ) + release = attr.ib( + type=Optional[PersistentId], default=None, validator=type_validator() + ) + revision = attr.ib( + type=Optional[PersistentId], default=None, validator=type_validator() + ) + path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) + directory = attr.ib( + type=Optional[PersistentId], default=None, validator=type_validator() + ) + + @id.validator + def check_id(self, attribute, value): + if self.type == MetadataTargetType.ORIGIN: + if isinstance(value, PersistentId) or value.startswith("swh:"): + raise ValueError( + "Got SWHID as id for origin metadata (expected an URL)." + ) + else: + self._check_pid(self.type.value, value) + + @origin.validator + def check_origin(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'origin' context for {self.type} object: {value}" + ) + + @visit.validator + def check_visit(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'visit' context for {self.type} object: {value}" + ) + + if value <= 0: + raise ValueError("Nonpositive visit id") + + @snapshot.validator + def check_snapshot(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'snapshot' context for {self.type} object: {value}" + ) + + self._check_pid("snapshot", value) + + @release.validator + def check_release(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'release' context for {self.type} object: {value}" + ) + + self._check_pid("release", value) + + @revision.validator + def check_revision(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'revision' context for {self.type} object: {value}" + ) + + self._check_pid("revision", value) + + @path.validator + def check_path(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'path' context for {self.type} object: {value}" + ) + + @directory.validator + def check_directory(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'directory' context for {self.type} object: {value}" + ) + + self._check_pid("directory", value) + + def _check_pid(self, expected_object_type, pid): + if isinstance(pid, str): + raise ValueError(f"Expected PersistentId, got a string: {pid}") + + if pid.namespace != "swh": + raise ValueError(f"Unexpected PID namespace {pid.namespace} in {pid}") + + if pid.scheme_version != 1: + raise ValueError(f"Unexpected SWHID version {pid.scheme_version} in {pid}") + + if pid.object_type != expected_object_type: + raise ValueError( + f"Expected SWHID type '{expected_object_type}', " + f"got '{pid.object_type}' in {pid}" + ) + + if pid.metadata: + raise ValueError(f"Expected core SWHID, but got: {pid}") diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -25,6 +25,11 @@ TimestampWithTimezone, MissingData, Person, + RawExtrinsicMetadata, + MetadataTargetType, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, ) from swh.model.hashutil import hash_to_bytes, MultiHash import swh.model.hypothesis_strategies as strategies @@ -33,6 +38,7 @@ revision_identifier, release_identifier, snapshot_identifier, + parse_persistent_identifier, ) from swh.model.tests.test_identifiers import ( directory_example, @@ -490,3 +496,107 @@ check_final(subcls) check_final(BaseModel) + + +_metadata_authority = MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org", +) +_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",) +_content_swhid = parse_persistent_identifier( + "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2" +) + + +def test_metadata_valid(): + """Checks valid RawExtrinsicMetadata objects don't raise an error.""" + discovery_date = datetime.datetime.now() + + # Simplest case + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id="https://forge.softwareheritage.org/source/swh-model.git", + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + ) + + # Object with an SWHID + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=_content_swhid, + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + ) + + +def test_metadata_invalid_id(): + """Checks various invalid values for the 'id' field.""" + + discovery_date = datetime.datetime.now() + + # SWHID for an origin + with pytest.raises(ValueError, match="expected an URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=_content_swhid, + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + ) + + # SWHID for an origin (even when passed as string) + with pytest.raises(ValueError, match="expected an URL"): + RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + ) + + # URL for a non-origin + with pytest.raises(ValueError, match="Expected PersistentId, got a string"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id="https://forge.softwareheritage.org/source/swh-model.git", + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + ) + + # SWHID passed as string instead of PersistentId + with pytest.raises(ValueError, match="Expected PersistentId, got a string"): + RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + ) + + # Object with an SWHID + with pytest.raises( + ValueError, match="Expected SWHID type 'revision', got 'content'" + ): + RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=_content_swhid, + discovery_date=discovery_date, + authority=_metadata_authority, + fetcher=_metadata_fetcher, + format="json", + metadata=b'{"foo": "bar"}', + )