diff --git a/swh/model/model.py b/swh/model/model.py
index 735ce46..53c874e 100644
--- a/swh/model/model.py
+++ b/swh/model/model.py
@@ -1,1338 +1,1386 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Implementation of Software Heritage's data model
See :ref:`data-model` for an overview of the data model.
The classes defined in this module are immutable
`attrs objects `__ and enums.
All classes define a ``from_dict`` class method and a ``to_dict``
method to convert between them and msgpack-serializable objects.
"""
from abc import ABCMeta, abstractmethod
import datetime
from enum import Enum
import hashlib
from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union
import attr
from attrs_strict import AttributeTypeError
import dateutil.parser
import iso8601
from typing_extensions import Final
from . import git_objects
from .collections import ImmutableDict
from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex
from .swhids import CoreSWHID
from .swhids import ExtendedObjectType as SwhidExtendedObjectType
from .swhids import ExtendedSWHID
from .swhids import ObjectType as SwhidObjectType
class MissingData(Exception):
"""Raised by `Content.with_data` when it has no way of fetching the
data (but not when fetching the data fails)."""
pass
KeyType = Union[Dict[str, str], Dict[str, bytes], bytes]
"""The type returned by BaseModel.unique_key()."""
SHA1_SIZE = 20
+_OFFSET_CHARS = frozenset(b"+-0123456789")
+
# TODO: Limit this to 20 bytes
Sha1Git = bytes
Sha1 = bytes
KT = TypeVar("KT")
VT = TypeVar("VT")
def hash_repr(h: bytes) -> str:
if h is None:
return "None"
else:
return f"hash_to_bytes('{hash_to_hex(h)}')"
def freeze_optional_dict(
d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore
) -> Optional[ImmutableDict[KT, VT]]:
if isinstance(d, dict):
return ImmutableDict(d)
else:
return d
def dictify(value):
"Helper function used by BaseModel.to_dict()"
if isinstance(value, BaseModel):
return value.to_dict()
elif isinstance(value, (CoreSWHID, ExtendedSWHID)):
return str(value)
elif isinstance(value, Enum):
return value.value
elif isinstance(value, (dict, ImmutableDict)):
return {k: dictify(v) for k, v in value.items()}
elif isinstance(value, tuple):
return tuple(dictify(v) for v in value)
else:
return value
def _check_type(type_, value):
if type_ is object or type_ is Any:
return True
origin = getattr(type_, "__origin__", None)
# Non-generic type, check it directly
if origin is None:
# This is functionally equivalent to using just this:
# return isinstance(value, type)
# but using type equality before isinstance allows very quick checks
# when the exact class is used (which is the overwhelming majority of cases)
# while still allowing subclasses to be used.
return type(value) == type_ or isinstance(value, type_)
# Check the type of the value itself
#
# For the same reason as above, this condition is functionally equivalent to:
# if origin is not Union and not isinstance(value, origin):
if origin is not Union and type(value) != origin and not isinstance(value, origin):
return False
# Then, if it's a container, check its items.
if origin is tuple:
args = type_.__args__
if len(args) == 2 and args[1] is Ellipsis:
# Infinite tuple
return all(_check_type(args[0], item) for item in value)
else:
# Finite tuple
if len(args) != len(value):
return False
return all(
_check_type(item_type, item) for (item_type, item) in zip(args, value)
)
elif origin is Union:
args = type_.__args__
return any(_check_type(variant, value) for variant in args)
elif origin is ImmutableDict:
(key_type, value_type) = type_.__args__
return all(
_check_type(key_type, key) and _check_type(value_type, value)
for (key, value) in value.items()
)
else:
# No need to check dict or list. because they are converted to ImmutableDict
# and tuple respectively.
raise NotImplementedError(f"Type-checking {type_}")
def type_validator():
"""Like attrs_strict.type_validator(), but stricter.
It is an attrs validator, which checks attributes have the specified type,
using type equality instead of ``isinstance()``, for improved performance
"""
def validator(instance, attribute, value):
if not _check_type(attribute.type, value):
raise AttributeTypeError(value, attribute)
return validator
ModelType = TypeVar("ModelType", bound="BaseModel")
class BaseModel:
"""Base class for SWH model classes.
Provides serialization/deserialization to/from Python dictionaries,
that are suitable for JSON/msgpack-like formats."""
__slots__ = ()
def to_dict(self):
"""Wrapper of `attr.asdict` that can be overridden by subclasses
that have special handling of some of the fields."""
return dictify(attr.asdict(self, recurse=False))
@classmethod
def from_dict(cls, d):
"""Takes a dictionary representing a tree of SWH objects, and
recursively builds the corresponding objects."""
return cls(**d)
def anonymize(self: ModelType) -> Optional[ModelType]:
"""Returns an anonymized version of the object, if needed.
If the object model does not need/support anonymization, returns None.
"""
return None
def unique_key(self) -> KeyType:
"""Returns a unique key for this object, that can be used for
deduplication."""
raise NotImplementedError(f"unique_key for {self}")
class HashableObject(metaclass=ABCMeta):
"""Mixin to automatically compute object identifier hash when
the associated model is instantiated."""
__slots__ = ()
id: Sha1Git
@abstractmethod
def compute_hash(self) -> bytes:
"""Derived model classes must implement this to compute
the object hash.
This method is called by the object initialization if the `id`
attribute is set to an empty value.
"""
pass
def __attrs_post_init__(self):
if not self.id:
obj_id = self.compute_hash()
object.__setattr__(self, "id", obj_id)
def unique_key(self) -> KeyType:
return self.id
@attr.s(frozen=True, slots=True)
class Person(BaseModel):
"""Represents the author/committer of a revision or release."""
object_type: Final = "person"
fullname = attr.ib(type=bytes, validator=type_validator())
name = attr.ib(type=Optional[bytes], validator=type_validator())
email = attr.ib(type=Optional[bytes], validator=type_validator())
@classmethod
def from_fullname(cls, fullname: bytes):
"""Returns a Person object, by guessing the name and email from the
fullname, in the `name ` format.
The fullname is left unchanged."""
if fullname is None:
raise TypeError("fullname is None.")
name: Optional[bytes]
email: Optional[bytes]
try:
open_bracket = fullname.index(b"<")
except ValueError:
name = fullname
email = None
else:
raw_name = fullname[:open_bracket]
raw_email = fullname[open_bracket + 1 :]
if not raw_name:
name = None
else:
name = raw_name.strip()
try:
close_bracket = raw_email.rindex(b">")
except ValueError:
email = raw_email
else:
email = raw_email[:close_bracket]
return Person(name=name or None, email=email or None, fullname=fullname,)
def anonymize(self) -> "Person":
"""Returns an anonymized version of the Person object.
Anonymization is simply a Person which fullname is the hashed, with unset name
or email.
"""
return Person(
fullname=hashlib.sha256(self.fullname).digest(), name=None, email=None,
)
@classmethod
def from_dict(cls, d):
"""
If the fullname is missing, construct a fullname
using the following heuristics: if the name value is None, we return the
email in angle brackets, else, we return the name, a space, and the email
in angle brackets.
"""
if "fullname" not in d:
parts = []
if d["name"] is not None:
parts.append(d["name"])
if d["email"] is not None:
parts.append(b"".join([b"<", d["email"], b">"]))
fullname = b" ".join(parts)
d = {**d, "fullname": fullname}
d = {"name": None, "email": None, **d}
return super().from_dict(d)
@attr.s(frozen=True, slots=True)
class Timestamp(BaseModel):
"""Represents a naive timestamp from a VCS."""
object_type: Final = "timestamp"
seconds = attr.ib(type=int, validator=type_validator())
microseconds = attr.ib(type=int, validator=type_validator())
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if not (-(2 ** 63) <= value < 2 ** 63):
raise ValueError("Seconds must be a signed 64-bits integer.")
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if not (0 <= value < 10 ** 6):
raise ValueError("Microseconds must be in [0, 1000000[.")
@attr.s(frozen=True, slots=True)
class TimestampWithTimezone(BaseModel):
"""Represents a TZ-aware timestamp from a VCS."""
object_type: Final = "timestamp_with_timezone"
timestamp = attr.ib(type=Timestamp, validator=type_validator())
offset = attr.ib(type=int, validator=type_validator())
negative_utc = attr.ib(type=bool, validator=type_validator())
+ offset_bytes = attr.ib(type=bytes, validator=type_validator())
+ """Raw git representation of the timezone, as an offset from UTC.
+ It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and
+ ``-0000``).
+
+ However, when created from git objects, it must be the exact bytes used in the
+ original objects, so it may differ from this format when they do.
+ """
+
@offset.validator
def check_offset(self, attribute, value):
"""Checks the offset is a 16-bits signed integer (in theory, it
should always be between -14 and +14 hours)."""
if not (-(2 ** 15) <= value < 2 ** 15):
# max 14 hours offset in theory, but you never know what
# you'll find in the wild...
raise ValueError("offset too large: %d minutes" % value)
+ self._check_offsets_match()
+
@negative_utc.validator
def check_negative_utc(self, attribute, value):
if self.offset and value:
raise ValueError("negative_utc can only be True is offset=0")
+ self._check_offsets_match()
+
+ @offset_bytes.default
+ def _default_offset_bytes(self):
+ negative = self.offset < 0 or self.negative_utc
+ (hours, minutes) = divmod(abs(self.offset), 60)
+ return f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode()
+
+ @offset_bytes.validator
+ def check_offset_bytes(self, attribute, value):
+ if not set(value) <= _OFFSET_CHARS:
+ raise ValueError(f"invalid characters in offset_bytes: {value!r}")
+
+ self._check_offsets_match()
+
+ def _check_offsets_match(self):
+ offset_str = self.offset_bytes.decode()
+ assert offset_str[0] in "+-"
+ sign = int(offset_str[0] + "1")
+ hours = int(offset_str[1:-2])
+ minutes = int(offset_str[-2:])
+ offset = sign * (hours * 60 + minutes)
+ if offset != self.offset:
+ raise ValueError(
+ f"offset_bytes ({self.offset_bytes!r}) does not match offset "
+ f"{divmod(self.offset, 60)}"
+ )
+
+ if offset == 0 and self.negative_utc != self.offset_bytes.startswith(b"-"):
+ raise ValueError(
+ f"offset_bytes ({self.offset_bytes!r}) does not match negative_utc "
+ f"({self.negative_utc})"
+ )
+
@classmethod
def from_dict(cls, time_representation: Union[Dict, datetime.datetime, int]):
"""Builds a TimestampWithTimezone from any of the formats
accepted by :func:`swh.model.normalize_timestamp`."""
# TODO: this accept way more types than just dicts; find a better
# name
negative_utc = False
if isinstance(time_representation, dict):
ts = time_representation["timestamp"]
if isinstance(ts, dict):
seconds = ts.get("seconds", 0)
microseconds = ts.get("microseconds", 0)
elif isinstance(ts, int):
seconds = ts
microseconds = 0
else:
raise ValueError(
f"TimestampWithTimezone.from_dict received non-integer timestamp "
f"member {ts!r}"
)
offset = time_representation["offset"]
if "negative_utc" in time_representation:
negative_utc = time_representation["negative_utc"]
if negative_utc is None:
negative_utc = False
elif isinstance(time_representation, datetime.datetime):
microseconds = time_representation.microsecond
if microseconds:
time_representation = time_representation.replace(microsecond=0)
seconds = int(time_representation.timestamp())
utcoffset = time_representation.utcoffset()
if utcoffset is None:
raise ValueError(
f"TimestampWithTimezone.from_dict received datetime without "
f"timezone: {time_representation}"
)
# utcoffset is an integer number of minutes
seconds_offset = utcoffset.total_seconds()
offset = int(seconds_offset) // 60
elif isinstance(time_representation, int):
seconds = time_representation
microseconds = 0
offset = 0
else:
raise ValueError(
f"TimestampWithTimezone.from_dict received non-integer timestamp: "
f"{time_representation!r}"
)
return cls(
timestamp=Timestamp(seconds=seconds, microseconds=microseconds),
offset=offset,
negative_utc=negative_utc,
)
@classmethod
def from_datetime(cls, dt: datetime.datetime):
return cls.from_dict(dt)
def to_datetime(self) -> datetime.datetime:
"""Convert to a datetime (with a timezone set to the recorded fixed UTC offset)
Beware that this conversion can be lossy: the negative_utc flag is not
taken into consideration (since it cannot be represented in a
datetime). Also note that it may fail due to type overflow.
"""
timestamp = datetime.datetime.fromtimestamp(
self.timestamp.seconds,
datetime.timezone(datetime.timedelta(minutes=self.offset)),
)
timestamp = timestamp.replace(microsecond=self.timestamp.microseconds)
return timestamp
@classmethod
def from_iso8601(cls, s):
"""Builds a TimestampWithTimezone from an ISO8601-formatted string.
"""
dt = iso8601.parse_date(s)
tstz = cls.from_datetime(dt)
if dt.tzname() == "-00:00":
- tstz = attr.evolve(tstz, negative_utc=True)
+ assert tstz.offset_bytes == b"+0000"
+ tstz = attr.evolve(tstz, negative_utc=True, offset_bytes=b"-0000")
return tstz
@attr.s(frozen=True, slots=True)
class Origin(HashableObject, BaseModel):
"""Represents a software source: a VCS and an URL."""
object_type: Final = "origin"
url = attr.ib(type=str, validator=type_validator())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
def unique_key(self) -> KeyType:
return {"url": self.url}
def compute_hash(self) -> bytes:
return hashlib.sha1(self.url.encode("utf-8")).digest()
def swhid(self) -> ExtendedSWHID:
"""Returns a SWHID representing this origin."""
return ExtendedSWHID(
object_type=SwhidExtendedObjectType.ORIGIN, object_id=self.id,
)
@attr.s(frozen=True, slots=True)
class OriginVisit(BaseModel):
"""Represents an origin visit with a given type at a given point in time, by a
SWH loader."""
object_type: Final = "origin_visit"
origin = attr.ib(type=str, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
type = attr.ib(type=str, validator=type_validator())
"""Should not be set before calling 'origin_visit_add()'."""
visit = attr.ib(type=Optional[int], validator=type_validator(), default=None)
@date.validator
def check_date(self, attribute, value):
"""Checks the date has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
def to_dict(self):
"""Serializes the date as a string and omits the visit id if it is
`None`."""
ov = super().to_dict()
if ov["visit"] is None:
del ov["visit"]
return ov
def unique_key(self) -> KeyType:
return {"origin": self.origin, "date": str(self.date)}
@attr.s(frozen=True, slots=True)
class OriginVisitStatus(BaseModel):
"""Represents a visit update of an origin at a given point in time.
"""
object_type: Final = "origin_visit_status"
origin = attr.ib(type=str, validator=type_validator())
visit = attr.ib(type=int, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(
["created", "ongoing", "full", "partial", "not_found", "failed"]
),
)
snapshot = attr.ib(
type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr
)
# Type is optional be to able to use it before adding it to the database model
type = attr.ib(type=Optional[str], validator=type_validator(), default=None)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=type_validator(),
converter=freeze_optional_dict,
default=None,
)
@date.validator
def check_date(self, attribute, value):
"""Checks the date has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
def unique_key(self) -> KeyType:
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)}
class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a
revision or an alias."""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
ALIAS = "alias"
def __repr__(self):
return f"TargetType.{self.name}"
class ObjectType(Enum):
"""The type of content pointed to by a release. Usually a revision"""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
def __repr__(self):
return f"ObjectType.{self.name}"
@attr.s(frozen=True, slots=True)
class SnapshotBranch(BaseModel):
"""Represents one of the branches of a snapshot."""
object_type: Final = "snapshot_branch"
target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
target_type = attr.ib(type=TargetType, validator=type_validator())
@target.validator
def check_target(self, attribute, value):
"""Checks the target type is not an alias, checks the target is a
valid sha1_git."""
if self.target_type != TargetType.ALIAS and self.target is not None:
if len(value) != 20:
raise ValueError("Wrong length for bytes identifier: %d" % len(value))
@classmethod
def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"]))
@attr.s(frozen=True, slots=True)
class Snapshot(HashableObject, BaseModel):
"""Represents the full state of an origin at a given point in time."""
object_type: Final = "snapshot"
branches = attr.ib(
type=ImmutableDict[bytes, Optional[SnapshotBranch]],
validator=type_validator(),
converter=freeze_optional_dict,
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.snapshot_git_object(self)
return hashlib.new("sha1", git_object).digest()
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
branches=ImmutableDict(
(name, SnapshotBranch.from_dict(branch) if branch else None)
for (name, branch) in d.pop("branches").items()
),
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id)
@attr.s(frozen=True, slots=True)
class Release(HashableObject, BaseModel):
object_type: Final = "release"
name = attr.ib(type=bytes, validator=type_validator())
message = attr.ib(type=Optional[bytes], validator=type_validator())
target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr)
target_type = attr.ib(type=ObjectType, validator=type_validator())
synthetic = attr.ib(type=bool, validator=type_validator())
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None)
date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None
)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=type_validator(),
converter=freeze_optional_dict,
default=None,
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.release_git_object(self)
return hashlib.new("sha1", git_object).digest()
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError("release date must be None if author is None.")
def to_dict(self):
rel = super().to_dict()
if rel["metadata"] is None:
del rel["metadata"]
return rel
@classmethod
def from_dict(cls, d):
d = d.copy()
if d.get("author"):
d["author"] = Person.from_dict(d["author"])
if d.get("date"):
d["date"] = TimestampWithTimezone.from_dict(d["date"])
return cls(target_type=ObjectType(d.pop("target_type")), **d)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id)
def anonymize(self) -> "Release":
"""Returns an anonymized version of the Release object.
Anonymization consists in replacing the author with an anonymized Person object.
"""
author = self.author and self.author.anonymize()
return attr.evolve(self, author=author)
class RevisionType(Enum):
GIT = "git"
TAR = "tar"
DSC = "dsc"
SUBVERSION = "svn"
MERCURIAL = "hg"
CVS = "cvs"
BAZAAR = "bzr"
def __repr__(self):
return f"RevisionType.{self.name}"
def tuplify_extra_headers(value: Iterable):
return tuple((k, v) for k, v in value)
@attr.s(frozen=True, slots=True)
class Revision(HashableObject, BaseModel):
object_type: Final = "revision"
message = attr.ib(type=Optional[bytes], validator=type_validator())
author = attr.ib(type=Person, validator=type_validator())
committer = attr.ib(type=Person, validator=type_validator())
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator())
committer_date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator()
)
type = attr.ib(type=RevisionType, validator=type_validator())
directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr)
synthetic = attr.ib(type=bool, validator=type_validator())
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=type_validator(),
converter=freeze_optional_dict,
default=None,
)
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
extra_headers = attr.ib(
type=Tuple[Tuple[bytes, bytes], ...],
validator=type_validator(),
converter=tuplify_extra_headers,
default=(),
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# ensure metadata is a deep copy of whatever was given, and if needed
# extract extra_headers from there
if self.metadata:
metadata = self.metadata
if not self.extra_headers and "extra_headers" in metadata:
(extra_headers, metadata) = metadata.copy_pop("extra_headers")
object.__setattr__(
self, "extra_headers", tuplify_extra_headers(extra_headers),
)
attr.validate(self)
object.__setattr__(self, "metadata", metadata)
def compute_hash(self) -> bytes:
git_object = git_objects.revision_git_object(self)
return hashlib.new("sha1", git_object).digest()
@classmethod
def from_dict(cls, d):
d = d.copy()
date = d.pop("date")
if date:
date = TimestampWithTimezone.from_dict(date)
committer_date = d.pop("committer_date")
if committer_date:
committer_date = TimestampWithTimezone.from_dict(committer_date)
return cls(
author=Person.from_dict(d.pop("author")),
committer=Person.from_dict(d.pop("committer")),
date=date,
committer_date=committer_date,
type=RevisionType(d.pop("type")),
parents=tuple(d.pop("parents")), # for BW compat
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id)
def anonymize(self) -> "Revision":
"""Returns an anonymized version of the Revision object.
Anonymization consists in replacing the author and committer with an anonymized
Person object.
"""
return attr.evolve(
self, author=self.author.anonymize(), committer=self.committer.anonymize()
)
@attr.s(frozen=True, slots=True)
class DirectoryEntry(BaseModel):
object_type: Final = "directory_entry"
name = attr.ib(type=bytes, validator=type_validator())
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"]))
target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr)
perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
@name.validator
def check_name(self, attribute, value):
if b"/" in value:
raise ValueError("{value!r} is not a valid directory entry name.")
@attr.s(frozen=True, slots=True)
class Directory(HashableObject, BaseModel):
object_type: Final = "directory"
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.directory_git_object(self)
return hashlib.new("sha1", git_object).digest()
@entries.validator
def check_entries(self, attribute, value):
seen = set()
for entry in value:
if entry.name in seen:
raise ValueError(
"{self.swhid()} has duplicated entry name: {entry.name!r}"
)
seen.add(entry.name)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
entries=tuple(
DirectoryEntry.from_dict(entry) for entry in d.pop("entries")
),
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=self.id)
@attr.s(frozen=True, slots=True)
class BaseContent(BaseModel):
status = attr.ib(
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"])
)
@staticmethod
def _hash_data(data: bytes):
"""Hash some data, returning most of the fields of a content object"""
d = MultiHash.from_data(data).digest()
d["data"] = data
d["length"] = len(data)
return d
@classmethod
def from_dict(cls, d, use_subclass=True):
if use_subclass:
# Chooses a subclass to instantiate instead.
if d["status"] == "absent":
return SkippedContent.from_dict(d)
else:
return Content.from_dict(d)
else:
return super().from_dict(d)
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError("{} is not a valid hash name.".format(hash_name))
return getattr(self, hash_name)
def hashes(self) -> Dict[str, bytes]:
"""Returns a dictionary {hash_name: hash_value}"""
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS}
@attr.s(frozen=True, slots=True)
class Content(BaseContent):
object_type: Final = "content"
sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr)
sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
length = attr.ib(type=int, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)
data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if value < 0:
raise ValueError("Length must be positive.")
@ctime.validator
def check_ctime(self, attribute, value):
"""Checks the ctime has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("ctime must be a timezone-aware datetime.")
def to_dict(self):
content = super().to_dict()
if content["data"] is None:
del content["data"]
if content["ctime"] is None:
del content["ctime"]
return content
@classmethod
def from_data(cls, data, status="visible", ctime=None) -> "Content":
"""Generate a Content from a given `data` byte string.
This populates the Content with the hashes and length for the data
passed as argument, as well as the data itself.
"""
d = cls._hash_data(data)
d["status"] = status
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
if isinstance(d.get("ctime"), str):
d = d.copy()
d["ctime"] = dateutil.parser.parse(d["ctime"])
return super().from_dict(d, use_subclass=False)
def with_data(self) -> "Content":
"""Loads the `data` attribute; meaning that it is guaranteed not to
be None after this call.
This call is almost a no-op, but subclasses may overload this method
to lazy-load data (eg. from disk or objstorage)."""
if self.data is None:
raise MissingData("Content data is None.")
return self
def unique_key(self) -> KeyType:
return self.sha1 # TODO: use a dict of hashes
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git)
@attr.s(frozen=True, slots=True)
class SkippedContent(BaseContent):
object_type: Final = "skipped_content"
sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr)
sha1_git = attr.ib(
type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr
)
sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr)
blake2s256 = attr.ib(
type=Optional[bytes], validator=type_validator(), repr=hash_repr
)
length = attr.ib(type=Optional[int], validator=type_validator())
status = attr.ib(type=str, validator=attr.validators.in_(["absent"]))
reason = attr.ib(type=Optional[str], validator=type_validator(), default=None)
origin = attr.ib(type=Optional[str], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if value is None:
raise ValueError("Must provide a reason if content is absent.")
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive or -1."""
if value < -1:
raise ValueError("Length must be positive or -1.")
@ctime.validator
def check_ctime(self, attribute, value):
"""Checks the ctime has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("ctime must be a timezone-aware datetime.")
def to_dict(self):
content = super().to_dict()
if content["origin"] is None:
del content["origin"]
if content["ctime"] is None:
del content["ctime"]
return content
@classmethod
def from_data(
cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None
) -> "SkippedContent":
"""Generate a SkippedContent from a given `data` byte string.
This populates the SkippedContent with the hashes and length for the
data passed as argument.
You can use `attr.evolve` on such a generated content to nullify some
of its attributes, e.g. for tests.
"""
d = cls._hash_data(data)
del d["data"]
d["status"] = "absent"
d["reason"] = reason
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
d2 = d.copy()
if d2.pop("data", None) is not None:
raise ValueError('SkippedContent has no "data" attribute %r' % d)
return super().from_dict(d2, use_subclass=False)
def unique_key(self) -> KeyType:
return self.hashes()
class MetadataAuthorityType(Enum):
DEPOSIT_CLIENT = "deposit_client"
FORGE = "forge"
REGISTRY = "registry"
def __repr__(self):
return f"MetadataAuthorityType.{self.name}"
@attr.s(frozen=True, slots=True)
class MetadataAuthority(BaseModel):
"""Represents an entity that provides metadata about an origin or
software artifact."""
object_type: Final = "metadata_authority"
type = attr.ib(type=MetadataAuthorityType, validator=type_validator())
url = attr.ib(type=str, validator=type_validator())
metadata = attr.ib(
type=Optional[ImmutableDict[str, Any]],
default=None,
validator=type_validator(),
converter=freeze_optional_dict,
)
def to_dict(self):
d = super().to_dict()
if d["metadata"] is None:
del d["metadata"]
return d
@classmethod
def from_dict(cls, d):
d = {
**d,
"type": MetadataAuthorityType(d["type"]),
}
return super().from_dict(d)
def unique_key(self) -> KeyType:
return {"type": self.type.value, "url": self.url}
@attr.s(frozen=True, slots=True)
class MetadataFetcher(BaseModel):
"""Represents a software component used to fetch metadata from a metadata
authority, and ingest them into the Software Heritage archive."""
object_type: Final = "metadata_fetcher"
name = attr.ib(type=str, validator=type_validator())
version = attr.ib(type=str, validator=type_validator())
metadata = attr.ib(
type=Optional[ImmutableDict[str, Any]],
default=None,
validator=type_validator(),
converter=freeze_optional_dict,
)
def to_dict(self):
d = super().to_dict()
if d["metadata"] is None:
del d["metadata"]
return d
def unique_key(self) -> KeyType:
return {"name": self.name, "version": self.version}
def normalize_discovery_date(value: Any) -> datetime.datetime:
if not isinstance(value, datetime.datetime):
raise TypeError("discovery_date must be a timezone-aware datetime.")
if value.tzinfo is None:
raise ValueError("discovery_date must be a timezone-aware datetime.")
# Normalize timezone to utc, and truncate microseconds to 0
return value.astimezone(datetime.timezone.utc).replace(microsecond=0)
@attr.s(frozen=True, slots=True)
class RawExtrinsicMetadata(HashableObject, BaseModel):
object_type: Final = "raw_extrinsic_metadata"
# target object
target = attr.ib(type=ExtendedSWHID, validator=type_validator())
# source
discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date)
authority = attr.ib(type=MetadataAuthority, validator=type_validator())
fetcher = attr.ib(type=MetadataFetcher, validator=type_validator())
# the metadata itself
format = attr.ib(type=str, validator=type_validator())
metadata = attr.ib(type=bytes, validator=type_validator())
# context
origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
visit = attr.ib(type=Optional[int], default=None, validator=type_validator())
snapshot = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
release = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
revision = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator())
directory = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.raw_extrinsic_metadata_git_object(self)
return hashlib.new("sha1", git_object).digest()
@origin.validator
def check_origin(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.SNAPSHOT,
SwhidExtendedObjectType.RELEASE,
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'origin' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.startswith("swh:"):
# Technically this is valid; but:
# 1. SWHIDs are URIs, not URLs
# 2. if a SWHID gets here, it's very likely to be a mistake
# (and we can remove this check if it turns out there is a
# legitimate use for it).
raise ValueError(f"SWHID used as context origin URL: {value}")
@visit.validator
def check_visit(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.SNAPSHOT,
SwhidExtendedObjectType.RELEASE,
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'visit' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if self.origin is None:
raise ValueError("'origin' context must be set if 'visit' is.")
if value <= 0:
raise ValueError("Nonpositive visit id")
@snapshot.validator
def check_snapshot(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.RELEASE,
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'snapshot' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.SNAPSHOT, value)
@release.validator
def check_release(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'release' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.RELEASE, value)
@revision.validator
def check_revision(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'revision' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.REVISION, value)
@path.validator
def check_path(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'path' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
@directory.validator
def check_directory(self, attribute, value):
if value is None:
return
if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,):
raise ValueError(
f"Unexpected 'directory' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.DIRECTORY, value)
def _check_swhid(self, expected_object_type, swhid):
if isinstance(swhid, str):
raise ValueError(f"Expected SWHID, got a string: {swhid}")
if swhid.object_type != expected_object_type:
raise ValueError(
f"Expected SWHID type '{expected_object_type.name.lower()}', "
f"got '{swhid.object_type.name.lower()}' in {swhid}"
)
def to_dict(self):
d = super().to_dict()
context_keys = (
"origin",
"visit",
"snapshot",
"release",
"revision",
"directory",
"path",
)
for context_key in context_keys:
if d[context_key] is None:
del d[context_key]
return d
@classmethod
def from_dict(cls, d):
d = {
**d,
"target": ExtendedSWHID.from_string(d["target"]),
"authority": MetadataAuthority.from_dict(d["authority"]),
"fetcher": MetadataFetcher.from_dict(d["fetcher"]),
}
swhid_keys = ("snapshot", "release", "revision", "directory")
for swhid_key in swhid_keys:
if d.get(swhid_key):
d[swhid_key] = CoreSWHID.from_string(d[swhid_key])
return super().from_dict(d)
def swhid(self) -> ExtendedSWHID:
"""Returns a SWHID representing this RawExtrinsicMetadata object."""
return ExtendedSWHID(
object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA,
object_id=self.id,
)
@attr.s(frozen=True, slots=True)
class ExtID(HashableObject, BaseModel):
object_type: Final = "extid"
extid_type = attr.ib(type=str, validator=type_validator())
extid = attr.ib(type=bytes, validator=type_validator())
target = attr.ib(type=CoreSWHID, validator=type_validator())
extid_version = attr.ib(type=int, validator=type_validator(), default=0)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
@classmethod
def from_dict(cls, d):
return cls(
extid=d["extid"],
extid_type=d["extid_type"],
target=CoreSWHID.from_string(d["target"]),
extid_version=d.get("extid_version", 0),
)
def compute_hash(self) -> bytes:
git_object = git_objects.extid_git_object(self)
return hashlib.new("sha1", git_object).digest()
diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py
index 188c584..2501e34 100644
--- a/swh/model/tests/test_identifiers.py
+++ b/swh/model/tests/test_identifiers.py
@@ -1,1195 +1,1210 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
from typing import Dict
import unittest
import pytest
from swh.model import git_objects, hashutil
from swh.model.hashutil import hash_to_bytes as _x
from swh.model.model import (
Content,
Directory,
ExtID,
Origin,
RawExtrinsicMetadata,
Release,
Revision,
Snapshot,
TimestampWithTimezone,
)
def remove_id(d: Dict) -> Dict:
"""Returns a (shallow) copy of a dict with the 'id' key removed."""
d = d.copy()
if "id" in d:
del d["id"]
return d
class UtilityFunctionsDateOffset(unittest.TestCase):
def setUp(self):
self.dates = {
b"1448210036": {"seconds": 1448210036, "microseconds": 0,},
b"1448210036.002342": {"seconds": 1448210036, "microseconds": 2342,},
b"1448210036.12": {"seconds": 1448210036, "microseconds": 120000,},
}
self.offsets = {
0: b"+0000",
-630: b"-1030",
800: b"+1320",
}
def test_format_date(self):
for date_repr, date in self.dates.items():
self.assertEqual(git_objects.format_date(date), date_repr)
def test_format_offset(self):
for offset, res in self.offsets.items():
self.assertEqual(git_objects.format_offset(offset), res)
content_example = {
"status": "visible",
"length": 5,
"data": b"1984\n",
"ctime": datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc),
}
class ContentIdentifier(unittest.TestCase):
def setUp(self):
self.content_id = hashutil.MultiHash.from_data(content_example["data"]).digest()
def test_content_identifier(self):
self.assertEqual(
Content.from_data(content_example["data"]).hashes(), self.content_id
)
directory_example = {
"id": _x("d7ed3d2c31d608823be58b1cbe57605310615231"),
"entries": [
{
"type": "file",
"perms": 33188,
"name": b"README",
"target": _x("37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"),
},
{
"type": "file",
"perms": 33188,
"name": b"Rakefile",
"target": _x("3bb0e8592a41ae3185ee32266c860714980dbed7"),
},
{
"type": "dir",
"perms": 16384,
"name": b"app",
"target": _x("61e6e867f5d7ba3b40540869bc050b0c4fed9e95"),
},
{
"type": "file",
"perms": 33188,
"name": b"1.megabyte",
"target": _x("7c2b2fbdd57d6765cdc9d84c2d7d333f11be7fb3"),
},
{
"type": "dir",
"perms": 16384,
"name": b"config",
"target": _x("591dfe784a2e9ccc63aaba1cb68a765734310d98"),
},
{
"type": "dir",
"perms": 16384,
"name": b"public",
"target": _x("9588bf4522c2b4648bfd1c61d175d1f88c1ad4a5"),
},
{
"type": "file",
"perms": 33188,
"name": b"development.sqlite3",
"target": _x("e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"),
},
{
"type": "dir",
"perms": 16384,
"name": b"doc",
"target": _x("154705c6aa1c8ead8c99c7915373e3c44012057f"),
},
{
"type": "dir",
"perms": 16384,
"name": b"db",
"target": _x("85f157bdc39356b7bc7de9d0099b4ced8b3b382c"),
},
{
"type": "dir",
"perms": 16384,
"name": b"log",
"target": _x("5e3d3941c51cce73352dff89c805a304ba96fffe"),
},
{
"type": "dir",
"perms": 16384,
"name": b"script",
"target": _x("1b278423caf176da3f3533592012502aa10f566c"),
},
{
"type": "dir",
"perms": 16384,
"name": b"test",
"target": _x("035f0437c080bfd8711670b3e8677e686c69c763"),
},
{
"type": "dir",
"perms": 16384,
"name": b"vendor",
"target": _x("7c0dc9ad978c1af3f9a4ce061e50f5918bd27138"),
},
{
"type": "rev",
"perms": 57344,
"name": b"will_paginate",
"target": _x("3d531e169db92a16a9a8974f0ae6edf52e52659e"),
},
# in git order, the dir named "order" should be between the files
# named "order." and "order0"
{
"type": "dir",
"perms": 16384,
"name": b"order",
"target": _x("62cdb7020ff920e5aa642c3d4066950dd1f01f4d"),
},
{
"type": "file",
"perms": 16384,
"name": b"order.",
"target": _x("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"),
},
{
"type": "file",
"perms": 16384,
"name": b"order0",
"target": _x("bbe960a25ea311d21d40669e93df2003ba9b90a2"),
},
],
}
class DirectoryIdentifier(unittest.TestCase):
def setUp(self):
self.directory = directory_example
self.empty_directory = {
"id": "4b825dc642cb6eb9a060e54bf8d69288fbee4904",
"entries": [],
}
def test_dir_identifier(self):
self.assertEqual(Directory.from_dict(self.directory).id, self.directory["id"])
self.assertEqual(
Directory.from_dict(remove_id(self.directory)).id, self.directory["id"],
)
def test_dir_identifier_entry_order(self):
# Reverse order of entries, check the id is still the same.
directory = {"entries": reversed(self.directory["entries"])}
self.assertEqual(
Directory.from_dict(remove_id(directory)).id, self.directory["id"],
)
def test_dir_identifier_empty_directory(self):
self.assertEqual(
Directory.from_dict(remove_id(self.empty_directory)).id,
_x(self.empty_directory["id"]),
)
linus_tz = datetime.timezone(datetime.timedelta(minutes=-420))
revision_example = {
"id": _x("bc0195aad0daa2ad5b0d76cce22b167bc3435590"),
"directory": _x("85a74718d377195e1efd0843ba4f3260bad4fe07"),
"parents": [_x("01e2d0627a9a6edb24c37db45db5ecb31e9de808")],
"author": {
"name": b"Linus Torvalds",
"email": b"torvalds@linux-foundation.org",
"fullname": b"Linus Torvalds ",
},
"date": datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz),
"committer": {
"name": b"Linus Torvalds",
"email": b"torvalds@linux-foundation.org",
"fullname": b"Linus Torvalds ",
},
"committer_date": datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz),
"message": b"Linux 4.2-rc2\n",
"type": "git",
"synthetic": False,
}
class RevisionIdentifier(unittest.TestCase):
def setUp(self):
gpgsig = b"""\
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.13 (Darwin)
iQIcBAABAgAGBQJVJcYsAAoJEBiY3kIkQRNJVAUQAJ8/XQIfMqqC5oYeEFfHOPYZ
L7qy46bXHVBa9Qd8zAJ2Dou3IbI2ZoF6/Et89K/UggOycMlt5FKV/9toWyuZv4Po
L682wonoxX99qvVTHo6+wtnmYO7+G0f82h+qHMErxjP+I6gzRNBvRr+SfY7VlGdK
wikMKOMWC5smrScSHITnOq1Ews5pe3N7qDYMzK0XVZmgDoaem4RSWMJs4My/qVLN
e0CqYWq2A22GX7sXl6pjneJYQvcAXUX+CAzp24QnPSb+Q22Guj91TcxLFcHCTDdn
qgqMsEyMiisoglwrCbO+D+1xq9mjN9tNFWP66SQ48mrrHYTBV5sz9eJyDfroJaLP
CWgbDTgq6GzRMehHT3hXfYS5NNatjnhkNISXR7pnVP/obIi/vpWh5ll6Gd8q26z+
a/O41UzOaLTeNI365MWT4/cnXohVLRG7iVJbAbCxoQmEgsYMRc/pBAzWJtLfcB2G
jdTswYL6+MUdL8sB9pZ82D+BP/YAdHe69CyTu1lk9RT2pYtI/kkfjHubXBCYEJSG
+VGllBbYG6idQJpyrOYNRJyrDi9yvDJ2W+S0iQrlZrxzGBVGTB/y65S8C+2WTBcE
lf1Qb5GDsQrZWgD+jtWTywOYHtCBwyCKSAXxSARMbNPeak9WPlcW/Jmu+fUcMe2x
dg1KdHOa34shrKDaOVzW
=od6m
-----END PGP SIGNATURE-----"""
self.revision = revision_example
self.revision_none_metadata = {
"id": _x("bc0195aad0daa2ad5b0d76cce22b167bc3435590"),
"directory": _x("85a74718d377195e1efd0843ba4f3260bad4fe07"),
"parents": [_x("01e2d0627a9a6edb24c37db45db5ecb31e9de808")],
"author": {
"name": b"Linus Torvalds",
"email": b"torvalds@linux-foundation.org",
},
"date": datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz),
"committer": {
"name": b"Linus Torvalds",
"email": b"torvalds@linux-foundation.org",
},
"committer_date": datetime.datetime(
2015, 7, 12, 15, 10, 30, tzinfo=linus_tz
),
"message": b"Linux 4.2-rc2\n",
"type": "git",
"synthetic": False,
"metadata": None,
}
self.synthetic_revision = {
"id": _x("b2a7e1260492e344fab3cbf91bc13c91e05426fd"),
"author": {
"name": b"Software Heritage",
"email": b"robot@softwareheritage.org",
},
"date": {
"timestamp": {"seconds": 1437047495},
"offset": 0,
"negative_utc": False,
},
"type": "tar",
"committer": {
"name": b"Software Heritage",
"email": b"robot@softwareheritage.org",
},
"committer_date": 1437047495,
"synthetic": True,
"parents": [],
"message": b"synthetic revision message\n",
"directory": _x("d11f00a6a0fea6055341d25584b5a96516c0d2b8"),
"metadata": {
"original_artifact": [
{
"archive_type": "tar",
"name": "gcc-5.2.0.tar.bz2",
"sha1_git": "39d281aff934d44b439730057e55b055e206a586",
"sha1": "fe3f5390949d47054b613edc36c557eb1d51c18e",
"sha256": "5f835b04b5f7dd4f4d2dc96190ec1621b8d89f"
"2dc6f638f9f8bc1b1014ba8cad",
}
]
},
}
# cat commit.txt | git hash-object -t commit --stdin
self.revision_with_extra_headers = {
"id": _x("010d34f384fa99d047cdd5e2f41e56e5c2feee45"),
"directory": _x("85a74718d377195e1efd0843ba4f3260bad4fe07"),
"parents": [_x("01e2d0627a9a6edb24c37db45db5ecb31e9de808")],
"author": {
"name": b"Linus Torvalds",
"email": b"torvalds@linux-foundation.org",
"fullname": b"Linus Torvalds ",
},
"date": datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz),
"committer": {
"name": b"Linus Torvalds",
"email": b"torvalds@linux-foundation.org",
"fullname": b"Linus Torvalds ",
},
"committer_date": datetime.datetime(
2015, 7, 12, 15, 10, 30, tzinfo=linus_tz
),
"message": b"Linux 4.2-rc2\n",
"type": "git",
"synthetic": False,
"extra_headers": (
(b"svn-repo-uuid", b"046f1af7-66c2-d61b-5410-ce57b7db7bff"),
(b"svn-revision", b"10"),
),
}
self.revision_with_gpgsig = {
"id": _x("44cc742a8ca17b9c279be4cc195a93a6ef7a320e"),
"directory": _x("b134f9b7dc434f593c0bab696345548b37de0558"),
"parents": [
_x("689664ae944b4692724f13b709a4e4de28b54e57"),
_x("c888305e1efbaa252d01b4e5e6b778f865a97514"),
],
"author": {
"name": b"Jiang Xin",
"email": b"worldhello.net@gmail.com",
"fullname": b"Jiang Xin ",
},
"date": {"timestamp": 1428538899, "offset": 480,},
"committer": {"name": b"Jiang Xin", "email": b"worldhello.net@gmail.com",},
"committer_date": {"timestamp": 1428538899, "offset": 480,},
"extra_headers": ((b"gpgsig", gpgsig),),
"message": b"""Merge branch 'master' of git://github.com/alexhenrie/git-po
* 'master' of git://github.com/alexhenrie/git-po:
l10n: ca.po: update translation
""",
"type": "git",
"synthetic": False,
}
self.revision_no_message = {
"id": _x("4cfc623c9238fa92c832beed000ce2d003fd8333"),
"directory": _x("b134f9b7dc434f593c0bab696345548b37de0558"),
"parents": [
_x("689664ae944b4692724f13b709a4e4de28b54e57"),
_x("c888305e1efbaa252d01b4e5e6b778f865a97514"),
],
"author": {
"name": b"Jiang Xin",
"email": b"worldhello.net@gmail.com",
"fullname": b"Jiang Xin ",
},
"date": {"timestamp": 1428538899, "offset": 480,},
"committer": {"name": b"Jiang Xin", "email": b"worldhello.net@gmail.com",},
"committer_date": {"timestamp": 1428538899, "offset": 480,},
"message": None,
"type": "git",
"synthetic": False,
}
self.revision_empty_message = {
"id": _x("7442cd78bd3b4966921d6a7f7447417b7acb15eb"),
"directory": _x("b134f9b7dc434f593c0bab696345548b37de0558"),
"parents": [
_x("689664ae944b4692724f13b709a4e4de28b54e57"),
_x("c888305e1efbaa252d01b4e5e6b778f865a97514"),
],
"author": {
"name": b"Jiang Xin",
"email": b"worldhello.net@gmail.com",
"fullname": b"Jiang Xin ",
},
"date": {"timestamp": 1428538899, "offset": 480,},
"committer": {"name": b"Jiang Xin", "email": b"worldhello.net@gmail.com",},
"committer_date": {"timestamp": 1428538899, "offset": 480,},
"message": b"",
"type": "git",
"synthetic": False,
}
self.revision_only_fullname = {
"id": _x("010d34f384fa99d047cdd5e2f41e56e5c2feee45"),
"directory": _x("85a74718d377195e1efd0843ba4f3260bad4fe07"),
"parents": [_x("01e2d0627a9a6edb24c37db45db5ecb31e9de808")],
"author": {"fullname": b"Linus Torvalds ",},
"date": datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz),
"committer": {
"fullname": b"Linus Torvalds ",
},
"committer_date": datetime.datetime(
2015, 7, 12, 15, 10, 30, tzinfo=linus_tz
),
"message": b"Linux 4.2-rc2\n",
"type": "git",
"synthetic": False,
"extra_headers": (
(b"svn-repo-uuid", b"046f1af7-66c2-d61b-5410-ce57b7db7bff"),
(b"svn-revision", b"10"),
),
}
def test_revision_identifier(self):
self.assertEqual(
Revision.from_dict(self.revision).id, self.revision["id"],
)
self.assertEqual(
Revision.from_dict(remove_id(self.revision)).id, self.revision["id"],
)
def test_revision_identifier_none_metadata(self):
self.assertEqual(
Revision.from_dict(remove_id(self.revision_none_metadata)).id,
self.revision_none_metadata["id"],
)
def test_revision_identifier_synthetic(self):
self.assertEqual(
Revision.from_dict(remove_id(self.synthetic_revision)).id,
self.synthetic_revision["id"],
)
def test_revision_identifier_with_extra_headers(self):
self.assertEqual(
Revision.from_dict(remove_id(self.revision_with_extra_headers)).id,
self.revision_with_extra_headers["id"],
)
def test_revision_identifier_with_gpgsig(self):
self.assertEqual(
Revision.from_dict(remove_id(self.revision_with_gpgsig)).id,
self.revision_with_gpgsig["id"],
)
def test_revision_identifier_no_message(self):
self.assertEqual(
Revision.from_dict(remove_id(self.revision_no_message)).id,
self.revision_no_message["id"],
)
def test_revision_identifier_empty_message(self):
self.assertEqual(
Revision.from_dict(remove_id(self.revision_empty_message)).id,
self.revision_empty_message["id"],
)
def test_revision_identifier_only_fullname(self):
self.assertEqual(
Revision.from_dict(remove_id(self.revision_only_fullname)).id,
self.revision_only_fullname["id"],
)
release_example = {
"id": _x("2b10839e32c4c476e9d94492756bb1a3e1ec4aa8"),
"target": _x("741b2252a5e14d6c60a913c77a6099abe73a854a"),
"target_type": "revision",
"name": b"v2.6.14",
"author": {
"name": b"Linus Torvalds",
"email": b"torvalds@g5.osdl.org",
"fullname": b"Linus Torvalds ",
},
"date": datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz),
"message": b"""\
Linux 2.6.14 release
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.1 (GNU/Linux)
iD8DBQBDYWq6F3YsRnbiHLsRAmaeAJ9RCez0y8rOBbhSv344h86l/VVcugCeIhO1
wdLOnvj91G4wxYqrvThthbE=
=7VeT
-----END PGP SIGNATURE-----
""",
"synthetic": False,
}
class ReleaseIdentifier(unittest.TestCase):
def setUp(self):
linus_tz = datetime.timezone(datetime.timedelta(minutes=-420))
self.release = release_example
self.release_no_author = {
"id": _x("26791a8bcf0e6d33f43aef7682bdb555236d56de"),
"target": _x("9ee1c939d1cb936b1f98e8d81aeffab57bae46ab"),
"target_type": "revision",
"name": b"v2.6.12",
"message": b"""\
This is the final 2.6.12 release
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.4 (GNU/Linux)
iD8DBQBCsykyF3YsRnbiHLsRAvPNAJ482tCZwuxp/bJRz7Q98MHlN83TpACdHr37
o6X/3T+vm8K3bf3driRr34c=
=sBHn
-----END PGP SIGNATURE-----
""",
"synthetic": False,
}
self.release_no_message = {
"id": _x("b6f4f446715f7d9543ef54e41b62982f0db40045"),
"target": _x("9ee1c939d1cb936b1f98e8d81aeffab57bae46ab"),
"target_type": "revision",
"name": b"v2.6.12",
"author": {"name": b"Linus Torvalds", "email": b"torvalds@g5.osdl.org",},
"date": datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz),
"message": None,
"synthetic": False,
}
self.release_empty_message = {
"id": _x("71a0aea72444d396575dc25ac37fec87ee3c6492"),
"target": _x("9ee1c939d1cb936b1f98e8d81aeffab57bae46ab"),
"target_type": "revision",
"name": b"v2.6.12",
"author": {"name": b"Linus Torvalds", "email": b"torvalds@g5.osdl.org",},
"date": datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz),
"message": b"",
"synthetic": False,
}
self.release_negative_utc = {
"id": _x("97c8d2573a001f88e72d75f596cf86b12b82fd01"),
"name": b"20081029",
"target": _x("54e9abca4c77421e2921f5f156c9fe4a9f7441c7"),
"target_type": "revision",
"date": {
"timestamp": {"seconds": 1225281976},
"offset": 0,
"negative_utc": True,
},
"author": {"name": b"Otavio Salvador", "email": b"otavio@debian.org",},
"synthetic": False,
"message": b"tagging version 20081029\n\nr56558\n",
}
self.release_newline_in_author = {
"author": {
"email": b"esycat@gmail.com",
"fullname": b"Eugene Janusov\n",
"name": b"Eugene Janusov\n",
},
"date": {
"negative_utc": None,
"offset": 600,
"timestamp": {"microseconds": 0, "seconds": 1377480558,},
},
"id": _x("5c98f559d034162de22d3ebeb95433e6f8885231"),
"message": b"Release of v0.3.2.",
"name": b"0.3.2",
"synthetic": False,
"target": _x("c06aa3d93b78a2865c4935170030f8c2d7396fd3"),
"target_type": "revision",
}
self.release_snapshot_target = dict(self.release)
self.release_snapshot_target["target_type"] = "snapshot"
self.release_snapshot_target["id"] = _x(
"c29c3ddcc6769a04e54dd69d63a6fdcbc566f850"
)
def test_release_identifier(self):
self.assertEqual(
Release.from_dict(self.release).id, self.release["id"],
)
self.assertEqual(
Release.from_dict(remove_id(self.release)).id, self.release["id"],
)
def test_release_identifier_no_author(self):
self.assertEqual(
Release.from_dict(remove_id(self.release_no_author)).id,
self.release_no_author["id"],
)
def test_release_identifier_no_message(self):
self.assertEqual(
Release.from_dict(remove_id(self.release_no_message)).id,
self.release_no_message["id"],
)
def test_release_identifier_empty_message(self):
self.assertEqual(
Release.from_dict(remove_id(self.release_empty_message)).id,
self.release_empty_message["id"],
)
def test_release_identifier_negative_utc(self):
self.assertEqual(
Release.from_dict(remove_id(self.release_negative_utc)).id,
self.release_negative_utc["id"],
)
def test_release_identifier_newline_in_author(self):
self.assertEqual(
Release.from_dict(remove_id(self.release_newline_in_author)).id,
self.release_newline_in_author["id"],
)
def test_release_identifier_snapshot_target(self):
self.assertEqual(
Release.from_dict(self.release_snapshot_target).id,
self.release_snapshot_target["id"],
)
snapshot_example = {
"id": _x("6e65b86363953b780d92b0a928f3e8fcdd10db36"),
"branches": {
b"directory": {
"target": _x("1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"),
"target_type": "directory",
},
b"content": {
"target": _x("fe95a46679d128ff167b7c55df5d02356c5a1ae1"),
"target_type": "content",
},
b"alias": {"target": b"revision", "target_type": "alias",},
b"revision": {
"target": _x("aafb16d69fd30ff58afdd69036a26047f3aebdc6"),
"target_type": "revision",
},
b"release": {
"target": _x("7045404f3d1c54e6473c71bbb716529fbad4be24"),
"target_type": "release",
},
b"snapshot": {
"target": _x("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"),
"target_type": "snapshot",
},
b"dangling": None,
},
}
class SnapshotIdentifier(unittest.TestCase):
def setUp(self):
super().setUp()
self.empty = {
"id": _x("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"),
"branches": {},
}
self.dangling_branch = {
"id": _x("c84502e821eb21ed84e9fd3ec40973abc8b32353"),
"branches": {b"HEAD": None,},
}
self.unresolved = {
"id": _x("84b4548ea486e4b0a7933fa541ff1503a0afe1e0"),
"branches": {b"foo": {"target": b"bar", "target_type": "alias",},},
}
self.all_types = snapshot_example
def test_empty_snapshot(self):
self.assertEqual(
Snapshot.from_dict(remove_id(self.empty)).id, self.empty["id"],
)
def test_dangling_branch(self):
self.assertEqual(
Snapshot.from_dict(remove_id(self.dangling_branch)).id,
self.dangling_branch["id"],
)
def test_unresolved(self):
with self.assertRaisesRegex(ValueError, "b'foo' -> b'bar'"):
Snapshot.from_dict(remove_id(self.unresolved))
def test_all_types(self):
self.assertEqual(
Snapshot.from_dict(remove_id(self.all_types)).id, self.all_types["id"],
)
authority_example = {
"type": "forge",
"url": "https://forge.softwareheritage.org/",
}
fetcher_example = {
"name": "swh-phabricator-metadata-fetcher",
"version": "0.0.1",
}
metadata_example = {
"target": "swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d",
"discovery_date": datetime.datetime(
2021, 1, 25, 11, 27, 51, tzinfo=datetime.timezone.utc
),
"authority": authority_example,
"fetcher": fetcher_example,
"format": "json",
"metadata": b'{"foo": "bar"}',
}
class RawExtrinsicMetadataIdentifier(unittest.TestCase):
def setUp(self):
super().setUp()
self.minimal = metadata_example
self.maximal = {
**self.minimal,
"origin": "https://forge.softwareheritage.org/source/swh-model/",
"visit": 42,
"snapshot": "swh:1:snp:" + "00" * 20,
"release": "swh:1:rel:" + "01" * 20,
"revision": "swh:1:rev:" + "02" * 20,
"path": b"/abc/def",
"directory": "swh:1:dir:" + "03" * 20,
}
def test_minimal(self):
git_object = (
b"raw_extrinsic_metadata 210\0"
b"target swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d\n"
b"discovery_date 1611574071\n"
b"authority forge https://forge.softwareheritage.org/\n"
b"fetcher swh-phabricator-metadata-fetcher 0.0.1\n"
b"format json\n"
b"\n"
b'{"foo": "bar"}'
)
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(self.minimal)
),
git_object,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.minimal).id,
hashlib.sha1(git_object).digest(),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.minimal).id,
_x("5c13f20ba336e44549baf3d7b9305b027ec9f43d"),
)
def test_maximal(self):
git_object = (
b"raw_extrinsic_metadata 533\0"
b"target swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d\n"
b"discovery_date 1611574071\n"
b"authority forge https://forge.softwareheritage.org/\n"
b"fetcher swh-phabricator-metadata-fetcher 0.0.1\n"
b"format json\n"
b"origin https://forge.softwareheritage.org/source/swh-model/\n"
b"visit 42\n"
b"snapshot swh:1:snp:0000000000000000000000000000000000000000\n"
b"release swh:1:rel:0101010101010101010101010101010101010101\n"
b"revision swh:1:rev:0202020202020202020202020202020202020202\n"
b"path /abc/def\n"
b"directory swh:1:dir:0303030303030303030303030303030303030303\n"
b"\n"
b'{"foo": "bar"}'
)
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(self.maximal)
),
git_object,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.maximal).id,
hashlib.sha1(git_object).digest(),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.maximal).id,
_x("f96966e1093d15236a31fde07e47d5b1c9428049"),
)
def test_nonascii_path(self):
metadata = {
**self.minimal,
"path": b"/ab\nc/d\xf0\x9f\xa4\xb7e\x00f",
}
git_object = (
b"raw_extrinsic_metadata 231\0"
b"target swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d\n"
b"discovery_date 1611574071\n"
b"authority forge https://forge.softwareheritage.org/\n"
b"fetcher swh-phabricator-metadata-fetcher 0.0.1\n"
b"format json\n"
b"path /ab\n"
b" c/d\xf0\x9f\xa4\xb7e\x00f\n"
b"\n"
b'{"foo": "bar"}'
)
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
git_object,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
hashlib.sha1(git_object).digest(),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("7cc83fd1912176510c083f5df43f01b09af4b333"),
)
def test_timezone_insensitive(self):
"""Checks the timezone of the datetime.datetime does not affect the
hashed git_object."""
utc_plus_one = datetime.timezone(datetime.timedelta(hours=1))
metadata = {
**self.minimal,
"discovery_date": datetime.datetime(
2021, 1, 25, 12, 27, 51, tzinfo=utc_plus_one,
),
}
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(self.minimal)
),
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.minimal).id,
RawExtrinsicMetadata.from_dict(metadata).id,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("5c13f20ba336e44549baf3d7b9305b027ec9f43d"),
)
def test_microsecond_insensitive(self):
"""Checks the microseconds of the datetime.datetime does not affect the
hashed manifest."""
metadata = {
**self.minimal,
"discovery_date": datetime.datetime(
2021, 1, 25, 11, 27, 51, 123456, tzinfo=datetime.timezone.utc,
),
}
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(self.minimal)
),
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.minimal).id,
RawExtrinsicMetadata.from_dict(metadata).id,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("5c13f20ba336e44549baf3d7b9305b027ec9f43d"),
)
def test_noninteger_timezone(self):
"""Checks the discovery_date is translated to UTC before truncating
microseconds"""
tz = datetime.timezone(datetime.timedelta(microseconds=-42))
metadata = {
**self.minimal,
"discovery_date": datetime.datetime(
2021, 1, 25, 11, 27, 50, 1_000_000 - 42, tzinfo=tz,
),
}
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(self.minimal)
),
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(self.minimal).id,
RawExtrinsicMetadata.from_dict(metadata).id,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("5c13f20ba336e44549baf3d7b9305b027ec9f43d"),
)
def test_negative_timestamp(self):
metadata = {
**self.minimal,
"discovery_date": datetime.datetime(
1960, 1, 25, 11, 27, 51, tzinfo=datetime.timezone.utc,
),
}
git_object = (
b"raw_extrinsic_metadata 210\0"
b"target swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d\n"
b"discovery_date -313504329\n"
b"authority forge https://forge.softwareheritage.org/\n"
b"fetcher swh-phabricator-metadata-fetcher 0.0.1\n"
b"format json\n"
b"\n"
b'{"foo": "bar"}'
)
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
git_object,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
hashlib.sha1(git_object).digest(),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("895d0821a2991dd376ddc303424aceb7c68280f9"),
)
def test_epoch(self):
metadata = {
**self.minimal,
"discovery_date": datetime.datetime(
1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc,
),
}
git_object = (
b"raw_extrinsic_metadata 201\0"
b"target swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d\n"
b"discovery_date 0\n"
b"authority forge https://forge.softwareheritage.org/\n"
b"fetcher swh-phabricator-metadata-fetcher 0.0.1\n"
b"format json\n"
b"\n"
b'{"foo": "bar"}'
)
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
git_object,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
hashlib.sha1(git_object).digest(),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("27a53df54ace35ebd910493cdc70b334d6b7cb88"),
)
def test_negative_epoch(self):
metadata = {
**self.minimal,
"discovery_date": datetime.datetime(
1969, 12, 31, 23, 59, 59, 1, tzinfo=datetime.timezone.utc,
),
}
git_object = (
b"raw_extrinsic_metadata 202\0"
b"target swh:1:cnt:568aaf43d83b2c3df8067f3bedbb97d83260be6d\n"
b"discovery_date -1\n"
b"authority forge https://forge.softwareheritage.org/\n"
b"fetcher swh-phabricator-metadata-fetcher 0.0.1\n"
b"format json\n"
b"\n"
b'{"foo": "bar"}'
)
self.assertEqual(
git_objects.raw_extrinsic_metadata_git_object(
RawExtrinsicMetadata.from_dict(metadata)
),
git_object,
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
hashlib.sha1(git_object).digest(),
)
self.assertEqual(
RawExtrinsicMetadata.from_dict(metadata).id,
_x("be7154a8fd49d87f81547ea634d1e2152907d089"),
)
origin_example = {
"url": "https://github.com/torvalds/linux",
}
class OriginIdentifier(unittest.TestCase):
def test_content_identifier(self):
self.assertEqual(
Origin.from_dict(origin_example).id,
_x("b63a575fe3faab7692c9f38fb09d4bb45651bb0f"),
)
TS_DICTS = [
(
{"timestamp": 12345, "offset": 0},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{"timestamp": 12345, "offset": 0, "negative_utc": False},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{"timestamp": 12345, "offset": 0, "negative_utc": False},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{"timestamp": 12345, "offset": 0, "negative_utc": None},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{"timestamp": {"seconds": 12345}, "offset": 0, "negative_utc": None},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": None,
},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{
"timestamp": {"seconds": 12345, "microseconds": 100},
"offset": 0,
"negative_utc": None,
},
{
"timestamp": {"seconds": 12345, "microseconds": 100},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
(
{"timestamp": 12345, "offset": 0, "negative_utc": True},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": True,
+ "offset_bytes": b"-0000",
},
),
(
{"timestamp": 12345, "offset": 0, "negative_utc": None},
{
"timestamp": {"seconds": 12345, "microseconds": 0},
"offset": 0,
"negative_utc": False,
+ "offset_bytes": b"+0000",
},
),
]
@pytest.mark.parametrize("dict_input,expected", TS_DICTS)
def test_normalize_timestamp_dict(dict_input, expected):
assert TimestampWithTimezone.from_dict(dict_input).to_dict() == expected
TS_DICTS_INVALID_TIMESTAMP = [
{"timestamp": 1.2, "offset": 0},
{"timestamp": "1", "offset": 0},
# these below should really also trigger a ValueError...
# {"timestamp": {"seconds": "1"}, "offset": 0},
# {"timestamp": {"seconds": 1.2}, "offset": 0},
# {"timestamp": {"seconds": 1.2}, "offset": 0},
]
@pytest.mark.parametrize("dict_input", TS_DICTS_INVALID_TIMESTAMP)
def test_normalize_timestamp_dict_invalid_timestamp(dict_input):
with pytest.raises(ValueError, match="non-integer timestamp"):
TimestampWithTimezone.from_dict(dict_input)
UTC = datetime.timezone.utc
TS_TIMEZONES = [
datetime.timezone.min,
datetime.timezone(datetime.timedelta(hours=-1)),
UTC,
datetime.timezone(datetime.timedelta(minutes=+60)),
datetime.timezone.max,
]
TS_TZ_EXPECTED = [-1439, -60, 0, 60, 1439]
+TS_TZ_BYTES_EXPECTED = [b"-2359", b"-0100", b"+0000", b"+0100", b"+2359"]
TS_DATETIMES = [
datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=UTC),
datetime.datetime(2120, 12, 31, 23, 59, 59, tzinfo=UTC),
datetime.datetime(1610, 5, 14, 15, 43, 0, tzinfo=UTC),
]
TS_DT_EXPECTED = [1582814359, 4765132799, -11348929020]
@pytest.mark.parametrize("date, seconds", zip(TS_DATETIMES, TS_DT_EXPECTED))
-@pytest.mark.parametrize("tz, offset", zip(TS_TIMEZONES, TS_TZ_EXPECTED))
+@pytest.mark.parametrize(
+ "tz, offset, offset_bytes", zip(TS_TIMEZONES, TS_TZ_EXPECTED, TS_TZ_BYTES_EXPECTED)
+)
@pytest.mark.parametrize("microsecond", [0, 1, 10, 100, 1000, 999999])
-def test_normalize_timestamp_datetime(date, seconds, tz, offset, microsecond):
+def test_normalize_timestamp_datetime(
+ date, seconds, tz, offset, offset_bytes, microsecond
+):
date = date.astimezone(tz).replace(microsecond=microsecond)
assert TimestampWithTimezone.from_dict(date).to_dict() == {
"timestamp": {"seconds": seconds, "microseconds": microsecond},
"offset": offset,
"negative_utc": False,
+ "offset_bytes": offset_bytes,
}
def test_extid_identifier_bwcompat():
extid_dict = {
"extid_type": "test-type",
"extid": b"extid",
"target": "swh:1:dir:" + "00" * 20,
}
assert ExtID.from_dict(extid_dict).id == _x(
"b9295e1931c31e40a7e3e1e967decd1c89426455"
)
assert (
ExtID.from_dict({**extid_dict, "extid_version": 0}).id
== ExtID.from_dict(extid_dict).id
)
assert (
ExtID.from_dict({**extid_dict, "extid_version": 1}).id
!= ExtID.from_dict(extid_dict).id
)
diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py
index 47f6d3c..50cfa10 100644
--- a/swh/model/tests/test_model.py
+++ b/swh/model/tests/test_model.py
@@ -1,1317 +1,1323 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import copy
import datetime
from typing import Any, List, Optional, Tuple, Union
import attr
from attrs_strict import AttributeTypeError
import dateutil
from hypothesis import given
from hypothesis.strategies import binary
import pytest
from swh.model.collections import ImmutableDict
from swh.model.from_disk import DentryPerms
from swh.model.hashutil import MultiHash, hash_to_bytes
import swh.model.hypothesis_strategies as strategies
import swh.model.model
from swh.model.model import (
BaseModel,
Content,
Directory,
DirectoryEntry,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MissingData,
Origin,
OriginVisit,
OriginVisitStatus,
Person,
RawExtrinsicMetadata,
Release,
Revision,
SkippedContent,
Snapshot,
TargetType,
Timestamp,
TimestampWithTimezone,
type_validator,
)
import swh.model.swhids
from swh.model.swhids import CoreSWHID, ExtendedSWHID, ObjectType
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.model.tests.test_identifiers import (
TS_DATETIMES,
TS_TIMEZONES,
directory_example,
metadata_example,
release_example,
revision_example,
snapshot_example,
)
EXAMPLE_HASH = hash_to_bytes("94a9ed024d3859793618152ea559a168bbcbb5e2")
@given(strategies.objects())
def test_todict_inverse_fromdict(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
if obj_type in ("origin", "origin_visit"):
return
obj_as_dict = obj.to_dict()
obj_as_dict_copy = copy.deepcopy(obj_as_dict)
# Check the composition of to_dict and from_dict is the identity
assert obj == type(obj).from_dict(obj_as_dict)
# Check from_dict() does not change the input dict
assert obj_as_dict == obj_as_dict_copy
# Check the composition of from_dict and to_dict is the identity
assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict()
@given(strategies.objects())
def test_repr(objtype_and_obj):
"""Checks every model object has a working repr(), and that it can be eval()uated
(so that printed objects can be copy-pasted to write test cases.)"""
(obj_type, obj) = objtype_and_obj
r = repr(obj)
env = {
"tzutc": lambda: datetime.timezone.utc,
"tzfile": dateutil.tz.tzfile,
"hash_to_bytes": hash_to_bytes,
**swh.model.swhids.__dict__,
**swh.model.model.__dict__,
}
assert eval(r, env) == obj
@attr.s
class Cls1:
pass
@attr.s
class Cls2(Cls1):
pass
_custom_namedtuple = collections.namedtuple("_custom_namedtuple", "a b")
class _custom_tuple(tuple):
pass
# List of (type, valid_values, invalid_values)
_TYPE_VALIDATOR_PARAMETERS: List[Tuple[Any, List[Any], List[Any]]] = [
# base types:
(
bool,
[True, False],
[-1, 0, 1, 42, 1000, None, "123", 0.0, (), ("foo",), ImmutableDict()],
),
(
int,
[-1, 0, 1, 42, 1000, DentryPerms.directory, True, False],
[None, "123", 0.0, (), ImmutableDict()],
),
(
float,
[-1.0, 0.0, 1.0, float("infinity"), float("NaN")],
[True, False, None, 1, "1.2", (), ImmutableDict()],
),
(
bytes,
[b"", b"123"],
[None, bytearray(b"\x12\x34"), "123", 0, 123, (), (1, 2, 3), ImmutableDict()],
),
(str, ["", "123"], [None, b"123", b"", 0, (), (1, 2, 3), ImmutableDict()]),
# unions:
(
Optional[int],
[None, -1, 0, 1, 42, 1000, DentryPerms.directory],
["123", 0.0, (), ImmutableDict()],
),
(
Optional[bytes],
[None, b"", b"123"],
["123", "", 0, (), (1, 2, 3), ImmutableDict()],
),
(
Union[str, bytes],
["", "123", b"123", b""],
[None, 0, (), (1, 2, 3), ImmutableDict()],
),
(
Union[str, bytes, None],
["", "123", b"123", b"", None],
[0, (), (1, 2, 3), ImmutableDict()],
),
# tuples
(
Tuple[str, str],
[("foo", "bar"), ("", ""), _custom_namedtuple("", ""), _custom_tuple(("", ""))],
[("foo",), ("foo", "bar", "baz"), ("foo", 42), (42, "foo")],
),
(
Tuple[str, ...],
[
("foo",),
("foo", "bar"),
("", ""),
("foo", "bar", "baz"),
_custom_namedtuple("", ""),
_custom_tuple(("", "")),
],
[("foo", 42), (42, "foo")],
),
# composite generic:
(
Tuple[Union[str, int], Union[str, int]],
[("foo", "foo"), ("foo", 42), (42, "foo"), (42, 42)],
[("foo", b"bar"), (b"bar", "foo")],
),
(
Union[Tuple[str, str], Tuple[int, int]],
[("foo", "foo"), (42, 42)],
[("foo", b"bar"), (b"bar", "foo"), ("foo", 42), (42, "foo")],
),
(
Tuple[Tuple[bytes, bytes], ...],
[(), ((b"foo", b"bar"),), ((b"foo", b"bar"), (b"baz", b"qux"))],
[((b"foo", "bar"),), ((b"foo", b"bar"), ("baz", b"qux"))],
),
# standard types:
(
datetime.datetime,
[datetime.datetime.now(), datetime.datetime.now(tz=datetime.timezone.utc)],
[None, 123],
),
# ImmutableDict
(
ImmutableDict[str, int],
[
ImmutableDict(),
ImmutableDict({"foo": 42}),
ImmutableDict({"foo": 42, "bar": 123}),
],
[ImmutableDict({"foo": "bar"}), ImmutableDict({42: 123})],
),
# Any:
(object, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],),
(Any, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],),
(
ImmutableDict[Any, int],
[
ImmutableDict(),
ImmutableDict({"foo": 42}),
ImmutableDict({"foo": 42, "bar": 123}),
ImmutableDict({42: 123}),
],
[ImmutableDict({"foo": "bar"})],
),
(
ImmutableDict[str, Any],
[
ImmutableDict(),
ImmutableDict({"foo": 42}),
ImmutableDict({"foo": "bar"}),
ImmutableDict({"foo": 42, "bar": 123}),
],
[ImmutableDict({42: 123})],
),
# attr objects:
(
Timestamp,
[Timestamp(seconds=123, microseconds=0),],
[None, "2021-09-28T11:27:59", 123],
),
(Cls1, [Cls1(), Cls2()], [None, b"abcd"],),
# enums:
(
TargetType,
[TargetType.CONTENT, TargetType.ALIAS],
["content", "alias", 123, None],
),
]
@pytest.mark.parametrize(
"type_,value",
[
pytest.param(type_, value, id=f"type={type_}, value={value}")
for (type_, values, _) in _TYPE_VALIDATOR_PARAMETERS
for value in values
],
)
def test_type_validator_valid(type_, value):
type_validator()(None, attr.ib(type=type_), value)
@pytest.mark.parametrize(
"type_,value",
[
pytest.param(type_, value, id=f"type={type_}, value={value}")
for (type_, _, values) in _TYPE_VALIDATOR_PARAMETERS
for value in values
],
)
def test_type_validator_invalid(type_, value):
with pytest.raises(AttributeTypeError):
type_validator()(None, attr.ib(type=type_), value)
@pytest.mark.parametrize("object_type, objects", TEST_OBJECTS.items())
def test_swh_model_todict_fromdict(object_type, objects):
"""checks model objects in swh_model_data are in correct shape"""
assert objects
for obj in objects:
# Check the composition of from_dict and to_dict is the identity
obj_as_dict = obj.to_dict()
assert obj == type(obj).from_dict(obj_as_dict)
assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict()
def test_unique_key():
url = "http://example.org/"
date = datetime.datetime.now(tz=datetime.timezone.utc)
id_ = b"42" * 10
assert Origin(url=url).unique_key() == {"url": url}
assert OriginVisit(origin=url, date=date, type="git").unique_key() == {
"origin": url,
"date": str(date),
}
assert OriginVisitStatus(
origin=url, visit=42, date=date, status="created", snapshot=None
).unique_key() == {"origin": url, "visit": "42", "date": str(date),}
assert Snapshot.from_dict({**snapshot_example, "id": id_}).unique_key() == id_
assert Release.from_dict({**release_example, "id": id_}).unique_key() == id_
assert Revision.from_dict({**revision_example, "id": id_}).unique_key() == id_
assert Directory.from_dict({**directory_example, "id": id_}).unique_key() == id_
assert (
RawExtrinsicMetadata.from_dict({**metadata_example, "id": id_}).unique_key()
== id_
)
cont = Content.from_data(b"foo")
assert cont.unique_key().hex() == "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
kwargs = {
**cont.to_dict(),
"reason": "foo",
"status": "absent",
}
del kwargs["data"]
assert SkippedContent(**kwargs).unique_key() == cont.hashes()
# Anonymization
@given(strategies.objects())
def test_anonymization(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
def check_person(p):
if p is not None:
assert p.name is None
assert p.email is None
assert len(p.fullname) == 32
anon_obj = obj.anonymize()
if obj_type == "person":
assert anon_obj is not None
check_person(anon_obj)
elif obj_type == "release":
assert anon_obj is not None
check_person(anon_obj.author)
elif obj_type == "revision":
assert anon_obj is not None
check_person(anon_obj.author)
check_person(anon_obj.committer)
else:
assert anon_obj is None
# Origin, OriginVisit, OriginVisitStatus
@given(strategies.origins())
def test_todict_origins(origin):
obj = origin.to_dict()
assert "type" not in obj
assert type(origin)(url=origin.url) == type(origin).from_dict(obj)
@given(strategies.origin_visits())
def test_todict_origin_visits(origin_visit):
obj = origin_visit.to_dict()
assert origin_visit == type(origin_visit).from_dict(obj)
def test_origin_visit_naive_datetime():
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
OriginVisit(
origin="http://foo/", date=datetime.datetime.now(), type="git",
)
@given(strategies.origin_visit_statuses())
def test_todict_origin_visit_statuses(origin_visit_status):
obj = origin_visit_status.to_dict()
assert origin_visit_status == type(origin_visit_status).from_dict(obj)
def test_origin_visit_status_naive_datetime():
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
OriginVisitStatus(
origin="http://foo/",
visit=42,
date=datetime.datetime.now(),
status="ongoing",
snapshot=None,
)
# Timestamp
@given(strategies.timestamps())
def test_timestamps_strategy(timestamp):
attr.validate(timestamp)
def test_timestamp_seconds():
attr.validate(Timestamp(seconds=0, microseconds=0))
with pytest.raises(AttributeTypeError):
Timestamp(seconds="0", microseconds=0)
attr.validate(Timestamp(seconds=2 ** 63 - 1, microseconds=0))
with pytest.raises(ValueError):
Timestamp(seconds=2 ** 63, microseconds=0)
attr.validate(Timestamp(seconds=-(2 ** 63), microseconds=0))
with pytest.raises(ValueError):
Timestamp(seconds=-(2 ** 63) - 1, microseconds=0)
def test_timestamp_microseconds():
attr.validate(Timestamp(seconds=0, microseconds=0))
with pytest.raises(AttributeTypeError):
Timestamp(seconds=0, microseconds="0")
attr.validate(Timestamp(seconds=0, microseconds=10 ** 6 - 1))
with pytest.raises(ValueError):
Timestamp(seconds=0, microseconds=10 ** 6)
with pytest.raises(ValueError):
Timestamp(seconds=0, microseconds=-1)
def test_timestamp_from_dict():
assert Timestamp.from_dict({"seconds": 10, "microseconds": 5})
with pytest.raises(AttributeTypeError):
Timestamp.from_dict({"seconds": "10", "microseconds": 5})
with pytest.raises(AttributeTypeError):
Timestamp.from_dict({"seconds": 10, "microseconds": "5"})
with pytest.raises(ValueError):
Timestamp.from_dict({"seconds": 0, "microseconds": -1})
Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6 - 1})
with pytest.raises(ValueError):
Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6})
# TimestampWithTimezone
def test_timestampwithtimezone():
ts = Timestamp(seconds=0, microseconds=0)
tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=False)
attr.validate(tstz)
assert tstz.negative_utc is False
+ assert tstz.offset_bytes == b"+0000"
- attr.validate(TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False))
+ tstz = TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False)
+ attr.validate(tstz)
+ assert tstz.offset_bytes == b"+0010"
- attr.validate(TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False))
+ tstz = TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False)
+ attr.validate(tstz)
+ assert tstz.offset_bytes == b"-0010"
tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=True)
attr.validate(tstz)
assert tstz.negative_utc is True
+ assert tstz.offset_bytes == b"-0000"
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(
timestamp=datetime.datetime.now(), offset=0, negative_utc=False
)
- with pytest.raises(AttributeTypeError):
+ with pytest.raises((AttributeTypeError, TypeError)):
TimestampWithTimezone(timestamp=ts, offset="0", negative_utc=False)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset=1.0, negative_utc=False)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=0)
with pytest.raises(ValueError):
TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=True)
with pytest.raises(ValueError):
TimestampWithTimezone(timestamp=ts, offset=-1, negative_utc=True)
def test_timestampwithtimezone_from_datetime():
tz = datetime.timezone(datetime.timedelta(minutes=+60))
date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz)
tstz = TimestampWithTimezone.from_datetime(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=0,),
offset=60,
negative_utc=False,
)
def test_timestampwithtimezone_from_naive_datetime():
date = datetime.datetime(2020, 2, 27, 14, 39, 19)
with pytest.raises(ValueError, match="datetime without timezone"):
TimestampWithTimezone.from_datetime(date)
def test_timestampwithtimezone_from_iso8601():
date = "2020-02-27 14:39:19.123456+0100"
tstz = TimestampWithTimezone.from_iso8601(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=123456,),
offset=60,
negative_utc=False,
)
def test_timestampwithtimezone_from_iso8601_negative_utc():
date = "2020-02-27 13:39:19-0000"
tstz = TimestampWithTimezone.from_iso8601(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=0,),
offset=0,
negative_utc=True,
)
@pytest.mark.parametrize("date", TS_DATETIMES)
@pytest.mark.parametrize("tz", TS_TIMEZONES)
@pytest.mark.parametrize("microsecond", [0, 1, 10, 100, 1000, 999999])
def test_timestampwithtimezone_to_datetime(date, tz, microsecond):
date = date.replace(tzinfo=tz, microsecond=microsecond)
tstz = TimestampWithTimezone.from_datetime(date)
assert tstz.to_datetime() == date
assert tstz.to_datetime().utcoffset() == date.utcoffset()
def test_person_from_fullname():
"""The author should have name, email and fullname filled.
"""
actual_person = Person.from_fullname(b"tony ")
assert actual_person == Person(
fullname=b"tony ", name=b"tony", email=b"ynot@dagobah",
)
def test_person_from_fullname_no_email():
"""The author and fullname should be the same as the input (author).
"""
actual_person = Person.from_fullname(b"tony")
assert actual_person == Person(fullname=b"tony", name=b"tony", email=None,)
def test_person_from_fullname_empty_person():
"""Empty person has only its fullname filled with the empty
byte-string.
"""
actual_person = Person.from_fullname(b"")
assert actual_person == Person(fullname=b"", name=None, email=None,)
def test_git_author_line_to_author():
# edge case out of the way
with pytest.raises(TypeError):
Person.from_fullname(None)
tests = {
b"a ": Person(name=b"a", email=b"b@c.com", fullname=b"a ",),
b"": Person(
name=None, email=b"foo@bar.com", fullname=b"",
),
b"malformed ': Person(
name=b"malformed",
email=b'"
',
),
b"trailing ": Person(
name=b"trailing", email=b"sp@c.e", fullname=b"trailing ",
),
b"no": Person(name=b"no", email=b"sp@c.e", fullname=b"no",),
b" more ": Person(
name=b"more", email=b"sp@c.es", fullname=b" more ",
),
b" <>": Person(name=None, email=None, fullname=b" <>",),
}
for person in sorted(tests):
expected_person = tests[person]
assert expected_person == Person.from_fullname(person)
# Content
def test_content_get_hash():
hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux")
c = Content(length=42, status="visible", **hashes)
for (hash_name, hash_) in hashes.items():
assert c.get_hash(hash_name) == hash_
def test_content_hashes():
hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux")
c = Content(length=42, status="visible", **hashes)
assert c.hashes() == hashes
def test_content_data():
c = Content(
length=42,
status="visible",
data=b"foo",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
assert c.with_data() == c
def test_content_data_missing():
c = Content(
length=42,
status="visible",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(MissingData):
c.with_data()
@given(strategies.present_contents_d())
def test_content_from_dict(content_d):
c = Content.from_data(**content_d)
assert c
assert c.ctime == content_d["ctime"]
content_d2 = c.to_dict()
c2 = Content.from_dict(content_d2)
assert c2.ctime == c.ctime
def test_content_from_dict_str_ctime():
# test with ctime as a string
n = datetime.datetime(2020, 5, 6, 12, 34, tzinfo=datetime.timezone.utc)
content_d = {
"ctime": n.isoformat(),
"data": b"",
"length": 0,
"sha1": b"\x00",
"sha256": b"\x00",
"sha1_git": b"\x00",
"blake2s256": b"\x00",
}
c = Content.from_dict(content_d)
assert c.ctime == n
def test_content_from_dict_str_naive_ctime():
# test with ctime as a string
n = datetime.datetime(2020, 5, 6, 12, 34)
content_d = {
"ctime": n.isoformat(),
"data": b"",
"length": 0,
"sha1": b"\x00",
"sha256": b"\x00",
"sha1_git": b"\x00",
"blake2s256": b"\x00",
}
with pytest.raises(ValueError, match="must be a timezone-aware datetime."):
Content.from_dict(content_d)
@given(binary(max_size=4096))
def test_content_from_data(data):
c = Content.from_data(data)
assert c.data == data
assert c.length == len(data)
assert c.status == "visible"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
@given(binary(max_size=4096))
def test_hidden_content_from_data(data):
c = Content.from_data(data, status="hidden")
assert c.data == data
assert c.length == len(data)
assert c.status == "hidden"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
def test_content_naive_datetime():
c = Content.from_data(b"foo")
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
Content(
**c.to_dict(), ctime=datetime.datetime.now(),
)
# SkippedContent
@given(binary(max_size=4096))
def test_skipped_content_from_data(data):
c = SkippedContent.from_data(data, reason="reason")
assert c.reason == "reason"
assert c.length == len(data)
assert c.status == "absent"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
@given(strategies.skipped_contents_d())
def test_skipped_content_origin_is_str(skipped_content_d):
assert SkippedContent.from_dict(skipped_content_d)
skipped_content_d["origin"] = "http://path/to/origin"
assert SkippedContent.from_dict(skipped_content_d)
skipped_content_d["origin"] = Origin(url="http://path/to/origin")
with pytest.raises(ValueError, match="origin"):
SkippedContent.from_dict(skipped_content_d)
def test_skipped_content_naive_datetime():
c = SkippedContent.from_data(b"foo", reason="reason")
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
SkippedContent(
**c.to_dict(), ctime=datetime.datetime.now(),
)
# Directory
def test_directory_entry_name_validation():
with pytest.raises(ValueError, match="valid directory entry name."):
DirectoryEntry(name=b"foo/", type="dir", target=b"\x00" * 20, perms=0),
def test_directory_duplicate_entry_name():
entries = (
DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0),
DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1),
)
with pytest.raises(ValueError, match="duplicated entry name"):
Directory(entries=entries)
entries = (
DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0),
DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0),
)
with pytest.raises(ValueError, match="duplicated entry name"):
Directory(entries=entries)
# Revision
def test_revision_extra_headers_no_headers():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_model = Revision(**rev_dict)
assert rev_model.metadata is None
assert rev_model.extra_headers == ()
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
rev_model = Revision(**rev_dict)
assert rev_model.metadata == rev_dict["metadata"]
assert rev_model.extra_headers == ()
def test_revision_extra_headers_with_headers():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\x00"),
(b"header1", b"again"),
)
rev_dict["extra_headers"] = extra_headers
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_in_metadata():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\x00"),
(b"header1", b"again"),
)
# check the bw-compat init hook does the job
# ie. extra_headers are given in the metadata field
rev_dict["metadata"]["extra_headers"] = extra_headers
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_as_lists():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\x00"),
(b"header1", b"again"),
)
# check Revision.extra_headers tuplify does the job
rev_dict["extra_headers"] = [list(x) for x in extra_headers]
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_type_error():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
orig_rev_dict = attr.asdict(rev, recurse=False)
orig_rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
("header1", b"value1"),
(b"header2", 42),
("header1", "again"),
)
# check headers one at a time
# if given as extra_header
for extra_header in extra_headers:
rev_dict = copy.deepcopy(orig_rev_dict)
rev_dict["extra_headers"] = (extra_header,)
with pytest.raises(AttributeTypeError):
Revision(**rev_dict)
# if given as metadata
for extra_header in extra_headers:
rev_dict = copy.deepcopy(orig_rev_dict)
rev_dict["metadata"]["extra_headers"] = (extra_header,)
with pytest.raises(AttributeTypeError):
Revision(**rev_dict)
def test_revision_extra_headers_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_model = Revision.from_dict(rev_dict)
assert rev_model.metadata is None
assert rev_model.extra_headers == ()
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
rev_model = Revision.from_dict(rev_dict)
assert rev_model.metadata == rev_dict["metadata"]
assert rev_model.extra_headers == ()
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
rev_dict["extra_headers"] = extra_headers
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_in_metadata_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
# check the bw-compat init hook does the job
rev_dict["metadata"]["extra_headers"] = extra_headers
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_as_lists_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_model = Revision.from_dict(rev_dict)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
# check Revision.extra_headers converter does the job
rev_dict["extra_headers"] = [list(x) for x in extra_headers]
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
@given(strategies.objects(split_content=True))
def test_object_type(objtype_and_obj):
obj_type, obj = objtype_and_obj
assert obj_type == obj.object_type
def test_object_type_is_final():
object_types = set()
def check_final(cls):
if hasattr(cls, "object_type"):
assert cls.object_type not in object_types
object_types.add(cls.object_type)
if cls.__subclasses__():
assert not hasattr(cls, "object_type")
for subcls in cls.__subclasses__():
check_final(subcls)
check_final(BaseModel)
_metadata_authority = MetadataAuthority(
type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org",
)
_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",)
_content_swhid = ExtendedSWHID.from_string(
"swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2"
)
_origin_url = "https://forge.softwareheritage.org/source/swh-model.git"
_origin_swhid = ExtendedSWHID.from_string(
"swh:1:ori:94a9ed024d3859793618152ea559a168bbcbb5e2"
)
_dummy_qualifiers = {"origin": "https://example.com", "lines": "42"}
_common_metadata_fields = dict(
discovery_date=datetime.datetime(
2021, 1, 29, 13, 57, 9, tzinfo=datetime.timezone.utc
),
authority=_metadata_authority,
fetcher=_metadata_fetcher,
format="json",
metadata=b'{"origin": "https://example.com", "lines": "42"}',
)
def test_metadata_valid():
"""Checks valid RawExtrinsicMetadata objects don't raise an error."""
# Simplest case
RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields)
# Object with an SWHID
RawExtrinsicMetadata(
target=_content_swhid, **_common_metadata_fields,
)
def test_metadata_to_dict():
"""Checks valid RawExtrinsicMetadata objects don't raise an error."""
common_fields = {
"authority": {"type": "forge", "url": "https://forge.softwareheritage.org"},
"fetcher": {"name": "test-fetcher", "version": "0.0.1",},
"discovery_date": _common_metadata_fields["discovery_date"],
"format": "json",
"metadata": b'{"origin": "https://example.com", "lines": "42"}',
}
m = RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields,)
assert m.to_dict() == {
"target": str(_origin_swhid),
"id": b"@j\xc9\x01\xbc\x1e#p*\xf3q9\xa7u\x97\x00\x14\x02xa",
**common_fields,
}
assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m
m = RawExtrinsicMetadata(target=_content_swhid, **_common_metadata_fields,)
assert m.to_dict() == {
"target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
"id": b"\xbc\xa3U\xddf\x19U\xc5\xd2\xd7\xdfK\xd7c\x1f\xa8\xfeh\x992",
**common_fields,
}
assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m
hash_hex = "6162" * 10
hash_bin = b"ab" * 10
m = RawExtrinsicMetadata(
target=_content_swhid,
**_common_metadata_fields,
origin="https://example.org/",
snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=hash_bin),
release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=hash_bin),
revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=hash_bin),
path=b"/foo/bar",
directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=hash_bin),
)
assert m.to_dict() == {
"target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
"id": b"\x14l\xb0\x1f\xb9\xc0{)\xc7\x0f\xbd\xc0*,YZ\xf5C\xab\xfc",
**common_fields,
"origin": "https://example.org/",
"snapshot": f"swh:1:snp:{hash_hex}",
"release": f"swh:1:rel:{hash_hex}",
"revision": f"swh:1:rev:{hash_hex}",
"path": b"/foo/bar",
"directory": f"swh:1:dir:{hash_hex}",
}
assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m
def test_metadata_invalid_target():
"""Checks various invalid values for the 'target' field."""
# SWHID passed as string instead of SWHID
with pytest.raises(ValueError, match="target must be.*ExtendedSWHID"):
RawExtrinsicMetadata(
target="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
**_common_metadata_fields,
)
def test_metadata_naive_datetime():
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
RawExtrinsicMetadata(
target=_origin_swhid,
**{**_common_metadata_fields, "discovery_date": datetime.datetime.now()},
)
def test_metadata_validate_context_origin():
"""Checks validation of RawExtrinsicMetadata.origin."""
# Origins can't have an 'origin' context
with pytest.raises(
ValueError, match="Unexpected 'origin' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid, origin=_origin_url, **_common_metadata_fields,
)
# but all other types can
RawExtrinsicMetadata(
target=_content_swhid, origin=_origin_url, **_common_metadata_fields,
)
# SWHIDs aren't valid origin URLs
with pytest.raises(ValueError, match="SWHID used as context origin URL"):
RawExtrinsicMetadata(
target=_content_swhid,
origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
**_common_metadata_fields,
)
def test_metadata_validate_context_visit():
"""Checks validation of RawExtrinsicMetadata.visit."""
# Origins can't have a 'visit' context
with pytest.raises(
ValueError, match="Unexpected 'visit' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid, visit=42, **_common_metadata_fields,
)
# but all other types can
RawExtrinsicMetadata(
target=_content_swhid, origin=_origin_url, visit=42, **_common_metadata_fields,
)
# Missing 'origin'
with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"):
RawExtrinsicMetadata(
target=_content_swhid, visit=42, **_common_metadata_fields,
)
# visit id must be positive
with pytest.raises(ValueError, match="Nonpositive visit id"):
RawExtrinsicMetadata(
target=_content_swhid,
origin=_origin_url,
visit=-42,
**_common_metadata_fields,
)
def test_metadata_validate_context_snapshot():
"""Checks validation of RawExtrinsicMetadata.snapshot."""
# Origins can't have a 'snapshot' context
with pytest.raises(
ValueError, match="Unexpected 'snapshot' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
snapshot=CoreSWHID(
object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'snapshot', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
snapshot=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
def test_metadata_validate_context_release():
"""Checks validation of RawExtrinsicMetadata.release."""
# Origins can't have a 'release' context
with pytest.raises(
ValueError, match="Unexpected 'release' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'release', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
release=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
def test_metadata_validate_context_revision():
"""Checks validation of RawExtrinsicMetadata.revision."""
# Origins can't have a 'revision' context
with pytest.raises(
ValueError, match="Unexpected 'revision' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
revision=CoreSWHID(
object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'revision', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
revision=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
def test_metadata_validate_context_path():
"""Checks validation of RawExtrinsicMetadata.path."""
# Origins can't have a 'path' context
with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"):
RawExtrinsicMetadata(
target=_origin_swhid, path=b"/foo/bar", **_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid, path=b"/foo/bar", **_common_metadata_fields,
)
def test_metadata_validate_context_directory():
"""Checks validation of RawExtrinsicMetadata.directory."""
# Origins can't have a 'directory' context
with pytest.raises(
ValueError, match="Unexpected 'directory' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
directory=CoreSWHID(
object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'directory', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
directory=CoreSWHID(
object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
def test_metadata_normalize_discovery_date():
fields_copy = {**_common_metadata_fields}
truncated_date = fields_copy.pop("discovery_date")
assert truncated_date.microsecond == 0
# Check for TypeError on disabled object type: we removed attrs_strict's
# type_validator
with pytest.raises(TypeError):
RawExtrinsicMetadata(
target=_content_swhid, discovery_date="not a datetime", **fields_copy
)
# Check for truncation to integral second
date_with_us = truncated_date.replace(microsecond=42)
md = RawExtrinsicMetadata(
target=_content_swhid, discovery_date=date_with_us, **fields_copy,
)
assert md.discovery_date == truncated_date
assert md.discovery_date.tzinfo == datetime.timezone.utc
# Check that the timezone gets normalized. Timezones can be offset by a
# non-integral number of seconds, so we need to handle that.
timezone = datetime.timezone(offset=datetime.timedelta(hours=2))
date_with_tz = truncated_date.astimezone(timezone)
assert date_with_tz.tzinfo != datetime.timezone.utc
md = RawExtrinsicMetadata(
target=_content_swhid, discovery_date=date_with_tz, **fields_copy,
)
assert md.discovery_date == truncated_date
assert md.discovery_date.tzinfo == datetime.timezone.utc