diff --git a/PKG-INFO b/PKG-INFO
index 92d127e..4a1f9f1 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,42 +1,42 @@
Metadata-Version: 2.1
Name: swh.model
-Version: 6.5.1
+Version: 6.6.0
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-model
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: cli
Provides-Extra: testing-minimal
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-model
=========
Implementation of the Data model of the Software Heritage project, used to
archive source code artifacts.
This module defines the notion of SoftWare Heritage persistent IDentifiers
(SWHIDs) and provides tools to compute them:
```sh
$ swh-identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
$ swh-identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
```
diff --git a/swh.model.egg-info/PKG-INFO b/swh.model.egg-info/PKG-INFO
index 92d127e..4a1f9f1 100644
--- a/swh.model.egg-info/PKG-INFO
+++ b/swh.model.egg-info/PKG-INFO
@@ -1,42 +1,42 @@
Metadata-Version: 2.1
Name: swh.model
-Version: 6.5.1
+Version: 6.6.0
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-model
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-model/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: cli
Provides-Extra: testing-minimal
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
swh-model
=========
Implementation of the Data model of the Software Heritage project, used to
archive source code artifacts.
This module defines the notion of SoftWare Heritage persistent IDentifiers
(SWHIDs) and provides tools to compute them:
```sh
$ swh-identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
$ swh-identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
```
diff --git a/swh/model/from_disk.py b/swh/model/from_disk.py
index 8795b1f..8bd7f5d 100644
--- a/swh/model/from_disk.py
+++ b/swh/model/from_disk.py
@@ -1,592 +1,592 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Conversion from filesystem tree to SWH objects.
This module allows reading a tree of directories and files from a local
filesystem, and convert them to in-memory data structures, which can then
be exported to SWH data model objects, as defined in :mod:`swh.model.model`.
"""
import datetime
import enum
import fnmatch
import glob
import os
import re
import stat
from typing import Any, Iterable, Iterator, List, Optional, Pattern, Tuple
import attr
from attrs_strict import type_validator
from typing_extensions import Final
from . import model
from .exceptions import InvalidDirectoryPath
from .git_objects import directory_entry_sort_key
from .hashutil import MultiHash, hash_to_hex
from .merkle import MerkleLeaf, MerkleNode
from .swhids import CoreSWHID, ObjectType
@attr.s(frozen=True, slots=True)
class DiskBackedContent(model.BaseContent):
"""Content-like class, which allows lazy-loading data from the disk."""
object_type: Final = "content_file"
sha1 = attr.ib(type=bytes, validator=type_validator())
sha1_git = attr.ib(type=model.Sha1Git, validator=type_validator())
sha256 = attr.ib(type=bytes, validator=type_validator())
blake2s256 = attr.ib(type=bytes, validator=type_validator())
length = attr.ib(type=int, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=type_validator(),
default=None,
eq=False,
)
path = attr.ib(type=Optional[bytes], default=None)
@classmethod
def from_dict(cls, d):
return cls(**d)
def __attrs_post_init__(self):
if self.path is None:
raise TypeError("path must not be None.")
def with_data(self) -> model.Content:
args = self.to_dict()
del args["path"]
assert self.path is not None
with open(self.path, "rb") as fd:
return model.Content.from_dict({**args, "data": fd.read()})
class DentryPerms(enum.IntEnum):
"""Admissible permissions for directory entries."""
content = 0o100644
"""Content"""
executable_content = 0o100755
"""Executable content (e.g. executable script)"""
symlink = 0o120000
"""Symbolic link"""
directory = 0o040000
"""Directory"""
revision = 0o160000
"""Revision (e.g. submodule)"""
def mode_to_perms(mode):
"""Convert a file mode to a permission compatible with Software Heritage
directory entries
Args:
mode (int): a file mode as returned by :func:`os.stat` in
:attr:`os.stat_result.st_mode`
Returns:
DentryPerms: one of the following values:
:const:`DentryPerms.content`: plain file
:const:`DentryPerms.executable_content`: executable file
:const:`DentryPerms.symlink`: symbolic link
:const:`DentryPerms.directory`: directory
"""
if stat.S_ISLNK(mode):
return DentryPerms.symlink
if stat.S_ISDIR(mode):
return DentryPerms.directory
else:
# file is executable in any way
if mode & (0o111):
return DentryPerms.executable_content
else:
return DentryPerms.content
class Content(MerkleLeaf):
"""Representation of a Software Heritage content as a node in a Merkle tree.
The current Merkle hash for the Content nodes is the `sha1_git`, which
makes it consistent with what :class:`Directory` uses for its own hash
computation.
"""
__slots__ = [] # type: List[str]
object_type: Final = "content"
@classmethod
def from_bytes(cls, *, mode, data):
"""Convert data (raw :class:`bytes`) to a Software Heritage content entry
Args:
mode (int): a file mode (passed to :func:`mode_to_perms`)
data (bytes): raw contents of the file
"""
ret = MultiHash.from_data(data).digest()
ret["length"] = len(data)
ret["perms"] = mode_to_perms(mode)
ret["data"] = data
ret["status"] = "visible"
return cls(ret)
@classmethod
def from_symlink(cls, *, path, mode):
"""Convert a symbolic link to a Software Heritage content entry"""
return cls.from_bytes(mode=mode, data=os.readlink(path))
@classmethod
def from_file(cls, *, path, max_content_length=None):
"""Compute the Software Heritage content entry corresponding to an
on-disk file.
The returned dictionary contains keys useful for both:
- loading the content in the archive (hashes, `length`)
- using the content as a directory entry in a directory
Args:
save_path (bool): add the file path to the entry
max_content_length (Optional[int]): if given, all contents larger
than this will be skipped.
"""
file_stat = os.lstat(path)
mode = file_stat.st_mode
length = file_stat.st_size
too_large = max_content_length is not None and length > max_content_length
if stat.S_ISLNK(mode):
# Symbolic link: return a file whose contents are the link target
if too_large:
# Unlike large contents, we can't stream symlinks to
# MultiHash, and we don't want to fit them in memory if
# they exceed max_content_length either.
# Thankfully, this should not happen for reasonable values of
# max_content_length because of OS/filesystem limitations,
# so let's just raise an error.
raise Exception(f"Symlink too large ({length} bytes)")
return cls.from_symlink(path=path, mode=mode)
elif not stat.S_ISREG(mode):
# not a regular file: return the empty file instead
return cls.from_bytes(mode=mode, data=b"")
if too_large:
skip_reason = "Content too large"
else:
skip_reason = None
hashes = MultiHash.from_path(path).digest()
if skip_reason:
ret = {
**hashes,
"status": "absent",
"reason": skip_reason,
}
else:
ret = {
**hashes,
"status": "visible",
}
ret["path"] = path
ret["perms"] = mode_to_perms(mode)
ret["length"] = length
obj = cls(ret)
return obj
def swhid(self) -> CoreSWHID:
"""Return node identifier as a SWHID"""
return CoreSWHID(object_type=ObjectType.CONTENT, object_id=self.hash)
def __repr__(self):
return "Content(id=%s)" % hash_to_hex(self.hash)
def compute_hash(self):
return self.data["sha1_git"]
def to_model(self) -> model.BaseContent:
"""Builds a `model.BaseContent` object based on this leaf."""
data = self.get_data().copy()
data.pop("perms", None)
if data["status"] == "absent":
data.pop("path", None)
return model.SkippedContent.from_dict(data)
elif "data" in data:
return model.Content.from_dict(data)
else:
return DiskBackedContent.from_dict(data)
def accept_all_directories(dirpath: str, dirname: str, entries: Iterable[Any]) -> bool:
"""Default filter for :func:`Directory.from_disk` accepting all
directories
Args:
dirname (bytes): directory name
entries (list): directory entries
"""
return True
def ignore_empty_directories(
dirpath: str, dirname: str, entries: Iterable[Any]
) -> bool:
"""Filter for :func:`directory_to_objects` ignoring empty directories
Args:
dirname (bytes): directory name
entries (list): directory entries
Returns:
True if the directory is not empty, false if the directory is empty
"""
return bool(entries)
def ignore_named_directories(names, *, case_sensitive=True):
"""Filter for :func:`directory_to_objects` to ignore directories named one
of names.
Args:
names (list of bytes): names to ignore
case_sensitive (bool): whether to do the filtering in a case sensitive
way
Returns:
a directory filter for :func:`directory_to_objects`
"""
if not case_sensitive:
names = [name.lower() for name in names]
def named_filter(
dirpath: str,
dirname: str,
entries: Iterable[Any],
names: Iterable[Any] = names,
case_sensitive: bool = case_sensitive,
):
if case_sensitive:
return dirname not in names
else:
return dirname.lower() not in names
return named_filter
# TODO: `extract_regex_objs` has been copied and adapted from `swh.scanner`.
# In the future `swh.scanner` should use the `swh.model` version and remove its own.
def extract_regex_objs(
root_path: bytes, patterns: Iterable[bytes]
) -> Iterator[Pattern[bytes]]:
"""Generates a regex object for each pattern given in input and checks if
the path is a subdirectory or relative to the root path.
Args:
root_path (bytes): path to the root directory
patterns (list of byte): shell patterns to match
Yields:
an SRE_Pattern object
"""
absolute_root_path = os.path.abspath(root_path)
for pattern in patterns:
if os.path.isabs(pattern):
pattern = os.path.relpath(pattern, root_path)
# python 3.10 has a `root_dir` argument for glob, but not the previous
# version. So we adjust the pattern
test_pattern = os.path.join(absolute_root_path, pattern)
for path in glob.glob(test_pattern):
if os.path.isabs(path) and not path.startswith(absolute_root_path):
error_msg = (
b'The path "' + path + b'" is not a subdirectory or relative '
b'to the root directory path: "' + root_path + b'"'
)
raise InvalidDirectoryPath(error_msg)
regex = fnmatch.translate((pattern.decode()))
yield re.compile(regex.encode())
def ignore_directories_patterns(root_path: bytes, patterns: Iterable[bytes]):
"""Filter for :func:`directory_to_objects` to ignore directories
matching certain patterns.
Args:
root_path (bytes): path of the root directory
patterns (list of bytes): patterns to ignore
Returns:
a directory filter for :func:`directory_to_objects`
"""
sre_patterns = set(extract_regex_objs(root_path, patterns))
def pattern_filter(
dirpath: bytes,
dirname: bytes,
entries: Iterable[Any],
patterns: Iterable[Any] = sre_patterns,
root_path: bytes = os.path.abspath(root_path),
):
full_path = os.path.abspath(dirpath)
relative_path = os.path.relpath(full_path, root_path)
return not any([pattern.match(relative_path) for pattern in patterns])
return pattern_filter
def iter_directory(
directory,
) -> Tuple[List[model.Content], List[model.SkippedContent], List[model.Directory]]:
"""Return the directory listing from a disk-memory directory instance.
Raises:
TypeError in case an unexpected object type is listed.
Returns:
Tuple of respectively iterable of content, skipped content and directories.
"""
contents: List[model.Content] = []
skipped_contents: List[model.SkippedContent] = []
directories: List[model.Directory] = []
for obj in directory.iter_tree():
obj = obj.to_model()
obj_type = obj.object_type
if obj_type in (model.Content.object_type, DiskBackedContent.object_type):
# FIXME: read the data from disk later (when the
# storage buffer is flushed).
obj = obj.with_data()
contents.append(obj)
elif obj_type == model.SkippedContent.object_type:
skipped_contents.append(obj)
elif obj_type == model.Directory.object_type:
directories.append(obj)
else:
raise TypeError(f"Unexpected object type from disk: {obj}")
return contents, skipped_contents, directories
class Directory(MerkleNode):
"""Representation of a Software Heritage directory as a node in a Merkle Tree.
This class can be used to generate, from an on-disk directory, all the
objects that need to be sent to the Software Heritage archive.
The :func:`from_disk` constructor allows you to generate the data structure
from a directory on disk. The resulting :class:`Directory` can then be
manipulated as a dictionary, using the path as key.
The :func:`collect` method is used to retrieve all the objects that need to
be added to the Software Heritage archive since the last collection, by
class (contents and directories).
When using the dict-like methods to update the contents of the directory,
the affected levels of hierarchy are reset and can be collected again using
the same method. This enables the efficient collection of updated nodes,
for instance when the client is applying diffs.
"""
__slots__ = ["__entries", "__model_object"]
object_type: Final = "directory"
@classmethod
def from_disk(
cls, *, path, dir_filter=accept_all_directories, max_content_length=None
):
"""Compute the Software Heritage objects for a given directory tree
Args:
path (bytes): the directory to traverse
data (bool): whether to add the data to the content objects
save_path (bool): whether to add the path to the content objects
dir_filter (function): a filter to ignore some directories by
name or contents. Takes two arguments: dirname and entries, and
returns True if the directory should be added, False if the
directory should be ignored.
max_content_length (Optional[int]): if given, all contents larger
than this will be skipped.
"""
top_path = path
dirs = {}
for root, dentries, fentries in os.walk(top_path, topdown=False):
entries = {}
# Join fentries and dentries in the same processing, as symbolic
# links to directories appear in dentries...
for name in fentries + dentries:
path = os.path.join(root, name)
if not os.path.isdir(path) or os.path.islink(path):
content = Content.from_file(
path=path, max_content_length=max_content_length
)
entries[name] = content
else:
if dir_filter(path, name, dirs[path].entries):
entries[name] = dirs[path]
dirs[root] = cls({"name": os.path.basename(root), "path": root})
dirs[root].update(entries)
return dirs[top_path]
def __init__(self, data=None):
super().__init__(data=data)
self.__entries = None
self.__model_object = None
def invalidate_hash(self):
self.__entries = None
self.__model_object = None
super().invalidate_hash()
@staticmethod
def child_to_directory_entry(name, child):
if child.object_type == "directory":
return {
"type": "dir",
"perms": DentryPerms.directory,
"target": child.hash,
"name": name,
}
elif child.object_type == "content":
return {
"type": "file",
"perms": child.data["perms"],
"target": child.hash,
"name": name,
}
else:
raise ValueError(f"unknown child {child}")
def get_data(self, **kwargs):
return {
"id": self.hash,
"entries": self.entries,
}
@property
def entries(self):
"""Child nodes, sorted by name in the same way
:func:`swh.model.git_objects.directory_git_object` does."""
if self.__entries is None:
self.__entries = sorted(
(
self.child_to_directory_entry(name, child)
for name, child in self.items()
),
key=directory_entry_sort_key,
)
return self.__entries
def swhid(self) -> CoreSWHID:
"""Return node identifier as a SWHID"""
return CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=self.hash)
def compute_hash(self):
return self.to_model().id
def to_model(self) -> model.Directory:
"""Builds a `model.Directory` object based on this node;
ignoring its children."""
if self.__model_object is None:
DirectoryEntry = model.DirectoryEntry
entries = []
for name, child in self.items():
if child.object_type == "directory":
e = DirectoryEntry(
type="dir",
perms=DentryPerms.directory,
target=child.hash,
name=name,
)
elif child.object_type == "content":
e = DirectoryEntry(
type="file",
perms=child.data["perms"],
target=child.hash,
name=name,
)
else:
raise ValueError(f"unknown child {child}")
entries.append(e)
entries.sort(key=directory_entry_sort_key)
self.__model_object = model.Directory(entries=tuple(entries))
return self.__model_object
def __getitem__(self, key):
if not isinstance(key, bytes):
raise ValueError("Can only get a bytes from Directory")
# Convenience shortcut
if key == b"":
return self
if b"/" not in key:
return super().__getitem__(key)
else:
key1, key2 = key.split(b"/", 1)
return self.__getitem__(key1)[key2]
def __setitem__(self, key, value):
if not isinstance(key, bytes):
raise ValueError("Can only set a bytes Directory entry")
if not isinstance(value, (Content, Directory)):
raise ValueError(
"Can only set a Directory entry to a Content or " "Directory"
)
if key == b"":
raise ValueError("Directory entry must have a name")
if b"\x00" in key:
raise ValueError("Directory entry name must not contain nul bytes")
if b"/" not in key:
return super().__setitem__(key, value)
else:
key1, key2 = key.rsplit(b"/", 1)
self[key1].__setitem__(key2, value)
def __delitem__(self, key):
if not isinstance(key, bytes):
raise ValueError("Can only delete a bytes Directory entry")
if b"/" not in key:
super().__delitem__(key)
else:
key1, key2 = key.rsplit(b"/", 1)
del self[key1][key2]
def __contains__(self, key):
if b"/" not in key:
return super().__contains__(key)
else:
key1, key2 = key.split(b"/", 1)
return super().__contains__(key1) and self[key1].__contains__(key2)
def __repr__(self):
return "Directory(id=%s, entries=[%s])" % (
hash_to_hex(self.hash),
", ".join(str(entry) for entry in self),
)
diff --git a/swh/model/merkle.py b/swh/model/merkle.py
index ab6b8ea..b224840 100644
--- a/swh/model/merkle.py
+++ b/swh/model/merkle.py
@@ -1,315 +1,233 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Merkle tree data structure"""
-import abc
-from collections.abc import Mapping
-from typing import Dict, Iterator, List, Set
-
-
-def deep_update(left, right):
- """Recursively update the left mapping with deeply nested values from the right
- mapping.
-
- This function is useful to merge the results of several calls to
- :func:`MerkleNode.collect`.
-
- Arguments:
- left: a mapping (modified by the update operation)
- right: a mapping
-
- Returns:
- the left mapping, updated with nested values from the right mapping
-
- Example:
- >>> a = {
- ... 'key1': {
- ... 'key2': {
- ... 'key3': 'value1/2/3',
- ... },
- ... },
- ... }
- >>> deep_update(a, {
- ... 'key1': {
- ... 'key2': {
- ... 'key4': 'value1/2/4',
- ... },
- ... },
- ... }) == {
- ... 'key1': {
- ... 'key2': {
- ... 'key3': 'value1/2/3',
- ... 'key4': 'value1/2/4',
- ... },
- ... },
- ... }
- True
- >>> deep_update(a, {
- ... 'key1': {
- ... 'key2': {
- ... 'key3': 'newvalue1/2/3',
- ... },
- ... },
- ... }) == {
- ... 'key1': {
- ... 'key2': {
- ... 'key3': 'newvalue1/2/3',
- ... 'key4': 'value1/2/4',
- ... },
- ... },
- ... }
- True
+from __future__ import annotations
- """
- for key, rvalue in right.items():
- if isinstance(rvalue, Mapping):
- new_lvalue = deep_update(left.get(key, {}), rvalue)
- left[key] = new_lvalue
- else:
- left[key] = rvalue
- return left
+import abc
+from typing import Any, Dict, Iterator, List, Set
class MerkleNode(dict, metaclass=abc.ABCMeta):
"""Representation of a node in a Merkle Tree.
A (generalized) `Merkle Tree`_ is a tree in which every node is labeled
with a hash of its own data and the hash of its children.
.. _Merkle Tree: https://en.wikipedia.org/wiki/Merkle_tree
In pseudocode::
node.hash = hash(node.data
+ sum(child.hash for child in node.children))
This class efficiently implements the Merkle Tree data structure on top of
a Python :class:`dict`, minimizing hash computations and new data
collections when updating nodes.
Node data is stored in the :attr:`data` attribute, while (named) children
are stored as items of the underlying dictionary.
Addition, update and removal of objects are instrumented to automatically
invalidate the hashes of the current node as well as its registered
parents; It also resets the collection status of the objects so the updated
objects can be collected.
The collection of updated data from the tree is implemented through the
:func:`collect` function and associated helpers.
"""
__slots__ = ["parents", "data", "__hash", "collected"]
data: Dict
"""data associated to the current node"""
parents: List
"""known parents of the current node"""
collected: bool
"""whether the current node has been collected"""
def __init__(self, data=None):
super().__init__()
self.parents = []
self.data = data
self.__hash = None
self.collected = False
def __eq__(self, other):
return (
isinstance(other, MerkleNode)
and super().__eq__(other)
and self.data == other.data
)
def __ne__(self, other):
return not self.__eq__(other)
def invalidate_hash(self):
"""Invalidate the cached hash of the current node."""
if not self.__hash:
return
self.__hash = None
self.collected = False
for parent in self.parents:
parent.invalidate_hash()
- def update_hash(self, *, force=False):
+ def update_hash(self, *, force=False) -> Any:
"""Recursively compute the hash of the current node.
Args:
force (bool): invalidate the cache and force the computation for
this node and all children.
"""
if self.__hash and not force:
return self.__hash
if force:
self.invalidate_hash()
for child in self.values():
child.update_hash(force=force)
self.__hash = self.compute_hash()
return self.__hash
@property
- def hash(self):
+ def hash(self) -> Any:
"""The hash of the current node, as calculated by
:func:`compute_hash`.
"""
return self.update_hash()
+ def __hash__(self):
+ return hash(self.hash)
+
@abc.abstractmethod
- def compute_hash(self):
+ def compute_hash(self) -> Any:
"""Compute the hash of the current node.
The hash should depend on the data of the node, as well as on hashes
of the children nodes.
"""
raise NotImplementedError("Must implement compute_hash method")
def __setitem__(self, name, new_child):
"""Add a child, invalidating the current hash"""
self.invalidate_hash()
super().__setitem__(name, new_child)
new_child.parents.append(self)
def __delitem__(self, name):
"""Remove a child, invalidating the current hash"""
if name in self:
self.invalidate_hash()
self[name].parents.remove(self)
super().__delitem__(name)
else:
raise KeyError(name)
def update(self, new_children):
"""Add several named children from a dictionary"""
if not new_children:
return
self.invalidate_hash()
for name, new_child in new_children.items():
new_child.parents.append(self)
if name in self:
self[name].parents.remove(self)
super().update(new_children)
def get_data(self, **kwargs):
"""Retrieve and format the collected data for the current node, for use by
:func:`collect`.
Can be overridden, for instance when you want the collected data to
contain information about the child nodes.
Arguments:
kwargs: allow subclasses to alter behaviour depending on how
:func:`collect` is called.
Returns:
data formatted for :func:`collect`
"""
return self.data
- def collect_node(self, **kwargs):
- """Collect the data for the current node, for use by :func:`collect`.
-
- Arguments:
- kwargs: passed as-is to :func:`get_data`.
-
- Returns:
- A :class:`dict` compatible with :func:`collect`.
- """
+ def collect_node(self) -> Set[MerkleNode]:
+ """Collect the current node if it has not been yet, for use by :func:`collect`."""
if not self.collected:
self.collected = True
- return {self.object_type: {self.hash: self.get_data(**kwargs)}}
+ return {self}
else:
- return {}
+ return set()
- def collect(self, **kwargs):
- """Collect the data for all nodes in the subtree rooted at `self`.
-
- The data is deduplicated by type and by hash.
-
- Arguments:
- kwargs: passed as-is to :func:`get_data`.
+ def collect(self) -> Set[MerkleNode]:
+ """Collect the added and modified nodes in the subtree rooted at `self`
+ since the last collect operation.
Returns:
- A :class:`dict` with the following structure::
-
- {
- 'typeA': {
- node1.hash: node1.get_data(),
- node2.hash: node2.get_data(),
- },
- 'typeB': {
- node3.hash: node3.get_data(),
- ...
- },
- ...
- }
+ A :class:`set` of collected nodes
"""
- ret = self.collect_node(**kwargs)
+ ret = self.collect_node()
for child in self.values():
- deep_update(ret, child.collect(**kwargs))
+ ret.update(child.collect())
return ret
def reset_collect(self):
"""Recursively unmark collected nodes in the subtree rooted at `self`.
This lets the caller use :func:`collect` again.
"""
self.collected = False
for child in self.values():
child.reset_collect()
- def iter_tree(self, dedup=True) -> Iterator["MerkleNode"]:
+ def iter_tree(self, dedup=True) -> Iterator[MerkleNode]:
"""Yields all children nodes, recursively. Common nodes are deduplicated
by default (deduplication can be turned off setting the given argument
'dedup' to False).
"""
yield from self._iter_tree(set(), dedup)
- def _iter_tree(self, seen: Set[bytes], dedup) -> Iterator["MerkleNode"]:
+ def _iter_tree(self, seen: Set[bytes], dedup) -> Iterator[MerkleNode]:
if self.hash not in seen:
if dedup:
seen.add(self.hash)
yield self
for child in self.values():
yield from child._iter_tree(seen=seen, dedup=dedup)
class MerkleLeaf(MerkleNode):
"""A leaf to a Merkle tree.
A Merkle leaf is simply a Merkle node with children disabled.
"""
__slots__ = [] # type: List[str]
def __setitem__(self, name, child):
raise ValueError("%s is a leaf" % self.__class__.__name__)
def __getitem__(self, name):
raise ValueError("%s is a leaf" % self.__class__.__name__)
def __delitem__(self, name):
raise ValueError("%s is a leaf" % self.__class__.__name__)
def update(self, new_children):
"""Children update operation. Disabled for leaves."""
raise ValueError("%s is a leaf" % self.__class__.__name__)
diff --git a/swh/model/model.py b/swh/model/model.py
index df4b800..29c7d6e 100644
--- a/swh/model/model.py
+++ b/swh/model/model.py
@@ -1,1882 +1,1890 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Implementation of Software Heritage's data model
See :ref:`data-model` for an overview of the data model.
The classes defined in this module are immutable
`attrs objects `__ and enums.
All classes define a ``from_dict`` class method and a ``to_dict``
method to convert between them and msgpack-serializable objects.
"""
from abc import ABCMeta, abstractmethod
import collections
import datetime
from enum import Enum
import hashlib
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union
import attr
from attr._make import _AndValidator
from attr.validators import and_
from attrs_strict import AttributeTypeError
import dateutil.parser
import iso8601
from typing_extensions import Final
from . import git_objects
from .collections import ImmutableDict
from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex, hash_to_hex
from .swhids import CoreSWHID
from .swhids import ExtendedObjectType as SwhidExtendedObjectType
from .swhids import ExtendedSWHID
from .swhids import ObjectType as SwhidObjectType
class MissingData(Exception):
"""Raised by `Content.with_data` when it has no way of fetching the
data (but not when fetching the data fails)."""
pass
KeyType = Union[Dict[str, str], Dict[str, bytes], bytes]
"""The type returned by BaseModel.unique_key()."""
SHA1_SIZE = 20
_OFFSET_CHARS = frozenset(b"+-0123456789")
# TODO: Limit this to 20 bytes
Sha1Git = bytes
Sha1 = bytes
KT = TypeVar("KT")
VT = TypeVar("VT")
def hash_repr(h: bytes) -> str:
if h is None:
return "None"
else:
return f"hash_to_bytes('{hash_to_hex(h)}')"
def freeze_optional_dict(
d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore
) -> Optional[ImmutableDict[KT, VT]]:
if isinstance(d, dict):
return ImmutableDict(d)
else:
return d
def dictify(value):
"Helper function used by BaseModel.to_dict()"
if isinstance(value, BaseModel):
return value.to_dict()
elif isinstance(value, (CoreSWHID, ExtendedSWHID)):
return str(value)
elif isinstance(value, Enum):
return value.value
elif isinstance(value, (dict, ImmutableDict)):
return {k: dictify(v) for k, v in value.items()}
elif isinstance(value, tuple):
return tuple(dictify(v) for v in value)
else:
return value
def generic_type_validator(instance, attribute, value):
"""validates the type of an attribute value whatever the attribute type"""
raise NotImplementedError("generic type check should have been optimized")
def _true_validator(instance, attribute, value, expected_type=None, origin_value=None):
pass
def _none_validator(instance, attribute, value, expected_type=None, origin_value=None):
if value is not None:
if origin_value is None:
origin_value = value
raise AttributeTypeError(origin_value, attribute)
def _origin_type_validator(
instance, attribute, value, expected_type=None, origin_value=None
):
# This is functionally equivalent to using just this:
# return isinstance(value, type)
# but using type equality before isinstance allows very quick checks
# when the exact class is used (which is the overwhelming majority of cases)
# while still allowing subclasses to be used.
if expected_type is None:
expected_type = attribute.type
if not (type(value) == expected_type or isinstance(value, expected_type)):
if origin_value is None:
origin_value = value
raise AttributeTypeError(origin_value, attribute)
def _tuple_infinite_validator(
instance,
attribute,
value,
expected_type=None,
origin_value=None,
):
type_ = type(value)
if origin_value is None:
origin_value = value
if type_ != tuple and not isinstance(value, tuple):
raise AttributeTypeError(origin_value, attribute)
if expected_type is None:
expected_type = attribute.type
args = expected_type.__args__
# assert len(args) == 2 and args[1] is Ellipsis
expected_value_type = args[0]
validator = optimized_validator(expected_value_type)
for i in value:
validator(
instance,
attribute,
i,
expected_type=expected_value_type,
origin_value=origin_value,
)
def _tuple_bytes_bytes_validator(
instance,
attribute,
value,
expected_type=None,
origin_value=None,
):
type_ = type(value)
if type_ != tuple and not isinstance(value, tuple):
if origin_value is None:
origin_value = value
raise AttributeTypeError(origin_value, attribute)
if len(value) != 2:
if origin_value is None:
origin_value = value
raise AttributeTypeError(origin_value, attribute)
if type(value[0]) is not bytes or type(value[1]) is not bytes:
if origin_value is None:
origin_value = value
raise AttributeTypeError(origin_value, attribute)
def _tuple_finite_validator(
instance,
attribute,
value,
expected_type=None,
origin_value=None,
):
# might be useful to optimise the sub-validator tuple, in practice, we only
# have [bytes, bytes]
type_ = type(value)
if origin_value is None:
origin_value = value
if type_ != tuple and not isinstance(value, tuple):
raise AttributeTypeError(origin_value, attribute)
if expected_type is None:
expected_type = attribute.type
args = expected_type.__args__
# assert len(args) != 2 or args[1] is Ellipsis
if len(args) != len(value):
raise AttributeTypeError(origin_value, attribute)
for item_type, item in zip(args, value):
validator = optimized_validator(item_type)
validator(
instance,
attribute,
item,
expected_type=item_type,
origin_value=origin_value,
)
def _immutable_dict_validator(
instance,
attribute,
value,
expected_type=None,
origin_value=None,
):
value_type = type(value)
if origin_value is None:
origin_value = value
if value_type != ImmutableDict and not isinstance(value, ImmutableDict):
raise AttributeTypeError(origin_value, attribute)
if expected_type is None:
expected_type = attribute.type
(expected_key_type, expected_value_type) = expected_type.__args__
key_validator = optimized_validator(expected_key_type)
value_validator = optimized_validator(expected_value_type)
for (item_key, item_value) in value.items():
key_validator(
instance,
attribute,
item_key,
expected_type=expected_key_type,
origin_value=origin_value,
)
value_validator(
instance,
attribute,
item_value,
expected_type=expected_value_type,
origin_value=origin_value,
)
def optimized_validator(type_):
if type_ is object or type_ is Any:
return _true_validator
if type_ is None:
return _none_validator
origin = getattr(type_, "__origin__", None)
# Non-generic type, check it directly
if origin is None:
return _origin_type_validator
# Then, if it's a container, check its items.
if origin is tuple:
args = type_.__args__
if len(args) == 2 and args[1] is Ellipsis:
# Infinite tuple
return _tuple_infinite_validator
elif args == (bytes, bytes):
return _tuple_bytes_bytes_validator
else:
return _tuple_finite_validator
elif origin is Union:
args = type_.__args__
all_validators = tuple((optimized_validator(t), t) for t in args)
def union_validator(
instance,
attribute,
value,
expected_type=None,
origin_value=None,
):
if origin_value is None:
origin_value = value
for (validator, type_) in all_validators:
try:
validator(
instance,
attribute,
value,
expected_type=type_,
origin_value=origin_value,
)
except AttributeTypeError:
pass
else:
break
else:
raise AttributeTypeError(origin_value, attribute)
return union_validator
elif origin is ImmutableDict:
return _immutable_dict_validator
# No need to check dict or list. because they are converted to ImmutableDict
# and tuple respectively.
raise NotImplementedError(f"Type-checking {type_}")
def optimize_all_validators(cls, old_fields):
"""process validators to turn them into a faster version … eventually"""
new_fields = []
for f in old_fields:
validator = f.validator
if validator is generic_type_validator:
validator = optimized_validator(f.type)
elif isinstance(validator, _AndValidator):
new_and = []
for v in validator._validators:
if v is generic_type_validator:
v = optimized_validator(f.type)
new_and.append(v)
validator = and_(*new_and)
else:
validator = None
if validator is not None:
f = f.evolve(validator=validator)
new_fields.append(f)
- return new_fields
+ if attr.__version__ < "21.3.0":
+ # https://github.com/python-attrs/attrs/issues/821
+ from attr._make import _make_attr_tuple_class
+
+ attr_names = [f.name for f in new_fields]
+ AttrsClass = _make_attr_tuple_class(cls.__name__, attr_names)
+ return AttrsClass(new_fields)
+ else:
+ return new_fields
ModelType = TypeVar("ModelType", bound="BaseModel")
class BaseModel:
"""Base class for SWH model classes.
Provides serialization/deserialization to/from Python dictionaries,
that are suitable for JSON/msgpack-like formats."""
__slots__ = ()
def to_dict(self):
"""Wrapper of `attr.asdict` that can be overridden by subclasses
that have special handling of some of the fields."""
return dictify(attr.asdict(self, recurse=False))
@classmethod
def from_dict(cls, d):
"""Takes a dictionary representing a tree of SWH objects, and
recursively builds the corresponding objects."""
return cls(**d)
def anonymize(self: ModelType) -> Optional[ModelType]:
"""Returns an anonymized version of the object, if needed.
If the object model does not need/support anonymization, returns None.
"""
return None
def unique_key(self) -> KeyType:
"""Returns a unique key for this object, that can be used for
deduplication."""
raise NotImplementedError(f"unique_key for {self}")
def check(self) -> None:
"""Performs internal consistency checks, and raises an error if one fails."""
# without the type-ignore comment below, attr >= 22.1.0 causes mypy to report:
# Argument 1 has incompatible type "BaseModel"; expected "AttrsInstance"
attr.validate(self) # type: ignore[arg-type]
def _compute_hash_from_manifest(manifest: bytes) -> Sha1Git:
return hashlib.new("sha1", manifest).digest()
class HashableObject(metaclass=ABCMeta):
"""Mixin to automatically compute object identifier hash when
the associated model is instantiated."""
__slots__ = ()
id: Sha1Git
def compute_hash(self) -> bytes:
"""Derived model classes must implement this to compute
the object hash.
This method is called by the object initialization if the `id`
attribute is set to an empty value.
"""
return self._compute_hash_from_attributes()
@abstractmethod
def _compute_hash_from_attributes(self) -> Sha1Git:
raise NotImplementedError(f"_compute_hash_from_attributes for {self}")
def __attrs_post_init__(self):
if not self.id:
obj_id = self.compute_hash()
object.__setattr__(self, "id", obj_id)
def unique_key(self) -> KeyType:
return self.id
def check(self) -> None:
super().check() # type: ignore
if self.id != self.compute_hash():
raise ValueError("'id' does not match recomputed hash.")
class HashableObjectWithManifest(HashableObject):
"""Derived class of HashableObject, for objects that may need to store
verbatim git objects as ``raw_manifest`` to preserve original hashes."""
__slots__ = ()
raw_manifest: Optional[bytes] = None
"""Stores the original content of git objects when they cannot be faithfully
represented using only the other attributes.
This should only be used as a last resort, and only set in the Git loader,
for objects too corrupt to fit the data model."""
def to_dict(self):
d = super().to_dict()
if d["raw_manifest"] is None:
del d["raw_manifest"]
return d
def compute_hash(self) -> bytes:
"""Derived model classes must implement this to compute
the object hash.
This method is called by the object initialization if the `id`
attribute is set to an empty value.
"""
if self.raw_manifest is None:
return super().compute_hash() # calls self._compute_hash_from_attributes()
else:
return _compute_hash_from_manifest(self.raw_manifest)
def check(self) -> None:
super().check()
if (
self.raw_manifest is not None
and self.id == self._compute_hash_from_attributes()
):
raise ValueError(
f"{self} has a non-none raw_manifest attribute, but does not need it."
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Person(BaseModel):
"""Represents the author/committer of a revision or release."""
object_type: Final = "person"
fullname = attr.ib(type=bytes, validator=generic_type_validator)
name = attr.ib(type=Optional[bytes], validator=generic_type_validator, eq=False)
email = attr.ib(type=Optional[bytes], validator=generic_type_validator, eq=False)
@classmethod
def from_fullname(cls, fullname: bytes):
"""Returns a Person object, by guessing the name and email from the
fullname, in the `name ` format.
The fullname is left unchanged."""
if fullname is None:
raise TypeError("fullname is None.")
name: Optional[bytes]
email: Optional[bytes]
try:
open_bracket = fullname.index(b"<")
except ValueError:
name = fullname
email = None
else:
raw_name = fullname[:open_bracket]
raw_email = fullname[open_bracket + 1 :]
if not raw_name:
name = None
else:
name = raw_name.strip()
try:
close_bracket = raw_email.rindex(b">")
except ValueError:
email = raw_email
else:
email = raw_email[:close_bracket]
return Person(
name=name or None,
email=email or None,
fullname=fullname,
)
def anonymize(self) -> "Person":
"""Returns an anonymized version of the Person object.
Anonymization is simply a Person which fullname is the hashed, with unset name
or email.
"""
return Person(
fullname=hashlib.sha256(self.fullname).digest(),
name=None,
email=None,
)
@classmethod
def from_dict(cls, d):
"""
If the fullname is missing, construct a fullname
using the following heuristics: if the name value is None, we return the
email in angle brackets, else, we return the name, a space, and the email
in angle brackets.
"""
if "fullname" not in d:
parts = []
if d["name"] is not None:
parts.append(d["name"])
if d["email"] is not None:
parts.append(b"".join([b"<", d["email"], b">"]))
fullname = b" ".join(parts)
d = {**d, "fullname": fullname}
d = {"name": None, "email": None, **d}
return super().from_dict(d)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Timestamp(BaseModel):
"""Represents a naive timestamp from a VCS."""
object_type: Final = "timestamp"
seconds = attr.ib(type=int)
microseconds = attr.ib(type=int)
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if value.__class__ is not int:
raise AttributeTypeError(value, attribute)
if not (-(2**63) <= value < 2**63):
raise ValueError("Seconds must be a signed 64-bits integer.")
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if value.__class__ is not int:
raise AttributeTypeError(value, attribute)
if not (0 <= value < 10**6):
raise ValueError("Microseconds must be in [0, 1000000[.")
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class TimestampWithTimezone(BaseModel):
"""Represents a TZ-aware timestamp from a VCS."""
object_type: Final = "timestamp_with_timezone"
timestamp = attr.ib(type=Timestamp, validator=generic_type_validator)
offset_bytes = attr.ib(type=bytes, validator=generic_type_validator)
"""Raw git representation of the timezone, as an offset from UTC.
It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and
``-0000``).
However, when created from git objects, it must be the exact bytes used in the
original objects, so it may differ from this format when they do.
"""
@classmethod
def from_numeric_offset(
cls, timestamp: Timestamp, offset: int, negative_utc: bool
) -> "TimestampWithTimezone":
"""Returns a :class:`TimestampWithTimezone` instance from the old dictionary
format (with ``offset`` and ``negative_utc`` instead of ``offset_bytes``).
"""
negative = offset < 0 or negative_utc
(hours, minutes) = divmod(abs(offset), 60)
offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode()
tstz = TimestampWithTimezone(timestamp=timestamp, offset_bytes=offset_bytes)
assert tstz.offset_minutes() == offset, (tstz.offset_minutes(), offset)
return tstz
@classmethod
def from_dict(
cls, time_representation: Union[Dict, datetime.datetime, int]
) -> "TimestampWithTimezone":
"""Builds a TimestampWithTimezone from any of the formats
accepted by :func:`swh.model.normalize_timestamp`."""
# TODO: this accept way more types than just dicts; find a better
# name
if isinstance(time_representation, dict):
ts = time_representation["timestamp"]
if isinstance(ts, dict):
seconds = ts.get("seconds", 0)
microseconds = ts.get("microseconds", 0)
elif isinstance(ts, int):
seconds = ts
microseconds = 0
else:
raise ValueError(
f"TimestampWithTimezone.from_dict received non-integer timestamp "
f"member {ts!r}"
)
timestamp = Timestamp(seconds=seconds, microseconds=microseconds)
if "offset_bytes" in time_representation:
return cls(
timestamp=timestamp,
offset_bytes=time_representation["offset_bytes"],
)
else:
# old format
offset = time_representation["offset"]
negative_utc = time_representation.get("negative_utc") or False
return cls.from_numeric_offset(timestamp, offset, negative_utc)
elif isinstance(time_representation, datetime.datetime):
# TODO: warn when using from_dict() on a datetime
utcoffset = time_representation.utcoffset()
time_representation = time_representation.astimezone(datetime.timezone.utc)
microseconds = time_representation.microsecond
if microseconds:
time_representation = time_representation.replace(microsecond=0)
seconds = int(time_representation.timestamp())
if utcoffset is None:
raise ValueError(
f"TimestampWithTimezone.from_dict received datetime without "
f"timezone: {time_representation}"
)
# utcoffset is an integer number of minutes
seconds_offset = utcoffset.total_seconds()
offset = int(seconds_offset) // 60
# TODO: warn if remainder is not zero
return cls.from_numeric_offset(
Timestamp(seconds=seconds, microseconds=microseconds), offset, False
)
elif isinstance(time_representation, int):
# TODO: warn when using from_dict() on an int
seconds = time_representation
timestamp = Timestamp(seconds=time_representation, microseconds=0)
return cls(timestamp=timestamp, offset_bytes=b"+0000")
else:
raise ValueError(
f"TimestampWithTimezone.from_dict received non-integer timestamp: "
f"{time_representation!r}"
)
@classmethod
def from_datetime(cls, dt: datetime.datetime) -> "TimestampWithTimezone":
return cls.from_dict(dt)
def to_datetime(self) -> datetime.datetime:
"""Convert to a datetime (with a timezone set to the recorded fixed UTC offset)
Beware that this conversion can be lossy: ``-0000`` and 'weird' offsets
cannot be represented. Also note that it may fail due to type overflow.
"""
timestamp = datetime.datetime.fromtimestamp(
self.timestamp.seconds,
datetime.timezone(datetime.timedelta(minutes=self.offset_minutes())),
)
timestamp = timestamp.replace(microsecond=self.timestamp.microseconds)
return timestamp
@classmethod
def from_iso8601(cls, s):
"""Builds a TimestampWithTimezone from an ISO8601-formatted string."""
dt = iso8601.parse_date(s)
tstz = cls.from_datetime(dt)
if dt.tzname() == "-00:00":
assert tstz.offset_bytes == b"+0000"
tstz = attr.evolve(tstz, offset_bytes=b"-0000")
return tstz
@staticmethod
def _parse_offset_bytes(offset_bytes: bytes) -> int:
"""Parses an ``offset_bytes`` value (in Git's ``[+-]HHMM`` format),
and returns the corresponding numeric values (in number of minutes).
Tries to account for some mistakes in the format, to support incorrect
Git implementations.
>>> TimestampWithTimezone._parse_offset_bytes(b"+0000")
0
>>> TimestampWithTimezone._parse_offset_bytes(b"-0000")
0
>>> TimestampWithTimezone._parse_offset_bytes(b"+0200")
120
>>> TimestampWithTimezone._parse_offset_bytes(b"-0200")
-120
>>> TimestampWithTimezone._parse_offset_bytes(b"+200")
120
>>> TimestampWithTimezone._parse_offset_bytes(b"-200")
-120
>>> TimestampWithTimezone._parse_offset_bytes(b"+02")
120
>>> TimestampWithTimezone._parse_offset_bytes(b"-02")
-120
>>> TimestampWithTimezone._parse_offset_bytes(b"+0010")
10
>>> TimestampWithTimezone._parse_offset_bytes(b"-0010")
-10
>>> TimestampWithTimezone._parse_offset_bytes(b"+200000000000000000")
0
>>> TimestampWithTimezone._parse_offset_bytes(b"+0160") # 60 minutes...
0
"""
offset_str = offset_bytes.decode()
assert offset_str[0] in "+-"
sign = int(offset_str[0] + "1")
if len(offset_str) <= 3:
hours = int(offset_str[1:])
minutes = 0
else:
hours = int(offset_str[1:-2])
minutes = int(offset_str[-2:])
offset = sign * (hours * 60 + minutes)
if (0 <= minutes <= 59) and (-(2**15) <= offset < 2**15):
return offset
else:
# can't parse it to a reasonable value; give up and pretend it's UTC.
return 0
def offset_minutes(self):
"""Returns the offset, as a number of minutes since UTC.
>>> TimestampWithTimezone(
... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0000"
... ).offset_minutes()
0
>>> TimestampWithTimezone(
... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0200"
... ).offset_minutes()
120
>>> TimestampWithTimezone(
... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"-0200"
... ).offset_minutes()
-120
>>> TimestampWithTimezone(
... Timestamp(seconds=1642765364, microseconds=0), offset_bytes=b"+0530"
... ).offset_minutes()
330
"""
return self._parse_offset_bytes(self.offset_bytes)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Origin(HashableObject, BaseModel):
"""Represents a software source: a VCS and an URL."""
object_type: Final = "origin"
url = attr.ib(type=str, validator=generic_type_validator)
id = attr.ib(type=Sha1Git, validator=generic_type_validator, default=b"")
def unique_key(self) -> KeyType:
return {"url": self.url}
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(self.url.encode("utf-8"))
def swhid(self) -> ExtendedSWHID:
"""Returns a SWHID representing this origin."""
return ExtendedSWHID(
object_type=SwhidExtendedObjectType.ORIGIN,
object_id=self.id,
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class OriginVisit(BaseModel):
"""Represents an origin visit with a given type at a given point in time, by a
SWH loader."""
object_type: Final = "origin_visit"
origin = attr.ib(type=str, validator=generic_type_validator)
date = attr.ib(type=datetime.datetime)
type = attr.ib(type=str, validator=generic_type_validator)
"""Should not be set before calling 'origin_visit_add()'."""
visit = attr.ib(type=Optional[int], validator=generic_type_validator, default=None)
@date.validator
def check_date(self, attribute, value):
"""Checks the date has a timezone."""
if value.__class__ is not datetime.datetime:
raise AttributeTypeError(value, attribute)
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
def to_dict(self):
"""Serializes the date as a string and omits the visit id if it is
`None`."""
ov = super().to_dict()
if ov["visit"] is None:
del ov["visit"]
return ov
def unique_key(self) -> KeyType:
return {"origin": self.origin, "date": str(self.date)}
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class OriginVisitStatus(BaseModel):
"""Represents a visit update of an origin at a given point in time."""
object_type: Final = "origin_visit_status"
origin = attr.ib(type=str, validator=generic_type_validator)
visit = attr.ib(type=int, validator=generic_type_validator)
date = attr.ib(type=datetime.datetime)
status = attr.ib(
type=str,
validator=attr.validators.in_(
["created", "ongoing", "full", "partial", "not_found", "failed"]
),
)
snapshot = attr.ib(
type=Optional[Sha1Git], validator=generic_type_validator, repr=hash_repr
)
# Type is optional be to able to use it before adding it to the database model
type = attr.ib(type=Optional[str], validator=generic_type_validator, default=None)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=generic_type_validator,
converter=freeze_optional_dict,
default=None,
)
@date.validator
def check_date(self, attribute, value):
"""Checks the date has a timezone."""
if value.__class__ is not datetime.datetime:
raise AttributeTypeError(value, attribute)
if value is not None and value.tzinfo is None:
raise ValueError("date must be a timezone-aware datetime.")
def unique_key(self) -> KeyType:
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)}
class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a
revision or an alias."""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
ALIAS = "alias"
def __repr__(self):
return f"TargetType.{self.name}"
class ObjectType(Enum):
"""The type of content pointed to by a release. Usually a revision"""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
def __repr__(self):
return f"ObjectType.{self.name}"
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class SnapshotBranch(BaseModel):
"""Represents one of the branches of a snapshot."""
object_type: Final = "snapshot_branch"
target = attr.ib(type=bytes, repr=hash_repr)
target_type = attr.ib(type=TargetType, validator=generic_type_validator)
@target.validator
def check_target(self, attribute, value):
"""Checks the target type is not an alias, checks the target is a
valid sha1_git."""
if value.__class__ is not bytes:
raise AttributeTypeError(value, attribute)
if self.target_type != TargetType.ALIAS and self.target is not None:
if len(value) != 20:
raise ValueError("Wrong length for bytes identifier: %d" % len(value))
@classmethod
def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"]))
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Snapshot(HashableObject, BaseModel):
"""Represents the full state of an origin at a given point in time."""
object_type: Final = "snapshot"
branches = attr.ib(
type=ImmutableDict[bytes, Optional[SnapshotBranch]],
validator=generic_type_validator,
converter=freeze_optional_dict,
)
id = attr.ib(
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr
)
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(
git_objects.snapshot_git_object(self, ignore_unresolved=True)
)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
branches=ImmutableDict(
(name, SnapshotBranch.from_dict(branch) if branch else None)
for (name, branch) in d.pop("branches").items()
),
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.id)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Release(HashableObjectWithManifest, BaseModel):
object_type: Final = "release"
name = attr.ib(type=bytes, validator=generic_type_validator)
message = attr.ib(type=Optional[bytes], validator=generic_type_validator)
target = attr.ib(
type=Optional[Sha1Git], validator=generic_type_validator, repr=hash_repr
)
target_type = attr.ib(type=ObjectType, validator=generic_type_validator)
synthetic = attr.ib(type=bool, validator=generic_type_validator)
author = attr.ib(
type=Optional[Person], validator=generic_type_validator, default=None
)
date = attr.ib(
type=Optional[TimestampWithTimezone],
validator=generic_type_validator,
default=None,
)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=generic_type_validator,
converter=freeze_optional_dict,
default=None,
)
id = attr.ib(
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr
)
raw_manifest = attr.ib(type=Optional[bytes], default=None)
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(git_objects.release_git_object(self))
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError("release date must be None if author is None.")
def to_dict(self):
rel = super().to_dict()
if rel["metadata"] is None:
del rel["metadata"]
return rel
@classmethod
def from_dict(cls, d):
d = d.copy()
if d.get("author"):
d["author"] = Person.from_dict(d["author"])
if d.get("date"):
d["date"] = TimestampWithTimezone.from_dict(d["date"])
return cls(target_type=ObjectType(d.pop("target_type")), **d)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id)
def anonymize(self) -> "Release":
"""Returns an anonymized version of the Release object.
Anonymization consists in replacing the author with an anonymized Person object.
"""
author = self.author and self.author.anonymize()
return attr.evolve(self, author=author)
class RevisionType(Enum):
GIT = "git"
TAR = "tar"
DSC = "dsc"
SUBVERSION = "svn"
MERCURIAL = "hg"
CVS = "cvs"
BAZAAR = "bzr"
def __repr__(self):
return f"RevisionType.{self.name}"
def tuplify_extra_headers(value: Iterable):
return tuple((k, v) for k, v in value)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Revision(HashableObjectWithManifest, BaseModel):
object_type: Final = "revision"
message = attr.ib(type=Optional[bytes], validator=generic_type_validator)
author = attr.ib(type=Optional[Person], validator=generic_type_validator)
committer = attr.ib(type=Optional[Person], validator=generic_type_validator)
date = attr.ib(
type=Optional[TimestampWithTimezone], validator=generic_type_validator
)
committer_date = attr.ib(
type=Optional[TimestampWithTimezone], validator=generic_type_validator
)
type = attr.ib(type=RevisionType, validator=generic_type_validator)
directory = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr)
synthetic = attr.ib(type=bool, validator=generic_type_validator)
metadata = attr.ib(
type=Optional[ImmutableDict[str, object]],
validator=generic_type_validator,
converter=freeze_optional_dict,
default=None,
)
parents = attr.ib(
type=Tuple[Sha1Git, ...], validator=generic_type_validator, default=()
)
id = attr.ib(
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr
)
extra_headers = attr.ib(
type=Tuple[Tuple[bytes, bytes], ...],
validator=generic_type_validator,
converter=tuplify_extra_headers,
default=(),
)
raw_manifest = attr.ib(type=Optional[bytes], default=None)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# ensure metadata is a deep copy of whatever was given, and if needed
# extract extra_headers from there
if self.metadata:
metadata = self.metadata
if not self.extra_headers and "extra_headers" in metadata:
(extra_headers, metadata) = metadata.copy_pop("extra_headers")
object.__setattr__(
self,
"extra_headers",
tuplify_extra_headers(extra_headers),
)
attr.validate(self)
object.__setattr__(self, "metadata", metadata)
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(git_objects.revision_git_object(self))
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError("revision date must be None if author is None.")
@committer.validator
def check_committer(self, attribute, value):
"""If the committer is `None`, checks the committer_date is `None` too."""
if self.committer is None and self.committer_date is not None:
raise ValueError(
"revision committer_date must be None if committer is None."
)
@classmethod
def from_dict(cls, d):
d = d.copy()
date = d.pop("date")
if date:
date = TimestampWithTimezone.from_dict(date)
committer_date = d.pop("committer_date")
if committer_date:
committer_date = TimestampWithTimezone.from_dict(committer_date)
author = d.pop("author")
if author:
author = Person.from_dict(author)
committer = d.pop("committer")
if committer:
committer = Person.from_dict(committer)
return cls(
author=author,
committer=committer,
date=date,
committer_date=committer_date,
type=RevisionType(d.pop("type")),
parents=tuple(d.pop("parents")), # for BW compat
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id)
def anonymize(self) -> "Revision":
"""Returns an anonymized version of the Revision object.
Anonymization consists in replacing the author and committer with an anonymized
Person object.
"""
return attr.evolve(
self,
author=None if self.author is None else self.author.anonymize(),
committer=None if self.committer is None else self.committer.anonymize(),
)
_DIR_ENTRY_TYPES = ["file", "dir", "rev"]
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class DirectoryEntry(BaseModel):
object_type: Final = "directory_entry"
name = attr.ib(type=bytes)
type = attr.ib(type=str, validator=attr.validators.in_(_DIR_ENTRY_TYPES))
target = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr)
perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
@name.validator
def check_name(self, attribute, value):
if value.__class__ is not bytes:
raise AttributeTypeError(value, attribute)
if b"/" in value:
raise ValueError(f"{value!r} is not a valid directory entry name.")
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Directory(HashableObjectWithManifest, BaseModel):
object_type: Final = "directory"
entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=generic_type_validator)
id = attr.ib(
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr
)
raw_manifest = attr.ib(type=Optional[bytes], default=None)
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(git_objects.directory_git_object(self))
@entries.validator
def check_entries(self, attribute, value):
seen = set()
for entry in value:
if entry.name in seen:
# Cannot use self.swhid() here, self.id may be None
raise ValueError(
f"swh:1:dir:{hash_to_hex(self.id)} has duplicated entry name: "
f"{entry.name!r}"
)
seen.add(entry.name)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
entries=tuple(
DirectoryEntry.from_dict(entry) for entry in d.pop("entries")
),
**d,
)
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=self.id)
@classmethod
def from_possibly_duplicated_entries(
cls,
*,
entries: Tuple[DirectoryEntry, ...],
id: Sha1Git = b"",
raw_manifest: Optional[bytes] = None,
) -> Tuple[bool, "Directory"]:
"""Constructs a ``Directory`` object from a list of entries that may contain
duplicated names.
This is required to represent legacy objects, that were ingested in the
storage database before this check was added.
As it is impossible for a ``Directory`` instances to have more than one entry
with a given names, this function computes a ``raw_manifest`` and renames one of
the entries before constructing the ``Directory``.
Returns:
``(is_corrupt, directory)`` where ``is_corrupt`` is True iff some
entry names were indeed duplicated
"""
# First, try building a Directory object normally without any extra computation,
# which works the overwhelming majority of the time:
try:
return (False, Directory(entries=entries, id=id, raw_manifest=raw_manifest))
except ValueError:
pass
# If it fails:
# 1. compute a raw_manifest if there isn't already one:
if raw_manifest is None:
# invalid_directory behaves like a Directory object, but without the
# duplicated entry check; which allows computing its raw_manifest
invalid_directory = type("", (), {})()
invalid_directory.entries = entries
raw_manifest = git_objects.directory_git_object(invalid_directory)
# 2. look for duplicated entries:
entries_by_name: Dict[
bytes, Dict[str, List[DirectoryEntry]]
] = collections.defaultdict(lambda: collections.defaultdict(list))
for entry in entries:
entries_by_name[entry.name][entry.type].append(entry)
# 3. strip duplicates
deduplicated_entries = []
for entry_lists in entries_by_name.values():
# We could pick one entry at random to keep the original name; but we try to
# "minimize" the impact, by preserving entries of type "rev" first
# (because renaming them would likely break git submodules entirely
# when this directory is written to disk),
# then entries of type "dir" (because renaming them affects the path
# of every file in the dir, instead of just one "cnt").
dir_entry_types = ("rev", "dir", "file")
assert set(dir_entry_types) == set(_DIR_ENTRY_TYPES)
picked_winner = False # when True, all future entries must be renamed
for type_ in dir_entry_types:
for entry in entry_lists[type_]:
if not picked_winner:
# this is the "most important" entry according to this
# heuristic; it gets to keep its name.
deduplicated_entries.append(entry)
picked_winner = True
else:
# the heuristic already found an entry more important than
# this one; so this one must be renamed to something.
# we pick the beginning of its hash, it should be good enough
# to avoid any conflict.
new_name = (
entry.name + b"_" + hash_to_bytehex(entry.target)[0:10]
)
renamed_entry = attr.evolve(entry, name=new_name)
deduplicated_entries.append(renamed_entry)
# Finally, return the "fixed" the directory
dir_ = Directory(
entries=tuple(deduplicated_entries), id=id, raw_manifest=raw_manifest
)
return (True, dir_)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class BaseContent(BaseModel):
status = attr.ib(
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"])
)
@staticmethod
def _hash_data(data: bytes):
"""Hash some data, returning most of the fields of a content object"""
d = MultiHash.from_data(data).digest()
d["data"] = data
d["length"] = len(data)
return d
@classmethod
def from_dict(cls, d, use_subclass=True):
if use_subclass:
# Chooses a subclass to instantiate instead.
if d["status"] == "absent":
return SkippedContent.from_dict(d)
else:
return Content.from_dict(d)
else:
return super().from_dict(d)
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError("{} is not a valid hash name.".format(hash_name))
return getattr(self, hash_name)
def hashes(self) -> Dict[str, bytes]:
"""Returns a dictionary {hash_name: hash_value}"""
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS}
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Content(BaseContent):
object_type: Final = "content"
sha1 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr)
sha1_git = attr.ib(type=Sha1Git, validator=generic_type_validator, repr=hash_repr)
sha256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr)
blake2s256 = attr.ib(type=bytes, validator=generic_type_validator, repr=hash_repr)
length = attr.ib(type=int)
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)
data = attr.ib(type=Optional[bytes], validator=generic_type_validator, default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
default=None,
eq=False,
)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if value.__class__ is not int:
raise AttributeTypeError(value, attribute)
if value < 0:
raise ValueError("Length must be positive.")
@ctime.validator
def check_ctime(self, attribute, value):
"""Checks the ctime has a timezone."""
if value is not None:
if value.__class__ is not datetime.datetime:
raise AttributeTypeError(value, attribute)
if value.tzinfo is None:
raise ValueError("ctime must be a timezone-aware datetime.")
def to_dict(self):
content = super().to_dict()
if content["data"] is None:
del content["data"]
if content["ctime"] is None:
del content["ctime"]
return content
@classmethod
def from_data(cls, data, status="visible", ctime=None) -> "Content":
"""Generate a Content from a given `data` byte string.
This populates the Content with the hashes and length for the data
passed as argument, as well as the data itself.
"""
d = cls._hash_data(data)
d["status"] = status
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
if isinstance(d.get("ctime"), str):
d = d.copy()
d["ctime"] = dateutil.parser.parse(d["ctime"])
return super().from_dict(d, use_subclass=False)
def with_data(self) -> "Content":
"""Loads the `data` attribute; meaning that it is guaranteed not to
be None after this call.
This call is almost a no-op, but subclasses may overload this method
to lazy-load data (eg. from disk or objstorage)."""
if self.data is None:
raise MissingData("Content data is None.")
return self
def unique_key(self) -> KeyType:
return self.sha1 # TODO: use a dict of hashes
def swhid(self) -> CoreSWHID:
"""Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class SkippedContent(BaseContent):
object_type: Final = "skipped_content"
sha1 = attr.ib(
type=Optional[bytes], validator=generic_type_validator, repr=hash_repr
)
sha1_git = attr.ib(
type=Optional[Sha1Git], validator=generic_type_validator, repr=hash_repr
)
sha256 = attr.ib(
type=Optional[bytes], validator=generic_type_validator, repr=hash_repr
)
blake2s256 = attr.ib(
type=Optional[bytes], validator=generic_type_validator, repr=hash_repr
)
length = attr.ib(type=Optional[int])
status = attr.ib(type=str, validator=attr.validators.in_(["absent"]))
reason = attr.ib(type=Optional[str], default=None)
origin = attr.ib(type=Optional[str], validator=generic_type_validator, default=None)
ctime = attr.ib(
type=Optional[datetime.datetime],
validator=generic_type_validator,
default=None,
eq=False,
)
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if value is None:
raise ValueError("Must provide a reason if content is absent.")
elif value.__class__ is not str:
raise AttributeTypeError(value, attribute)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive or -1."""
if value.__class__ is not int:
raise AttributeTypeError(value, attribute)
elif value < -1:
raise ValueError("Length must be positive or -1.")
@ctime.validator
def check_ctime(self, attribute, value):
"""Checks the ctime has a timezone."""
if value is not None:
if value.__class__ is not datetime.datetime:
raise AttributeTypeError(value, attribute)
elif value.tzinfo is None:
raise ValueError("ctime must be a timezone-aware datetime.")
def to_dict(self):
content = super().to_dict()
if content["origin"] is None:
del content["origin"]
if content["ctime"] is None:
del content["ctime"]
return content
@classmethod
def from_data(
cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None
) -> "SkippedContent":
"""Generate a SkippedContent from a given `data` byte string.
This populates the SkippedContent with the hashes and length for the
data passed as argument.
You can use `attr.evolve` on such a generated content to nullify some
of its attributes, e.g. for tests.
"""
d = cls._hash_data(data)
del d["data"]
d["status"] = "absent"
d["reason"] = reason
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
d2 = d.copy()
if d2.pop("data", None) is not None:
raise ValueError('SkippedContent has no "data" attribute %r' % d)
return super().from_dict(d2, use_subclass=False)
def unique_key(self) -> KeyType:
return self.hashes()
class MetadataAuthorityType(Enum):
DEPOSIT_CLIENT = "deposit_client"
FORGE = "forge"
REGISTRY = "registry"
def __repr__(self):
return f"MetadataAuthorityType.{self.name}"
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class MetadataAuthority(BaseModel):
"""Represents an entity that provides metadata about an origin or
software artifact."""
object_type: Final = "metadata_authority"
type = attr.ib(type=MetadataAuthorityType, validator=generic_type_validator)
url = attr.ib(type=str, validator=generic_type_validator)
metadata = attr.ib(
type=Optional[ImmutableDict[str, Any]],
default=None,
validator=generic_type_validator,
converter=freeze_optional_dict,
)
def to_dict(self):
d = super().to_dict()
if d["metadata"] is None:
del d["metadata"]
return d
@classmethod
def from_dict(cls, d):
d = {
**d,
"type": MetadataAuthorityType(d["type"]),
}
return super().from_dict(d)
def unique_key(self) -> KeyType:
return {"type": self.type.value, "url": self.url}
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class MetadataFetcher(BaseModel):
"""Represents a software component used to fetch metadata from a metadata
authority, and ingest them into the Software Heritage archive."""
object_type: Final = "metadata_fetcher"
name = attr.ib(type=str, validator=generic_type_validator)
version = attr.ib(type=str, validator=generic_type_validator)
metadata = attr.ib(
type=Optional[ImmutableDict[str, Any]],
default=None,
validator=generic_type_validator,
converter=freeze_optional_dict,
)
def to_dict(self):
d = super().to_dict()
if d["metadata"] is None:
del d["metadata"]
return d
def unique_key(self) -> KeyType:
return {"name": self.name, "version": self.version}
def normalize_discovery_date(value: Any) -> datetime.datetime:
if not isinstance(value, datetime.datetime):
raise TypeError("discovery_date must be a timezone-aware datetime.")
if value.tzinfo is None:
raise ValueError("discovery_date must be a timezone-aware datetime.")
# Normalize timezone to utc, and truncate microseconds to 0
return value.astimezone(datetime.timezone.utc).replace(microsecond=0)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class RawExtrinsicMetadata(HashableObject, BaseModel):
object_type: Final = "raw_extrinsic_metadata"
# target object
target = attr.ib(type=ExtendedSWHID, validator=generic_type_validator)
# source
discovery_date = attr.ib(type=datetime.datetime, converter=normalize_discovery_date)
authority = attr.ib(type=MetadataAuthority, validator=generic_type_validator)
fetcher = attr.ib(type=MetadataFetcher, validator=generic_type_validator)
# the metadata itself
format = attr.ib(type=str, validator=generic_type_validator)
metadata = attr.ib(type=bytes, validator=generic_type_validator)
# context
origin = attr.ib(type=Optional[str], default=None, validator=generic_type_validator)
visit = attr.ib(type=Optional[int], default=None)
snapshot = attr.ib(type=Optional[CoreSWHID], default=None)
release = attr.ib(type=Optional[CoreSWHID], default=None)
revision = attr.ib(type=Optional[CoreSWHID], default=None)
path = attr.ib(type=Optional[bytes], default=None)
directory = attr.ib(type=Optional[CoreSWHID], default=None)
id = attr.ib(
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr
)
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(
git_objects.raw_extrinsic_metadata_git_object(self)
)
@origin.validator
def check_origin(self, attribute, value):
if value is None:
return
if value.__class__ is not str:
raise AttributeTypeError(value, attribute)
obj_type = self.target.object_type
if not (
obj_type is SwhidExtendedObjectType.SNAPSHOT
or obj_type is SwhidExtendedObjectType.RELEASE
or obj_type is SwhidExtendedObjectType.REVISION
or obj_type is SwhidExtendedObjectType.DIRECTORY
or obj_type is SwhidExtendedObjectType.CONTENT
):
raise ValueError(
f"Unexpected 'origin' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.startswith("swh:"):
# Technically this is valid; but:
# 1. SWHIDs are URIs, not URLs
# 2. if a SWHID gets here, it's very likely to be a mistake
# (and we can remove this check if it turns out there is a
# legitimate use for it).
raise ValueError(f"SWHID used as context origin URL: {value}")
@visit.validator
def check_visit(self, attribute, value):
if value is None:
return
if value.__class__ is not int:
raise AttributeTypeError(value, attribute)
obj_type = self.target.object_type
if not (
obj_type is SwhidExtendedObjectType.SNAPSHOT
or obj_type is SwhidExtendedObjectType.RELEASE
or obj_type is SwhidExtendedObjectType.REVISION
or obj_type is SwhidExtendedObjectType.DIRECTORY
or obj_type is SwhidExtendedObjectType.CONTENT
):
raise ValueError(
f"Unexpected 'visit' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if self.origin is None:
raise ValueError("'origin' context must be set if 'visit' is.")
if value <= 0:
raise ValueError("Nonpositive visit id")
@snapshot.validator
def check_snapshot(self, attribute, value):
if value is None:
return
if value.__class__ is not CoreSWHID:
raise AttributeTypeError(value, attribute)
obj_type = self.target.object_type
if not (
obj_type is SwhidExtendedObjectType.RELEASE
or obj_type is SwhidExtendedObjectType.REVISION
or obj_type is SwhidExtendedObjectType.DIRECTORY
or obj_type is SwhidExtendedObjectType.CONTENT
):
raise ValueError(
f"Unexpected 'snapshot' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.object_type != SwhidObjectType.SNAPSHOT:
raise ValueError(
f"Expected SWHID type 'snapshot', "
f"got '{value.object_type.name.lower()}' in {value}"
)
@release.validator
def check_release(self, attribute, value):
if value is None:
return
if value.__class__ is not CoreSWHID:
raise AttributeTypeError(value, attribute)
obj_type = self.target.object_type
if not (
obj_type is SwhidExtendedObjectType.REVISION
or obj_type is SwhidExtendedObjectType.DIRECTORY
or obj_type is SwhidExtendedObjectType.CONTENT
):
raise ValueError(
f"Unexpected 'release' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.object_type != SwhidObjectType.RELEASE:
raise ValueError(
f"Expected SWHID type 'release', "
f"got '{value.object_type.name.lower()}' in {value}"
)
@revision.validator
def check_revision(self, attribute, value):
if value is None:
return
if value.__class__ is not CoreSWHID:
raise AttributeTypeError(value, attribute)
obj_type = self.target.object_type
if not (
obj_type is SwhidExtendedObjectType.DIRECTORY
or obj_type is SwhidExtendedObjectType.CONTENT
):
raise ValueError(
f"Unexpected 'revision' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.object_type != SwhidObjectType.REVISION:
raise ValueError(
f"Expected SWHID type 'revision', "
f"got '{value.object_type.name.lower()}' in {value}"
)
@path.validator
def check_path(self, attribute, value):
if value is None:
return
if value.__class__ is not bytes:
raise AttributeTypeError(value, attribute)
obj_type = self.target.object_type
if not (
obj_type is SwhidExtendedObjectType.DIRECTORY
or obj_type is SwhidExtendedObjectType.CONTENT
):
raise ValueError(
f"Unexpected 'path' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
@directory.validator
def check_directory(self, attribute, value):
if value is None:
return
if value.__class__ is not CoreSWHID:
raise AttributeTypeError(value, attribute)
if self.target.object_type is not SwhidExtendedObjectType.CONTENT:
raise ValueError(
f"Unexpected 'directory' context for "
f"{self.target.object_type.name.lower()} object: {value}"
)
if value.object_type != SwhidObjectType.DIRECTORY:
raise ValueError(
f"Expected SWHID type 'directory', "
f"got '{value.object_type.name.lower()}' in {value}"
)
def to_dict(self):
d = super().to_dict()
context_keys = (
"origin",
"visit",
"snapshot",
"release",
"revision",
"directory",
"path",
)
for context_key in context_keys:
if d[context_key] is None:
del d[context_key]
return d
@classmethod
def from_dict(cls, d):
if "type" in d:
# Convert from old schema
type_ = d.pop("type")
if type_ == "origin":
d["target"] = str(Origin(d["target"]).swhid())
d = {
**d,
"target": ExtendedSWHID.from_string(d["target"]),
"authority": MetadataAuthority.from_dict(d["authority"]),
"fetcher": MetadataFetcher.from_dict(d["fetcher"]),
}
swhid_keys = ("snapshot", "release", "revision", "directory")
for swhid_key in swhid_keys:
if d.get(swhid_key):
d[swhid_key] = CoreSWHID.from_string(d[swhid_key])
return super().from_dict(d)
def swhid(self) -> ExtendedSWHID:
"""Returns a SWHID representing this RawExtrinsicMetadata object."""
return ExtendedSWHID(
object_type=SwhidExtendedObjectType.RAW_EXTRINSIC_METADATA,
object_id=self.id,
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class ExtID(HashableObject, BaseModel):
object_type: Final = "extid"
extid_type = attr.ib(type=str, validator=generic_type_validator)
extid = attr.ib(type=bytes, validator=generic_type_validator)
target = attr.ib(type=CoreSWHID, validator=generic_type_validator)
extid_version = attr.ib(type=int, validator=generic_type_validator, default=0)
id = attr.ib(
type=Sha1Git, validator=generic_type_validator, default=b"", repr=hash_repr
)
@classmethod
def from_dict(cls, d):
return cls(
extid=d["extid"],
extid_type=d["extid_type"],
target=CoreSWHID.from_string(d["target"]),
extid_version=d.get("extid_version", 0),
)
def _compute_hash_from_attributes(self) -> bytes:
return _compute_hash_from_manifest(git_objects.extid_git_object(self))
# Note: we need the type ignore stanza here because mypy cannot figure that all
# subclasses of BaseModel do have an object_type attribute, even if BaseModel
# itself does not (because these are Final)
SWH_MODEL_OBJECT_TYPES: Dict[str, Type[BaseModel]] = {
cls.object_type: cls # type: ignore
for cls in (
Person,
Timestamp,
TimestampWithTimezone,
Origin,
OriginVisit,
OriginVisitStatus,
Snapshot,
SnapshotBranch,
Release,
Revision,
Directory,
DirectoryEntry,
Content,
SkippedContent,
MetadataAuthority,
MetadataFetcher,
RawExtrinsicMetadata,
ExtID,
)
}
diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py
index b7674d4..c07fef6 100644
--- a/swh/model/tests/test_from_disk.py
+++ b/swh/model/tests/test_from_disk.py
@@ -1,1001 +1,1008 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import os
import tarfile
import tempfile
from typing import ClassVar, Optional
import unittest
import pytest
from swh.model import from_disk, model
from swh.model.from_disk import Content, DentryPerms, Directory, DiskBackedContent
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex
TEST_DATA = os.path.join(os.path.dirname(__file__), "data")
class ModeToPerms(unittest.TestCase):
def setUp(self):
super().setUp()
# Generate a full permissions map
self.perms_map = {}
# Symlinks
for i in range(0o120000, 0o127777 + 1):
self.perms_map[i] = DentryPerms.symlink
# Directories
for i in range(0o040000, 0o047777 + 1):
self.perms_map[i] = DentryPerms.directory
# Other file types: socket, regular file, block device, character
# device, fifo all map to regular files
for ft in [0o140000, 0o100000, 0o060000, 0o020000, 0o010000]:
for i in range(ft, ft + 0o7777 + 1):
if i & 0o111:
# executable bits are set
self.perms_map[i] = DentryPerms.executable_content
else:
self.perms_map[i] = DentryPerms.content
def test_exhaustive_mode_to_perms(self):
for fmode, perm in self.perms_map.items():
self.assertEqual(perm, from_disk.mode_to_perms(fmode))
class TestDiskBackedContent(unittest.TestCase):
def test_with_data(self):
expected_content = model.Content(
length=42,
status="visible",
data=b"foo bar",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
content = DiskBackedContent(
length=42,
status="visible",
path=fd.name,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
fd.write(b"foo bar")
fd.seek(0)
content_with_data = content.with_data()
assert expected_content == content_with_data
def test_lazy_data(self):
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
fd.write(b"foo")
fd.seek(0)
content = DiskBackedContent(
length=42,
status="visible",
path=fd.name,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
fd.write(b"bar")
fd.seek(0)
content_with_data = content.with_data()
fd.write(b"baz")
fd.seek(0)
assert content_with_data.data == b"bar"
def test_with_data_cannot_read(self):
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
content = DiskBackedContent(
length=42,
status="visible",
path=fd.name,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(OSError):
content.with_data()
def test_missing_path(self):
with pytest.raises(TypeError):
DiskBackedContent(
length=42,
status="visible",
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
with pytest.raises(TypeError):
DiskBackedContent(
length=42,
status="visible",
path=None,
sha1=b"foo",
sha1_git=b"bar",
sha256=b"baz",
blake2s256=b"qux",
)
class DataMixin:
maxDiff = None # type: ClassVar[Optional[int]]
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory(prefix="swh.model.from_disk")
self.tmpdir_name = os.fsencode(self.tmpdir.name)
self.contents = {
b"file": {
"data": b"42\n",
"sha1": hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
"sha256": hash_to_bytes(
"084c799cd551dd1d8d5c5f9a5d593b2e"
"931f5e36122ee5c793c1d08a19839cc0"
),
"sha1_git": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"blake2s256": hash_to_bytes(
"d5fe1939576527e42cfd76a9455a2432"
"fe7f56669564577dd93c4280e76d661d"
),
"length": 3,
"mode": 0o100644,
},
}
self.symlinks = {
b"symlink": {
"data": b"target",
"blake2s256": hash_to_bytes(
"595d221b30fdd8e10e2fdf18376e688e"
"9f18d56fd9b6d1eb6a822f8c146c6da6"
),
"sha1": hash_to_bytes("0e8a3ad980ec179856012b7eecf4327e99cd44cd"),
"sha1_git": hash_to_bytes("1de565933b05f74c75ff9a6520af5f9f8a5a2f1d"),
"sha256": hash_to_bytes(
"34a04005bcaf206eec990bd9637d9fdb"
"6725e0a0c0d4aebf003f17f4c956eb5c"
),
"length": 6,
"perms": DentryPerms.symlink,
}
}
self.specials = {
b"fifo": os.mkfifo,
}
self.empty_content = {
"data": b"",
"length": 0,
"blake2s256": hash_to_bytes(
"69217a3079908094e11121d042354a7c" "1f55b6482ca1a51e1b250dfd1ed0eef9"
),
"sha1": hash_to_bytes("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
"sha1_git": hash_to_bytes("e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"),
"sha256": hash_to_bytes(
"e3b0c44298fc1c149afbf4c8996fb924" "27ae41e4649b934ca495991b7852b855"
),
"perms": DentryPerms.content,
}
self.empty_directory = {
"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
"entries": [],
}
# Generated with generate_testdata_from_disk
self.tarball_contents = {
b"": {
"entries": [
{
"name": b"bar",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"3c1f578394f4623f74a0ba7fe761729f59fc6ec4"
),
"type": "dir",
},
{
"name": b"empty-folder",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"4b825dc642cb6eb9a060e54bf8d69288fbee4904"
),
"type": "dir",
},
{
"name": b"foo",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"2b41c40f0d1fbffcba12497db71fba83fcca96e5"
),
"type": "dir",
},
{
"name": b"link-to-another-quote",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"7d5c08111e21c8a9f71540939998551683375fad"
),
"type": "file",
},
{
"name": b"link-to-binary",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"e86b45e538d9b6888c969c89fbd22a85aa0e0366"
),
"type": "file",
},
{
"name": b"link-to-foo",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"19102815663d23f8b75a47e7a01965dcdc96468c"
),
"type": "file",
},
{
"name": b"some-binary",
"perms": DentryPerms.executable_content,
"target": hash_to_bytes(
"68769579c3eaadbe555379b9c3538e6628bae1eb"
),
"type": "file",
},
],
"id": hash_to_bytes("e8b0f1466af8608c8a3fb9879db172b887e80759"),
},
b"bar": {
"entries": [
{
"name": b"barfoo",
"perms": DentryPerms.directory,
"target": hash_to_bytes(
"c3020f6bf135a38c6df3afeb5fb38232c5e07087"
),
"type": "dir",
}
],
"id": hash_to_bytes("3c1f578394f4623f74a0ba7fe761729f59fc6ec4"),
},
b"bar/barfoo": {
"entries": [
{
"name": b"another-quote.org",
"perms": DentryPerms.content,
"target": hash_to_bytes(
"133693b125bad2b4ac318535b84901ebb1f6b638"
),
"type": "file",
}
],
"id": hash_to_bytes("c3020f6bf135a38c6df3afeb5fb38232c5e07087"),
},
b"bar/barfoo/another-quote.org": {
"blake2s256": hash_to_bytes(
"d26c1cad82d43df0bffa5e7be11a60e3"
"4adb85a218b433cbce5278b10b954fe8"
),
"length": 72,
"perms": DentryPerms.content,
"sha1": hash_to_bytes("90a6138ba59915261e179948386aa1cc2aa9220a"),
"sha1_git": hash_to_bytes("133693b125bad2b4ac318535b84901ebb1f6b638"),
"sha256": hash_to_bytes(
"3db5ae168055bcd93a4d08285dc99ffe"
"e2883303b23fac5eab850273a8ea5546"
),
},
b"empty-folder": {
"entries": [],
"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
},
b"foo": {
"entries": [
{
"name": b"barfoo",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"8185dfb2c0c2c597d16f75a8a0c37668567c3d7e"
),
"type": "file",
},
{
"name": b"quotes.md",
"perms": DentryPerms.content,
"target": hash_to_bytes(
"7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"
),
"type": "file",
},
{
"name": b"rel-link-to-barfoo",
"perms": DentryPerms.symlink,
"target": hash_to_bytes(
"acac326ddd63b0bc70840659d4ac43619484e69f"
),
"type": "file",
},
],
"id": hash_to_bytes("2b41c40f0d1fbffcba12497db71fba83fcca96e5"),
},
b"foo/barfoo": {
"blake2s256": hash_to_bytes(
"e1252f2caa4a72653c4efd9af871b62b"
"f2abb7bb2f1b0e95969204bd8a70d4cd"
),
"data": b"bar/barfoo",
"length": 10,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("9057ee6d0162506e01c4d9d5459a7add1fedac37"),
"sha1_git": hash_to_bytes("8185dfb2c0c2c597d16f75a8a0c37668567c3d7e"),
"sha256": hash_to_bytes(
"29ad3f5725321b940332c78e403601af"
"ff61daea85e9c80b4a7063b6887ead68"
),
},
b"foo/quotes.md": {
"blake2s256": hash_to_bytes(
"bf7ce4fe304378651ee6348d3e9336ed"
"5ad603d33e83c83ba4e14b46f9b8a80b"
),
"length": 66,
"perms": DentryPerms.content,
"sha1": hash_to_bytes("1bf0bb721ac92c18a19b13c0eb3d741cbfadebfc"),
"sha1_git": hash_to_bytes("7c4c57ba9ff496ad179b8f65b1d286edbda34c9a"),
"sha256": hash_to_bytes(
"caca942aeda7b308859eb56f909ec96d"
"07a499491690c453f73b9800a93b1659"
),
},
b"foo/rel-link-to-barfoo": {
"blake2s256": hash_to_bytes(
"d9c327421588a1cf61f316615005a2e9"
"c13ac3a4e96d43a24138d718fa0e30db"
),
"data": b"../bar/barfoo",
"length": 13,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("dc51221d308f3aeb2754db48391b85687c2869f4"),
"sha1_git": hash_to_bytes("acac326ddd63b0bc70840659d4ac43619484e69f"),
"sha256": hash_to_bytes(
"8007d20db2af40435f42ddef4b8ad76b"
"80adbec26b249fdf0473353f8d99df08"
),
},
b"link-to-another-quote": {
"blake2s256": hash_to_bytes(
"2d0e73cea01ba949c1022dc10c8a43e6"
"6180639662e5dc2737b843382f7b1910"
),
"data": b"bar/barfoo/another-quote.org",
"length": 28,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("cbeed15e79599c90de7383f420fed7acb48ea171"),
"sha1_git": hash_to_bytes("7d5c08111e21c8a9f71540939998551683375fad"),
"sha256": hash_to_bytes(
"e6e17d0793aa750a0440eb9ad5b80b25"
"8076637ef0fb68f3ac2e59e4b9ac3ba6"
),
},
b"link-to-binary": {
"blake2s256": hash_to_bytes(
"9ce18b1adecb33f891ca36664da676e1"
"2c772cc193778aac9a137b8dc5834b9b"
),
"data": b"some-binary",
"length": 11,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("d0248714948b3a48a25438232a6f99f0318f59f1"),
"sha1_git": hash_to_bytes("e86b45e538d9b6888c969c89fbd22a85aa0e0366"),
"sha256": hash_to_bytes(
"14126e97d83f7d261c5a6889cee73619"
"770ff09e40c5498685aba745be882eff"
),
},
b"link-to-foo": {
"blake2s256": hash_to_bytes(
"08d6cad88075de8f192db097573d0e82"
"9411cd91eb6ec65e8fc16c017edfdb74"
),
"data": b"foo",
"length": 3,
"perms": DentryPerms.symlink,
"sha1": hash_to_bytes("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"),
"sha1_git": hash_to_bytes("19102815663d23f8b75a47e7a01965dcdc96468c"),
"sha256": hash_to_bytes(
"2c26b46b68ffc68ff99b453c1d304134"
"13422d706483bfa0f98a5e886266e7ae"
),
},
b"some-binary": {
"blake2s256": hash_to_bytes(
"922e0f7015035212495b090c27577357"
"a740ddd77b0b9e0cd23b5480c07a18c6"
),
"length": 5,
"perms": DentryPerms.executable_content,
"sha1": hash_to_bytes("0bbc12d7f4a2a15b143da84617d95cb223c9b23c"),
"sha1_git": hash_to_bytes("68769579c3eaadbe555379b9c3538e6628bae1eb"),
"sha256": hash_to_bytes(
"bac650d34a7638bb0aeb5342646d24e3"
"b9ad6b44c9b383621faa482b990a367d"
),
},
}
def tearDown(self):
self.tmpdir.cleanup()
def assertContentEqual(self, left, right, *, check_path=False): # noqa
if not isinstance(left, Content):
raise ValueError("%s is not a Content" % left)
if isinstance(right, Content):
right = right.get_data()
# Compare dictionaries
keys = DEFAULT_ALGORITHMS | {
"length",
"perms",
}
if check_path:
keys |= {"path"}
failed = []
for key in keys:
try:
lvalue = left.data[key]
if key == "perms" and "perms" not in right:
rvalue = from_disk.mode_to_perms(right["mode"])
else:
rvalue = right[key]
except KeyError:
failed.append(key)
continue
if lvalue != rvalue:
failed.append(key)
if failed:
raise self.failureException(
"Content mismatched:\n"
+ "\n".join(
"content[%s] = %r != %r" % (key, left.data.get(key), right.get(key))
for key in failed
)
)
def assertDirectoryEqual(self, left, right): # NoQA
if not isinstance(left, Directory):
raise ValueError("%s is not a Directory" % left)
if isinstance(right, Directory):
right = right.get_data()
assert left.entries == right["entries"]
assert left.hash == right["id"]
assert left.to_model() == model.Directory.from_dict(right)
def make_contents(self, directory):
for filename, content in self.contents.items():
path = os.path.join(directory, filename)
with open(path, "wb") as f:
f.write(content["data"])
os.chmod(path, content["mode"])
def make_symlinks(self, directory):
for filename, symlink in self.symlinks.items():
path = os.path.join(directory, filename)
os.symlink(symlink["data"], path)
def make_specials(self, directory):
for filename, fn in self.specials.items():
path = os.path.join(directory, filename)
fn(path)
def make_from_tarball(self, directory):
tarball = os.path.join(TEST_DATA, "dir-folders", "sample-folder.tgz")
with tarfile.open(tarball, "r:gz") as f:
f.extractall(os.fsdecode(directory))
class TestContent(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
def test_data_to_content(self):
for filename, content in self.contents.items():
conv_content = Content.from_bytes(
mode=content["mode"], data=content["data"]
)
self.assertContentEqual(conv_content, content)
self.assertIn(hash_to_hex(conv_content.hash), repr(conv_content))
def test_content_swhid(self):
for _, content in self.contents.items():
content_res = Content.from_bytes(mode=content["mode"], data=content["data"])
content_swhid = "swh:1:cnt:" + hash_to_hex(content["sha1_git"])
assert str(content_res.swhid()) == content_swhid
class TestDirectory(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
def test_directory_swhid(self):
directory_swhid = "swh:1:dir:" + hash_to_hex(self.empty_directory["id"])
directory = Directory.from_disk(path=self.tmpdir_name)
assert str(directory.swhid()) == directory_swhid
class SymlinkToContent(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_symlinks(self.tmpdir_name)
def test_symlink_to_content(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
perms = 0o120000
conv_content = Content.from_symlink(path=path, mode=perms)
self.assertContentEqual(conv_content, symlink)
def test_symlink_to_base_model(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
perms = 0o120000
model_content = Content.from_symlink(path=path, mode=perms).to_model()
right = symlink.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
class FileToContent(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_contents(self.tmpdir_name)
self.make_symlinks(self.tmpdir_name)
self.make_specials(self.tmpdir_name)
def test_symlink_to_content(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, symlink)
def test_file_to_content(self):
for filename, content in self.contents.items():
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, content)
def test_special_to_content(self):
for filename in self.specials:
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, self.empty_content)
for path in ["/dev/null", "/dev/zero"]:
path = os.path.join(self.tmpdir_name, filename)
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, self.empty_content)
def test_symlink_to_content_model(self):
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
model_content = Content.from_file(path=path).to_model()
right = symlink.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
def test_file_to_content_model(self):
for filename, content in self.contents.items():
path = os.path.join(self.tmpdir_name, filename)
model_content = Content.from_file(path=path).to_model()
right = content.copy()
for key in ("perms", "mode"):
right.pop(key, None)
assert model_content.with_data() == model.Content.from_dict(right)
right["path"] = path
del right["data"]
assert model_content == DiskBackedContent.from_dict(right)
def test_special_to_content_model(self):
for filename in self.specials:
path = os.path.join(self.tmpdir_name, filename)
model_content = Content.from_file(path=path).to_model()
right = self.empty_content.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
for path in ["/dev/null", "/dev/zero"]:
model_content = Content.from_file(path=path).to_model()
right = self.empty_content.copy()
for key in ("perms", "path", "mode"):
right.pop(key, None)
right["status"] = "visible"
assert model_content == model.Content.from_dict(right)
def test_symlink_max_length(self):
for max_content_length in [4, 10]:
for filename, symlink in self.symlinks.items():
path = os.path.join(self.tmpdir_name, filename)
content = Content.from_file(path=path)
if content.data["length"] > max_content_length:
with pytest.raises(Exception, match="too large"):
Content.from_file(
path=path, max_content_length=max_content_length
)
else:
limited_content = Content.from_file(
path=path, max_content_length=max_content_length
)
assert content == limited_content
def test_file_max_length(self):
for max_content_length in [2, 4]:
for filename, content in self.contents.items():
path = os.path.join(self.tmpdir_name, filename)
content = Content.from_file(path=path)
limited_content = Content.from_file(
path=path, max_content_length=max_content_length
)
assert content.data["length"] == limited_content.data["length"]
assert content.data["status"] == "visible"
if content.data["length"] > max_content_length:
assert limited_content.data["status"] == "absent"
assert limited_content.data["reason"] == "Content too large"
else:
assert limited_content.data["status"] == "visible"
def test_special_file_max_length(self):
for max_content_length in [None, 0, 1]:
for filename in self.specials:
path = os.path.join(self.tmpdir_name, filename)
content = Content.from_file(path=path)
limited_content = Content.from_file(
path=path, max_content_length=max_content_length
)
assert limited_content == content
def test_file_to_content_with_path(self):
for filename, content in self.contents.items():
content_w_path = content.copy()
path = os.path.join(self.tmpdir_name, filename)
content_w_path["path"] = path
conv_content = Content.from_file(path=path)
self.assertContentEqual(conv_content, content_w_path, check_path=True)
@pytest.mark.fs
class DirectoryToObjects(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
contents = os.path.join(self.tmpdir_name, b"contents")
os.mkdir(contents)
self.make_contents(contents)
symlinks = os.path.join(self.tmpdir_name, b"symlinks")
os.mkdir(symlinks)
self.make_symlinks(symlinks)
specials = os.path.join(self.tmpdir_name, b"specials")
os.mkdir(specials)
self.make_specials(specials)
empties = os.path.join(self.tmpdir_name, b"empty1", b"empty2")
os.makedirs(empties)
+ def check_collect(
+ self, directory, expected_directory_count, expected_content_count
+ ):
+ objs = directory.collect()
+ contents = []
+ directories = []
+ for obj in objs:
+ if isinstance(obj, Content):
+ contents.append(obj)
+ elif isinstance(obj, Directory):
+ directories.append(obj)
+
+ self.assertEqual(len(directories), expected_directory_count)
+ self.assertEqual(len(contents), expected_content_count)
+
def test_directory_to_objects(self):
directory = Directory.from_disk(path=self.tmpdir_name)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name, value in self.symlinks.items():
self.assertContentEqual(directory[b"symlinks/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
self.assertEqual(
directory[b"empty1/empty2"].get_data(),
self.empty_directory,
)
# Raise on non existent file
with self.assertRaisesRegex(KeyError, "b'nonexistent'"):
directory[b"empty1/nonexistent"]
# Raise on non existent directory
with self.assertRaisesRegex(KeyError, "b'nonexistentdir'"):
directory[b"nonexistentdir/file"]
- objs = directory.collect()
-
- self.assertCountEqual(["content", "directory"], objs)
-
- self.assertEqual(len(objs["directory"]), 6)
- self.assertEqual(
- len(objs["content"]), len(self.contents) + len(self.symlinks) + 1
+ self.check_collect(
+ directory,
+ expected_directory_count=6,
+ expected_content_count=len(self.contents) + len(self.symlinks) + 1,
)
def test_directory_to_objects_ignore_empty(self):
directory = Directory.from_disk(
path=self.tmpdir_name, dir_filter=from_disk.ignore_empty_directories
)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name, value in self.symlinks.items():
self.assertContentEqual(directory[b"symlinks/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
# empty directories have been ignored recursively
with self.assertRaisesRegex(KeyError, "b'empty1'"):
directory[b"empty1"]
with self.assertRaisesRegex(KeyError, "b'empty1'"):
directory[b"empty1/empty2"]
- objs = directory.collect()
-
- self.assertCountEqual(["content", "directory"], objs)
-
- self.assertEqual(len(objs["directory"]), 4)
- self.assertEqual(
- len(objs["content"]), len(self.contents) + len(self.symlinks) + 1
+ self.check_collect(
+ directory,
+ expected_directory_count=4,
+ expected_content_count=len(self.contents) + len(self.symlinks) + 1,
)
def test_directory_to_objects_ignore_name(self):
directory = Directory.from_disk(
path=self.tmpdir_name,
dir_filter=from_disk.ignore_named_directories([b"symlinks"]),
)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
self.assertEqual(
directory[b"empty1/empty2"].get_data(),
self.empty_directory,
)
with self.assertRaisesRegex(KeyError, "b'symlinks'"):
directory[b"symlinks"]
- objs = directory.collect()
-
- self.assertCountEqual(["content", "directory"], objs)
-
- self.assertEqual(len(objs["directory"]), 5)
- self.assertEqual(len(objs["content"]), len(self.contents) + 1)
+ self.check_collect(
+ directory,
+ expected_directory_count=5,
+ expected_content_count=len(self.contents) + 1,
+ )
def test_directory_to_objects_ignore_name_case(self):
directory = Directory.from_disk(
path=self.tmpdir_name,
dir_filter=from_disk.ignore_named_directories(
[b"symLiNks"], case_sensitive=False
),
)
for name, value in self.contents.items():
self.assertContentEqual(directory[b"contents/" + name], value)
for name in self.specials:
self.assertContentEqual(
directory[b"specials/" + name],
self.empty_content,
)
self.assertEqual(
directory[b"empty1/empty2"].get_data(),
self.empty_directory,
)
with self.assertRaisesRegex(KeyError, "b'symlinks'"):
directory[b"symlinks"]
- objs = directory.collect()
-
- self.assertCountEqual(["content", "directory"], objs)
-
- self.assertEqual(len(objs["directory"]), 5)
- self.assertEqual(len(objs["content"]), len(self.contents) + 1)
+ self.check_collect(
+ directory,
+ expected_directory_count=5,
+ expected_content_count=len(self.contents) + 1,
+ )
def test_directory_entry_order(self):
with tempfile.TemporaryDirectory() as dirname:
dirname = os.fsencode(dirname)
open(os.path.join(dirname, b"foo."), "a")
open(os.path.join(dirname, b"foo0"), "a")
os.mkdir(os.path.join(dirname, b"foo"))
directory = Directory.from_disk(path=dirname)
assert [entry["name"] for entry in directory.entries] == [
b"foo.",
b"foo",
b"foo0",
]
@pytest.mark.fs
class TarballTest(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_from_tarball(self.tmpdir_name)
def test_contents_match(self):
directory = Directory.from_disk(
path=os.path.join(self.tmpdir_name, b"sample-folder")
)
for name, expected in self.tarball_contents.items():
obj = directory[name]
if isinstance(obj, Content):
self.assertContentEqual(obj, expected)
elif isinstance(obj, Directory):
self.assertDirectoryEqual(obj, expected)
else:
raise self.failureException("Unknown type for %s" % obj)
class TarballIterDirectory(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.make_from_tarball(self.tmpdir_name)
def test_iter_directory(self):
"""Iter from_disk.directory should yield the full arborescence tree"""
directory = Directory.from_disk(
path=os.path.join(self.tmpdir_name, b"sample-folder")
)
contents, skipped_contents, directories = from_disk.iter_directory(directory)
expected_nb = defaultdict(int)
for name in self.tarball_contents.keys():
obj = directory[name]
expected_nb[obj.object_type] += 1
assert len(contents) == expected_nb["content"] and len(contents) > 0
assert len(skipped_contents) == 0
assert len(directories) == expected_nb["directory"] and len(directories) > 0
class DirectoryManipulation(DataMixin, unittest.TestCase):
def test_directory_access_nested(self):
d = Directory()
d[b"a"] = Directory()
d[b"a/b"] = Directory()
self.assertEqual(d[b"a/b"].get_data(), self.empty_directory)
def test_directory_del_nested(self):
d = Directory()
d[b"a"] = Directory()
d[b"a/b"] = Directory()
with self.assertRaisesRegex(KeyError, "b'c'"):
del d[b"a/b/c"]
with self.assertRaisesRegex(KeyError, "b'level2'"):
del d[b"a/level2/c"]
del d[b"a/b"]
self.assertEqual(d[b"a"].get_data(), self.empty_directory)
def test_directory_access_self(self):
d = Directory()
self.assertIs(d, d[b""])
self.assertIs(d, d[b"/"])
self.assertIs(d, d[b"//"])
def test_directory_access_wrong_type(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "bytes from Directory"):
d["foo"]
with self.assertRaisesRegex(ValueError, "bytes from Directory"):
d[42]
def test_directory_repr(self):
entries = [b"a", b"b", b"c"]
d = Directory()
for entry in entries:
d[entry] = Directory()
r = repr(d)
self.assertIn(hash_to_hex(d.hash), r)
for entry in entries:
self.assertIn(str(entry), r)
def test_directory_set_wrong_type_name(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
d["foo"] = Directory()
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
d[42] = Directory()
def test_directory_set_nul_in_name(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "nul bytes"):
d[b"\x00\x01"] = Directory()
def test_directory_set_empty_name(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "must have a name"):
d[b""] = Directory()
with self.assertRaisesRegex(ValueError, "must have a name"):
d[b"/"] = Directory()
def test_directory_set_wrong_type(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "Content or Directory"):
d[b"entry"] = object()
def test_directory_del_wrong_type(self):
d = Directory()
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
del d["foo"]
with self.assertRaisesRegex(ValueError, "bytes Directory entry"):
del d[42]
def test_directory_contains(self):
d = Directory()
d[b"a"] = Directory()
d[b"a/b"] = Directory()
d[b"a/b/c"] = Directory()
d[b"a/b/c/d"] = Content()
self.assertIn(b"a", d)
self.assertIn(b"a/b", d)
self.assertIn(b"a/b/c", d)
self.assertIn(b"a/b/c/d", d)
self.assertNotIn(b"b", d)
self.assertNotIn(b"b/c", d)
self.assertNotIn(b"b/c/d", d)
diff --git a/swh/model/tests/test_merkle.py b/swh/model/tests/test_merkle.py
index 52edb2c..a852541 100644
--- a/swh/model/tests/test_merkle.py
+++ b/swh/model/tests/test_merkle.py
@@ -1,267 +1,262 @@
-# Copyright (C) 2017-2020 The Software Heritage developers
+# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.model import merkle
class MerkleTestNode(merkle.MerkleNode):
object_type = "tested_merkle_node_type"
def __init__(self, data):
super().__init__(data)
self.compute_hash_called = 0
- def compute_hash(self):
+ def compute_hash(self) -> bytes:
self.compute_hash_called += 1
child_data = [child + b"=" + self[child].hash for child in sorted(self)]
-
- return b"hash(" + b", ".join([self.data["value"]] + child_data) + b")"
+ return b"hash(" + b", ".join([self.data.get("value", b"")] + child_data) + b")"
class MerkleTestLeaf(merkle.MerkleLeaf):
object_type = "tested_merkle_leaf_type"
def __init__(self, data):
super().__init__(data)
self.compute_hash_called = 0
def compute_hash(self):
self.compute_hash_called += 1
- return b"hash(" + self.data["value"] + b")"
+ return b"hash(" + self.data.get("value", b"") + b")"
class TestMerkleLeaf(unittest.TestCase):
def setUp(self):
self.data = {"value": b"value"}
self.instance = MerkleTestLeaf(self.data)
def test_equality(self):
leaf1 = MerkleTestLeaf(self.data)
leaf2 = MerkleTestLeaf(self.data)
leaf3 = MerkleTestLeaf({})
self.assertEqual(leaf1, leaf2)
self.assertNotEqual(leaf1, leaf3)
def test_hash(self):
self.assertEqual(self.instance.compute_hash_called, 0)
instance_hash = self.instance.hash
self.assertEqual(self.instance.compute_hash_called, 1)
instance_hash2 = self.instance.hash
self.assertEqual(self.instance.compute_hash_called, 1)
self.assertEqual(instance_hash, instance_hash2)
def test_data(self):
self.assertEqual(self.instance.get_data(), self.data)
def test_collect(self):
collected = self.instance.collect()
self.assertEqual(
collected,
- {
- self.instance.object_type: {
- self.instance.hash: self.instance.get_data(),
- },
- },
+ {self.instance},
)
collected2 = self.instance.collect()
- self.assertEqual(collected2, {})
+ self.assertEqual(collected2, set())
self.instance.reset_collect()
collected3 = self.instance.collect()
self.assertEqual(collected, collected3)
def test_leaf(self):
with self.assertRaisesRegex(ValueError, "is a leaf"):
self.instance[b"key1"] = "Test"
with self.assertRaisesRegex(ValueError, "is a leaf"):
del self.instance[b"key1"]
with self.assertRaisesRegex(ValueError, "is a leaf"):
self.instance[b"key1"]
with self.assertRaisesRegex(ValueError, "is a leaf"):
self.instance.update(self.data)
class TestMerkleNode(unittest.TestCase):
maxDiff = None
def setUp(self):
self.root = MerkleTestNode({"value": b"root"})
self.nodes = {b"root": self.root}
for i in (b"a", b"b", b"c"):
value = b"root/" + i
node = MerkleTestNode(
{
"value": value,
}
)
self.root[i] = node
self.nodes[value] = node
for j in (b"a", b"b", b"c"):
value2 = value + b"/" + j
node2 = MerkleTestNode(
{
"value": value2,
}
)
node[j] = node2
self.nodes[value2] = node2
for k in (b"a", b"b", b"c"):
value3 = value2 + b"/" + j
node3 = MerkleTestNode(
{
"value": value3,
}
)
node2[j] = node3
self.nodes[value3] = node3
def test_equality(self):
- node1 = merkle.MerkleNode({"foo": b"bar"})
- node2 = merkle.MerkleNode({"foo": b"bar"})
- node3 = merkle.MerkleNode({})
+ node1 = MerkleTestNode({"value": b"bar"})
+ node2 = MerkleTestNode({"value": b"bar"})
+ node3 = MerkleTestNode({})
self.assertEqual(node1, node2)
self.assertNotEqual(node1, node3, node1 == node3)
- node1["foo"] = node3
+ node1[b"a"] = node3
self.assertNotEqual(node1, node2)
- node2["foo"] = node3
+ node2[b"a"] = node3
self.assertEqual(node1, node2)
def test_hash(self):
for node in self.nodes.values():
self.assertEqual(node.compute_hash_called, 0)
# Root hash will compute hash for all the nodes
hash = self.root.hash
for node in self.nodes.values():
self.assertEqual(node.compute_hash_called, 1)
self.assertIn(node.data["value"], hash)
# Should use the cached value
hash2 = self.root.hash
self.assertEqual(hash, hash2)
for node in self.nodes.values():
self.assertEqual(node.compute_hash_called, 1)
# Should still use the cached value
hash3 = self.root.update_hash(force=False)
self.assertEqual(hash, hash3)
for node in self.nodes.values():
self.assertEqual(node.compute_hash_called, 1)
# Force update of the cached value for a deeply nested node
self.root[b"a"][b"b"].update_hash(force=True)
for key, node in self.nodes.items():
# update_hash rehashes all children
if key.startswith(b"root/a/b"):
self.assertEqual(node.compute_hash_called, 2)
else:
self.assertEqual(node.compute_hash_called, 1)
hash4 = self.root.hash
self.assertEqual(hash, hash4)
for key, node in self.nodes.items():
# update_hash also invalidates all parents
if key in (b"root", b"root/a") or key.startswith(b"root/a/b"):
self.assertEqual(node.compute_hash_called, 2)
else:
self.assertEqual(node.compute_hash_called, 1)
def test_collect(self):
collected = self.root.collect()
- self.assertEqual(len(collected[self.root.object_type]), len(self.nodes))
+ self.assertEqual(collected, set(self.nodes.values()))
for node in self.nodes.values():
self.assertTrue(node.collected)
collected2 = self.root.collect()
- self.assertEqual(collected2, {})
+ self.assertEqual(collected2, set())
def test_iter_tree_with_deduplication(self):
nodes = list(self.root.iter_tree())
self.assertCountEqual(nodes, self.nodes.values())
def test_iter_tree_without_deduplication(self):
# duplicate existing hash in merkle tree
self.root[b"d"] = MerkleTestNode({"value": b"root/c/c/c"})
nodes_dedup = list(self.root.iter_tree())
nodes = list(self.root.iter_tree(dedup=False))
assert nodes != nodes_dedup
assert len(nodes) == len(nodes_dedup) + 1
def test_get(self):
for key in (b"a", b"b", b"c"):
self.assertEqual(self.root[key], self.nodes[b"root/" + key])
with self.assertRaisesRegex(KeyError, "b'nonexistent'"):
self.root[b"nonexistent"]
def test_del(self):
hash_root = self.root.hash
hash_a = self.nodes[b"root/a"].hash
del self.root[b"a"][b"c"]
hash_root2 = self.root.hash
hash_a2 = self.nodes[b"root/a"].hash
self.assertNotEqual(hash_root, hash_root2)
self.assertNotEqual(hash_a, hash_a2)
self.assertEqual(self.nodes[b"root/a/c"].parents, [])
with self.assertRaisesRegex(KeyError, "b'nonexistent'"):
del self.root[b"nonexistent"]
def test_update(self):
hash_root = self.root.hash
hash_b = self.root[b"b"].hash
new_children = {
b"c": MerkleTestNode({"value": b"root/b/new_c"}),
b"d": MerkleTestNode({"value": b"root/b/d"}),
}
# collect all nodes
self.root.collect()
self.root[b"b"].update(new_children)
# Ensure everyone got reparented
self.assertEqual(new_children[b"c"].parents, [self.root[b"b"]])
self.assertEqual(new_children[b"d"].parents, [self.root[b"b"]])
self.assertEqual(self.nodes[b"root/b/c"].parents, [])
hash_root2 = self.root.hash
self.assertNotEqual(hash_root, hash_root2)
self.assertIn(b"root/b/new_c", hash_root2)
self.assertIn(b"root/b/d", hash_root2)
hash_b2 = self.root[b"b"].hash
self.assertNotEqual(hash_b, hash_b2)
for key, node in self.nodes.items():
if key in (b"root", b"root/b"):
self.assertEqual(node.compute_hash_called, 2)
else:
self.assertEqual(node.compute_hash_called, 1)
# Ensure we collected root, root/b, and both new children
collected_after_update = self.root.collect()
- self.assertCountEqual(
- collected_after_update[MerkleTestNode.object_type],
- [
- self.nodes[b"root"].hash,
- self.nodes[b"root/b"].hash,
- new_children[b"c"].hash,
- new_children[b"d"].hash,
- ],
+ self.assertEqual(
+ collected_after_update,
+ {
+ self.nodes[b"root"],
+ self.nodes[b"root/b"],
+ new_children[b"c"],
+ new_children[b"d"],
+ },
)
# test that noop updates doesn't invalidate anything
self.root[b"a"][b"b"].update({})
- self.assertEqual(self.root.collect(), {})
+ self.assertEqual(self.root.collect(), set())