diff --git a/PKG-INFO b/PKG-INFO index 854b38bc..416480d2 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,250 +1,250 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.37.1 +Version: 0.38.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal License-File: LICENSE License-File: AUTHORS swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: postgresql db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a postgresql storage with the following snippet: ``` storage: cls: postgresql db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` ## Cassandra As an alternative to PostgreSQL, swh-storage can use Cassandra as a database backend. It can be used like this: ``` storage: cls: cassandra hosts: - localhost objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` The Cassandra swh-storage implementation supports both Cassandra >= 4.0-alpha2 and ScyllaDB >= 4.4 (and possibly earlier versions, but this is untested). While the main code supports both transparently, running tests or configuring the schema requires specific code when using ScyllaDB, enabled by setting the `SWH_USE_SCYLLADB=1` environment variable. diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 854b38bc..416480d2 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,250 +1,250 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.37.1 +Version: 0.38.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal License-File: LICENSE License-File: AUTHORS swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: postgresql db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a postgresql storage with the following snippet: ``` storage: cls: postgresql db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` ## Cassandra As an alternative to PostgreSQL, swh-storage can use Cassandra as a database backend. It can be used like this: ``` storage: cls: cassandra hosts: - localhost objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` The Cassandra swh-storage implementation supports both Cassandra >= 4.0-alpha2 and ScyllaDB >= 4.4 (and possibly earlier versions, but this is untested). While the main code supports both transparently, running tests or configuring the schema requires specific code when using ScyllaDB, enabled by setting the `SWH_USE_SCYLLADB=1` environment variable. diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py index 1320614e..f2df75f1 100644 --- a/swh/storage/proxies/buffer.py +++ b/swh/storage/proxies/buffer.py @@ -1,183 +1,320 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial -from typing import Dict, Iterable, Mapping, Sequence, Tuple +import logging +from typing import Dict, Iterable, Mapping, Sequence, Tuple, cast from typing_extensions import Literal from swh.core.utils import grouper -from swh.model.model import BaseModel, Content, SkippedContent +from swh.model.model import ( + BaseModel, + Content, + Directory, + Release, + Revision, + SkippedContent, +) from swh.storage import get_storage from swh.storage.interface import StorageInterface +logger = logging.getLogger(__name__) + LObjectType = Literal[ "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ] OBJECT_TYPES: Tuple[LObjectType, ...] = ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ) DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { "content": 10000, "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, + "directory_entries": 200000, "revision": 100000, + "revision_parents": 200000, + "revision_bytes": 100 * 1024 * 1024, "release": 100000, + "release_bytes": 100 * 1024 * 1024, "snapshot": 25000, "extid": 10000, } +def estimate_revision_size(revision: Revision) -> int: + """Estimate the size of a revision, by summing the size of variable length fields""" + s = 20 * len(revision.parents) + + if revision.message: + s += len(revision.message) + + s += len(revision.author.fullname) + s += len(revision.committer.fullname) + s += sum(len(h) + len(v) for h, v in revision.extra_headers) + + return s + + +def estimate_release_size(release: Release) -> int: + """Estimate the size of a release, by summing the size of variable length fields""" + s = 0 + if release.message: + s += len(release.message) + if release.author: + s += len(release.author.fullname) + + return s + + class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 + directory_entries: 100000 revision: 1000 + revision_parents: 2000 + revision_bytes: 100000000 release: 10000 + release_bytes: 100000000 snapshot: 5000 """ def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 + self._directory_entries: int = 0 + self._revision_parents: int = 0 + self._revision_size: int = 0 + self._release_size: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial(self.object_add, object_type=object_type, keys=["id"],) if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, contents: Sequence[Content]) -> Dict[str, int]: """Push contents to write to the storage in the buffer. Following policies apply: - if the buffer's threshold is hit, flush content to the storage. - otherwise, if the total size of buffered contents's threshold is hit, flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) - if not stats: # We did not flush already + if not stats: + # We did not flush based on number of objects; check total size self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) return stats def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) + def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: + stats = self.object_add(directories, object_type="directory", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the number of entries + self._directory_entries += sum(len(d.entries) for d in directories) + if self._directory_entries >= self._buffer_thresholds["directory_entries"]: + return self.flush(["content", "directory"]) + + return stats + + def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: + stats = self.object_add(revisions, object_type="revision", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the number of + # parents and estimated size + self._revision_parents += sum(len(r.parents) for r in revisions) + self._revision_size += sum(estimate_revision_size(r) for r in revisions) + if ( + self._revision_parents >= self._buffer_thresholds["revision_parents"] + or self._revision_size >= self._buffer_thresholds["revision_bytes"] + ): + return self.flush(["content", "directory", "revision"]) + + return stats + + def release_add(self, releases: Sequence[Release]) -> Dict[str, int]: + stats = self.object_add(releases, object_type="release", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the estimated size + self._release_size += sum(estimate_release_size(r) for r in releases) + if self._release_size >= self._buffer_thresholds["release_bytes"]: + return self.flush(["content", "directory", "revision", "release"]) + + return stats + def object_add( self, objects: Sequence[BaseModel], *, object_type: LObjectType, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {} def flush( self, object_types: Sequence[LObjectType] = OBJECT_TYPES ) -> Dict[str, int]: summary: Dict[str, int] = {} def update_summary(stats): for k, v in stats.items(): summary[k] = v + summary.get(k, 0) for object_type in object_types: buffer_ = self._objects[object_type] + if not buffer_: + continue + + if logger.isEnabledFor(logging.DEBUG): + log = "Flushing %s objects of type %s" + log_args = [len(buffer_), object_type] + + if object_type == "content": + log += " (%s bytes)" + log_args.append( + sum(cast(Content, c).length for c in buffer_.values()) + ) + + elif object_type == "directory": + log += " (%s entries)" + log_args.append( + sum(len(cast(Directory, d).entries) for d in buffer_.values()) + ) + + elif object_type == "revision": + log += " (%s parents, %s estimated bytes)" + log_args.extend( + ( + sum( + len(cast(Revision, r).parents) for r in buffer_.values() + ), + sum( + estimate_revision_size(cast(Revision, r)) + for r in buffer_.values() + ), + ) + ) + + elif object_type == "release": + log += " (%s estimated bytes)" + log_args.append( + sum( + estimate_release_size(cast(Release, r)) + for r in buffer_.values() + ) + ) + + logger.debug(log, *log_args) + batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) update_summary(stats) # Flush underlying storage stats = self.storage.flush(object_types) update_summary(stats) self.clear_buffers(object_types) return summary def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 + elif object_type == "directory": + self._directory_entries = 0 + elif object_type == "revision": + self._revision_parents = 0 + self._revision_size = 0 + elif object_type == "release": + self._release_size = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/proxies/filter.py b/swh/storage/proxies/filter.py index c28b3df3..14bff3cc 100644 --- a/swh/storage/proxies/filter.py +++ b/swh/storage/proxies/filter.py @@ -1,116 +1,145 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Iterable, List, Set -from swh.model.model import Content, Directory, Revision, Sha1Git, SkippedContent +from swh.model.model import ( + Content, + Directory, + Release, + Revision, + Sha1Git, + SkippedContent, +) from swh.storage import get_storage from swh.storage.interface import StorageInterface class FilteringProxyStorage: """Filtering Storage implementation. This is in charge of transparently filtering out known objects prior to adding them to storage. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: filter storage: cls: remote url: http://storage.internal.staging.swh.network:5002/ """ - object_types = ["content", "skipped_content", "directory", "revision"] + object_types = ["content", "skipped_content", "directory", "revision", "release"] def __init__(self, storage): self.storage: StorageInterface = get_storage(**storage) def __getattr__(self, key): if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: List[Content]) -> Dict[str, int]: + empty_stat = { + "content:add": 0, + "content:add:bytes": 0, + } + if not content: + return empty_stat contents_to_add = self._filter_missing_contents(content) + if not contents_to_add: + return empty_stat return self.storage.content_add( [x for x in content if x.sha256 in contents_to_add] ) def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: + empty_stat = {"skipped_content:add": 0} + if not content: + return empty_stat contents_to_add = self._filter_missing_skipped_contents(content) + if not contents_to_add and not any(c.sha1_git is None for c in content): + return empty_stat return self.storage.skipped_content_add( [x for x in content if x.sha1_git is None or x.sha1_git in contents_to_add] ) def directory_add(self, directories: List[Directory]) -> Dict[str, int]: + empty_stat = {"directory:add": 0} + if not directories: + return empty_stat missing_ids = self._filter_missing_ids("directory", (d.id for d in directories)) + if not missing_ids: + return empty_stat return self.storage.directory_add( [d for d in directories if d.id in missing_ids] ) def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: + empty_stat = {"revision:add": 0} + if not revisions: + return empty_stat missing_ids = self._filter_missing_ids("revision", (r.id for r in revisions)) + if not missing_ids: + return empty_stat return self.storage.revision_add([r for r in revisions if r.id in missing_ids]) + def release_add(self, releases: List[Release]) -> Dict[str, int]: + empty_stat = {"release:add": 0} + if not releases: + return empty_stat + missing_ids = self._filter_missing_ids("release", (r.id for r in releases)) + if not missing_ids: + return empty_stat + return self.storage.release_add([r for r in releases if r.id in missing_ids]) + def _filter_missing_contents(self, contents: List[Content]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ missing_contents = [] for content in contents: missing_contents.append(content.hashes()) return set(self.storage.content_missing(missing_contents, key_hash="sha256",)) def _filter_missing_skipped_contents( self, contents: List[SkippedContent] ) -> Set[Sha1Git]: """Return only the content keys missing from swh Args: content_hashes: List of sha1_git to check for existence in swh storage """ missing_contents = [c.hashes() for c in contents if c.sha1_git is not None] ids = set() for c in self.storage.skipped_content_missing(missing_contents): if c is None or c.get("sha1_git") is None: continue ids.add(c["sha1_git"]) return ids def _filter_missing_ids(self, object_type: str, ids: Iterable[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: List of object_type ids Returns: Missing ids from the storage for object_type """ - missing_ids = [] - for id in ids: - missing_ids.append(id) - - fn_by_object_type = { - "revision": self.storage.revision_missing, - "directory": self.storage.directory_missing, - } - - fn = fn_by_object_type[object_type] - return set(fn(missing_ids)) + return set(getattr(self.storage, f"{object_type}_missing")(list(ids))) diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index 0335c060..8f0f30ef 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,622 +1,729 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter from typing import Optional from unittest.mock import Mock from swh.storage import get_storage -from swh.storage.proxies.buffer import BufferingProxyStorage +from swh.storage.proxies.buffer import ( + BufferingProxyStorage, + estimate_release_size, + estimate_revision_size, +) def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: steps = [ {"cls": "buffer", **buffer_config}, {"cls": "memory"}, ] ret = get_storage("pipeline", steps=steps) assert isinstance(ret, BufferingProxyStorage) return ret def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None: contents = sample_data.contents[:2] contents_dict = [c.to_dict() for c in contents] storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) s = storage.content_add(contents) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing(contents_dict) assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) s = storage.flush() assert s == { "content:add": 1 + 1, "content:add:bytes": contents[0].length + contents[1].length, } missing_contents = storage.content_missing(contents_dict) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None: content = sample_data.content content_dict = content.to_dict() storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) s = storage.content_add([content]) assert s == { "content:add": 1, "content:add:bytes": content.length, } missing_contents = storage.content_missing([content_dict]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None: contents = sample_data.contents[:2] storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) s = storage.content_add([contents[0], contents[0]]) assert s == {} s = storage.content_add([contents[0]]) assert s == {} s = storage.content_add([contents[1]]) assert s == { "content:add": 1 + 1, "content:add:bytes": contents[0].length + contents[1].length, } missing_contents = storage.content_missing([c.to_dict() for c in contents]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None: contents = sample_data.contents[:2] content_bytes_min_batch_size = 2 storage = get_storage_with_buffer_config( min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} ) assert contents[0].length > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { "content:add": 1, "content:add:bytes": contents[0].length, } missing_contents = storage.content_missing([contents[0].to_dict()]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None: contents = sample_data.skipped_contents contents_dict = [c.to_dict() for c in contents] storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) s = storage.skipped_content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.skipped_content_missing(contents_dict) assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} s = storage.flush() assert s == {"skipped_content:add": 1 + 1} missing_contents = storage.skipped_content_missing(contents_dict) assert list(missing_contents) == [] def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None: contents = sample_data.skipped_contents storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) s = storage.skipped_content_add([contents[0]]) assert s == {"skipped_content:add": 1} missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): contents = sample_data.skipped_contents[:2] storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,}) s = storage.skipped_content_add([contents[0], contents[0]]) assert s == {} s = storage.skipped_content_add([contents[0]]) assert s == {} s = storage.skipped_content_add([contents[1]]) assert s == { "skipped_content:add": 1 + 1, } missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_extid_threshold_not_hit(sample_data) -> None: extid = sample_data.extid1 storage = get_storage_with_buffer_config(min_batch_size={"extid": 10,}) s = storage.extid_add([extid]) assert s == {} present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [] s = storage.flush() assert s == { "extid:add": 1, } present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] def test_buffering_proxy_storage_extid_threshold_hit(sample_data) -> None: extid = sample_data.extid1 storage = get_storage_with_buffer_config(min_batch_size={"extid": 1,}) s = storage.extid_add([extid]) assert s == { "extid:add": 1, } present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] s = storage.flush() assert s == {} def test_buffering_proxy_storage_extid_deduplicate(sample_data) -> None: extids = sample_data.extids[:2] storage = get_storage_with_buffer_config(min_batch_size={"extid": 2,}) s = storage.extid_add([extids[0], extids[0]]) assert s == {} s = storage.extid_add([extids[0]]) assert s == {} s = storage.extid_add([extids[1]]) assert s == { "extid:add": 1 + 1, } for extid in extids: present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None: directory = sample_data.directory storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) s = storage.directory_add([directory]) assert s == {} missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [directory.id] s = storage.flush() assert s == { "directory:add": 1, } missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None: directory = sample_data.directory storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) s = storage.directory_add([directory]) assert s == { "directory:add": 1, } missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None: directories = sample_data.directories[:2] storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) s = storage.directory_add([directories[0], directories[0]]) assert s == {} s = storage.directory_add([directories[0]]) assert s == {} s = storage.directory_add([directories[1]]) assert s == { "directory:add": 1 + 1, } missing_directories = storage.directory_missing([d.id for d in directories]) assert list(missing_directories) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_directory_entries_threshold(sample_data) -> None: + directories = sample_data.directories + n_entries = sum(len(d.entries) for d in directories) + threshold = sum(len(d.entries) for d in directories[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < n_entries + + storage = get_storage_with_buffer_config( + min_batch_size={"directory_entries": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for directory in directories: + storage.directory_add([directory]) + storage.flush() + + # We should have called the underlying directory_add at least twice, as + # we have hit the threshold for number of entries on directory n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["directory_add"] >= 2 + + def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) s = storage.revision_add([revision]) assert s == {} missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [revision.id] s = storage.flush() assert s == { "revision:add": 1, } missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) s = storage.revision_add([revision]) assert s == { "revision:add": 1, } missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None: revisions = sample_data.revisions[:2] storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) s = storage.revision_add([revisions[0], revisions[0]]) assert s == {} s = storage.revision_add([revisions[0]]) assert s == {} s = storage.revision_add([revisions[1]]) assert s == { "revision:add": 1 + 1, } missing_revisions = storage.revision_missing([r.id for r in revisions]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_revision_parents_threshold(sample_data) -> None: + revisions = sample_data.revisions + n_parents = sum(len(r.parents) for r in revisions) + threshold = sum(len(r.parents) for r in revisions[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < n_parents + + storage = get_storage_with_buffer_config( + min_batch_size={"revision_parents": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for revision in revisions: + storage.revision_add([revision]) + storage.flush() + + # We should have called the underlying revision_add at least twice, as + # we have hit the threshold for number of parents on revision n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["revision_add"] >= 2 + + +def test_buffering_proxy_storage_revision_size_threshold(sample_data) -> None: + revisions = sample_data.revisions + total_size = sum(estimate_revision_size(r) for r in revisions) + threshold = sum(estimate_revision_size(r) for r in revisions[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < total_size + + storage = get_storage_with_buffer_config( + min_batch_size={"revision_bytes": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for revision in revisions: + storage.revision_add([revision]) + storage.flush() + + # We should have called the underlying revision_add at least twice, as + # we have hit the threshold for number of parents on revision n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["revision_add"] >= 2 + + def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: releases = sample_data.releases threshold = 10 assert len(releases) < threshold storage = get_storage_with_buffer_config( min_batch_size={"release": threshold,} # configuration set ) s = storage.release_add(releases) assert s == {} release_ids = [r.id for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == release_ids s = storage.flush() assert s == { "release:add": len(releases), } missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None: releases = sample_data.releases threshold = 2 assert len(releases) > threshold storage = get_storage_with_buffer_config( min_batch_size={"release": threshold,} # configuration set ) s = storage.release_add(releases) assert s == { "release:add": len(releases), } release_ids = [r.id for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None: releases = sample_data.releases[:2] storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) s = storage.release_add([releases[0], releases[0]]) assert s == {} s = storage.release_add([releases[0]]) assert s == {} s = storage.release_add([releases[1]]) assert s == { "release:add": 1 + 1, } missing_releases = storage.release_missing([r.id for r in releases]) assert list(missing_releases) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_release_size_threshold(sample_data) -> None: + releases = sample_data.releases + total_size = sum(estimate_release_size(r) for r in releases) + threshold = sum(estimate_release_size(r) for r in releases[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < total_size + + storage = get_storage_with_buffer_config( + min_batch_size={"release_bytes": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for release in releases: + storage.release_add([release]) + storage.flush() + + # We should have called the underlying release_add at least twice, as + # we have hit the threshold for number of parents on release n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["release_add"] >= 2 + + def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 10 assert len(snapshots) < threshold storage = get_storage_with_buffer_config( min_batch_size={"snapshot": threshold,} # configuration set ) s = storage.snapshot_add(snapshots) assert s == {} snapshot_ids = [r.id for r in snapshots] missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == snapshot_ids s = storage.flush() assert s == { "snapshot:add": len(snapshots), } missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == [] def test_buffering_proxy_storage_snapshot_threshold_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 2 assert len(snapshots) > threshold storage = get_storage_with_buffer_config( min_batch_size={"snapshot": threshold,} # configuration set ) s = storage.snapshot_add(snapshots) assert s == { "snapshot:add": len(snapshots), } snapshot_ids = [r.id for r in snapshots] missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_snapshot_deduplicate(sample_data) -> None: snapshots = sample_data.snapshots[:2] storage = get_storage_with_buffer_config(min_batch_size={"snapshot": 2,}) s = storage.snapshot_add([snapshots[0], snapshots[0]]) assert s == {} s = storage.snapshot_add([snapshots[0]]) assert s == {} s = storage.snapshot_add([snapshots[1]]) assert s == { "snapshot:add": 1 + 1, } missing_snapshots = storage.snapshot_missing([r.id for r in snapshots]) assert list(missing_snapshots) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_clear(sample_data) -> None: """Clear operation on buffer """ threshold = 10 contents = sample_data.contents assert 0 < len(contents) < threshold skipped_contents = sample_data.skipped_contents assert 0 < len(skipped_contents) < threshold directories = sample_data.directories assert 0 < len(directories) < threshold revisions = sample_data.revisions assert 0 < len(revisions) < threshold releases = sample_data.releases assert 0 < len(releases) < threshold snapshots = sample_data.snapshots assert 0 < len(snapshots) < threshold storage = get_storage_with_buffer_config( min_batch_size={ "content": threshold, "skipped_content": threshold, "directory": threshold, "revision": threshold, "release": threshold, } ) s = storage.content_add(contents) assert s == {} s = storage.skipped_content_add(skipped_contents) assert s == {} s = storage.directory_add(directories) assert s == {} s = storage.revision_add(revisions) assert s == {} s = storage.release_add(releases) assert s == {} s = storage.snapshot_add(snapshots) assert s == {} assert len(storage._objects["content"]) == len(contents) assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) assert len(storage._objects["snapshot"]) == len(snapshots) # clear only content from the buffer s = storage.clear_buffers(["content"]) # type: ignore assert s is None # specific clear operation on specific object type content only touched # them assert len(storage._objects["content"]) == 0 assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) assert len(storage._objects["snapshot"]) == len(snapshots) # clear current buffer from all object types s = storage.clear_buffers() # type: ignore assert s is None assert len(storage._objects["content"]) == 0 assert len(storage._objects["skipped_content"]) == 0 assert len(storage._objects["directory"]) == 0 assert len(storage._objects["revision"]) == 0 assert len(storage._objects["release"]) == 0 assert len(storage._objects["snapshot"]) == 0 def test_buffer_proxy_with_default_args() -> None: storage = get_storage_with_buffer_config() assert storage is not None def test_buffer_flush_stats(sample_data) -> None: storage = get_storage_with_buffer_config() s = storage.content_add(sample_data.contents) assert s == {} s = storage.skipped_content_add(sample_data.skipped_contents) assert s == {} s = storage.directory_add(sample_data.directories) assert s == {} s = storage.revision_add(sample_data.revisions) assert s == {} s = storage.release_add(sample_data.releases) assert s == {} s = storage.snapshot_add(sample_data.snapshots) assert s == {} # Flush all the things s = storage.flush() assert s["content:add"] > 0 assert s["content:add:bytes"] > 0 assert s["skipped_content:add"] > 0 assert s["directory:add"] > 0 assert s["revision:add"] > 0 assert s["release:add"] > 0 assert s["snapshot:add"] > 0 def test_buffer_operation_order(sample_data) -> None: storage = get_storage_with_buffer_config() # Wrap the inner storage in a mock to track all method calls. storage.storage = mocked_storage = Mock(wraps=storage.storage) # Simulate a loader: add contents, directories, revisions, releases, then # snapshots. storage.content_add(sample_data.contents) storage.skipped_content_add(sample_data.skipped_contents) storage.directory_add(sample_data.directories) storage.revision_add(sample_data.revisions) storage.release_add(sample_data.releases) storage.snapshot_add(sample_data.snapshots) # Check that nothing has been flushed yet assert mocked_storage.method_calls == [] # Flush all the things storage.flush() methods_called = [c[0] for c in mocked_storage.method_calls] prev = -1 for method in [ "content_add", "skipped_content_add", "directory_add", "revision_add", "release_add", "snapshot_add", "flush", ]: try: cur: Optional[int] = methods_called.index(method) except ValueError: cur = None assert cur is not None, "Method %s not called" % method assert cur > prev, "Method %s called out of order; all calls were: %s" % ( method, methods_called, ) prev = cur + + +def test_buffer_empty_batches() -> None: + "Flushing an empty buffer storage doesn't call any underlying _add method" + storage = get_storage_with_buffer_config() + storage.storage = mocked_storage = Mock(wraps=storage.storage) + + storage.flush() + methods_called = {c[0] for c in mocked_storage.method_calls} + assert methods_called == {"flush", "clear_buffers"} diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py index 13194454..0ffe9cea 100644 --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,130 +1,181 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from unittest.mock import Mock + import attr import pytest from swh.storage import get_storage @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [{"cls": "filter"}, {"cls": "memory"},], } return get_storage(**storage_config) def test_filtering_proxy_storage_content(swh_storage, sample_data): sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content.length, } content = swh_storage.content_get_data(sample_content.sha1) assert content is not None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 0, "content:add:bytes": 0, } def test_filtering_proxy_storage_skipped_content(swh_storage, sample_data): sample_content = sample_data.skipped_content sample_content_dict = sample_content.to_dict() content = next(swh_storage.skipped_content_missing([sample_content_dict])) assert content["sha1"] == sample_content.sha1 s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content_dict])) assert content == [] s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 0, } def test_filtering_proxy_storage_skipped_content_missing_sha1_git( swh_storage, sample_data ): sample_contents = [ attr.evolve(c, sha1_git=None) for c in sample_data.skipped_contents ] sample_content, sample_content2 = [c.to_dict() for c in sample_contents[:2]] content = next(swh_storage.skipped_content_missing([sample_content])) assert content["sha1"] == sample_content["sha1"] s = swh_storage.skipped_content_add([sample_contents[0]]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] s = swh_storage.skipped_content_add([sample_contents[1]]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content2])) assert content == [] def test_filtering_proxy_storage_revision(swh_storage, sample_data): sample_revision = sample_data.revision revision = swh_storage.revision_get([sample_revision.id])[0] assert revision is None s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 1, } revision = swh_storage.revision_get([sample_revision.id])[0] assert revision is not None s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 0, } +def test_filtering_proxy_storage_release(swh_storage, sample_data): + sample_release = sample_data.release + + release = swh_storage.release_get([sample_release.id])[0] + assert release is None + + s = swh_storage.release_add([sample_release]) + assert s == { + "release:add": 1, + } + + release = swh_storage.release_get([sample_release.id])[0] + assert release is not None + + s = swh_storage.release_add([sample_release]) + assert s == { + "release:add": 0, + } + + def test_filtering_proxy_storage_directory(swh_storage, sample_data): sample_directory = sample_data.directory directory = list(swh_storage.directory_missing([sample_directory.id]))[0] assert directory s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 1, } directory = list(swh_storage.directory_missing([sample_directory.id])) assert not directory s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 0, } + + +def test_filtering_proxy_storage_empty_list(swh_storage, sample_data): + swh_storage.storage = mock_storage = Mock(wraps=swh_storage.storage) + + calls = 0 + for object_type in swh_storage.object_types: + calls += 1 + + method_name = f"{object_type}_add" + method = getattr(swh_storage, method_name) + one_object = getattr(sample_data, object_type) + + # Call with empty list: ensure underlying storage not called + method([]) + assert method_name not in {c[0] for c in mock_storage.method_calls} + mock_storage.reset_mock() + + # Call with an object: ensure underlying storage is called + method([one_object]) + assert method_name in {c[0] for c in mock_storage.method_calls} + mock_storage.reset_mock() + + # Call with the same object: ensure underlying storage is not called again + method([one_object]) + assert method_name not in {c[0] for c in mock_storage.method_calls} + mock_storage.reset_mock() + + assert calls > 0, "Empty list never tested"