Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/model/model.py b/swh/model/model.py
index c4f185f..ab11b8b 100644
--- a/swh/model/model.py
+++ b/swh/model/model.py
@@ -1,698 +1,902 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from abc import ABCMeta, abstractmethod
from copy import deepcopy
from enum import Enum
from hashlib import sha256
-from typing import Dict, Iterable, Optional, Tuple, TypeVar, Union
+from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union
from typing_extensions import Final
import attr
from attrs_strict import type_validator
import dateutil.parser
import iso8601
from .identifiers import (
normalize_timestamp,
directory_identifier,
revision_identifier,
release_identifier,
snapshot_identifier,
+ SWHID,
)
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash
class MissingData(Exception):
"""Raised by `Content.with_data` when it has no way of fetching the
data (but not when fetching the data fails)."""
pass
SHA1_SIZE = 20
# TODO: Limit this to 20 bytes
Sha1Git = bytes
def dictify(value):
"Helper function used by BaseModel.to_dict()"
if isinstance(value, BaseModel):
return value.to_dict()
elif isinstance(value, Enum):
return value.value
elif isinstance(value, dict):
return {k: dictify(v) for k, v in value.items()}
elif isinstance(value, tuple):
return tuple(dictify(v) for v in value)
else:
return value
ModelType = TypeVar("ModelType", bound="BaseModel")
class BaseModel:
"""Base class for SWH model classes.
Provides serialization/deserialization to/from Python dictionaries,
that are suitable for JSON/msgpack-like formats."""
def to_dict(self):
"""Wrapper of `attr.asdict` that can be overridden by subclasses
that have special handling of some of the fields."""
return dictify(attr.asdict(self, recurse=False))
@classmethod
def from_dict(cls, d):
"""Takes a dictionary representing a tree of SWH objects, and
recursively builds the corresponding objects."""
return cls(**d)
def anonymize(self: ModelType) -> Optional[ModelType]:
"""Returns an anonymized version of the object, if needed.
If the object model does not need/support anonymization, returns None.
"""
return None
class HashableObject(metaclass=ABCMeta):
"""Mixin to automatically compute object identifier hash when
the associated model is instantiated."""
@staticmethod
@abstractmethod
def compute_hash(object_dict):
"""Derived model classes must implement this to compute
the object hash from its dict representation."""
pass
def __attrs_post_init__(self):
if not self.id:
obj_id = hash_to_bytes(self.compute_hash(self.to_dict()))
object.__setattr__(self, "id", obj_id)
@attr.s(frozen=True)
class Person(BaseModel):
"""Represents the author/committer of a revision or release."""
object_type: Final = "person"
fullname = attr.ib(type=bytes, validator=type_validator())
name = attr.ib(type=Optional[bytes], validator=type_validator())
email = attr.ib(type=Optional[bytes], validator=type_validator())
@classmethod
def from_fullname(cls, fullname: bytes):
"""Returns a Person object, by guessing the name and email from the
fullname, in the `name <email>` format.
The fullname is left unchanged."""
if fullname is None:
raise TypeError("fullname is None.")
name: Optional[bytes]
email: Optional[bytes]
try:
open_bracket = fullname.index(b"<")
except ValueError:
name = fullname
email = None
else:
raw_name = fullname[:open_bracket]
raw_email = fullname[open_bracket + 1 :]
if not raw_name:
name = None
else:
name = raw_name.strip()
try:
close_bracket = raw_email.rindex(b">")
except ValueError:
email = raw_email
else:
email = raw_email[:close_bracket]
return Person(name=name or None, email=email or None, fullname=fullname,)
def anonymize(self) -> "Person":
"""Returns an anonymized version of the Person object.
Anonymization is simply a Person which fullname is the hashed, with unset name
or email.
"""
return Person(fullname=sha256(self.fullname).digest(), name=None, email=None,)
@attr.s(frozen=True)
class Timestamp(BaseModel):
"""Represents a naive timestamp from a VCS."""
object_type: Final = "timestamp"
seconds = attr.ib(type=int, validator=type_validator())
microseconds = attr.ib(type=int, validator=type_validator())
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if not (-(2 ** 63) <= value < 2 ** 63):
raise ValueError("Seconds must be a signed 64-bits integer.")
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if not (0 <= value < 10 ** 6):
raise ValueError("Microseconds must be in [0, 1000000[.")
@attr.s(frozen=True)
class TimestampWithTimezone(BaseModel):
"""Represents a TZ-aware timestamp from a VCS."""
object_type: Final = "timestamp_with_timezone"
timestamp = attr.ib(type=Timestamp, validator=type_validator())
offset = attr.ib(type=int, validator=type_validator())
negative_utc = attr.ib(type=bool, validator=type_validator())
@offset.validator
def check_offset(self, attribute, value):
"""Checks the offset is a 16-bits signed integer (in theory, it
should always be between -14 and +14 hours)."""
if not (-(2 ** 15) <= value < 2 ** 15):
# max 14 hours offset in theory, but you never know what
# you'll find in the wild...
raise ValueError("offset too large: %d minutes" % value)
@negative_utc.validator
def check_negative_utc(self, attribute, value):
if self.offset and value:
raise ValueError("negative_utc can only be True is offset=0")
@classmethod
def from_dict(cls, obj: Union[Dict, datetime.datetime, int]):
"""Builds a TimestampWithTimezone from any of the formats
accepted by :func:`swh.model.normalize_timestamp`."""
# TODO: this accept way more types than just dicts; find a better
# name
d = normalize_timestamp(obj)
return cls(
timestamp=Timestamp.from_dict(d["timestamp"]),
offset=d["offset"],
negative_utc=d["negative_utc"],
)
@classmethod
def from_datetime(cls, dt: datetime.datetime):
return cls.from_dict(dt)
@classmethod
def from_iso8601(cls, s):
"""Builds a TimestampWithTimezone from an ISO8601-formatted string.
"""
dt = iso8601.parse_date(s)
tstz = cls.from_datetime(dt)
if dt.tzname() == "-00:00":
tstz = attr.evolve(tstz, negative_utc=True)
return tstz
@attr.s(frozen=True)
class Origin(BaseModel):
"""Represents a software source: a VCS and an URL."""
object_type: Final = "origin"
url = attr.ib(type=str, validator=type_validator())
@attr.s(frozen=True)
class OriginVisit(BaseModel):
"""Represents an origin visit with a given type at a given point in time, by a
SWH loader."""
object_type: Final = "origin_visit"
origin = attr.ib(type=str, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
type = attr.ib(type=str, validator=type_validator())
"""Should not be set before calling 'origin_visit_add()'."""
visit = attr.ib(type=Optional[int], validator=type_validator(), default=None)
def to_dict(self):
"""Serializes the date as a string and omits the visit id if it is
`None`."""
ov = super().to_dict()
if ov["visit"] is None:
del ov["visit"]
return ov
@attr.s(frozen=True)
class OriginVisitStatus(BaseModel):
"""Represents a visit update of an origin at a given point in time.
"""
object_type: Final = "origin_visit_status"
origin = attr.ib(type=str, validator=type_validator())
visit = attr.ib(type=int, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["created", "ongoing", "full", "partial"]),
)
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator())
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a
revision or an alias."""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
ALIAS = "alias"
class ObjectType(Enum):
"""The type of content pointed to by a release. Usually a revision"""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
@attr.s(frozen=True)
class SnapshotBranch(BaseModel):
"""Represents one of the branches of a snapshot."""
object_type: Final = "snapshot_branch"
target = attr.ib(type=bytes, validator=type_validator())
target_type = attr.ib(type=TargetType, validator=type_validator())
@target.validator
def check_target(self, attribute, value):
"""Checks the target type is not an alias, checks the target is a
valid sha1_git."""
if self.target_type != TargetType.ALIAS and self.target is not None:
if len(value) != 20:
raise ValueError("Wrong length for bytes identifier: %d" % len(value))
@classmethod
def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"]))
@attr.s(frozen=True)
class Snapshot(BaseModel, HashableObject):
"""Represents the full state of an origin at a given point in time."""
object_type: Final = "snapshot"
branches = attr.ib(
type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator()
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return snapshot_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
branches={
name: SnapshotBranch.from_dict(branch) if branch else None
for (name, branch) in d.pop("branches").items()
},
**d,
)
@attr.s(frozen=True)
class Release(BaseModel, HashableObject):
object_type: Final = "release"
name = attr.ib(type=bytes, validator=type_validator())
message = attr.ib(type=Optional[bytes], validator=type_validator())
target = attr.ib(type=Optional[Sha1Git], validator=type_validator())
target_type = attr.ib(type=ObjectType, validator=type_validator())
synthetic = attr.ib(type=bool, validator=type_validator())
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None)
date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None
)
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return release_identifier(object_dict)
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError("release date must be None if author is None.")
def to_dict(self):
rel = super().to_dict()
if rel["metadata"] is None:
del rel["metadata"]
return rel
@classmethod
def from_dict(cls, d):
d = d.copy()
if d.get("author"):
d["author"] = Person.from_dict(d["author"])
if d.get("date"):
d["date"] = TimestampWithTimezone.from_dict(d["date"])
return cls(target_type=ObjectType(d.pop("target_type")), **d)
def anonymize(self) -> "Release":
"""Returns an anonymized version of the Release object.
Anonymization consists in replacing the author with an anonymized Person object.
"""
author = self.author and self.author.anonymize()
return attr.evolve(self, author=author)
class RevisionType(Enum):
GIT = "git"
TAR = "tar"
DSC = "dsc"
SUBVERSION = "svn"
MERCURIAL = "hg"
def tuplify_extra_headers(value: Iterable) -> Tuple:
return tuple((k, v) for k, v in value)
@attr.s(frozen=True)
class Revision(BaseModel, HashableObject):
object_type: Final = "revision"
message = attr.ib(type=Optional[bytes], validator=type_validator())
author = attr.ib(type=Person, validator=type_validator())
committer = attr.ib(type=Person, validator=type_validator())
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator())
committer_date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator()
)
type = attr.ib(type=RevisionType, validator=type_validator())
directory = attr.ib(type=Sha1Git, validator=type_validator())
synthetic = attr.ib(type=bool, validator=type_validator())
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
extra_headers = attr.ib(
type=Tuple[Tuple[bytes, bytes], ...], # but it makes mypy sad
validator=type_validator(),
converter=tuplify_extra_headers, # type: ignore
default=(),
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# ensure metadata is a deep copy of whatever was given, and if needed
# extract extra_headers from there
if self.metadata:
metadata = deepcopy(self.metadata)
if not self.extra_headers and "extra_headers" in metadata:
object.__setattr__(
self,
"extra_headers",
tuplify_extra_headers(metadata.pop("extra_headers")),
)
attr.validate(self)
object.__setattr__(self, "metadata", metadata)
@staticmethod
def compute_hash(object_dict):
return revision_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
date = d.pop("date")
if date:
date = TimestampWithTimezone.from_dict(date)
committer_date = d.pop("committer_date")
if committer_date:
committer_date = TimestampWithTimezone.from_dict(committer_date)
return cls(
author=Person.from_dict(d.pop("author")),
committer=Person.from_dict(d.pop("committer")),
date=date,
committer_date=committer_date,
type=RevisionType(d.pop("type")),
parents=tuple(d.pop("parents")), # for BW compat
**d,
)
def anonymize(self) -> "Revision":
"""Returns an anonymized version of the Revision object.
Anonymization consists in replacing the author and committer with an anonymized
Person object.
"""
return attr.evolve(
self, author=self.author.anonymize(), committer=self.committer.anonymize()
)
@attr.s(frozen=True)
class DirectoryEntry(BaseModel):
object_type: Final = "directory_entry"
name = attr.ib(type=bytes, validator=type_validator())
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"]))
target = attr.ib(type=Sha1Git, validator=type_validator())
perms = attr.ib(type=int, validator=type_validator())
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
@attr.s(frozen=True)
class Directory(BaseModel, HashableObject):
object_type: Final = "directory"
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return directory_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
entries=tuple(
DirectoryEntry.from_dict(entry) for entry in d.pop("entries")
),
**d,
)
@attr.s(frozen=True)
class BaseContent(BaseModel):
status = attr.ib(
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"])
)
@staticmethod
def _hash_data(data: bytes):
"""Hash some data, returning most of the fields of a content object"""
d = MultiHash.from_data(data).digest()
d["data"] = data
d["length"] = len(data)
return d
@classmethod
def from_dict(cls, d, use_subclass=True):
if use_subclass:
# Chooses a subclass to instantiate instead.
if d["status"] == "absent":
return SkippedContent.from_dict(d)
else:
return Content.from_dict(d)
else:
return super().from_dict(d)
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError("{} is not a valid hash name.".format(hash_name))
return getattr(self, hash_name)
def hashes(self) -> Dict[str, bytes]:
"""Returns a dictionary {hash_name: hash_value}"""
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS}
@attr.s(frozen=True)
class Content(BaseContent):
object_type: Final = "content"
sha1 = attr.ib(type=bytes, validator=type_validator())
sha1_git = attr.ib(type=Sha1Git, validator=type_validator())
sha256 = attr.ib(type=bytes, validator=type_validator())
blake2s256 = attr.ib(type=bytes, validator=type_validator())
length = attr.ib(type=int, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)
data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if value < 0:
raise ValueError("Length must be positive.")
def to_dict(self):
content = super().to_dict()
if content["data"] is None:
del content["data"]
return content
@classmethod
def from_data(cls, data, status="visible", ctime=None) -> "Content":
"""Generate a Content from a given `data` byte string.
This populates the Content with the hashes and length for the data
passed as argument, as well as the data itself.
"""
d = cls._hash_data(data)
d["status"] = status
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
if isinstance(d.get("ctime"), str):
d = d.copy()
d["ctime"] = dateutil.parser.parse(d["ctime"])
return super().from_dict(d, use_subclass=False)
def with_data(self) -> "Content":
"""Loads the `data` attribute; meaning that it is guaranteed not to
be None after this call.
This call is almost a no-op, but subclasses may overload this method
to lazy-load data (eg. from disk or objstorage)."""
if self.data is None:
raise MissingData("Content data is None.")
return self
@attr.s(frozen=True)
class SkippedContent(BaseContent):
object_type: Final = "skipped_content"
sha1 = attr.ib(type=Optional[bytes], validator=type_validator())
sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator())
sha256 = attr.ib(type=Optional[bytes], validator=type_validator())
blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator())
length = attr.ib(type=Optional[int], validator=type_validator())
status = attr.ib(type=str, validator=attr.validators.in_(["absent"]))
reason = attr.ib(type=Optional[str], validator=type_validator(), default=None)
origin = attr.ib(type=Optional[str], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if value is None:
raise ValueError("Must provide a reason if content is absent.")
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive or -1."""
if value < -1:
raise ValueError("Length must be positive or -1.")
def to_dict(self):
content = super().to_dict()
if content["origin"] is None:
del content["origin"]
return content
@classmethod
def from_data(
cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None
) -> "SkippedContent":
"""Generate a SkippedContent from a given `data` byte string.
This populates the SkippedContent with the hashes and length for the
data passed as argument.
You can use `attr.evolve` on such a generated content to nullify some
of its attributes, e.g. for tests.
"""
d = cls._hash_data(data)
del d["data"]
d["status"] = "absent"
d["reason"] = reason
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
d2 = d.copy()
if d2.pop("data", None) is not None:
raise ValueError('SkippedContent has no "data" attribute %r' % d)
return super().from_dict(d2, use_subclass=False)
+
+
+class MetadataAuthorityType(Enum):
+ DEPOSIT = "deposit"
+ FORGE = "forge"
+ REGISTRY = "registry"
+
+
+@attr.s(frozen=True)
+class MetadataAuthority(BaseModel):
+ """Represents an entity that provides metadata about an origin or
+ software artifact."""
+
+ type = attr.ib(type=MetadataAuthorityType, validator=type_validator())
+ url = attr.ib(type=str, validator=type_validator())
+ metadata = attr.ib(
+ type=Optional[Dict[str, Any]], default=None, validator=type_validator()
+ )
+
+
+@attr.s(frozen=True)
+class MetadataFetcher(BaseModel):
+ """Represents a software component used to fetch metadata from a metadata
+ authority, and ingest them into the Software Heritage archive."""
+
+ name = attr.ib(type=str, validator=type_validator())
+ version = attr.ib(type=str, validator=type_validator())
+ metadata = attr.ib(
+ type=Optional[Dict[str, Any]], default=None, validator=type_validator()
+ )
+
+
+class MetadataTargetType(Enum):
+ """The type of object extrinsic metadata refer to."""
+
+ CONTENT = "content"
+ DIRECTORY = "directory"
+ REVISION = "revision"
+ RELEASE = "release"
+ SNAPSHOT = "snapshot"
+ ORIGIN = "origin"
+
+
+@attr.s(frozen=True)
+class RawExtrinsicMetadata(BaseModel):
+ # target object
+ type = attr.ib(type=MetadataTargetType, validator=type_validator())
+ id = attr.ib(type=Union[str, SWHID], validator=type_validator())
+ """URL if type=MetadataTargetType.ORIGIN, else core SWHID"""
+
+ # source
+ discovery_date = attr.ib(type=datetime.datetime, validator=type_validator())
+ authority = attr.ib(type=MetadataAuthority, validator=type_validator())
+ fetcher = attr.ib(type=MetadataFetcher, validator=type_validator())
+
+ # the metadata itself
+ format = attr.ib(type=str, validator=type_validator())
+ metadata = attr.ib(type=bytes, validator=type_validator())
+
+ # context
+ origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
+ visit = attr.ib(type=Optional[int], default=None, validator=type_validator())
+ snapshot = attr.ib(type=Optional[SWHID], default=None, validator=type_validator())
+ release = attr.ib(type=Optional[SWHID], default=None, validator=type_validator())
+ revision = attr.ib(type=Optional[SWHID], default=None, validator=type_validator())
+ path = attr.ib(type=Optional[bytes], default=None, validator=type_validator())
+ directory = attr.ib(type=Optional[SWHID], default=None, validator=type_validator())
+
+ @id.validator
+ def check_id(self, attribute, value):
+ if self.type == MetadataTargetType.ORIGIN:
+ if isinstance(value, SWHID) or value.startswith("swh:"):
+ raise ValueError(
+ "Got SWHID as id for origin metadata (expected an URL)."
+ )
+ else:
+ self._check_pid(self.type.value, value)
+
+ @origin.validator
+ def check_origin(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (
+ MetadataTargetType.SNAPSHOT,
+ MetadataTargetType.RELEASE,
+ MetadataTargetType.REVISION,
+ MetadataTargetType.DIRECTORY,
+ MetadataTargetType.CONTENT,
+ ):
+ raise ValueError(
+ f"Unexpected 'origin' context for {self.type.value} object: {value}"
+ )
+
+ if value.startswith("swh:"):
+ # Technically this is valid; but:
+ # 1. SWHIDs are URIs, not URLs
+ # 2. if a SWHID gets here, it's very likely to be a mistake
+ # (and we can remove this check if it turns out there is a
+ # legitimate use for it).
+ raise ValueError(f"SWHID used as context origin URL: {value}")
+
+ @visit.validator
+ def check_visit(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (
+ MetadataTargetType.SNAPSHOT,
+ MetadataTargetType.RELEASE,
+ MetadataTargetType.REVISION,
+ MetadataTargetType.DIRECTORY,
+ MetadataTargetType.CONTENT,
+ ):
+ raise ValueError(
+ f"Unexpected 'visit' context for {self.type.value} object: {value}"
+ )
+
+ if self.origin is None:
+ raise ValueError("'origin' context must be set if 'visit' is.")
+
+ if value <= 0:
+ raise ValueError("Nonpositive visit id")
+
+ @snapshot.validator
+ def check_snapshot(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (
+ MetadataTargetType.RELEASE,
+ MetadataTargetType.REVISION,
+ MetadataTargetType.DIRECTORY,
+ MetadataTargetType.CONTENT,
+ ):
+ raise ValueError(
+ f"Unexpected 'snapshot' context for {self.type.value} object: {value}"
+ )
+
+ self._check_pid("snapshot", value)
+
+ @release.validator
+ def check_release(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (
+ MetadataTargetType.REVISION,
+ MetadataTargetType.DIRECTORY,
+ MetadataTargetType.CONTENT,
+ ):
+ raise ValueError(
+ f"Unexpected 'release' context for {self.type.value} object: {value}"
+ )
+
+ self._check_pid("release", value)
+
+ @revision.validator
+ def check_revision(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,):
+ raise ValueError(
+ f"Unexpected 'revision' context for {self.type.value} object: {value}"
+ )
+
+ self._check_pid("revision", value)
+
+ @path.validator
+ def check_path(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,):
+ raise ValueError(
+ f"Unexpected 'path' context for {self.type.value} object: {value}"
+ )
+
+ @directory.validator
+ def check_directory(self, attribute, value):
+ if value is None:
+ return
+
+ if self.type not in (MetadataTargetType.CONTENT,):
+ raise ValueError(
+ f"Unexpected 'directory' context for {self.type.value} object: {value}"
+ )
+
+ self._check_pid("directory", value)
+
+ def _check_pid(self, expected_object_type, pid):
+ if isinstance(pid, str):
+ raise ValueError(f"Expected SWHID, got a string: {pid}")
+
+ if pid.object_type != expected_object_type:
+ raise ValueError(
+ f"Expected SWHID type '{expected_object_type}', "
+ f"got '{pid.object_type}' in {pid}"
+ )
+
+ if pid.metadata:
+ raise ValueError(f"Expected core SWHID, but got: {pid}")
diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py
index edfc829..43c32a0 100644
--- a/swh/model/tests/test_model.py
+++ b/swh/model/tests/test_model.py
@@ -1,680 +1,1087 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import attr
from attrs_strict import AttributeTypeError
from hypothesis import given
from hypothesis.strategies import binary
import pytest
from swh.model.model import (
BaseModel,
Content,
SkippedContent,
Directory,
Revision,
Release,
Snapshot,
Origin,
Timestamp,
TimestampWithTimezone,
MissingData,
Person,
+ RawExtrinsicMetadata,
+ MetadataTargetType,
+ MetadataAuthority,
+ MetadataAuthorityType,
+ MetadataFetcher,
)
from swh.model.hashutil import hash_to_bytes, MultiHash
import swh.model.hypothesis_strategies as strategies
from swh.model.identifiers import (
directory_identifier,
revision_identifier,
release_identifier,
snapshot_identifier,
+ parse_swhid,
+ SWHID,
)
from swh.model.tests.test_identifiers import (
directory_example,
revision_example,
release_example,
snapshot_example,
)
@given(strategies.objects())
def test_todict_inverse_fromdict(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
if obj_type in ("origin", "origin_visit"):
return
obj_as_dict = obj.to_dict()
obj_as_dict_copy = copy.deepcopy(obj_as_dict)
# Check the composition of to_dict and from_dict is the identity
assert obj == type(obj).from_dict(obj_as_dict)
# Check from_dict() does not change the input dict
assert obj_as_dict == obj_as_dict_copy
# Check the composition of from_dict and to_dict is the identity
assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict()
# Anonymization
@given(strategies.objects())
def test_anonymization(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
def check_person(p):
if p is not None:
assert p.name is None
assert p.email is None
assert len(p.fullname) == 32
anon_obj = obj.anonymize()
if obj_type == "person":
assert anon_obj is not None
check_person(anon_obj)
elif obj_type == "release":
assert anon_obj is not None
check_person(anon_obj.author)
elif obj_type == "revision":
assert anon_obj is not None
check_person(anon_obj.author)
check_person(anon_obj.committer)
else:
assert anon_obj is None
# Origin, OriginVisit
@given(strategies.origins())
def test_todict_origins(origin):
obj = origin.to_dict()
assert "type" not in obj
assert type(origin)(url=origin.url) == type(origin).from_dict(obj)
@given(strategies.origin_visits())
def test_todict_origin_visits(origin_visit):
obj = origin_visit.to_dict()
assert origin_visit == type(origin_visit).from_dict(obj)
@given(strategies.origin_visit_statuses())
def test_todict_origin_visit_statuses(origin_visit_status):
obj = origin_visit_status.to_dict()
assert origin_visit_status == type(origin_visit_status).from_dict(obj)
# Timestamp
@given(strategies.timestamps())
def test_timestamps_strategy(timestamp):
attr.validate(timestamp)
def test_timestamp_seconds():
attr.validate(Timestamp(seconds=0, microseconds=0))
with pytest.raises(AttributeTypeError):
Timestamp(seconds="0", microseconds=0)
attr.validate(Timestamp(seconds=2 ** 63 - 1, microseconds=0))
with pytest.raises(ValueError):
Timestamp(seconds=2 ** 63, microseconds=0)
attr.validate(Timestamp(seconds=-(2 ** 63), microseconds=0))
with pytest.raises(ValueError):
Timestamp(seconds=-(2 ** 63) - 1, microseconds=0)
def test_timestamp_microseconds():
attr.validate(Timestamp(seconds=0, microseconds=0))
with pytest.raises(AttributeTypeError):
Timestamp(seconds=0, microseconds="0")
attr.validate(Timestamp(seconds=0, microseconds=10 ** 6 - 1))
with pytest.raises(ValueError):
Timestamp(seconds=0, microseconds=10 ** 6)
with pytest.raises(ValueError):
Timestamp(seconds=0, microseconds=-1)
def test_timestamp_from_dict():
assert Timestamp.from_dict({"seconds": 10, "microseconds": 5})
with pytest.raises(AttributeTypeError):
Timestamp.from_dict({"seconds": "10", "microseconds": 5})
with pytest.raises(AttributeTypeError):
Timestamp.from_dict({"seconds": 10, "microseconds": "5"})
with pytest.raises(ValueError):
Timestamp.from_dict({"seconds": 0, "microseconds": -1})
Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6 - 1})
with pytest.raises(ValueError):
Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6})
# TimestampWithTimezone
def test_timestampwithtimezone():
ts = Timestamp(seconds=0, microseconds=0)
tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=False)
attr.validate(tstz)
assert tstz.negative_utc is False
attr.validate(TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False))
attr.validate(TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False))
tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=True)
attr.validate(tstz)
assert tstz.negative_utc is True
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(
timestamp=datetime.datetime.now(), offset=0, negative_utc=False
)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset="0", negative_utc=False)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset=1.0, negative_utc=False)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=0)
with pytest.raises(ValueError):
TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=True)
with pytest.raises(ValueError):
TimestampWithTimezone(timestamp=ts, offset=-1, negative_utc=True)
def test_timestampwithtimezone_from_datetime():
tz = datetime.timezone(datetime.timedelta(minutes=+60))
date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz)
tstz = TimestampWithTimezone.from_datetime(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=0,),
offset=60,
negative_utc=False,
)
def test_timestampwithtimezone_from_iso8601():
date = "2020-02-27 14:39:19.123456+0100"
tstz = TimestampWithTimezone.from_iso8601(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=123456,),
offset=60,
negative_utc=False,
)
def test_timestampwithtimezone_from_iso8601_negative_utc():
date = "2020-02-27 13:39:19-0000"
tstz = TimestampWithTimezone.from_iso8601(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=0,),
offset=0,
negative_utc=True,
)
def test_person_from_fullname():
"""The author should have name, email and fullname filled.
"""
actual_person = Person.from_fullname(b"tony <ynot@dagobah>")
assert actual_person == Person(
fullname=b"tony <ynot@dagobah>", name=b"tony", email=b"ynot@dagobah",
)
def test_person_from_fullname_no_email():
"""The author and fullname should be the same as the input (author).
"""
actual_person = Person.from_fullname(b"tony")
assert actual_person == Person(fullname=b"tony", name=b"tony", email=None,)
def test_person_from_fullname_empty_person():
"""Empty person has only its fullname filled with the empty
byte-string.
"""
actual_person = Person.from_fullname(b"")
assert actual_person == Person(fullname=b"", name=None, email=None,)
def test_git_author_line_to_author():
# edge case out of the way
with pytest.raises(TypeError):
Person.from_fullname(None)
tests = {
b"a <b@c.com>": Person(name=b"a", email=b"b@c.com", fullname=b"a <b@c.com>",),
b"<foo@bar.com>": Person(
name=None, email=b"foo@bar.com", fullname=b"<foo@bar.com>",
),
b"malformed <email": Person(
name=b"malformed", email=b"email", fullname=b"malformed <email"
),
b'malformed <"<br"@ckets>': Person(
name=b"malformed",
email=b'"<br"@ckets',
fullname=b'malformed <"<br"@ckets>',
),
b"trailing <sp@c.e> ": Person(
name=b"trailing", email=b"sp@c.e", fullname=b"trailing <sp@c.e> ",
),
b"no<sp@c.e>": Person(name=b"no", email=b"sp@c.e", fullname=b"no<sp@c.e>",),
b" more <sp@c.es>": Person(
name=b"more", email=b"sp@c.es", fullname=b" more <sp@c.es>",
),
b" <>": Person(name=None, email=None, fullname=b" <>",),
}
for person in sorted(tests):
expected_person = tests[person]
assert expected_person == Person.from_fullname(person)
# Content
def test_content_get_hash():
hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux")
c = Content(length=42, status="visible", **hashes)
for (hash_name, hash_) in hashes.items():
assert c.get_hash(hash_name) == hash_
def test_content_hashes():
hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux")
c = Content(length=42, status="visible", **hashes)
assert c.hashes() == hashes
def test_content_data():
c = Content(
length=42,
status="visible",
data=b"foo",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
assert c.with_data() == c
def test_content_data_missing():
c = Content(
length=42,
status="visible",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(MissingData):
c.with_data()
@given(strategies.present_contents_d())
def test_content_from_dict(content_d):
c = Content.from_data(**content_d)
assert c
assert c.ctime == content_d["ctime"]
content_d2 = c.to_dict()
c2 = Content.from_dict(content_d2)
assert c2.ctime == c.ctime
def test_content_from_dict_str_ctime():
# test with ctime as a string
n = datetime.datetime(2020, 5, 6, 12, 34)
content_d = {
"ctime": n.isoformat(),
"data": b"",
"length": 0,
"sha1": b"\x00",
"sha256": b"\x00",
"sha1_git": b"\x00",
"blake2s256": b"\x00",
}
c = Content.from_dict(content_d)
assert c.ctime == n
@given(binary(max_size=4096))
def test_content_from_data(data):
c = Content.from_data(data)
assert c.data == data
assert c.length == len(data)
assert c.status == "visible"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
@given(binary(max_size=4096))
def test_hidden_content_from_data(data):
c = Content.from_data(data, status="hidden")
assert c.data == data
assert c.length == len(data)
assert c.status == "hidden"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
# SkippedContent
@given(binary(max_size=4096))
def test_skipped_content_from_data(data):
c = SkippedContent.from_data(data, reason="reason")
assert c.reason == "reason"
assert c.length == len(data)
assert c.status == "absent"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
@given(strategies.skipped_contents_d())
def test_skipped_content_origin_is_str(skipped_content_d):
assert SkippedContent.from_dict(skipped_content_d)
skipped_content_d["origin"] = "http://path/to/origin"
assert SkippedContent.from_dict(skipped_content_d)
skipped_content_d["origin"] = Origin(url="http://path/to/origin")
with pytest.raises(ValueError, match="origin"):
SkippedContent.from_dict(skipped_content_d)
# Revision
def test_revision_extra_headers_no_headers():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_model = Revision(**rev_dict)
assert rev_model.metadata is None
assert rev_model.extra_headers == ()
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
rev_model = Revision(**rev_dict)
assert rev_model.metadata == rev_dict["metadata"]
assert rev_model.extra_headers == ()
def test_revision_extra_headers_with_headers():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\u0000"),
(b"header1", b"again"),
)
rev_dict["extra_headers"] = extra_headers
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_in_metadata():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\u0000"),
(b"header1", b"again"),
)
# check the bw-compat init hook does the job
# ie. extra_headers are given in the metadata field
rev_dict["metadata"]["extra_headers"] = extra_headers
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_as_lists():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\u0000"),
(b"header1", b"again"),
)
# check Revision.extra_headers tuplify does the job
rev_dict["extra_headers"] = [list(x) for x in extra_headers]
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_type_error():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
orig_rev_dict = attr.asdict(rev, recurse=False)
orig_rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
("header1", b"value1"),
(b"header2", 42),
("header1", "again"),
)
# check headers one at a time
# if given as extra_header
for extra_header in extra_headers:
rev_dict = copy.deepcopy(orig_rev_dict)
rev_dict["extra_headers"] = (extra_header,)
with pytest.raises(AttributeTypeError):
Revision(**rev_dict)
# if given as metadata
for extra_header in extra_headers:
rev_dict = copy.deepcopy(orig_rev_dict)
rev_dict["metadata"]["extra_headers"] = (extra_header,)
with pytest.raises(AttributeTypeError):
Revision(**rev_dict)
def test_revision_extra_headers_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_model = Revision.from_dict(rev_dict)
assert rev_model.metadata is None
assert rev_model.extra_headers == ()
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
rev_model = Revision.from_dict(rev_dict)
assert rev_model.metadata == rev_dict["metadata"]
assert rev_model.extra_headers == ()
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
rev_dict["extra_headers"] = extra_headers
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_in_metadata_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
# check the bw-compat init hook does the job
rev_dict["metadata"]["extra_headers"] = extra_headers
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_as_lists_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_model = Revision.from_dict(rev_dict)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
# check Revision.extra_headers converter does the job
rev_dict["extra_headers"] = [list(x) for x in extra_headers]
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
# ID computation
def test_directory_model_id_computation():
dir_dict = directory_example.copy()
del dir_dict["id"]
dir_id = hash_to_bytes(directory_identifier(dir_dict))
dir_model = Directory.from_dict(dir_dict)
assert dir_model.id == dir_id
def test_revision_model_id_computation():
rev_dict = revision_example.copy()
del rev_dict["id"]
rev_id = hash_to_bytes(revision_identifier(rev_dict))
rev_model = Revision.from_dict(rev_dict)
assert rev_model.id == rev_id
def test_revision_model_id_computation_with_no_date():
"""We can have revision with date to None
"""
rev_dict = revision_example.copy()
rev_dict["date"] = None
rev_dict["committer_date"] = None
del rev_dict["id"]
rev_id = hash_to_bytes(revision_identifier(rev_dict))
rev_model = Revision.from_dict(rev_dict)
assert rev_model.date is None
assert rev_model.committer_date is None
assert rev_model.id == rev_id
def test_release_model_id_computation():
rel_dict = release_example.copy()
del rel_dict["id"]
rel_id = hash_to_bytes(release_identifier(rel_dict))
rel_model = Release.from_dict(rel_dict)
assert isinstance(rel_model.date, TimestampWithTimezone)
assert rel_model.id == hash_to_bytes(rel_id)
def test_snapshot_model_id_computation():
snp_dict = snapshot_example.copy()
del snp_dict["id"]
snp_id = hash_to_bytes(snapshot_identifier(snp_dict))
snp_model = Snapshot.from_dict(snp_dict)
assert snp_model.id == snp_id
@given(strategies.objects(split_content=True))
def test_object_type(objtype_and_obj):
obj_type, obj = objtype_and_obj
assert obj_type == obj.object_type
def test_object_type_is_final():
object_types = set()
def check_final(cls):
if hasattr(cls, "object_type"):
assert cls.object_type not in object_types
object_types.add(cls.object_type)
if cls.__subclasses__():
assert not hasattr(cls, "object_type")
for subcls in cls.__subclasses__():
check_final(subcls)
check_final(BaseModel)
+
+
+_metadata_authority = MetadataAuthority(
+ type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org",
+)
+_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",)
+_content_swhid = parse_swhid("swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2")
+_origin_url = "https://forge.softwareheritage.org/source/swh-model.git"
+_common_metadata_fields = dict(
+ discovery_date=datetime.datetime.now(),
+ authority=_metadata_authority,
+ fetcher=_metadata_fetcher,
+ format="json",
+ metadata=b'{"foo": "bar"}',
+)
+
+
+def test_metadata_valid():
+ """Checks valid RawExtrinsicMetadata objects don't raise an error."""
+
+ # Simplest case
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN, id=_origin_url, **_common_metadata_fields
+ )
+
+ # Object with an SWHID
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT, id=_content_swhid, **_common_metadata_fields
+ )
+
+
+def test_metadata_invalid_id():
+ """Checks various invalid values for the 'id' field."""
+
+ # SWHID for an origin
+ with pytest.raises(ValueError, match="expected an URL"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN, id=_content_swhid, **_common_metadata_fields
+ )
+
+ # SWHID for an origin (even when passed as string)
+ with pytest.raises(ValueError, match="expected an URL"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
+ **_common_metadata_fields,
+ )
+
+ # URL for a non-origin
+ with pytest.raises(ValueError, match="Expected SWHID, got a string"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT, id=_origin_url, **_common_metadata_fields
+ )
+
+ # SWHID passed as string instead of SWHID
+ with pytest.raises(ValueError, match="Expected SWHID, got a string"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
+ **_common_metadata_fields,
+ )
+
+ # Object type does not match the SWHID
+ with pytest.raises(
+ ValueError, match="Expected SWHID type 'revision', got 'content'"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.REVISION,
+ id=_content_swhid,
+ **_common_metadata_fields,
+ )
+
+ # Non-core SWHID
+ with pytest.raises(ValueError, match="Expected core SWHID"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=SWHID(
+ object_type="content",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ metadata={"foo": "bar"},
+ ),
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_origin():
+ """Checks validation of RawExtrinsicMetadata.origin."""
+
+ # Origins can't have an 'origin' context
+ with pytest.raises(
+ ValueError, match="Unexpected 'origin' context for origin object"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ origin=_origin_url,
+ **_common_metadata_fields,
+ )
+
+ # but all other types can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ origin=_origin_url,
+ **_common_metadata_fields,
+ )
+
+ # SWHIDs aren't valid origin URLs
+ with pytest.raises(ValueError, match="SWHID used as context origin URL"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_visit():
+ """Checks validation of RawExtrinsicMetadata.visit."""
+
+ # Origins can't have a 'visit' context
+ with pytest.raises(
+ ValueError, match="Unexpected 'visit' context for origin object"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ visit=42,
+ **_common_metadata_fields,
+ )
+
+ # but all other types can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ origin=_origin_url,
+ visit=42,
+ **_common_metadata_fields,
+ )
+
+ # Missing 'origin'
+ with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ visit=42,
+ **_common_metadata_fields,
+ )
+
+ # visit id must be positive
+ with pytest.raises(ValueError, match="Nonpositive visit id"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ origin=_origin_url,
+ visit=-42,
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_snapshot():
+ """Checks validation of RawExtrinsicMetadata.snapshot."""
+
+ # Origins can't have a 'snapshot' context
+ with pytest.raises(
+ ValueError, match="Unexpected 'snapshot' context for origin object"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ snapshot=SWHID(
+ object_type="snapshot",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+ # but content can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ snapshot=SWHID(
+ object_type="snapshot", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2"
+ ),
+ **_common_metadata_fields,
+ )
+
+ # Non-core SWHID
+ with pytest.raises(ValueError, match="Expected core SWHID"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ snapshot=SWHID(
+ object_type="snapshot",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ metadata={"foo": "bar"},
+ ),
+ **_common_metadata_fields,
+ )
+
+ # SWHID type doesn't match the expected type of this context key
+ with pytest.raises(
+ ValueError, match="Expected SWHID type 'snapshot', got 'content'"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ snapshot=SWHID(
+ object_type="content",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_release():
+ """Checks validation of RawExtrinsicMetadata.release."""
+
+ # Origins can't have a 'release' context
+ with pytest.raises(
+ ValueError, match="Unexpected 'release' context for origin object"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ release=SWHID(
+ object_type="release",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+ # but content can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ release=SWHID(
+ object_type="release", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2"
+ ),
+ **_common_metadata_fields,
+ )
+
+ # Non-core SWHID
+ with pytest.raises(ValueError, match="Expected core SWHID"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ release=SWHID(
+ object_type="release",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ metadata={"foo": "bar"},
+ ),
+ **_common_metadata_fields,
+ )
+
+ # SWHID type doesn't match the expected type of this context key
+ with pytest.raises(
+ ValueError, match="Expected SWHID type 'release', got 'content'"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ release=SWHID(
+ object_type="content",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_revision():
+ """Checks validation of RawExtrinsicMetadata.revision."""
+
+ # Origins can't have a 'revision' context
+ with pytest.raises(
+ ValueError, match="Unexpected 'revision' context for origin object"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ revision=SWHID(
+ object_type="revision",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+ # but content can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ revision=SWHID(
+ object_type="revision", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2"
+ ),
+ **_common_metadata_fields,
+ )
+
+ # Non-core SWHID
+ with pytest.raises(ValueError, match="Expected core SWHID"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ revision=SWHID(
+ object_type="revision",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ metadata={"foo": "bar"},
+ ),
+ **_common_metadata_fields,
+ )
+
+ # SWHID type doesn't match the expected type of this context key
+ with pytest.raises(
+ ValueError, match="Expected SWHID type 'revision', got 'content'"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ revision=SWHID(
+ object_type="content",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_path():
+ """Checks validation of RawExtrinsicMetadata.path."""
+
+ # Origins can't have a 'path' context
+ with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ path=b"/foo/bar",
+ **_common_metadata_fields,
+ )
+
+ # but content can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ path=b"/foo/bar",
+ **_common_metadata_fields,
+ )
+
+
+def test_metadata_validate_context_directory():
+ """Checks validation of RawExtrinsicMetadata.directory."""
+
+ # Origins can't have a 'directory' context
+ with pytest.raises(
+ ValueError, match="Unexpected 'directory' context for origin object"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.ORIGIN,
+ id=_origin_url,
+ directory=SWHID(
+ object_type="directory",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+ # but content can
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ directory=SWHID(
+ object_type="directory",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )
+
+ # Non-core SWHID
+ with pytest.raises(ValueError, match="Expected core SWHID"):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ directory=SWHID(
+ object_type="directory",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ metadata={"foo": "bar"},
+ ),
+ **_common_metadata_fields,
+ )
+
+ # SWHID type doesn't match the expected type of this context key
+ with pytest.raises(
+ ValueError, match="Expected SWHID type 'directory', got 'content'"
+ ):
+ RawExtrinsicMetadata(
+ type=MetadataTargetType.CONTENT,
+ id=_content_swhid,
+ directory=SWHID(
+ object_type="content",
+ object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
+ ),
+ **_common_metadata_fields,
+ )

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:14 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452557

Event Timeline