diff --git a/PKG-INFO b/PKG-INFO
index f930310..d6afa6e 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,46 +1,46 @@
Metadata-Version: 2.1
Name: swh.model
-Version: 3.0.0
+Version: 3.1.0
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-model
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: cli
Provides-Extra: testing-minimal
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-model
=========
Implementation of the Data model of the Software Heritage project, used to
archive source code artifacts.
This module defines the notion of SoftWare Heritage persistent IDentifiers
(SWHIDs) and provides tools to compute them:
```sh
$ swh-identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
$ swh-identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
```
diff --git a/swh.model.egg-info/PKG-INFO b/swh.model.egg-info/PKG-INFO
index f930310..d6afa6e 100644
--- a/swh.model.egg-info/PKG-INFO
+++ b/swh.model.egg-info/PKG-INFO
@@ -1,46 +1,46 @@
Metadata-Version: 2.1
Name: swh.model
-Version: 3.0.0
+Version: 3.1.0
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-model
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: cli
Provides-Extra: testing-minimal
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-model
=========
Implementation of the Data model of the Software Heritage project, used to
archive source code artifacts.
This module defines the notion of SoftWare Heritage persistent IDentifiers
(SWHIDs) and provides tools to compute them:
```sh
$ swh-identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
$ swh-identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
```
diff --git a/swh/model/hashutil.py b/swh/model/hashutil.py
index eaacb23..86ecc6f 100644
--- a/swh/model/hashutil.py
+++ b/swh/model/hashutil.py
@@ -1,365 +1,365 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of hashing function definitions. This is the base
module use to compute swh's hashes.
Only a subset of hashing algorithms is supported as defined in the
ALGORITHMS set. Any provided algorithms not in that list will result
in a ValueError explaining the error.
This module defines a MultiHash class to ease the softwareheritage
hashing algorithms computation. This allows to compute hashes from
file object, path, data using a similar interface as what the standard
hashlib module provides.
Basic usage examples:
- file object: MultiHash.from_file(
file_object, hash_names=DEFAULT_ALGORITHMS).digest()
- path (filepath): MultiHash.from_path(b'foo').hexdigest()
- data (bytes): MultiHash.from_data(b'foo').bytehexdigest()
"Complex" usage, defining a swh hashlib instance first:
- To compute length, integrate the length to the set of algorithms to
compute, for example:
.. code-block:: python
h = MultiHash(hash_names=set({'length'}).union(DEFAULT_ALGORITHMS))
with open(filepath, 'rb') as f:
h.update(f.read(HASH_BLOCK_SIZE))
hashes = h.digest() # returns a dict of {hash_algo_name: hash_in_bytes}
- Write alongside computing hashing algorithms (from a stream), example:
.. code-block:: python
h = MultiHash(length=length)
with open(filepath, 'wb') as f:
for chunk in r.iter_content(): # r a stream of sort
h.update(chunk)
f.write(chunk)
hashes = h.hexdigest() # returns a dict of {hash_algo_name: hash_in_hex}
"""
import binascii
import functools
import hashlib
from io import BytesIO
import os
from typing import Callable, Dict, Optional
-ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512"])
+ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5"])
"""Hashing algorithms supported by this module"""
DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"])
"""Algorithms computed by default when calling the functions from this module.
Subset of :const:`ALGORITHMS`.
"""
HASH_BLOCK_SIZE = 32768
"""Block size for streaming hash computations made in this module"""
_blake2_hash_cache = {} # type: Dict[str, Callable]
class MultiHash:
"""Hashutil class to support multiple hashes computation.
Args:
hash_names (set): Set of hash algorithms (+ optionally length)
to compute hashes (cf. DEFAULT_ALGORITHMS)
length (int): Length of the total sum of chunks to read
If the length is provided as algorithm, the length is also
computed and returned.
"""
def __init__(self, hash_names=DEFAULT_ALGORITHMS, length=None):
self.state = {}
self.track_length = False
for name in hash_names:
if name == "length":
self.state["length"] = 0
self.track_length = True
else:
self.state[name] = _new_hash(name, length)
@classmethod
def from_state(cls, state, track_length):
ret = cls([])
ret.state = state
ret.track_length = track_length
@classmethod
def from_file(cls, fobj, hash_names=DEFAULT_ALGORITHMS, length=None):
ret = cls(length=length, hash_names=hash_names)
while True:
chunk = fobj.read(HASH_BLOCK_SIZE)
if not chunk:
break
ret.update(chunk)
return ret
@classmethod
def from_path(cls, path, hash_names=DEFAULT_ALGORITHMS):
length = os.path.getsize(path)
with open(path, "rb") as f:
ret = cls.from_file(f, hash_names=hash_names, length=length)
return ret
@classmethod
def from_data(cls, data, hash_names=DEFAULT_ALGORITHMS):
length = len(data)
fobj = BytesIO(data)
return cls.from_file(fobj, hash_names=hash_names, length=length)
def update(self, chunk):
for name, h in self.state.items():
if name == "length":
continue
h.update(chunk)
if self.track_length:
self.state["length"] += len(chunk)
def digest(self):
return {
name: h.digest() if name != "length" else h
for name, h in self.state.items()
}
def hexdigest(self):
return {
name: h.hexdigest() if name != "length" else h
for name, h in self.state.items()
}
def bytehexdigest(self):
return {
name: hash_to_bytehex(h.digest()) if name != "length" else h
for name, h in self.state.items()
}
def copy(self):
copied_state = {
name: h.copy() if name != "length" else h for name, h in self.state.items()
}
return self.from_state(copied_state, self.track_length)
def _new_blake2_hash(algo):
"""Return a function that initializes a blake2 hash.
"""
if algo in _blake2_hash_cache:
return _blake2_hash_cache[algo]()
lalgo = algo.lower()
if not lalgo.startswith("blake2"):
raise ValueError("Algorithm %s is not a blake2 hash" % algo)
blake_family = lalgo[:7]
digest_size = None
if lalgo[7:]:
try:
digest_size, remainder = divmod(int(lalgo[7:]), 8)
except ValueError:
raise ValueError("Unknown digest size for algo %s" % algo) from None
if remainder:
raise ValueError(
"Digest size for algorithm %s must be a multiple of 8" % algo
)
if lalgo in hashlib.algorithms_available:
# Handle the case where OpenSSL ships the given algorithm
# (e.g. Python 3.5 on Debian 9 stretch)
_blake2_hash_cache[algo] = lambda: hashlib.new(lalgo)
else:
# Try using the built-in implementation for Python 3.6+
if blake_family in hashlib.algorithms_available:
blake2 = getattr(hashlib, blake_family)
else:
import pyblake2
blake2 = getattr(pyblake2, blake_family)
_blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
return _blake2_hash_cache[algo]()
def _new_hashlib_hash(algo):
"""Initialize a digest object from hashlib.
Handle the swh-specific names for the blake2-related algorithms
"""
if algo.startswith("blake2"):
return _new_blake2_hash(algo)
else:
return hashlib.new(algo)
def git_object_header(git_type: str, length: int) -> bytes:
"""Returns the header for a git object of the given type and length.
The header of a git object consists of:
- The type of the object (encoded in ASCII)
- One ASCII space (\x20)
- The length of the object (decimal encoded in ASCII)
- One NUL byte
Args:
base_algo (str from :const:`ALGORITHMS`): a hashlib-supported algorithm
git_type: the type of the git object (supposedly one of 'blob',
'commit', 'tag', 'tree')
length: the length of the git object you're encoding
Returns:
a hashutil.hash object
"""
git_object_types = {
"blob",
"tree",
"commit",
"tag",
"snapshot",
"raw_extrinsic_metadata",
"extid",
}
if git_type not in git_object_types:
raise ValueError(
"Unexpected git object type %s, expected one of %s"
% (git_type, ", ".join(sorted(git_object_types)))
)
return ("%s %d\0" % (git_type, length)).encode("ascii")
def _new_hash(algo: str, length: Optional[int] = None):
"""Initialize a digest object (as returned by python's hashlib) for
the requested algorithm. See the constant ALGORITHMS for the list
of supported algorithms. If a git-specific hashing algorithm is
requested (e.g., "sha1_git"), the hashing object will be pre-fed
with the needed header; for this to work, length must be given.
Args:
algo (str): a hashing algorithm (one of ALGORITHMS)
length (int): the length of the hashed payload (needed for
git-specific algorithms)
Returns:
a hashutil.hash object
Raises:
ValueError if algo is unknown, or length is missing for a git-specific
hash.
"""
if algo not in ALGORITHMS:
raise ValueError(
"Unexpected hashing algorithm %s, expected one of %s"
% (algo, ", ".join(sorted(ALGORITHMS)))
)
if algo.endswith("_git"):
if length is None:
raise ValueError("Missing length for git hashing algorithm")
base_algo = algo[:-4]
h = _new_hashlib_hash(base_algo)
h.update(git_object_header("blob", length))
return h
return _new_hashlib_hash(algo)
def hash_git_data(data, git_type, base_algo="sha1"):
"""Hash the given data as a git object of type git_type.
Args:
data: a bytes object
git_type: the git object type
base_algo: the base hashing algorithm used (default: sha1)
Returns: a dict mapping each algorithm to a bytes digest
Raises:
ValueError if the git_type is unexpected.
"""
h = _new_hashlib_hash(base_algo)
h.update(git_object_header(git_type, len(data)))
h.update(data)
return h.digest()
@functools.lru_cache()
def hash_to_hex(hash):
"""Converts a hash (in hex or bytes form) to its hexadecimal ascii form
Args:
hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing
the hexadecimal form of the hash
Returns:
str: the hexadecimal form of the hash
"""
if isinstance(hash, str):
return hash
return binascii.hexlify(hash).decode("ascii")
@functools.lru_cache()
def hash_to_bytehex(hash):
"""Converts a hash to its hexadecimal bytes representation
Args:
hash (bytes): a :class:`bytes` hash
Returns:
bytes: the hexadecimal form of the hash, as :class:`bytes`
"""
return binascii.hexlify(hash)
@functools.lru_cache()
def hash_to_bytes(hash):
"""Converts a hash (in hex or bytes form) to its raw bytes form
Args:
hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing
the hexadecimal form of the hash
Returns:
bytes: the :class:`bytes` form of the hash
"""
if isinstance(hash, bytes):
return hash
return bytes.fromhex(hash)
@functools.lru_cache()
def bytehex_to_hash(hex):
"""Converts a hexadecimal bytes representation of a hash to that hash
Args:
hash (bytes): a :class:`bytes` containing the hexadecimal form of the
hash encoded in ascii
Returns:
bytes: the :class:`bytes` form of the hash
"""
return hash_to_bytes(hex.decode())
diff --git a/swh/model/model.py b/swh/model/model.py
index 0167eae..735ce46 100644
--- a/swh/model/model.py
+++ b/swh/model/model.py
@@ -1,1233 +1,1338 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Implementation of Software Heritage's data model
See :ref:`data-model` for an overview of the data model.
The classes defined in this module are immutable
`attrs objects `__ and enums.
All classes define a ``from_dict`` class method and a ``to_dict``
method to convert between them and msgpack-serializable objects.
"""
from abc import ABCMeta, abstractmethod
import datetime
from enum import Enum
import hashlib
from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union
import attr
-from attrs_strict import type_validator
+from attrs_strict import AttributeTypeError
import dateutil.parser
import iso8601
from typing_extensions import Final
from . import git_objects
from .collections import ImmutableDict
-from .hashutil import DEFAULT_ALGORITHMS, MultiHash
+from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex
from .swhids import CoreSWHID
from .swhids import ExtendedObjectType as SwhidExtendedObjectType
from .swhids import ExtendedSWHID
from .swhids import ObjectType as SwhidObjectType
class MissingData(Exception):
"""Raised by `Content.with_data` when it has no way of fetching the
data (but not when fetching the data fails)."""
pass
KeyType = Union[Dict[str, str], Dict[str, bytes], bytes]
"""The type returned by BaseModel.unique_key()."""
SHA1_SIZE = 20
# TODO: Limit this to 20 bytes
Sha1Git = bytes
Sha1 = bytes
KT = TypeVar("KT")
VT = TypeVar("VT")
+def hash_repr(h: bytes) -> str:
+ if h is None:
+ return "None"
+ else:
+ return f"hash_to_bytes('{hash_to_hex(h)}')"
+
+
def freeze_optional_dict(
d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore
) -> Optional[ImmutableDict[KT, VT]]:
if isinstance(d, dict):
return ImmutableDict(d)
else:
return d
def dictify(value):
"Helper function used by BaseModel.to_dict()"
if isinstance(value, BaseModel):
return value.to_dict()
elif isinstance(value, (CoreSWHID, ExtendedSWHID)):
return str(value)
elif isinstance(value, Enum):
return value.value
elif isinstance(value, (dict, ImmutableDict)):
return {k: dictify(v) for k, v in value.items()}
elif isinstance(value, tuple):
return tuple(dictify(v) for v in value)
else:
return value
+def _check_type(type_, value):
+ if type_ is object or type_ is Any:
+ return True
+
+ origin = getattr(type_, "__origin__", None)
+
+ # Non-generic type, check it directly
+ if origin is None:
+ # This is functionally equivalent to using just this:
+ # return isinstance(value, type)
+ # but using type equality before isinstance allows very quick checks
+ # when the exact class is used (which is the overwhelming majority of cases)
+ # while still allowing subclasses to be used.
+ return type(value) == type_ or isinstance(value, type_)
+
+ # Check the type of the value itself
+ #
+ # For the same reason as above, this condition is functionally equivalent to:
+ # if origin is not Union and not isinstance(value, origin):
+ if origin is not Union and type(value) != origin and not isinstance(value, origin):
+ return False
+
+ # Then, if it's a container, check its items.
+ if origin is tuple:
+ args = type_.__args__
+ if len(args) == 2 and args[1] is Ellipsis:
+ # Infinite tuple
+ return all(_check_type(args[0], item) for item in value)
+ else:
+ # Finite tuple
+ if len(args) != len(value):
+ return False
+
+ return all(
+ _check_type(item_type, item) for (item_type, item) in zip(args, value)
+ )
+ elif origin is Union:
+ args = type_.__args__
+ return any(_check_type(variant, value) for variant in args)
+ elif origin is ImmutableDict:
+ (key_type, value_type) = type_.__args__
+ return all(
+ _check_type(key_type, key) and _check_type(value_type, value)
+ for (key, value) in value.items()
+ )
+ else:
+ # No need to check dict or list. because they are converted to ImmutableDict
+ # and tuple respectively.
+ raise NotImplementedError(f"Type-checking {type_}")
+
+
+def type_validator():
+ """Like attrs_strict.type_validator(), but stricter.
+
+ It is an attrs validator, which checks attributes have the specified type,
+ using type equality instead of ``isinstance()``, for improved performance
+ """
+
+ def validator(instance, attribute, value):
+ if not _check_type(attribute.type, value):
+ raise AttributeTypeError(value, attribute)
+
+ return validator
+
+
ModelType = TypeVar("ModelType", bound="BaseModel")
class BaseModel:
"""Base class for SWH model classes.
Provides serialization/deserialization to/from Python dictionaries,
that are suitable for JSON/msgpack-like formats."""
__slots__ = ()
def to_dict(self):
"""Wrapper of `attr.asdict` that can be overridden by subclasses
that have special handling of some of the fields."""
return dictify(attr.asdict(self, recurse=False))
@classmethod
def from_dict(cls, d):
"""Takes a dictionary representing a tree of SWH objects, and
recursively builds the corresponding objects."""
return cls(**d)
def anonymize(self: ModelType) -> Optional[ModelType]:
"""Returns an anonymized version of the object, if needed.
If the object model does not need/support anonymization, returns None.
"""
return None
def unique_key(self) -> KeyType:
"""Returns a unique key for this object, that can be used for
deduplication."""
raise NotImplementedError(f"unique_key for {self}")
class HashableObject(metaclass=ABCMeta):
"""Mixin to automatically compute object identifier hash when
the associated model is instantiated."""
__slots__ = ()
id: Sha1Git
@abstractmethod
def compute_hash(self) -> bytes:
"""Derived model classes must implement this to compute
the object hash.
This method is called by the object initialization if the `id`
attribute is set to an empty value.
"""
pass
def __attrs_post_init__(self):
if not self.id:
obj_id = self.compute_hash()
object.__setattr__(self, "id", obj_id)
def unique_key(self) -> KeyType:
return self.id
@attr.s(frozen=True, slots=True)
class Person(BaseModel):
"""Represents the author/committer of a revision or release."""
object_type: Final = "person"
fullname = attr.ib(type=bytes, validator=type_validator())
name = attr.ib(type=Optional[bytes], validator=type_validator())
email = attr.ib(type=Optional[bytes], validator=type_validator())
@classmethod
def from_fullname(cls, fullname: bytes):
"""Returns a Person object, by guessing the name and email from the
fullname, in the `name ` format.
The fullname is left unchanged."""
if fullname is None:
raise TypeError("fullname is None.")
name: Optional[bytes]
email: Optional[bytes]
try:
open_bracket = fullname.index(b"<")
except ValueError:
name = fullname
email = None
else:
raw_name = fullname[:open_bracket]
raw_email = fullname[open_bracket + 1 :]
if not raw_name:
name = None
else:
name = raw_name.strip()
try:
close_bracket = raw_email.rindex(b">")
except ValueError:
email = raw_email
else:
email = raw_email[:close_bracket]
return Person(name=name or None, email=email or None, fullname=fullname,)
def anonymize(self) -> "Person":
"""Returns an anonymized version of the Person object.
Anonymization is simply a Person which fullname is the hashed, with unset name
or email.
"""
return Person(
fullname=hashlib.sha256(self.fullname).digest(), name=None, email=None,
)
@classmethod
def from_dict(cls, d):
"""
If the fullname is missing, construct a fullname
using the following heuristics: if the name value is None, we return the
email in angle brackets, else, we return the name, a space, and the email
in angle brackets.
"""
if "fullname" not in d:
parts = []
if d["name"] is not None:
parts.append(d["name"])
if d["email"] is not None:
parts.append(b"".join([b"<", d["email"], b">"]))
fullname = b" ".join(parts)
d = {**d, "fullname": fullname}
d = {"name": None, "email": None, **d}
return super().from_dict(d)
@attr.s(frozen=True, slots=True)
class Timestamp(BaseModel):
"""Represents a naive timestamp from a VCS."""
object_type: Final = "timestamp"
seconds = attr.ib(type=int, validator=type_validator())
microseconds = attr.ib(type=int, validator=type_validator())
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if not (-(2 ** 63) <= value < 2 ** 63):
raise ValueError("Seconds must be a signed 64-bits integer.")
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if not (0 <= value < 10 ** 6):
raise ValueError("Microseconds must be in [0, 1000000[.")
@attr.s(frozen=True, slots=True)
class TimestampWithTimezone(BaseModel):
"""Represents a TZ-aware timestamp from a VCS."""
object_type: Final = "timestamp_with_timezone"
timestamp = attr.ib(type=Timestamp, validator=type_validator())
offset = attr.ib(type=int, validator=type_validator())
negative_utc = attr.ib(type=bool, validator=type_validator())
@offset.validator
def check_offset(self, attribute, value):
"""Checks the offset is a 16-bits signed integer (in theory, it
should always be between -14 and +14 hours)."""
if not (-(2 ** 15) <= value < 2 ** 15):
# max 14 hours offset in theory, but you never know what
# you'll find in the wild...
raise ValueError("offset too large: %d minutes" % value)
@negative_utc.validator
def check_negative_utc(self, attribute, value):
if self.offset and value:
raise ValueError("negative_utc can only be True is offset=0")
@classmethod
def from_dict(cls, time_representation: Union[Dict, datetime.datetime, int]):
"""Builds a TimestampWithTimezone from any of the formats
accepted by :func:`swh.model.normalize_timestamp`."""
# TODO: this accept way more types than just dicts; find a better
# name
negative_utc = False
if isinstance(time_representation, dict):
ts = time_representation["timestamp"]
if isinstance(ts, dict):
seconds = ts.get("seconds", 0)
microseconds = ts.get("microseconds", 0)
elif isinstance(ts, int):
seconds = ts
microseconds = 0
else:
raise ValueError(
f"TimestampWithTimezone.from_dict received non-integer timestamp "
f"member {ts!r}"
)
offset = time_representation["offset"]
if "negative_utc" in time_representation:
negative_utc = time_representation["negative_utc"]
if negative_utc is None:
negative_utc = False
elif isinstance(time_representation, datetime.datetime):
microseconds = time_representation.microsecond
if microseconds:
time_representation = time_representation.replace(microsecond=0)
seconds = int(time_representation.timestamp())
utcoffset = time_representation.utcoffset()
if utcoffset is None:
raise ValueError(
f"TimestampWithTimezone.from_dict received datetime without "
f"timezone: {time_representation}"
)
# utcoffset is an integer number of minutes
seconds_offset = utcoffset.total_seconds()
offset = int(seconds_offset) // 60
elif isinstance(time_representation, int):
seconds = time_representation
microseconds = 0
offset = 0
else:
raise ValueError(
f"TimestampWithTimezone.from_dict received non-integer timestamp: "
f"{time_representation!r}"
)
return cls(
timestamp=Timestamp(seconds=seconds, microseconds=microseconds),
offset=offset,
negative_utc=negative_utc,
)
@classmethod
def from_datetime(cls, dt: datetime.datetime):
return cls.from_dict(dt)
def to_datetime(self) -> datetime.datetime:
"""Convert to a datetime (with a timezone set to the recorded fixed UTC offset)
Beware that this conversion can be lossy: the negative_utc flag is not
taken into consideration (since it cannot be represented in a
datetime). Also note that it may fail due to type overflow.
"""
timestamp = datetime.datetime.fromtimestamp(
self.timestamp.seconds,
datetime.timezone(datetime.timedelta(minutes=self.offset)),
)
timestamp = timestamp.replace(microsecond=self.timestamp.microseconds)
return timestamp
@classmethod
def from_iso8601(cls, s):
"""Builds a TimestampWithTimezone from an ISO8601-formatted string.
"""
dt = iso8601.parse_date(s)
tstz = cls.from_datetime(dt)
if dt.tzname() == "-00:00":
tstz = attr.evolve(tstz, negative_utc=True)
return tstz
@attr.s(frozen=True, slots=True)
class Origin(HashableObject, BaseModel):
"""Represents a software source: a VCS and an URL."""
object_type: Final = "origin"
url = attr.ib(type=str, validator=type_validator())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
def unique_key(self) -> KeyType:
return {"url": self.url}
def compute_hash(self) -> bytes:
return hashlib.sha1(self.url.encode("utf-8")).digest()
def swhid(self) -> ExtendedSWHID:
"""Returns a SWHID representing this origin."""
return ExtendedSWHID(
object_type=SwhidExtendedObjectType.ORIGIN, object_id=self.id,
)
@attr.s(frozen=True, slots=True)
class OriginVisit(BaseModel):
"""Represents an origin visit with a given type at a given point in time, by a
SWH loader."""
object_type: Final = "origin_visit"
origin = attr.ib(type=str, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
type = attr.ib(type=str, validator=type_validator())
"""Should not be set before calling 'origin_visit_add()'."""
visit = attr.ib(type=Optional[int], validator=type_validator(), default=None)
@date.validator
def check_date(self, attribute, value):
"""Checks the date has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
def to_dict(self):
"""Serializes the date as a string and omits the visit id if it is
`None`."""
ov = super().to_dict()
if ov["visit"] is None:
del ov["visit"]
return ov
def unique_key(self) -> KeyType:
return {"origin": self.origin, "date": str(self.date)}
@attr.s(frozen=True, slots=True)
class OriginVisitStatus(BaseModel):
"""Represents a visit update of an origin at a given point in time.
"""
object_type: Final = "origin_visit_status"
origin = attr.ib(type=str, validator=type_validator())
visit = attr.ib(type=int, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(
["created", "ongoing", "full", "partial", "not_found", "failed"]
),
)
- snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator())
+ snapshot = attr.ib(
+ type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr
+ )
# Type is optional be to able to use it before adding it to the database model
type = attr.ib(type=Optional[str], validator=type_validator(), default=None)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=type_validator(),
converter=freeze_optional_dict,
default=None,
)
@date.validator
def check_date(self, attribute, value):
"""Checks the date has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
def unique_key(self) -> KeyType:
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)}
class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a
revision or an alias."""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
ALIAS = "alias"
+ def __repr__(self):
+ return f"TargetType.{self.name}"
+
class ObjectType(Enum):
"""The type of content pointed to by a release. Usually a revision"""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
+ def __repr__(self):
+ return f"ObjectType.{self.name}"
+
@attr.s(frozen=True, slots=True)
class SnapshotBranch(BaseModel):
"""Represents one of the branches of a snapshot."""
object_type: Final = "snapshot_branch"
- target = attr.ib(type=bytes, validator=type_validator())
+ target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
target_type = attr.ib(type=TargetType, validator=type_validator())
@target.validator
def check_target(self, attribute, value):
"""Checks the target type is not an alias, checks the target is a
valid sha1_git."""
if self.target_type != TargetType.ALIAS and self.target is not None:
if len(value) != 20:
raise ValueError("Wrong length for bytes identifier: %d" % len(value))
@classmethod
def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"]))
@attr.s(frozen=True, slots=True)
class Snapshot(HashableObject, BaseModel):
"""Represents the full state of an origin at a given point in time."""
object_type: Final = "snapshot"
branches = attr.ib(
type=ImmutableDict[bytes, Optional[SnapshotBranch]],
validator=type_validator(),
converter=freeze_optional_dict,
)
- id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
+ id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.snapshot_git_object(self)
return hashlib.new("sha1", git_object).digest()
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
branches=ImmutableDict(
(name, SnapshotBranch.from_dict(branch) if branch else None)
for (name, branch) in d.pop("branches").items()
),
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id)
@attr.s(frozen=True, slots=True)
class Release(HashableObject, BaseModel):
object_type: Final = "release"
name = attr.ib(type=bytes, validator=type_validator())
message = attr.ib(type=Optional[bytes], validator=type_validator())
- target = attr.ib(type=Optional[Sha1Git], validator=type_validator())
+ target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr)
target_type = attr.ib(type=ObjectType, validator=type_validator())
synthetic = attr.ib(type=bool, validator=type_validator())
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None)
date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None
)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=type_validator(),
converter=freeze_optional_dict,
default=None,
)
- id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
+ id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.release_git_object(self)
return hashlib.new("sha1", git_object).digest()
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError("release date must be None if author is None.")
def to_dict(self):
rel = super().to_dict()
if rel["metadata"] is None:
del rel["metadata"]
return rel
@classmethod
def from_dict(cls, d):
d = d.copy()
if d.get("author"):
d["author"] = Person.from_dict(d["author"])
if d.get("date"):
d["date"] = TimestampWithTimezone.from_dict(d["date"])
return cls(target_type=ObjectType(d.pop("target_type")), **d)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id)
def anonymize(self) -> "Release":
"""Returns an anonymized version of the Release object.
Anonymization consists in replacing the author with an anonymized Person object.
"""
author = self.author and self.author.anonymize()
return attr.evolve(self, author=author)
class RevisionType(Enum):
GIT = "git"
TAR = "tar"
DSC = "dsc"
SUBVERSION = "svn"
MERCURIAL = "hg"
CVS = "cvs"
BAZAAR = "bzr"
+ def __repr__(self):
+ return f"RevisionType.{self.name}"
+
def tuplify_extra_headers(value: Iterable):
return tuple((k, v) for k, v in value)
@attr.s(frozen=True, slots=True)
class Revision(HashableObject, BaseModel):
object_type: Final = "revision"
message = attr.ib(type=Optional[bytes], validator=type_validator())
author = attr.ib(type=Person, validator=type_validator())
committer = attr.ib(type=Person, validator=type_validator())
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator())
committer_date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator()
)
type = attr.ib(type=RevisionType, validator=type_validator())
- directory = attr.ib(type=Sha1Git, validator=type_validator())
+ directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr)
synthetic = attr.ib(type=bool, validator=type_validator())
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=type_validator(),
converter=freeze_optional_dict,
default=None,
)
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=())
- id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
+ id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
extra_headers = attr.ib(
type=Tuple[Tuple[bytes, bytes], ...],
validator=type_validator(),
converter=tuplify_extra_headers,
default=(),
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# ensure metadata is a deep copy of whatever was given, and if needed
# extract extra_headers from there
if self.metadata:
metadata = self.metadata
if not self.extra_headers and "extra_headers" in metadata:
(extra_headers, metadata) = metadata.copy_pop("extra_headers")
object.__setattr__(
self, "extra_headers", tuplify_extra_headers(extra_headers),
)
attr.validate(self)
object.__setattr__(self, "metadata", metadata)
def compute_hash(self) -> bytes:
git_object = git_objects.revision_git_object(self)
return hashlib.new("sha1", git_object).digest()
@classmethod
def from_dict(cls, d):
d = d.copy()
date = d.pop("date")
if date:
date = TimestampWithTimezone.from_dict(date)
committer_date = d.pop("committer_date")
if committer_date:
committer_date = TimestampWithTimezone.from_dict(committer_date)
return cls(
author=Person.from_dict(d.pop("author")),
committer=Person.from_dict(d.pop("committer")),
date=date,
committer_date=committer_date,
type=RevisionType(d.pop("type")),
parents=tuple(d.pop("parents")), # for BW compat
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id)
def anonymize(self) -> "Revision":
"""Returns an anonymized version of the Revision object.
Anonymization consists in replacing the author and committer with an anonymized
Person object.
"""
return attr.evolve(
self, author=self.author.anonymize(), committer=self.committer.anonymize()
)
@attr.s(frozen=True, slots=True)
class DirectoryEntry(BaseModel):
object_type: Final = "directory_entry"
name = attr.ib(type=bytes, validator=type_validator())
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"]))
- target = attr.ib(type=Sha1Git, validator=type_validator())
- perms = attr.ib(type=int, validator=type_validator())
+ target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr)
+ perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
+ @name.validator
+ def check_name(self, attribute, value):
+ if b"/" in value:
+ raise ValueError("{value!r} is not a valid directory entry name.")
+
@attr.s(frozen=True, slots=True)
class Directory(HashableObject, BaseModel):
object_type: Final = "directory"
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator())
- id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
+ id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.directory_git_object(self)
return hashlib.new("sha1", git_object).digest()
+ @entries.validator
+ def check_entries(self, attribute, value):
+ seen = set()
+ for entry in value:
+ if entry.name in seen:
+ raise ValueError(
+ "{self.swhid()} has duplicated entry name: {entry.name!r}"
+ )
+ seen.add(entry.name)
+
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
entries=tuple(
DirectoryEntry.from_dict(entry) for entry in d.pop("entries")
),
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=self.id)
@attr.s(frozen=True, slots=True)
class BaseContent(BaseModel):
status = attr.ib(
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"])
)
@staticmethod
def _hash_data(data: bytes):
"""Hash some data, returning most of the fields of a content object"""
d = MultiHash.from_data(data).digest()
d["data"] = data
d["length"] = len(data)
return d
@classmethod
def from_dict(cls, d, use_subclass=True):
if use_subclass:
# Chooses a subclass to instantiate instead.
if d["status"] == "absent":
return SkippedContent.from_dict(d)
else:
return Content.from_dict(d)
else:
return super().from_dict(d)
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError("{} is not a valid hash name.".format(hash_name))
return getattr(self, hash_name)
def hashes(self) -> Dict[str, bytes]:
"""Returns a dictionary {hash_name: hash_value}"""
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS}
@attr.s(frozen=True, slots=True)
class Content(BaseContent):
object_type: Final = "content"
- sha1 = attr.ib(type=bytes, validator=type_validator())
- sha1_git = attr.ib(type=Sha1Git, validator=type_validator())
- sha256 = attr.ib(type=bytes, validator=type_validator())
- blake2s256 = attr.ib(type=bytes, validator=type_validator())
+ sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
+ sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr)
+ sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
+ blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr)
length = attr.ib(type=int, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)
data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if value < 0:
raise ValueError("Length must be positive.")
@ctime.validator
def check_ctime(self, attribute, value):
"""Checks the ctime has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("ctime must be a timezone-aware datetime.")
def to_dict(self):
content = super().to_dict()
if content["data"] is None:
del content["data"]
if content["ctime"] is None:
del content["ctime"]
return content
@classmethod
def from_data(cls, data, status="visible", ctime=None) -> "Content":
"""Generate a Content from a given `data` byte string.
This populates the Content with the hashes and length for the data
passed as argument, as well as the data itself.
"""
d = cls._hash_data(data)
d["status"] = status
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
if isinstance(d.get("ctime"), str):
d = d.copy()
d["ctime"] = dateutil.parser.parse(d["ctime"])
return super().from_dict(d, use_subclass=False)
def with_data(self) -> "Content":
"""Loads the `data` attribute; meaning that it is guaranteed not to
be None after this call.
This call is almost a no-op, but subclasses may overload this method
to lazy-load data (eg. from disk or objstorage)."""
if self.data is None:
raise MissingData("Content data is None.")
return self
def unique_key(self) -> KeyType:
return self.sha1 # TODO: use a dict of hashes
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git)
@attr.s(frozen=True, slots=True)
class SkippedContent(BaseContent):
object_type: Final = "skipped_content"
- sha1 = attr.ib(type=Optional[bytes], validator=type_validator())
- sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator())
- sha256 = attr.ib(type=Optional[bytes], validator=type_validator())
- blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator())
+ sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr)
+ sha1_git = attr.ib(
+ type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr
+ )
+ sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr)
+ blake2s256 = attr.ib(
+ type=Optional[bytes], validator=type_validator(), repr=hash_repr
+ )
length = attr.ib(type=Optional[int], validator=type_validator())
status = attr.ib(type=str, validator=attr.validators.in_(["absent"]))
reason = attr.ib(type=Optional[str], validator=type_validator(), default=None)
origin = attr.ib(type=Optional[str], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if value is None:
raise ValueError("Must provide a reason if content is absent.")
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive or -1."""
if value < -1:
raise ValueError("Length must be positive or -1.")
@ctime.validator
def check_ctime(self, attribute, value):
"""Checks the ctime has a timezone."""
if value is not None and value.tzinfo is None:
raise ValueError("ctime must be a timezone-aware datetime.")
def to_dict(self):
content = super().to_dict()
if content["origin"] is None:
del content["origin"]
if content["ctime"] is None:
del content["ctime"]
return content
@classmethod
def from_data(
cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None
) -> "SkippedContent":
"""Generate a SkippedContent from a given `data` byte string.
This populates the SkippedContent with the hashes and length for the
data passed as argument.
You can use `attr.evolve` on such a generated content to nullify some
of its attributes, e.g. for tests.
"""
d = cls._hash_data(data)
del d["data"]
d["status"] = "absent"
d["reason"] = reason
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
d2 = d.copy()
if d2.pop("data", None) is not None:
raise ValueError('SkippedContent has no "data" attribute %r' % d)
return super().from_dict(d2, use_subclass=False)
def unique_key(self) -> KeyType:
return self.hashes()
class MetadataAuthorityType(Enum):
DEPOSIT_CLIENT = "deposit_client"
FORGE = "forge"
REGISTRY = "registry"
+ def __repr__(self):
+ return f"MetadataAuthorityType.{self.name}"
+
@attr.s(frozen=True, slots=True)
class MetadataAuthority(BaseModel):
"""Represents an entity that provides metadata about an origin or
software artifact."""
object_type: Final = "metadata_authority"
type = attr.ib(type=MetadataAuthorityType, validator=type_validator())
url = attr.ib(type=str, validator=type_validator())
metadata = attr.ib(
type=Optional[ImmutableDict[str, Any]],
default=None,
validator=type_validator(),
converter=freeze_optional_dict,
)
def to_dict(self):
d = super().to_dict()
if d["metadata"] is None:
del d["metadata"]
return d
@classmethod
def from_dict(cls, d):
d = {
**d,
"type": MetadataAuthorityType(d["type"]),
}
return super().from_dict(d)
def unique_key(self) -> KeyType:
return {"type": self.type.value, "url": self.url}
@attr.s(frozen=True, slots=True)
class MetadataFetcher(BaseModel):
"""Represents a software component used to fetch metadata from a metadata
authority, and ingest them into the Software Heritage archive."""
object_type: Final = "metadata_fetcher"
name = attr.ib(type=str, validator=type_validator())
version = attr.ib(type=str, validator=type_validator())
metadata = attr.ib(
type=Optional[ImmutableDict[str, Any]],
default=None,
validator=type_validator(),
converter=freeze_optional_dict,
)
def to_dict(self):
d = super().to_dict()
if d["metadata"] is None:
del d["metadata"]
return d
def unique_key(self) -> KeyType:
return {"name": self.name, "version": self.version}
def normalize_discovery_date(value: Any) -> datetime.datetime:
if not isinstance(value, datetime.datetime):
raise TypeError("discovery_date must be a timezone-aware datetime.")
if value.tzinfo is None:
raise ValueError("discovery_date must be a timezone-aware datetime.")
# Normalize timezone to utc, and truncate microseconds to 0
return value.astimezone(datetime.timezone.utc).replace(microsecond=0)
@attr.s(frozen=True, slots=True)
class RawExtrinsicMetadata(HashableObject, BaseModel):
object_type: Final = "raw_extrinsic_metadata"
# target object
target = attr.ib(type=ExtendedSWHID, validator=type_validator())
# source
discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date)
authority = attr.ib(type=MetadataAuthority, validator=type_validator())
fetcher = attr.ib(type=MetadataFetcher, validator=type_validator())
# the metadata itself
format = attr.ib(type=str, validator=type_validator())
metadata = attr.ib(type=bytes, validator=type_validator())
# context
origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
visit = attr.ib(type=Optional[int], default=None, validator=type_validator())
snapshot = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
release = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
revision = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator())
directory = attr.ib(
type=Optional[CoreSWHID], default=None, validator=type_validator()
)
- id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
+ id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
def compute_hash(self) -> bytes:
git_object = git_objects.raw_extrinsic_metadata_git_object(self)
return hashlib.new("sha1", git_object).digest()
@origin.validator
def check_origin(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.SNAPSHOT,
SwhidExtendedObjectType.RELEASE,
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'origin' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.startswith("swh:"):
# Technically this is valid; but:
# 1. SWHIDs are URIs, not URLs
# 2. if a SWHID gets here, it's very likely to be a mistake
# (and we can remove this check if it turns out there is a
# legitimate use for it).
raise ValueError(f"SWHID used as context origin URL: {value}")
@visit.validator
def check_visit(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.SNAPSHOT,
SwhidExtendedObjectType.RELEASE,
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'visit' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if self.origin is None:
raise ValueError("'origin' context must be set if 'visit' is.")
if value <= 0:
raise ValueError("Nonpositive visit id")
@snapshot.validator
def check_snapshot(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.RELEASE,
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'snapshot' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.SNAPSHOT, value)
@release.validator
def check_release(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.REVISION,
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'release' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.RELEASE, value)
@revision.validator
def check_revision(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'revision' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.REVISION, value)
@path.validator
def check_path(self, attribute, value):
if value is None:
return
if self.target.object_type not in (
SwhidExtendedObjectType.DIRECTORY,
SwhidExtendedObjectType.CONTENT,
):
raise ValueError(
f"Unexpected 'path' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
@directory.validator
def check_directory(self, attribute, value):
if value is None:
return
if self.target.object_type not in (SwhidExtendedObjectType.CONTENT,):
raise ValueError(
f"Unexpected 'directory' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
self._check_swhid(SwhidObjectType.DIRECTORY, value)
def _check_swhid(self, expected_object_type, swhid):
if isinstance(swhid, str):
raise ValueError(f"Expected SWHID, got a string: {swhid}")
if swhid.object_type != expected_object_type:
raise ValueError(
f"Expected SWHID type '{expected_object_type.name.lower()}', "
f"got '{swhid.object_type.name.lower()}' in {swhid}"
)
def to_dict(self):
d = super().to_dict()
context_keys = (
"origin",
"visit",
"snapshot",
"release",
"revision",
"directory",
"path",
)
for context_key in context_keys:
if d[context_key] is None:
del d[context_key]
return d
@classmethod
def from_dict(cls, d):
d = {
**d,
"target": ExtendedSWHID.from_string(d["target"]),
"authority": MetadataAuthority.from_dict(d["authority"]),
"fetcher": MetadataFetcher.from_dict(d["fetcher"]),
}
swhid_keys = ("snapshot", "release", "revision", "directory")
for swhid_key in swhid_keys:
if d.get(swhid_key):
d[swhid_key] = CoreSWHID.from_string(d[swhid_key])
return super().from_dict(d)
def swhid(self) -> ExtendedSWHID:
"""Returns a SWHID representing this RawExtrinsicMetadata object."""
return ExtendedSWHID(
object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA,
object_id=self.id,
)
@attr.s(frozen=True, slots=True)
class ExtID(HashableObject, BaseModel):
object_type: Final = "extid"
extid_type = attr.ib(type=str, validator=type_validator())
extid = attr.ib(type=bytes, validator=type_validator())
target = attr.ib(type=CoreSWHID, validator=type_validator())
extid_version = attr.ib(type=int, validator=type_validator(), default=0)
- id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
+ id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr)
@classmethod
def from_dict(cls, d):
return cls(
extid=d["extid"],
extid_type=d["extid_type"],
target=CoreSWHID.from_string(d["target"]),
extid_version=d.get("extid_version", 0),
)
def compute_hash(self) -> bytes:
git_object = git_objects.extid_git_object(self)
return hashlib.new("sha1", git_object).digest()
diff --git a/swh/model/swhids.py b/swh/model/swhids.py
index ee1be20..b1283c1 100644
--- a/swh/model/swhids.py
+++ b/swh/model/swhids.py
@@ -1,457 +1,463 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Classes to represent :ref:`SWH persistend IDentifiers `.
:class:`CoreSWHID` represents a SWHID with no qualifier, and :class:`QualifiedSWHID`
represents a SWHID that may have qualifiers.
:class:`ExtendedSWHID` extends the definition of SWHID to other object types,
and is used internally in Software Heritage; it does not support qualifiers.
"""
from __future__ import annotations
import enum
import re
from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, Union
import urllib.parse
import attr
from attrs_strict import type_validator
from .exceptions import ValidationError
from .hashutil import hash_to_bytes, hash_to_hex
class ObjectType(enum.Enum):
"""Possible object types of a QualifiedSWHID or CoreSWHID.
The values of each variant is what is used in the SWHID's string representation."""
SNAPSHOT = "snp"
REVISION = "rev"
RELEASE = "rel"
DIRECTORY = "dir"
CONTENT = "cnt"
class ExtendedObjectType(enum.Enum):
"""Possible object types of an ExtendedSWHID.
The variants are a superset of :class:`ObjectType`'s"""
SNAPSHOT = "snp"
REVISION = "rev"
RELEASE = "rel"
DIRECTORY = "dir"
CONTENT = "cnt"
ORIGIN = "ori"
RAW_EXTRINSIC_METADATA = "emd"
SWHID_NAMESPACE = "swh"
SWHID_VERSION = 1
SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"]
EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"]
SWHID_SEP = ":"
SWHID_CTXT_SEP = ";"
SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"}
SWHID_RE_RAW = (
f"(?P{SWHID_NAMESPACE})"
f"{SWHID_SEP}(?P{SWHID_VERSION})"
f"{SWHID_SEP}(?P{'|'.join(EXTENDED_SWHID_TYPES)})"
f"{SWHID_SEP}(?P[0-9a-f]{{40}})"
f"({SWHID_CTXT_SEP}(?P\\S+))?"
)
SWHID_RE = re.compile(SWHID_RE_RAW)
# type of the "object_type" attribute of the SWHID class; either
# ObjectType or ExtendedObjectType
_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType)
# the SWHID class itself (this is used so that X.from_string() can return X
# for all X subclass of _BaseSWHID)
_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID")
-@attr.s(frozen=True, kw_only=True)
+@attr.s(frozen=True, kw_only=True, repr=False)
class _BaseSWHID(Generic[_TObjectType]):
"""Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID.
This is an "abstract" class and should not be instantiated directly;
it only exists to deduplicate code between these three SWHID classes."""
namespace = attr.ib(type=str, default=SWHID_NAMESPACE)
"""the namespace of the identifier, defaults to ``swh``"""
scheme_version = attr.ib(type=int, default=SWHID_VERSION)
"""the scheme version of the identifier, defaults to 1"""
# overridden by subclasses
object_type: _TObjectType
"""the type of object the identifier points to"""
object_id = attr.ib(type=bytes, validator=type_validator())
"""object's identifier"""
@namespace.validator
def check_namespace(self, attribute, value):
if value != SWHID_NAMESPACE:
raise ValidationError(
"Invalid SWHID: invalid namespace: %(namespace)s",
params={"namespace": value},
)
@scheme_version.validator
def check_scheme_version(self, attribute, value):
if value != SWHID_VERSION:
raise ValidationError(
"Invalid SWHID: invalid version: %(version)s", params={"version": value}
)
@object_id.validator
def check_object_id(self, attribute, value):
if len(value) != 20:
raise ValidationError(
"Invalid SWHID: invalid checksum: %(object_id)s",
params={"object_id": hash_to_hex(value)},
)
def __str__(self) -> str:
return SWHID_SEP.join(
[
self.namespace,
str(self.scheme_version),
self.object_type.value,
hash_to_hex(self.object_id),
]
)
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}.from_string('{self}')"
+
@classmethod
def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID:
parts = _parse_swhid(s)
if parts.pop("qualifiers"):
raise ValidationError(f"{cls.__name__} does not support qualifiers.")
try:
return cls(**parts)
except ValueError as e:
raise ValidationError(
"ValueError: %(args)s", params={"args": e.args}
) from None
-@attr.s(frozen=True, kw_only=True)
+@attr.s(frozen=True, kw_only=True, repr=False)
class CoreSWHID(_BaseSWHID[ObjectType]):
"""
Dataclass holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID).
Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs
with no qualifiers.
Raises:
swh.model.exceptions.ValidationError: In case of invalid object type or id
To get the raw SWHID string from an instance of this class,
use the :func:`str` function:
>>> swhid = CoreSWHID(
... object_type=ObjectType.CONTENT,
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
... )
>>> str(swhid)
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
And vice-versa with :meth:`CoreSWHID.from_string`:
>>> swhid == CoreSWHID.from_string(
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
... )
True
"""
object_type = attr.ib(
type=ObjectType, validator=type_validator(), converter=ObjectType
)
"""the type of object the identifier points to"""
def to_extended(self) -> ExtendedSWHID:
"""Converts this CoreSWHID into an ExtendedSWHID.
As ExtendedSWHID is a superset of CoreSWHID, this is lossless."""
return ExtendedSWHID(
namespace=self.namespace,
scheme_version=self.scheme_version,
object_type=ExtendedObjectType(self.object_type.value),
object_id=self.object_id,
)
def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]:
if swhid is None or isinstance(swhid, CoreSWHID):
return swhid
else:
return CoreSWHID.from_string(swhid)
def _parse_lines_qualifier(
lines: Union[str, Tuple[int, Optional[int]], None]
) -> Optional[Tuple[int, Optional[int]]]:
try:
if lines is None or isinstance(lines, tuple):
return lines
elif "-" in lines:
(from_, to) = lines.split("-", 2)
return (int(from_), int(to))
else:
return (int(lines), None)
except ValueError:
raise ValidationError(
"Invalid format for the lines qualifier: %(lines)s", params={"lines": lines}
)
def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]:
if path is None or isinstance(path, bytes):
return path
else:
return urllib.parse.unquote_to_bytes(path)
-@attr.s(frozen=True, kw_only=True)
+@attr.s(frozen=True, kw_only=True, repr=False)
class QualifiedSWHID(_BaseSWHID[ObjectType]):
"""
Dataclass holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID)
Raises:
swh.model.exceptions.ValidationError: In case of invalid object type or id
To get the raw SWHID string from an instance of this class,
use the :func:`str` function:
>>> swhid = QualifiedSWHID(
... object_type=ObjectType.CONTENT,
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
... lines=(5, 10),
... )
>>> str(swhid)
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10'
And vice-versa with :meth:`QualifiedSWHID.from_string`:
>>> swhid == QualifiedSWHID.from_string(
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10"
... )
True
"""
object_type = attr.ib(
type=ObjectType, validator=type_validator(), converter=ObjectType
)
"""the type of object the identifier points to"""
# qualifiers:
origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
"""the software origin where an object has been found or observed in the wild,
as an URI"""
visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid)
"""the core identifier of a snapshot corresponding to a specific visit
of a repository containing the designated object"""
anchor = attr.ib(
type=Optional[CoreSWHID],
default=None,
validator=type_validator(),
converter=_parse_core_swhid,
)
"""a designated node in the Merkle DAG relative to which a path to the object
is specified, as the core identifier of a directory, a revision, a release,
or a snapshot"""
path = attr.ib(
type=Optional[bytes],
default=None,
validator=type_validator(),
converter=_parse_path_qualifier,
)
"""the absolute file path, from the root directory associated to the anchor node,
to the object; when the anchor denotes a directory or a revision, and almost always
when it’s a release, the root directory is uniquely determined;
when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD
(possibly indirectly), and undefined if such a reference is missing"""
lines = attr.ib(
type=Optional[Tuple[int, Optional[int]]],
default=None,
validator=type_validator(),
converter=_parse_lines_qualifier,
)
"""lines: line number(s) of interest, usually within a content object"""
@visit.validator
def check_visit(self, attribute, value):
if value and value.object_type != ObjectType.SNAPSHOT:
raise ValidationError(
"The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'",
params={"type": value.object_type.value},
)
@anchor.validator
def check_anchor(self, attribute, value):
if value and value.object_type not in (
ObjectType.DIRECTORY,
ObjectType.REVISION,
ObjectType.RELEASE,
ObjectType.SNAPSHOT,
):
raise ValidationError(
"The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, "
"not '%s(type)s'",
params={"type": value.object_type.value},
)
def qualifiers(self) -> Dict[str, str]:
origin = self.origin
if origin:
unescaped_origin = origin
origin = origin.replace(";", "%3B")
assert urllib.parse.unquote_to_bytes(
origin
) == urllib.parse.unquote_to_bytes(
unescaped_origin
), "Escaping ';' in the origin qualifier corrupted the origin URL."
d: Dict[str, Optional[str]] = {
"origin": origin,
"visit": str(self.visit) if self.visit else None,
"anchor": str(self.anchor) if self.anchor else None,
"path": (
urllib.parse.quote_from_bytes(self.path)
if self.path is not None
else None
),
"lines": (
"-".join(str(line) for line in self.lines if line is not None)
if self.lines
else None
),
}
return {k: v for (k, v) in d.items() if v is not None}
def __str__(self) -> str:
swhid = SWHID_SEP.join(
[
self.namespace,
str(self.scheme_version),
self.object_type.value,
hash_to_hex(self.object_id),
]
)
qualifiers = self.qualifiers()
if qualifiers:
for k, v in qualifiers.items():
swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v)
return swhid
+ def __repr__(self) -> str:
+ return super().__repr__()
+
@classmethod
def from_string(cls, s: str) -> QualifiedSWHID:
parts = _parse_swhid(s)
qualifiers = parts.pop("qualifiers")
invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS
if invalid_qualifiers:
raise ValidationError(
"Invalid qualifier(s): %(qualifiers)s",
params={"qualifiers": ", ".join(invalid_qualifiers)},
)
try:
return QualifiedSWHID(**parts, **qualifiers)
except ValueError as e:
raise ValidationError(
"ValueError: %(args)s", params={"args": e.args}
) from None
-@attr.s(frozen=True, kw_only=True)
+@attr.s(frozen=True, kw_only=True, repr=False)
class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]):
"""
Dataclass holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID).
It extends `CoreSWHID`, by allowing non-standard object types; and should
only be used internally to Software Heritage.
Raises:
swh.model.exceptions.ValidationError: In case of invalid object type or id
To get the raw SWHID string from an instance of this class,
use the :func:`str` function:
>>> swhid = ExtendedSWHID(
... object_type=ExtendedObjectType.CONTENT,
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
... )
>>> str(swhid)
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
And vice-versa with :meth:`CoreSWHID.from_string`:
>>> swhid == ExtendedSWHID.from_string(
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
... )
True
"""
object_type = attr.ib(
type=ExtendedObjectType,
validator=type_validator(),
converter=ExtendedObjectType,
)
"""the type of object the identifier points to"""
def _parse_swhid(swhid: str) -> Dict[str, Any]:
"""Parse a Software Heritage identifier (SWHID) from string (see:
:ref:`persistent-identifiers`.)
This is for internal use; use :meth:`CoreSWHID.from_string`,
:meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead,
as they perform validation and build a dataclass.
Args:
swhid (str): A persistent identifier
Raises:
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID
"""
m = SWHID_RE.fullmatch(swhid)
if not m:
raise ValidationError(
"Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid}
)
parts: Dict[str, Any] = m.groupdict()
qualifiers_raw = parts["qualifiers"]
parts["qualifiers"] = {}
if qualifiers_raw:
for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP):
try:
k, v = qualifier.split("=", maxsplit=1)
parts["qualifiers"][k] = v
except ValueError:
raise ValidationError(
"Invalid SWHID: invalid qualifier: %(qualifier)s",
params={"qualifier": qualifier},
)
parts["scheme_version"] = int(parts["scheme_version"])
parts["object_id"] = hash_to_bytes(parts["object_id"])
return parts
diff --git a/swh/model/tests/test_hashutil.py b/swh/model/tests/test_hashutil.py
index 59787a2..c864bd8 100644
--- a/swh/model/tests/test_hashutil.py
+++ b/swh/model/tests/test_hashutil.py
@@ -1,350 +1,408 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import contextlib
import hashlib
import io
import os
import tempfile
-import unittest
from unittest.mock import patch
+import pytest
+
from swh.model import hashutil
-from swh.model.hashutil import MultiHash
+from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex
@contextlib.contextmanager
def patch_blake2(function_name):
try:
with patch(function_name) as mock:
yield mock
finally:
# mocking blake2 inserts mock objects in the cache; we need
# to clean it before the next test runs
hashutil._blake2_hash_cache.clear()
-class BaseHashutil(unittest.TestCase):
- def setUp(self):
- # Reset function cache
- hashutil._blake2_hash_cache = {}
+@pytest.fixture(autouse=True)
+def blake2_hash_cache_reset():
+ # Reset function cache
+ hashutil._blake2_hash_cache = {}
+
- self.data = b"1984\n"
- self.hex_checksums = {
+@pytest.fixture
+def hash_test_data():
+ class HashTestData:
+
+ data = b"1984\n"
+ hex_checksums = {
"sha1": "62be35bf00ff0c624f4a621e2ea5595a049e0731",
"sha1_git": "568aaf43d83b2c3df8067f3bedbb97d83260be6d",
"sha256": "26602113b4b9afd9d55466b08580d3c2"
"4a9b50ee5b5866c0d91fab0e65907311",
"blake2s256": "63cfb259e1fdb485bc5c55749697a6b21ef31fb7445f6c78a"
"c9422f9f2dc8906",
}
- self.checksums = {
- type: bytes.fromhex(cksum) for type, cksum in self.hex_checksums.items()
+ checksums = {
+ type: bytes.fromhex(cksum) for type, cksum in hex_checksums.items()
}
- self.bytehex_checksums = {
- type: hashutil.hash_to_bytehex(cksum)
- for type, cksum in self.checksums.items()
+ bytehex_checksums = {
+ type: hashutil.hash_to_bytehex(cksum) for type, cksum in checksums.items()
}
- self.git_hex_checksums = {
- "blob": self.hex_checksums["sha1_git"],
+ git_hex_checksums = {
+ "blob": hex_checksums["sha1_git"],
"tree": "5b2e883aa33d2efab98442693ea4dd5f1b8871b0",
"commit": "79e4093542e72f0fcb7cbd75cb7d270f9254aa8f",
"tag": "d6bf62466f287b4d986c545890716ce058bddf67",
}
- self.git_checksums = {
- type: bytes.fromhex(cksum) for type, cksum in self.git_hex_checksums.items()
+ git_checksums = {
+ type: bytes.fromhex(cksum) for type, cksum in git_hex_checksums.items()
}
+ return HashTestData
-class MultiHashTest(BaseHashutil):
- def test_multi_hash_data(self):
- checksums = MultiHash.from_data(self.data).digest()
- self.assertEqual(checksums, self.checksums)
- self.assertFalse("length" in checksums)
- def test_multi_hash_data_with_length(self):
- expected_checksums = self.checksums.copy()
- expected_checksums["length"] = len(self.data)
+def test_multi_hash_data(hash_test_data):
+ checksums = MultiHash.from_data(hash_test_data.data).digest()
+ assert checksums == hash_test_data.checksums
+ assert "length" not in checksums
- algos = set(["length"]).union(hashutil.DEFAULT_ALGORITHMS)
- checksums = MultiHash.from_data(self.data, hash_names=algos).digest()
- self.assertEqual(checksums, expected_checksums)
- self.assertTrue("length" in checksums)
+def test_multi_hash_data_with_length(hash_test_data):
+ expected_checksums = hash_test_data.checksums.copy()
+ expected_checksums["length"] = len(hash_test_data.data)
- def test_multi_hash_data_unknown_hash(self):
- with self.assertRaises(ValueError) as cm:
- MultiHash.from_data(self.data, ["unknown-hash"])
+ algos = set(["length"]).union(hashutil.DEFAULT_ALGORITHMS)
+ checksums = MultiHash.from_data(hash_test_data.data, hash_names=algos).digest()
- self.assertIn("Unexpected hashing algorithm", cm.exception.args[0])
- self.assertIn("unknown-hash", cm.exception.args[0])
+ assert checksums == expected_checksums
+ assert "length" in checksums
- def test_multi_hash_file(self):
- fobj = io.BytesIO(self.data)
- checksums = MultiHash.from_file(fobj, length=len(self.data)).digest()
- self.assertEqual(checksums, self.checksums)
+def test_multi_hash_data_unknown_hash(hash_test_data):
+ with pytest.raises(ValueError, match="Unexpected hashing algorithm.*unknown-hash"):
+ MultiHash.from_data(hash_test_data.data, ["unknown-hash"])
- def test_multi_hash_file_hexdigest(self):
- fobj = io.BytesIO(self.data)
- length = len(self.data)
- checksums = MultiHash.from_file(fobj, length=length).hexdigest()
- self.assertEqual(checksums, self.hex_checksums)
- def test_multi_hash_file_bytehexdigest(self):
- fobj = io.BytesIO(self.data)
- length = len(self.data)
- checksums = MultiHash.from_file(fobj, length=length).bytehexdigest()
- self.assertEqual(checksums, self.bytehex_checksums)
+def test_multi_hash_file(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
- def test_multi_hash_file_missing_length(self):
- fobj = io.BytesIO(self.data)
- with self.assertRaises(ValueError) as cm:
- MultiHash.from_file(fobj, hash_names=["sha1_git"])
+ checksums = MultiHash.from_file(fobj, length=len(hash_test_data.data)).digest()
+ assert checksums == hash_test_data.checksums
- self.assertIn("Missing length", cm.exception.args[0])
- def test_multi_hash_path(self):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(self.data)
+def test_multi_hash_file_hexdigest(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
+ length = len(hash_test_data.data)
+ checksums = MultiHash.from_file(fobj, length=length).hexdigest()
+ assert checksums == hash_test_data.hex_checksums
- hashes = MultiHash.from_path(f.name).digest()
- os.remove(f.name)
- self.assertEqual(self.checksums, hashes)
+def test_multi_hash_file_bytehexdigest(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
+ length = len(hash_test_data.data)
+ checksums = MultiHash.from_file(fobj, length=length).bytehexdigest()
+ assert checksums == hash_test_data.bytehex_checksums
-class Hashutil(BaseHashutil):
- def test_hash_git_data(self):
- checksums = {
- git_type: hashutil.hash_git_data(self.data, git_type)
- for git_type in self.git_checksums
- }
+def test_multi_hash_file_with_md5(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
- self.assertEqual(checksums, self.git_checksums)
-
- def test_hash_git_data_unknown_git_type(self):
- with self.assertRaises(ValueError) as cm:
- hashutil.hash_git_data(self.data, "unknown-git-type")
-
- self.assertIn("Unexpected git object type", cm.exception.args[0])
- self.assertIn("unknown-git-type", cm.exception.args[0])
-
- def test_hash_to_hex(self):
- for type in self.checksums:
- hex = self.hex_checksums[type]
- hash = self.checksums[type]
- self.assertEqual(hashutil.hash_to_hex(hex), hex)
- self.assertEqual(hashutil.hash_to_hex(hash), hex)
-
- def test_hash_to_bytes(self):
- for type in self.checksums:
- hex = self.hex_checksums[type]
- hash = self.checksums[type]
- self.assertEqual(hashutil.hash_to_bytes(hex), hash)
- self.assertEqual(hashutil.hash_to_bytes(hash), hash)
-
- def test_hash_to_bytehex(self):
- for algo in self.checksums:
- self.assertEqual(
- self.hex_checksums[algo].encode("ascii"),
- hashutil.hash_to_bytehex(self.checksums[algo]),
- )
-
- def test_bytehex_to_hash(self):
- for algo in self.checksums:
- self.assertEqual(
- self.checksums[algo],
- hashutil.bytehex_to_hash(self.hex_checksums[algo].encode()),
- )
-
- def test_new_hash_unsupported_hashing_algorithm(self):
- try:
- hashutil._new_hash("blake2:10")
- except ValueError as e:
- self.assertEqual(
- str(e),
- "Unexpected hashing algorithm blake2:10, "
- "expected one of blake2b512, blake2s256, "
- "sha1, sha1_git, sha256",
- )
-
- @patch("hashlib.new")
- def test_new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new):
- if "blake2b512" not in hashlib.algorithms_available:
- self.skipTest("blake2b512 not built-in")
- mock_hashlib_new.return_value = sentinel = object()
+ checksums = MultiHash.from_file(
+ fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=len(hash_test_data.data)
+ ).digest()
+ md5sum = {"md5": hashlib.md5(hash_test_data.data).digest()}
+ assert checksums == {**hash_test_data.checksums, **md5sum}
- h = hashutil._new_hash("blake2b512")
- self.assertIs(h, sentinel)
- mock_hashlib_new.assert_called_with("blake2b512")
+def test_multi_hash_file_hexdigest_with_md5(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
+ length = len(hash_test_data.data)
+ checksums = MultiHash.from_file(
+ fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length
+ ).hexdigest()
+ md5sum = {"md5": hashlib.md5(hash_test_data.data).hexdigest()}
+ assert checksums == {**hash_test_data.hex_checksums, **md5sum}
- @patch("hashlib.new")
- def test_new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new):
- if "blake2s256" not in hashlib.algorithms_available:
- self.skipTest("blake2s256 not built-in")
- mock_hashlib_new.return_value = sentinel = object()
- h = hashutil._new_hash("blake2s256")
+def test_multi_hash_file_bytehexdigest_with_md5(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
+ length = len(hash_test_data.data)
+ checksums = MultiHash.from_file(
+ fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length
+ ).bytehexdigest()
+ md5sum = {"md5": hash_to_bytehex(hashlib.md5(hash_test_data.data).digest())}
+ assert checksums == {**hash_test_data.bytehex_checksums, **md5sum}
+
+
+def test_multi_hash_file_missing_length(hash_test_data):
+ fobj = io.BytesIO(hash_test_data.data)
+ with pytest.raises(ValueError, match="Missing length"):
+ MultiHash.from_file(fobj, hash_names=["sha1_git"])
+
+
+def test_multi_hash_path(hash_test_data):
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write(hash_test_data.data)
+
+ hashes = MultiHash.from_path(f.name).digest()
+ os.remove(f.name)
+
+ assert hash_test_data.checksums == hashes
+
+
+def test_hash_git_data(hash_test_data):
+ checksums = {
+ git_type: hashutil.hash_git_data(hash_test_data.data, git_type)
+ for git_type in hash_test_data.git_checksums
+ }
+
+ assert checksums == hash_test_data.git_checksums
+
+
+def test_hash_git_data_unknown_git_type(hash_test_data):
+ with pytest.raises(
+ ValueError, match="Unexpected git object type.*unknown-git-type"
+ ):
+ hashutil.hash_git_data(hash_test_data.data, "unknown-git-type")
+
+
+def test_hash_to_hex(hash_test_data):
+ for type in hash_test_data.checksums:
+ hex = hash_test_data.hex_checksums[type]
+ hash = hash_test_data.checksums[type]
+ assert hashutil.hash_to_hex(hex) == hex
+ assert hashutil.hash_to_hex(hash) == hex
+
+
+def test_hash_to_bytes(hash_test_data):
+ for type in hash_test_data.checksums:
+ hex = hash_test_data.hex_checksums[type]
+ hash = hash_test_data.checksums[type]
+ assert hashutil.hash_to_bytes(hex) == hash
+ assert hashutil.hash_to_bytes(hash) == hash
+
+
+def test_hash_to_bytehex(hash_test_data):
+ for algo in hash_test_data.checksums:
+ hex_checksum = hash_test_data.hex_checksums[algo].encode("ascii")
+ assert hex_checksum == hashutil.hash_to_bytehex(hash_test_data.checksums[algo])
+
+
+def test_bytehex_to_hash(hash_test_data):
+ for algo in hash_test_data.checksums:
+ assert hash_test_data.checksums[algo] == hashutil.bytehex_to_hash(
+ hash_test_data.hex_checksums[algo].encode()
+ )
- self.assertIs(h, sentinel)
- mock_hashlib_new.assert_called_with("blake2s256")
- def test_new_hash_blake2b_builtin(self):
- removed_hash = False
+def test_new_hash_unsupported_hashing_algorithm():
+ expected_message = (
+ "Unexpected hashing algorithm blake2:10, "
+ "expected one of blake2b512, blake2s256, "
+ "md5, sha1, sha1_git, sha256"
+ )
+ with pytest.raises(ValueError, match=expected_message):
+ hashutil._new_hash("blake2:10")
- try:
- if "blake2b512" in hashlib.algorithms_available:
- removed_hash = True
- hashlib.algorithms_available.remove("blake2b512")
- if "blake2b" not in hashlib.algorithms_available:
- self.skipTest("blake2b not built in")
- with patch_blake2("hashlib.blake2b") as mock_blake2b:
- mock_blake2b.return_value = sentinel = object()
+@pytest.mark.skipif(
+ "blake2b512" not in hashlib.algorithms_available, reason="blake2b512 not built-in"
+)
+@patch("hashlib.new")
+def test_new_hash_blake2b_blake2b512_builtin(mock_hashlib_new):
+ mock_hashlib_new.return_value = sentinel = object()
- h = hashutil._new_hash("blake2b512")
+ h = hashutil._new_hash("blake2b512")
- self.assertIs(h, sentinel)
- mock_blake2b.assert_called_with(digest_size=512 // 8)
- finally:
- if removed_hash:
- hashlib.algorithms_available.add("blake2b512")
+ assert h is sentinel
+ mock_hashlib_new.assert_called_with("blake2b512")
- def test_new_hash_blake2s_builtin(self):
- removed_hash = False
- try:
- if "blake2s256" in hashlib.algorithms_available:
- removed_hash = True
- hashlib.algorithms_available.remove("blake2s256")
- if "blake2s" not in hashlib.algorithms_available:
- self.skipTest("blake2s not built in")
+@pytest.mark.skipif(
+ "blake2s256" not in hashlib.algorithms_available, reason="blake2s256 not built-in"
+)
+@patch("hashlib.new")
+def test_new_hash_blake2s_blake2s256_builtin(mock_hashlib_new):
+ mock_hashlib_new.return_value = sentinel = object()
- with patch_blake2("hashlib.blake2s") as mock_blake2s:
- mock_blake2s.return_value = sentinel = object()
+ h = hashutil._new_hash("blake2s256")
- h = hashutil._new_hash("blake2s256")
+ assert h is sentinel
+ mock_hashlib_new.assert_called_with("blake2s256")
- self.assertIs(h, sentinel)
- mock_blake2s.assert_called_with(digest_size=256 // 8)
- finally:
- if removed_hash:
- hashlib.algorithms_available.add("blake2s256")
- def test_new_hash_blake2b_pyblake2(self):
+@pytest.mark.skipif(
+ "blake2b" not in hashlib.algorithms_available, reason="blake2b not built-in"
+)
+def test_new_hash_blake2b_builtin():
+ removed_hash = False
+
+ try:
if "blake2b512" in hashlib.algorithms_available:
- self.skipTest("blake2b512 built in")
- if "blake2b" in hashlib.algorithms_available:
- self.skipTest("blake2b built in")
+ removed_hash = True
+ hashlib.algorithms_available.remove("blake2b512")
- with patch_blake2("pyblake2.blake2b") as mock_blake2b:
+ with patch_blake2("hashlib.blake2b") as mock_blake2b:
mock_blake2b.return_value = sentinel = object()
h = hashutil._new_hash("blake2b512")
- self.assertIs(h, sentinel)
+ assert h is sentinel
mock_blake2b.assert_called_with(digest_size=512 // 8)
+ finally:
+ if removed_hash:
+ hashlib.algorithms_available.add("blake2b512")
+
+
+@pytest.mark.skipif(
+ "blake2s" not in hashlib.algorithms_available, reason="blake2s not built-in"
+)
+def test_new_hash_blake2s_builtin():
+ removed_hash = False
- def test_new_hash_blake2s_pyblake2(self):
+ try:
if "blake2s256" in hashlib.algorithms_available:
- self.skipTest("blake2s256 built in")
- if "blake2s" in hashlib.algorithms_available:
- self.skipTest("blake2s built in")
+ removed_hash = True
+ hashlib.algorithms_available.remove("blake2s256")
- with patch_blake2("pyblake2.blake2s") as mock_blake2s:
+ with patch_blake2("hashlib.blake2s") as mock_blake2s:
mock_blake2s.return_value = sentinel = object()
h = hashutil._new_hash("blake2s256")
- self.assertIs(h, sentinel)
+ assert h is sentinel
mock_blake2s.assert_called_with(digest_size=256 // 8)
+ finally:
+ if removed_hash:
+ hashlib.algorithms_available.add("blake2s256")
+
+
+@pytest.mark.skipif(
+ "blake2b512" in hashlib.algorithms_available, reason="blake2b512 built-in"
+)
+@pytest.mark.skipif(
+ "blake2b" in hashlib.algorithms_available, reason="blake2b built-in"
+)
+def test_new_hash_blake2b_pyblake2():
+ with patch_blake2("pyblake2.blake2b") as mock_blake2b:
+ mock_blake2b.return_value = sentinel = object()
+
+ h = hashutil._new_hash("blake2b512")
+ assert h is sentinel
+ mock_blake2b.assert_called_with(digest_size=512 // 8)
-class HashlibGit(unittest.TestCase):
- def setUp(self):
- self.blob_data = b"42\n"
- self.tree_data = b"".join(
+@pytest.mark.skipif(
+ "blake2s256" in hashlib.algorithms_available, reason="blake2s256 built-in"
+)
+@pytest.mark.skipif(
+ "blake2s" in hashlib.algorithms_available, reason="blake2s built-in"
+)
+def test_new_hash_blake2s_pyblake2():
+ with patch_blake2("pyblake2.blake2s") as mock_blake2s:
+ mock_blake2s.return_value = sentinel = object()
+
+ h = hashutil._new_hash("blake2s256")
+
+ assert h is sentinel
+ mock_blake2s.assert_called_with(digest_size=256 // 8)
+
+
+@pytest.fixture
+def hashgit_test_data():
+ class HashGitTestData:
+ blob_data = b"42\n"
+
+ tree_data = b"".join(
[
b"40000 barfoo\0",
bytes.fromhex("c3020f6bf135a38c6df" "3afeb5fb38232c5e07087"),
b"100644 blah\0",
bytes.fromhex("63756ef0df5e4f10b6efa" "33cfe5c758749615f20"),
b"100644 hello\0",
bytes.fromhex("907b308167f0880fb2a" "5c0e1614bb0c7620f9dc3"),
]
)
- self.commit_data = b"""\
+ commit_data = b"""\
tree 1c61f7259dcb770f46b194d941df4f08ff0a3970
author Antoine R. Dumont (@ardumont) 1444054085 +0200
committer Antoine R. Dumont (@ardumont) 1444054085 +0200
initial
""" # noqa
- self.tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241
+
+ tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241
type commit
tag 0.0.1
tagger Antoine R. Dumont (@ardumont) 1444225145 +0200
blah
""".encode(
"utf-8"
) # NOQA
- self.checksums = {
+ checksums = {
"blob_sha1_git": bytes.fromhex(
"d81cc0710eb6cf9efd5b920a8453e1" "e07157b6cd"
),
"tree_sha1_git": bytes.fromhex(
"ac212302c45eada382b27bfda795db" "121dacdb1c"
),
"commit_sha1_git": bytes.fromhex(
"e960570b2e6e2798fa4cfb9af2c399" "d629189653"
),
"tag_sha1_git": bytes.fromhex(
"bc2b99ba469987bcf1272c189ed534" "e9e959f120"
),
}
- def test_unknown_header_type(self):
- with self.assertRaises(ValueError) as cm:
- hashutil.hash_git_data(b"any-data", "some-unknown-type")
+ return HashGitTestData
+
+
+def test_unknown_header_type():
+ with pytest.raises(ValueError, match="Unexpected git object type"):
+ hashutil.hash_git_data(b"any-data", "some-unknown-type")
+
+
+def test_hashdata_content(hashgit_test_data):
+ # when
+ actual_hash = hashutil.hash_git_data(hashgit_test_data.blob_data, git_type="blob")
+
+ # then
+ assert actual_hash == hashgit_test_data.checksums["blob_sha1_git"]
- self.assertIn("Unexpected git object type", cm.exception.args[0])
- def test_hashdata_content(self):
- # when
- actual_hash = hashutil.hash_git_data(self.blob_data, git_type="blob")
+def test_hashdata_tree(hashgit_test_data):
+ # when
+ actual_hash = hashutil.hash_git_data(hashgit_test_data.tree_data, git_type="tree")
- # then
- self.assertEqual(actual_hash, self.checksums["blob_sha1_git"])
+ # then
+ assert actual_hash == hashgit_test_data.checksums["tree_sha1_git"]
- def test_hashdata_tree(self):
- # when
- actual_hash = hashutil.hash_git_data(self.tree_data, git_type="tree")
- # then
- self.assertEqual(actual_hash, self.checksums["tree_sha1_git"])
+def test_hashdata_revision(hashgit_test_data):
+ # when
+ actual_hash = hashutil.hash_git_data(
+ hashgit_test_data.commit_data, git_type="commit"
+ )
- def test_hashdata_revision(self):
- # when
- actual_hash = hashutil.hash_git_data(self.commit_data, git_type="commit")
+ # then
+ assert actual_hash == hashgit_test_data.checksums["commit_sha1_git"]
- # then
- self.assertEqual(actual_hash, self.checksums["commit_sha1_git"])
- def test_hashdata_tag(self):
- # when
- actual_hash = hashutil.hash_git_data(self.tag_data, git_type="tag")
+def test_hashdata_tag(hashgit_test_data):
+ # when
+ actual_hash = hashutil.hash_git_data(hashgit_test_data.tag_data, git_type="tag")
- # then
- self.assertEqual(actual_hash, self.checksums["tag_sha1_git"])
+ # then
+ assert actual_hash == hashgit_test_data.checksums["tag_sha1_git"]
diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py
index 781cfa4..47f6d3c 100644
--- a/swh/model/tests/test_model.py
+++ b/swh/model/tests/test_model.py
@@ -1,1090 +1,1317 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import collections
import copy
import datetime
+from typing import Any, List, Optional, Tuple, Union
import attr
from attrs_strict import AttributeTypeError
+import dateutil
from hypothesis import given
from hypothesis.strategies import binary
import pytest
+from swh.model.collections import ImmutableDict
+from swh.model.from_disk import DentryPerms
from swh.model.hashutil import MultiHash, hash_to_bytes
import swh.model.hypothesis_strategies as strategies
+import swh.model.model
from swh.model.model import (
BaseModel,
Content,
Directory,
+ DirectoryEntry,
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
MissingData,
Origin,
OriginVisit,
OriginVisitStatus,
Person,
RawExtrinsicMetadata,
Release,
Revision,
SkippedContent,
Snapshot,
+ TargetType,
Timestamp,
TimestampWithTimezone,
+ type_validator,
)
+import swh.model.swhids
from swh.model.swhids import CoreSWHID, ExtendedSWHID, ObjectType
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.model.tests.test_identifiers import (
TS_DATETIMES,
TS_TIMEZONES,
directory_example,
metadata_example,
release_example,
revision_example,
snapshot_example,
)
EXAMPLE_HASH = hash_to_bytes("94a9ed024d3859793618152ea559a168bbcbb5e2")
@given(strategies.objects())
def test_todict_inverse_fromdict(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
if obj_type in ("origin", "origin_visit"):
return
obj_as_dict = obj.to_dict()
obj_as_dict_copy = copy.deepcopy(obj_as_dict)
# Check the composition of to_dict and from_dict is the identity
assert obj == type(obj).from_dict(obj_as_dict)
# Check from_dict() does not change the input dict
assert obj_as_dict == obj_as_dict_copy
# Check the composition of from_dict and to_dict is the identity
assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict()
+@given(strategies.objects())
+def test_repr(objtype_and_obj):
+ """Checks every model object has a working repr(), and that it can be eval()uated
+ (so that printed objects can be copy-pasted to write test cases.)"""
+ (obj_type, obj) = objtype_and_obj
+
+ r = repr(obj)
+ env = {
+ "tzutc": lambda: datetime.timezone.utc,
+ "tzfile": dateutil.tz.tzfile,
+ "hash_to_bytes": hash_to_bytes,
+ **swh.model.swhids.__dict__,
+ **swh.model.model.__dict__,
+ }
+ assert eval(r, env) == obj
+
+
+@attr.s
+class Cls1:
+ pass
+
+
+@attr.s
+class Cls2(Cls1):
+ pass
+
+
+_custom_namedtuple = collections.namedtuple("_custom_namedtuple", "a b")
+
+
+class _custom_tuple(tuple):
+ pass
+
+
+# List of (type, valid_values, invalid_values)
+_TYPE_VALIDATOR_PARAMETERS: List[Tuple[Any, List[Any], List[Any]]] = [
+ # base types:
+ (
+ bool,
+ [True, False],
+ [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ("foo",), ImmutableDict()],
+ ),
+ (
+ int,
+ [-1, 0, 1, 42, 1000, DentryPerms.directory, True, False],
+ [None, "123", 0.0, (), ImmutableDict()],
+ ),
+ (
+ float,
+ [-1.0, 0.0, 1.0, float("infinity"), float("NaN")],
+ [True, False, None, 1, "1.2", (), ImmutableDict()],
+ ),
+ (
+ bytes,
+ [b"", b"123"],
+ [None, bytearray(b"\x12\x34"), "123", 0, 123, (), (1, 2, 3), ImmutableDict()],
+ ),
+ (str, ["", "123"], [None, b"123", b"", 0, (), (1, 2, 3), ImmutableDict()]),
+ # unions:
+ (
+ Optional[int],
+ [None, -1, 0, 1, 42, 1000, DentryPerms.directory],
+ ["123", 0.0, (), ImmutableDict()],
+ ),
+ (
+ Optional[bytes],
+ [None, b"", b"123"],
+ ["123", "", 0, (), (1, 2, 3), ImmutableDict()],
+ ),
+ (
+ Union[str, bytes],
+ ["", "123", b"123", b""],
+ [None, 0, (), (1, 2, 3), ImmutableDict()],
+ ),
+ (
+ Union[str, bytes, None],
+ ["", "123", b"123", b"", None],
+ [0, (), (1, 2, 3), ImmutableDict()],
+ ),
+ # tuples
+ (
+ Tuple[str, str],
+ [("foo", "bar"), ("", ""), _custom_namedtuple("", ""), _custom_tuple(("", ""))],
+ [("foo",), ("foo", "bar", "baz"), ("foo", 42), (42, "foo")],
+ ),
+ (
+ Tuple[str, ...],
+ [
+ ("foo",),
+ ("foo", "bar"),
+ ("", ""),
+ ("foo", "bar", "baz"),
+ _custom_namedtuple("", ""),
+ _custom_tuple(("", "")),
+ ],
+ [("foo", 42), (42, "foo")],
+ ),
+ # composite generic:
+ (
+ Tuple[Union[str, int], Union[str, int]],
+ [("foo", "foo"), ("foo", 42), (42, "foo"), (42, 42)],
+ [("foo", b"bar"), (b"bar", "foo")],
+ ),
+ (
+ Union[Tuple[str, str], Tuple[int, int]],
+ [("foo", "foo"), (42, 42)],
+ [("foo", b"bar"), (b"bar", "foo"), ("foo", 42), (42, "foo")],
+ ),
+ (
+ Tuple[Tuple[bytes, bytes], ...],
+ [(), ((b"foo", b"bar"),), ((b"foo", b"bar"), (b"baz", b"qux"))],
+ [((b"foo", "bar"),), ((b"foo", b"bar"), ("baz", b"qux"))],
+ ),
+ # standard types:
+ (
+ datetime.datetime,
+ [datetime.datetime.now(), datetime.datetime.now(tz=datetime.timezone.utc)],
+ [None, 123],
+ ),
+ # ImmutableDict
+ (
+ ImmutableDict[str, int],
+ [
+ ImmutableDict(),
+ ImmutableDict({"foo": 42}),
+ ImmutableDict({"foo": 42, "bar": 123}),
+ ],
+ [ImmutableDict({"foo": "bar"}), ImmutableDict({42: 123})],
+ ),
+ # Any:
+ (object, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],),
+ (Any, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],),
+ (
+ ImmutableDict[Any, int],
+ [
+ ImmutableDict(),
+ ImmutableDict({"foo": 42}),
+ ImmutableDict({"foo": 42, "bar": 123}),
+ ImmutableDict({42: 123}),
+ ],
+ [ImmutableDict({"foo": "bar"})],
+ ),
+ (
+ ImmutableDict[str, Any],
+ [
+ ImmutableDict(),
+ ImmutableDict({"foo": 42}),
+ ImmutableDict({"foo": "bar"}),
+ ImmutableDict({"foo": 42, "bar": 123}),
+ ],
+ [ImmutableDict({42: 123})],
+ ),
+ # attr objects:
+ (
+ Timestamp,
+ [Timestamp(seconds=123, microseconds=0),],
+ [None, "2021-09-28T11:27:59", 123],
+ ),
+ (Cls1, [Cls1(), Cls2()], [None, b"abcd"],),
+ # enums:
+ (
+ TargetType,
+ [TargetType.CONTENT, TargetType.ALIAS],
+ ["content", "alias", 123, None],
+ ),
+]
+
+
+@pytest.mark.parametrize(
+ "type_,value",
+ [
+ pytest.param(type_, value, id=f"type={type_}, value={value}")
+ for (type_, values, _) in _TYPE_VALIDATOR_PARAMETERS
+ for value in values
+ ],
+)
+def test_type_validator_valid(type_, value):
+ type_validator()(None, attr.ib(type=type_), value)
+
+
+@pytest.mark.parametrize(
+ "type_,value",
+ [
+ pytest.param(type_, value, id=f"type={type_}, value={value}")
+ for (type_, _, values) in _TYPE_VALIDATOR_PARAMETERS
+ for value in values
+ ],
+)
+def test_type_validator_invalid(type_, value):
+ with pytest.raises(AttributeTypeError):
+ type_validator()(None, attr.ib(type=type_), value)
+
+
@pytest.mark.parametrize("object_type, objects", TEST_OBJECTS.items())
def test_swh_model_todict_fromdict(object_type, objects):
"""checks model objects in swh_model_data are in correct shape"""
assert objects
for obj in objects:
# Check the composition of from_dict and to_dict is the identity
obj_as_dict = obj.to_dict()
assert obj == type(obj).from_dict(obj_as_dict)
assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict()
def test_unique_key():
url = "http://example.org/"
date = datetime.datetime.now(tz=datetime.timezone.utc)
id_ = b"42" * 10
assert Origin(url=url).unique_key() == {"url": url}
assert OriginVisit(origin=url, date=date, type="git").unique_key() == {
"origin": url,
"date": str(date),
}
assert OriginVisitStatus(
origin=url, visit=42, date=date, status="created", snapshot=None
).unique_key() == {"origin": url, "visit": "42", "date": str(date),}
assert Snapshot.from_dict({**snapshot_example, "id": id_}).unique_key() == id_
assert Release.from_dict({**release_example, "id": id_}).unique_key() == id_
assert Revision.from_dict({**revision_example, "id": id_}).unique_key() == id_
assert Directory.from_dict({**directory_example, "id": id_}).unique_key() == id_
assert (
RawExtrinsicMetadata.from_dict({**metadata_example, "id": id_}).unique_key()
== id_
)
cont = Content.from_data(b"foo")
assert cont.unique_key().hex() == "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
kwargs = {
**cont.to_dict(),
"reason": "foo",
"status": "absent",
}
del kwargs["data"]
assert SkippedContent(**kwargs).unique_key() == cont.hashes()
# Anonymization
@given(strategies.objects())
def test_anonymization(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
def check_person(p):
if p is not None:
assert p.name is None
assert p.email is None
assert len(p.fullname) == 32
anon_obj = obj.anonymize()
if obj_type == "person":
assert anon_obj is not None
check_person(anon_obj)
elif obj_type == "release":
assert anon_obj is not None
check_person(anon_obj.author)
elif obj_type == "revision":
assert anon_obj is not None
check_person(anon_obj.author)
check_person(anon_obj.committer)
else:
assert anon_obj is None
# Origin, OriginVisit, OriginVisitStatus
@given(strategies.origins())
def test_todict_origins(origin):
obj = origin.to_dict()
assert "type" not in obj
assert type(origin)(url=origin.url) == type(origin).from_dict(obj)
@given(strategies.origin_visits())
def test_todict_origin_visits(origin_visit):
obj = origin_visit.to_dict()
assert origin_visit == type(origin_visit).from_dict(obj)
def test_origin_visit_naive_datetime():
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
OriginVisit(
origin="http://foo/", date=datetime.datetime.now(), type="git",
)
@given(strategies.origin_visit_statuses())
def test_todict_origin_visit_statuses(origin_visit_status):
obj = origin_visit_status.to_dict()
assert origin_visit_status == type(origin_visit_status).from_dict(obj)
def test_origin_visit_status_naive_datetime():
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
OriginVisitStatus(
origin="http://foo/",
visit=42,
date=datetime.datetime.now(),
status="ongoing",
snapshot=None,
)
# Timestamp
@given(strategies.timestamps())
def test_timestamps_strategy(timestamp):
attr.validate(timestamp)
def test_timestamp_seconds():
attr.validate(Timestamp(seconds=0, microseconds=0))
with pytest.raises(AttributeTypeError):
Timestamp(seconds="0", microseconds=0)
attr.validate(Timestamp(seconds=2 ** 63 - 1, microseconds=0))
with pytest.raises(ValueError):
Timestamp(seconds=2 ** 63, microseconds=0)
attr.validate(Timestamp(seconds=-(2 ** 63), microseconds=0))
with pytest.raises(ValueError):
Timestamp(seconds=-(2 ** 63) - 1, microseconds=0)
def test_timestamp_microseconds():
attr.validate(Timestamp(seconds=0, microseconds=0))
with pytest.raises(AttributeTypeError):
Timestamp(seconds=0, microseconds="0")
attr.validate(Timestamp(seconds=0, microseconds=10 ** 6 - 1))
with pytest.raises(ValueError):
Timestamp(seconds=0, microseconds=10 ** 6)
with pytest.raises(ValueError):
Timestamp(seconds=0, microseconds=-1)
def test_timestamp_from_dict():
assert Timestamp.from_dict({"seconds": 10, "microseconds": 5})
with pytest.raises(AttributeTypeError):
Timestamp.from_dict({"seconds": "10", "microseconds": 5})
with pytest.raises(AttributeTypeError):
Timestamp.from_dict({"seconds": 10, "microseconds": "5"})
with pytest.raises(ValueError):
Timestamp.from_dict({"seconds": 0, "microseconds": -1})
Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6 - 1})
with pytest.raises(ValueError):
Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6})
# TimestampWithTimezone
def test_timestampwithtimezone():
ts = Timestamp(seconds=0, microseconds=0)
tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=False)
attr.validate(tstz)
assert tstz.negative_utc is False
attr.validate(TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False))
attr.validate(TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False))
tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=True)
attr.validate(tstz)
assert tstz.negative_utc is True
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(
timestamp=datetime.datetime.now(), offset=0, negative_utc=False
)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset="0", negative_utc=False)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset=1.0, negative_utc=False)
with pytest.raises(AttributeTypeError):
TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=0)
with pytest.raises(ValueError):
TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=True)
with pytest.raises(ValueError):
TimestampWithTimezone(timestamp=ts, offset=-1, negative_utc=True)
def test_timestampwithtimezone_from_datetime():
tz = datetime.timezone(datetime.timedelta(minutes=+60))
date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz)
tstz = TimestampWithTimezone.from_datetime(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=0,),
offset=60,
negative_utc=False,
)
def test_timestampwithtimezone_from_naive_datetime():
date = datetime.datetime(2020, 2, 27, 14, 39, 19)
with pytest.raises(ValueError, match="datetime without timezone"):
TimestampWithTimezone.from_datetime(date)
def test_timestampwithtimezone_from_iso8601():
date = "2020-02-27 14:39:19.123456+0100"
tstz = TimestampWithTimezone.from_iso8601(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=123456,),
offset=60,
negative_utc=False,
)
def test_timestampwithtimezone_from_iso8601_negative_utc():
date = "2020-02-27 13:39:19-0000"
tstz = TimestampWithTimezone.from_iso8601(date)
assert tstz == TimestampWithTimezone(
timestamp=Timestamp(seconds=1582810759, microseconds=0,),
offset=0,
negative_utc=True,
)
@pytest.mark.parametrize("date", TS_DATETIMES)
@pytest.mark.parametrize("tz", TS_TIMEZONES)
@pytest.mark.parametrize("microsecond", [0, 1, 10, 100, 1000, 999999])
def test_timestampwithtimezone_to_datetime(date, tz, microsecond):
date = date.replace(tzinfo=tz, microsecond=microsecond)
tstz = TimestampWithTimezone.from_datetime(date)
assert tstz.to_datetime() == date
assert tstz.to_datetime().utcoffset() == date.utcoffset()
def test_person_from_fullname():
"""The author should have name, email and fullname filled.
"""
actual_person = Person.from_fullname(b"tony ")
assert actual_person == Person(
fullname=b"tony ", name=b"tony", email=b"ynot@dagobah",
)
def test_person_from_fullname_no_email():
"""The author and fullname should be the same as the input (author).
"""
actual_person = Person.from_fullname(b"tony")
assert actual_person == Person(fullname=b"tony", name=b"tony", email=None,)
def test_person_from_fullname_empty_person():
"""Empty person has only its fullname filled with the empty
byte-string.
"""
actual_person = Person.from_fullname(b"")
assert actual_person == Person(fullname=b"", name=None, email=None,)
def test_git_author_line_to_author():
# edge case out of the way
with pytest.raises(TypeError):
Person.from_fullname(None)
tests = {
b"a ": Person(name=b"a", email=b"b@c.com", fullname=b"a ",),
b"": Person(
name=None, email=b"foo@bar.com", fullname=b"",
),
b"malformed ': Person(
name=b"malformed",
email=b'"
',
),
b"trailing ": Person(
name=b"trailing", email=b"sp@c.e", fullname=b"trailing ",
),
b"no": Person(name=b"no", email=b"sp@c.e", fullname=b"no",),
b" more ": Person(
name=b"more", email=b"sp@c.es", fullname=b" more ",
),
b" <>": Person(name=None, email=None, fullname=b" <>",),
}
for person in sorted(tests):
expected_person = tests[person]
assert expected_person == Person.from_fullname(person)
# Content
def test_content_get_hash():
hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux")
c = Content(length=42, status="visible", **hashes)
for (hash_name, hash_) in hashes.items():
assert c.get_hash(hash_name) == hash_
def test_content_hashes():
hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux")
c = Content(length=42, status="visible", **hashes)
assert c.hashes() == hashes
def test_content_data():
c = Content(
length=42,
status="visible",
data=b"foo",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
assert c.with_data() == c
def test_content_data_missing():
c = Content(
length=42,
status="visible",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(MissingData):
c.with_data()
@given(strategies.present_contents_d())
def test_content_from_dict(content_d):
c = Content.from_data(**content_d)
assert c
assert c.ctime == content_d["ctime"]
content_d2 = c.to_dict()
c2 = Content.from_dict(content_d2)
assert c2.ctime == c.ctime
def test_content_from_dict_str_ctime():
# test with ctime as a string
n = datetime.datetime(2020, 5, 6, 12, 34, tzinfo=datetime.timezone.utc)
content_d = {
"ctime": n.isoformat(),
"data": b"",
"length": 0,
"sha1": b"\x00",
"sha256": b"\x00",
"sha1_git": b"\x00",
"blake2s256": b"\x00",
}
c = Content.from_dict(content_d)
assert c.ctime == n
def test_content_from_dict_str_naive_ctime():
# test with ctime as a string
n = datetime.datetime(2020, 5, 6, 12, 34)
content_d = {
"ctime": n.isoformat(),
"data": b"",
"length": 0,
"sha1": b"\x00",
"sha256": b"\x00",
"sha1_git": b"\x00",
"blake2s256": b"\x00",
}
with pytest.raises(ValueError, match="must be a timezone-aware datetime."):
Content.from_dict(content_d)
@given(binary(max_size=4096))
def test_content_from_data(data):
c = Content.from_data(data)
assert c.data == data
assert c.length == len(data)
assert c.status == "visible"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
@given(binary(max_size=4096))
def test_hidden_content_from_data(data):
c = Content.from_data(data, status="hidden")
assert c.data == data
assert c.length == len(data)
assert c.status == "hidden"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
def test_content_naive_datetime():
c = Content.from_data(b"foo")
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
Content(
**c.to_dict(), ctime=datetime.datetime.now(),
)
# SkippedContent
@given(binary(max_size=4096))
def test_skipped_content_from_data(data):
c = SkippedContent.from_data(data, reason="reason")
assert c.reason == "reason"
assert c.length == len(data)
assert c.status == "absent"
for key, value in MultiHash.from_data(data).digest().items():
assert getattr(c, key) == value
@given(strategies.skipped_contents_d())
def test_skipped_content_origin_is_str(skipped_content_d):
assert SkippedContent.from_dict(skipped_content_d)
skipped_content_d["origin"] = "http://path/to/origin"
assert SkippedContent.from_dict(skipped_content_d)
skipped_content_d["origin"] = Origin(url="http://path/to/origin")
with pytest.raises(ValueError, match="origin"):
SkippedContent.from_dict(skipped_content_d)
def test_skipped_content_naive_datetime():
c = SkippedContent.from_data(b"foo", reason="reason")
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
SkippedContent(
**c.to_dict(), ctime=datetime.datetime.now(),
)
+# Directory
+
+
+def test_directory_entry_name_validation():
+ with pytest.raises(ValueError, match="valid directory entry name."):
+ DirectoryEntry(name=b"foo/", type="dir", target=b"\x00" * 20, perms=0),
+
+
+def test_directory_duplicate_entry_name():
+ entries = (
+ DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0),
+ DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1),
+ )
+ with pytest.raises(ValueError, match="duplicated entry name"):
+ Directory(entries=entries)
+
+ entries = (
+ DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0),
+ DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0),
+ )
+ with pytest.raises(ValueError, match="duplicated entry name"):
+ Directory(entries=entries)
+
+
# Revision
def test_revision_extra_headers_no_headers():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_model = Revision(**rev_dict)
assert rev_model.metadata is None
assert rev_model.extra_headers == ()
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
rev_model = Revision(**rev_dict)
assert rev_model.metadata == rev_dict["metadata"]
assert rev_model.extra_headers == ()
def test_revision_extra_headers_with_headers():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\x00"),
(b"header1", b"again"),
)
rev_dict["extra_headers"] = extra_headers
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_in_metadata():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\x00"),
(b"header1", b"again"),
)
# check the bw-compat init hook does the job
# ie. extra_headers are given in the metadata field
rev_dict["metadata"]["extra_headers"] = extra_headers
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_as_lists():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
rev_dict = attr.asdict(rev, recurse=False)
rev_dict["metadata"] = {}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\x00"),
(b"header1", b"again"),
)
# check Revision.extra_headers tuplify does the job
rev_dict["extra_headers"] = [list(x) for x in extra_headers]
rev_model = Revision(**rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_type_error():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev = Revision.from_dict(rev_dict)
orig_rev_dict = attr.asdict(rev, recurse=False)
orig_rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
("header1", b"value1"),
(b"header2", 42),
("header1", "again"),
)
# check headers one at a time
# if given as extra_header
for extra_header in extra_headers:
rev_dict = copy.deepcopy(orig_rev_dict)
rev_dict["extra_headers"] = (extra_header,)
with pytest.raises(AttributeTypeError):
Revision(**rev_dict)
# if given as metadata
for extra_header in extra_headers:
rev_dict = copy.deepcopy(orig_rev_dict)
rev_dict["metadata"]["extra_headers"] = (extra_header,)
with pytest.raises(AttributeTypeError):
Revision(**rev_dict)
def test_revision_extra_headers_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_model = Revision.from_dict(rev_dict)
assert rev_model.metadata is None
assert rev_model.extra_headers == ()
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
rev_model = Revision.from_dict(rev_dict)
assert rev_model.metadata == rev_dict["metadata"]
assert rev_model.extra_headers == ()
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
rev_dict["extra_headers"] = extra_headers
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_in_metadata_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
# check the bw-compat init hook does the job
rev_dict["metadata"]["extra_headers"] = extra_headers
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
def test_revision_extra_headers_as_lists_from_dict():
rev_dict = revision_example.copy()
rev_dict.pop("id")
rev_model = Revision.from_dict(rev_dict)
rev_dict["metadata"] = {
"something": "somewhere",
"some other thing": "stranger",
}
extra_headers = (
(b"header1", b"value1"),
(b"header2", b"42"),
(b"header3", b"should I?\nmaybe\x00\xff"),
(b"header1", b"again"),
)
# check Revision.extra_headers converter does the job
rev_dict["extra_headers"] = [list(x) for x in extra_headers]
rev_model = Revision.from_dict(rev_dict)
assert "extra_headers" not in rev_model.metadata
assert rev_model.extra_headers == extra_headers
@given(strategies.objects(split_content=True))
def test_object_type(objtype_and_obj):
obj_type, obj = objtype_and_obj
assert obj_type == obj.object_type
def test_object_type_is_final():
object_types = set()
def check_final(cls):
if hasattr(cls, "object_type"):
assert cls.object_type not in object_types
object_types.add(cls.object_type)
if cls.__subclasses__():
assert not hasattr(cls, "object_type")
for subcls in cls.__subclasses__():
check_final(subcls)
check_final(BaseModel)
_metadata_authority = MetadataAuthority(
type=MetadataAuthorityType.FORGE, url="https://forge.softwareheritage.org",
)
_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",)
_content_swhid = ExtendedSWHID.from_string(
"swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2"
)
_origin_url = "https://forge.softwareheritage.org/source/swh-model.git"
_origin_swhid = ExtendedSWHID.from_string(
"swh:1:ori:94a9ed024d3859793618152ea559a168bbcbb5e2"
)
_dummy_qualifiers = {"origin": "https://example.com", "lines": "42"}
_common_metadata_fields = dict(
discovery_date=datetime.datetime(
2021, 1, 29, 13, 57, 9, tzinfo=datetime.timezone.utc
),
authority=_metadata_authority,
fetcher=_metadata_fetcher,
format="json",
metadata=b'{"origin": "https://example.com", "lines": "42"}',
)
def test_metadata_valid():
"""Checks valid RawExtrinsicMetadata objects don't raise an error."""
# Simplest case
RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields)
# Object with an SWHID
RawExtrinsicMetadata(
target=_content_swhid, **_common_metadata_fields,
)
def test_metadata_to_dict():
"""Checks valid RawExtrinsicMetadata objects don't raise an error."""
common_fields = {
"authority": {"type": "forge", "url": "https://forge.softwareheritage.org"},
"fetcher": {"name": "test-fetcher", "version": "0.0.1",},
"discovery_date": _common_metadata_fields["discovery_date"],
"format": "json",
"metadata": b'{"origin": "https://example.com", "lines": "42"}',
}
m = RawExtrinsicMetadata(target=_origin_swhid, **_common_metadata_fields,)
assert m.to_dict() == {
"target": str(_origin_swhid),
"id": b"@j\xc9\x01\xbc\x1e#p*\xf3q9\xa7u\x97\x00\x14\x02xa",
**common_fields,
}
assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m
m = RawExtrinsicMetadata(target=_content_swhid, **_common_metadata_fields,)
assert m.to_dict() == {
"target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
"id": b"\xbc\xa3U\xddf\x19U\xc5\xd2\xd7\xdfK\xd7c\x1f\xa8\xfeh\x992",
**common_fields,
}
assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m
hash_hex = "6162" * 10
hash_bin = b"ab" * 10
m = RawExtrinsicMetadata(
target=_content_swhid,
**_common_metadata_fields,
origin="https://example.org/",
snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=hash_bin),
release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=hash_bin),
revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=hash_bin),
path=b"/foo/bar",
directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=hash_bin),
)
assert m.to_dict() == {
"target": "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
"id": b"\x14l\xb0\x1f\xb9\xc0{)\xc7\x0f\xbd\xc0*,YZ\xf5C\xab\xfc",
**common_fields,
"origin": "https://example.org/",
"snapshot": f"swh:1:snp:{hash_hex}",
"release": f"swh:1:rel:{hash_hex}",
"revision": f"swh:1:rev:{hash_hex}",
"path": b"/foo/bar",
"directory": f"swh:1:dir:{hash_hex}",
}
assert RawExtrinsicMetadata.from_dict(m.to_dict()) == m
def test_metadata_invalid_target():
"""Checks various invalid values for the 'target' field."""
# SWHID passed as string instead of SWHID
with pytest.raises(ValueError, match="target must be.*ExtendedSWHID"):
RawExtrinsicMetadata(
target="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
**_common_metadata_fields,
)
def test_metadata_naive_datetime():
with pytest.raises(ValueError, match="must be a timezone-aware datetime"):
RawExtrinsicMetadata(
target=_origin_swhid,
**{**_common_metadata_fields, "discovery_date": datetime.datetime.now()},
)
def test_metadata_validate_context_origin():
"""Checks validation of RawExtrinsicMetadata.origin."""
# Origins can't have an 'origin' context
with pytest.raises(
ValueError, match="Unexpected 'origin' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid, origin=_origin_url, **_common_metadata_fields,
)
# but all other types can
RawExtrinsicMetadata(
target=_content_swhid, origin=_origin_url, **_common_metadata_fields,
)
# SWHIDs aren't valid origin URLs
with pytest.raises(ValueError, match="SWHID used as context origin URL"):
RawExtrinsicMetadata(
target=_content_swhid,
origin="swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
**_common_metadata_fields,
)
def test_metadata_validate_context_visit():
"""Checks validation of RawExtrinsicMetadata.visit."""
# Origins can't have a 'visit' context
with pytest.raises(
ValueError, match="Unexpected 'visit' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid, visit=42, **_common_metadata_fields,
)
# but all other types can
RawExtrinsicMetadata(
target=_content_swhid, origin=_origin_url, visit=42, **_common_metadata_fields,
)
# Missing 'origin'
with pytest.raises(ValueError, match="'origin' context must be set if 'visit' is"):
RawExtrinsicMetadata(
target=_content_swhid, visit=42, **_common_metadata_fields,
)
# visit id must be positive
with pytest.raises(ValueError, match="Nonpositive visit id"):
RawExtrinsicMetadata(
target=_content_swhid,
origin=_origin_url,
visit=-42,
**_common_metadata_fields,
)
def test_metadata_validate_context_snapshot():
"""Checks validation of RawExtrinsicMetadata.snapshot."""
# Origins can't have a 'snapshot' context
with pytest.raises(
ValueError, match="Unexpected 'snapshot' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
snapshot=CoreSWHID(
object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
snapshot=CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=EXAMPLE_HASH),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'snapshot', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
snapshot=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
def test_metadata_validate_context_release():
"""Checks validation of RawExtrinsicMetadata.release."""
# Origins can't have a 'release' context
with pytest.raises(
ValueError, match="Unexpected 'release' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
release=CoreSWHID(object_type=ObjectType.RELEASE, object_id=EXAMPLE_HASH),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'release', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
release=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
def test_metadata_validate_context_revision():
"""Checks validation of RawExtrinsicMetadata.revision."""
# Origins can't have a 'revision' context
with pytest.raises(
ValueError, match="Unexpected 'revision' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
revision=CoreSWHID(
object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=EXAMPLE_HASH),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'revision', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
revision=CoreSWHID(object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
def test_metadata_validate_context_path():
"""Checks validation of RawExtrinsicMetadata.path."""
# Origins can't have a 'path' context
with pytest.raises(ValueError, match="Unexpected 'path' context for origin object"):
RawExtrinsicMetadata(
target=_origin_swhid, path=b"/foo/bar", **_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid, path=b"/foo/bar", **_common_metadata_fields,
)
def test_metadata_validate_context_directory():
"""Checks validation of RawExtrinsicMetadata.directory."""
# Origins can't have a 'directory' context
with pytest.raises(
ValueError, match="Unexpected 'directory' context for origin object"
):
RawExtrinsicMetadata(
target=_origin_swhid,
directory=CoreSWHID(
object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
# but content can
RawExtrinsicMetadata(
target=_content_swhid,
directory=CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=EXAMPLE_HASH,),
**_common_metadata_fields,
)
# SWHID type doesn't match the expected type of this context key
with pytest.raises(
ValueError, match="Expected SWHID type 'directory', got 'content'"
):
RawExtrinsicMetadata(
target=_content_swhid,
directory=CoreSWHID(
object_type=ObjectType.CONTENT, object_id=EXAMPLE_HASH,
),
**_common_metadata_fields,
)
def test_metadata_normalize_discovery_date():
fields_copy = {**_common_metadata_fields}
truncated_date = fields_copy.pop("discovery_date")
assert truncated_date.microsecond == 0
# Check for TypeError on disabled object type: we removed attrs_strict's
# type_validator
with pytest.raises(TypeError):
RawExtrinsicMetadata(
target=_content_swhid, discovery_date="not a datetime", **fields_copy
)
# Check for truncation to integral second
date_with_us = truncated_date.replace(microsecond=42)
md = RawExtrinsicMetadata(
target=_content_swhid, discovery_date=date_with_us, **fields_copy,
)
assert md.discovery_date == truncated_date
assert md.discovery_date.tzinfo == datetime.timezone.utc
# Check that the timezone gets normalized. Timezones can be offset by a
# non-integral number of seconds, so we need to handle that.
timezone = datetime.timezone(offset=datetime.timedelta(hours=2))
date_with_tz = truncated_date.astimezone(timezone)
assert date_with_tz.tzinfo != datetime.timezone.utc
md = RawExtrinsicMetadata(
target=_content_swhid, discovery_date=date_with_tz, **fields_copy,
)
assert md.discovery_date == truncated_date
assert md.discovery_date.tzinfo == datetime.timezone.utc