diff --git a/swh/model/model.py b/swh/model/model.py --- a/swh/model/model.py +++ b/swh/model/model.py @@ -8,7 +8,7 @@ from abc import ABCMeta, abstractmethod from enum import Enum from hashlib import sha256 -from typing import Dict, Optional, Tuple, TypeVar, Union +from typing import Any, Dict, Optional, Tuple, TypeVar, Union from typing_extensions import Final import attr @@ -22,6 +22,7 @@ revision_identifier, release_identifier, snapshot_identifier, + PersistentId, ) from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash @@ -670,3 +671,191 @@ if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) + + +class MetadataAuthorityType(Enum): + DEPOSIT = "deposit" + FORGE = "forge" + REGISTRY = "registry" + + +@attr.s(frozen=True) +class MetadataAuthority(BaseModel): + """Represents an entity that provides metadata about an origin or + software artifact.""" + + type = attr.ib(type=MetadataAuthorityType) + url = attr.ib(type=str) + metadata = attr.ib(type=Optional[Dict[str, Any]]) + + +@attr.s(frozen=True) +class MetadataFetcher(BaseModel): + """Represents a software component used to fetch metadata from a metadata + authority, and ingest them into the Software Heritage archive.""" + + name = attr.ib(type=str) + version = attr.ib(type=str) + metadata = attr.ib(type=Optional[Dict[str, Any]]) + + +class MetadataTargetType(Enum): + """The type of object extrinsic metadata refer to.""" + + CONTENT = "content" + DIRECTORY = "directory" + REVISION = "revision" + RELEASE = "release" + SNAPSHOT = "snapshot" + ORIGIN = "origin" + + +@attr.s(frozen=True) +class RawExtrinsicMetadata(BaseModel): + # target object + type = attr.ib(type=MetadataTargetType) + id = attr.ib(type=str) + """URL if type=MetadataTargetType.ORIGIN, else core SWHID""" + + # context + origin = attr.ib(type=Optional[str]) + visit = attr.ib(type=int) + snapshot = attr.ib(type=Optional[PersistentId]) + release = attr.ib(type=Optional[PersistentId]) + revision = attr.ib(type=Optional[PersistentId]) + path = attr.ib(type=Optional[bytes]) + directory = attr.ib(type=Optional[PersistentId]) + + # source + discovery_date = attr.ib(type=datetime.datetime) + authority = attr.ib(type=MetadataAuthority) + fetcher = attr.ib(type=MetadataFetcher) + + # the metadata itself + format = attr.ib(type=str) + metadata = attr.ib(type=bytes) + + @id.validator + def check_id(self, attribute, value): + if self.type == MetadataTargetType.ORIGIN: + return + + self._check_pid(self.type, value) + + @origin.validator + def check_origin(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'origin' context for {self.type} object: {value}" + ) + + @visit.validator + def check_visit(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.SNAPSHOT, + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'visit' context for {self.type} object: {value}" + ) + + if value <= 0: + raise ValueError("Nonpositive visit id") + + @snapshot.validator + def check_snapshot(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.RELEASE, + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'snapshot' context for {self.type} object: {value}" + ) + + self._check_pid("snapshot", value) + + @release.validator + def check_release(self, attribute, value): + if value is None: + return + + if self.type not in ( + MetadataTargetType.REVISION, + MetadataTargetType.DIRECTORY, + MetadataTargetType.CONTENT, + ): + raise ValueError( + f"Unexpected 'release' context for {self.type} object: {value}" + ) + + self._check_pid("release", value) + + @revision.validator + def check_revision(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'revision' context for {self.type} object: {value}" + ) + + self._check_pid("revision", value) + + @path.validator + def check_path(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'path' context for {self.type} object: {value}" + ) + + @directory.validator + def check_directory(self, attribute, value): + if value is None: + return + + if self.type not in (MetadataTargetType.CONTENT,): + raise ValueError( + f"Unexpected 'directory' context for {self.type} object: {value}" + ) + + self._check_pid("directory", value) + + def _check_pid(self, expected_object_type, pid): + if pid.namespace != "swh": + raise ValueError(f"Unexpected PID namespace {pid.namespace} in {pid}") + + if pid.scheme_version != "1": + raise ValueError(f"Unexpected SWHID version {pid.scheme_version} in {pid}") + + if pid.object_type != expected_object_type: + raise ValueError( + f"Expected SWHID type {expected_object_type}, " + f"got {pid.object_type} in {pid}" + ) + + if pid.metadata: + raise ValueError(f"Expected core SWHID, but got: {pid}")