diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py index c770946d..55572cb0 100644 --- a/swh/storage/proxies/buffer.py +++ b/swh/storage/proxies/buffer.py @@ -1,200 +1,216 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial from typing import Dict, Iterable, Mapping, Sequence, Tuple from typing_extensions import Literal from swh.core.utils import grouper -from swh.model.model import BaseModel, Content, Directory, SkippedContent +from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface LObjectType = Literal[ "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ] OBJECT_TYPES: Tuple[LObjectType, ...] = ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ) DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { "content": 10000, "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, "directory_entries": 200000, "revision": 100000, + "revision_parents": 200000, "release": 100000, "snapshot": 25000, "extid": 10000, } class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 directory_entries: 100000 revision: 1000 + revision_parents: 2000 release: 10000 snapshot: 5000 """ def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 self._directory_entries: int = 0 + self._revision_parents: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial(self.object_add, object_type=object_type, keys=["id"],) if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, contents: Sequence[Content]) -> Dict[str, int]: """Push contents to write to the storage in the buffer. Following policies apply: - if the buffer's threshold is hit, flush content to the storage. - otherwise, if the total size of buffered contents's threshold is hit, flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) if not stats: # We did not flush based on number of objects; check total size self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) return stats def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: stats = self.object_add(directories, object_type="directory", keys=["id"]) if not stats: # We did not flush based on number of objects; check the number of entries self._directory_entries += sum(len(d.entries) for d in directories) if self._directory_entries >= self._buffer_thresholds["directory_entries"]: return self.flush(["content", "directory"]) return stats + def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: + stats = self.object_add(revisions, object_type="revision", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the number of parents + self._revision_parents += sum(len(r.parents) for r in revisions) + if self._revision_parents >= self._buffer_thresholds["revision_parents"]: + return self.flush(["content", "directory", "revision"]) + + return stats + def object_add( self, objects: Sequence[BaseModel], *, object_type: LObjectType, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {} def flush( self, object_types: Sequence[LObjectType] = OBJECT_TYPES ) -> Dict[str, int]: summary: Dict[str, int] = {} def update_summary(stats): for k, v in stats.items(): summary[k] = v + summary.get(k, 0) for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) update_summary(stats) # Flush underlying storage stats = self.storage.flush(object_types) update_summary(stats) self.clear_buffers(object_types) return summary def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 elif object_type == "directory": self._directory_entries = 0 + elif object_type == "revision": + self._revision_parents = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index c41af2dd..48584402 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,656 +1,679 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from typing import Optional from unittest.mock import Mock from swh.storage import get_storage from swh.storage.proxies.buffer import BufferingProxyStorage def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: steps = [ {"cls": "buffer", **buffer_config}, {"cls": "memory"}, ] ret = get_storage("pipeline", steps=steps) assert isinstance(ret, BufferingProxyStorage) return ret def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None: contents = sample_data.contents[:2] contents_dict = [c.to_dict() for c in contents] storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) s = storage.content_add(contents) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing(contents_dict) assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) s = storage.flush() assert s == { "content:add": 1 + 1, "content:add:bytes": contents[0].length + contents[1].length, } missing_contents = storage.content_missing(contents_dict) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None: content = sample_data.content content_dict = content.to_dict() storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) s = storage.content_add([content]) assert s == { "content:add": 1, "content:add:bytes": content.length, } missing_contents = storage.content_missing([content_dict]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None: contents = sample_data.contents[:2] storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) s = storage.content_add([contents[0], contents[0]]) assert s == {} s = storage.content_add([contents[0]]) assert s == {} s = storage.content_add([contents[1]]) assert s == { "content:add": 1 + 1, "content:add:bytes": contents[0].length + contents[1].length, } missing_contents = storage.content_missing([c.to_dict() for c in contents]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None: contents = sample_data.contents[:2] content_bytes_min_batch_size = 2 storage = get_storage_with_buffer_config( min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} ) assert contents[0].length > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { "content:add": 1, "content:add:bytes": contents[0].length, } missing_contents = storage.content_missing([contents[0].to_dict()]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None: contents = sample_data.skipped_contents contents_dict = [c.to_dict() for c in contents] storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) s = storage.skipped_content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.skipped_content_missing(contents_dict) assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} s = storage.flush() assert s == {"skipped_content:add": 1 + 1} missing_contents = storage.skipped_content_missing(contents_dict) assert list(missing_contents) == [] def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None: contents = sample_data.skipped_contents storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) s = storage.skipped_content_add([contents[0]]) assert s == {"skipped_content:add": 1} missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): contents = sample_data.skipped_contents[:2] storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,}) s = storage.skipped_content_add([contents[0], contents[0]]) assert s == {} s = storage.skipped_content_add([contents[0]]) assert s == {} s = storage.skipped_content_add([contents[1]]) assert s == { "skipped_content:add": 1 + 1, } missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_extid_threshold_not_hit(sample_data) -> None: extid = sample_data.extid1 storage = get_storage_with_buffer_config(min_batch_size={"extid": 10,}) s = storage.extid_add([extid]) assert s == {} present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [] s = storage.flush() assert s == { "extid:add": 1, } present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] def test_buffering_proxy_storage_extid_threshold_hit(sample_data) -> None: extid = sample_data.extid1 storage = get_storage_with_buffer_config(min_batch_size={"extid": 1,}) s = storage.extid_add([extid]) assert s == { "extid:add": 1, } present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] s = storage.flush() assert s == {} def test_buffering_proxy_storage_extid_deduplicate(sample_data) -> None: extids = sample_data.extids[:2] storage = get_storage_with_buffer_config(min_batch_size={"extid": 2,}) s = storage.extid_add([extids[0], extids[0]]) assert s == {} s = storage.extid_add([extids[0]]) assert s == {} s = storage.extid_add([extids[1]]) assert s == { "extid:add": 1 + 1, } for extid in extids: present_extids = storage.extid_get_from_target( extid.target.object_type, [extid.target.object_id] ) assert list(present_extids) == [extid] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None: directory = sample_data.directory storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) s = storage.directory_add([directory]) assert s == {} missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [directory.id] s = storage.flush() assert s == { "directory:add": 1, } missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None: directory = sample_data.directory storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) s = storage.directory_add([directory]) assert s == { "directory:add": 1, } missing_directories = storage.directory_missing([directory.id]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None: directories = sample_data.directories[:2] storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) s = storage.directory_add([directories[0], directories[0]]) assert s == {} s = storage.directory_add([directories[0]]) assert s == {} s = storage.directory_add([directories[1]]) assert s == { "directory:add": 1 + 1, } missing_directories = storage.directory_missing([d.id for d in directories]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_entries_threshold(sample_data) -> None: directories = sample_data.directories n_entries = sum(len(d.entries) for d in directories) threshold = sum(len(d.entries) for d in directories[:-2]) # ensure the threshold is in the middle assert 0 < threshold < n_entries storage = get_storage_with_buffer_config( min_batch_size={"directory_entries": threshold} ) storage.storage = Mock(wraps=storage.storage) for directory in directories: storage.directory_add([directory]) storage.flush() # We should have called the underlying directory_add at least twice, as # we have hit the threshold for number of entries on directory n-2 method_calls = Counter(c[0] for c in storage.storage.method_calls) assert method_calls["directory_add"] >= 2 def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) s = storage.revision_add([revision]) assert s == {} missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [revision.id] s = storage.flush() assert s == { "revision:add": 1, } missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) s = storage.revision_add([revision]) assert s == { "revision:add": 1, } missing_revisions = storage.revision_missing([revision.id]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None: revisions = sample_data.revisions[:2] storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) s = storage.revision_add([revisions[0], revisions[0]]) assert s == {} s = storage.revision_add([revisions[0]]) assert s == {} s = storage.revision_add([revisions[1]]) assert s == { "revision:add": 1 + 1, } missing_revisions = storage.revision_missing([r.id for r in revisions]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} +def test_buffering_proxy_storage_revision_parents_threshold(sample_data) -> None: + revisions = sample_data.revisions + n_parents = sum(len(r.parents) for r in revisions) + threshold = sum(len(r.parents) for r in revisions[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < n_parents + + storage = get_storage_with_buffer_config( + min_batch_size={"revision_parents": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for revision in revisions: + storage.revision_add([revision]) + storage.flush() + + # We should have called the underlying revision_add at least twice, as + # we have hit the threshold for number of parents on revision n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["revision_add"] >= 2 + + def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: releases = sample_data.releases threshold = 10 assert len(releases) < threshold storage = get_storage_with_buffer_config( min_batch_size={"release": threshold,} # configuration set ) s = storage.release_add(releases) assert s == {} release_ids = [r.id for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == release_ids s = storage.flush() assert s == { "release:add": len(releases), } missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None: releases = sample_data.releases threshold = 2 assert len(releases) > threshold storage = get_storage_with_buffer_config( min_batch_size={"release": threshold,} # configuration set ) s = storage.release_add(releases) assert s == { "release:add": len(releases), } release_ids = [r.id for r in releases] missing_releases = storage.release_missing(release_ids) assert list(missing_releases) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None: releases = sample_data.releases[:2] storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) s = storage.release_add([releases[0], releases[0]]) assert s == {} s = storage.release_add([releases[0]]) assert s == {} s = storage.release_add([releases[1]]) assert s == { "release:add": 1 + 1, } missing_releases = storage.release_missing([r.id for r in releases]) assert list(missing_releases) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 10 assert len(snapshots) < threshold storage = get_storage_with_buffer_config( min_batch_size={"snapshot": threshold,} # configuration set ) s = storage.snapshot_add(snapshots) assert s == {} snapshot_ids = [r.id for r in snapshots] missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == snapshot_ids s = storage.flush() assert s == { "snapshot:add": len(snapshots), } missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == [] def test_buffering_proxy_storage_snapshot_threshold_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 2 assert len(snapshots) > threshold storage = get_storage_with_buffer_config( min_batch_size={"snapshot": threshold,} # configuration set ) s = storage.snapshot_add(snapshots) assert s == { "snapshot:add": len(snapshots), } snapshot_ids = [r.id for r in snapshots] missing_snapshots = storage.snapshot_missing(snapshot_ids) assert list(missing_snapshots) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_snapshot_deduplicate(sample_data) -> None: snapshots = sample_data.snapshots[:2] storage = get_storage_with_buffer_config(min_batch_size={"snapshot": 2,}) s = storage.snapshot_add([snapshots[0], snapshots[0]]) assert s == {} s = storage.snapshot_add([snapshots[0]]) assert s == {} s = storage.snapshot_add([snapshots[1]]) assert s == { "snapshot:add": 1 + 1, } missing_snapshots = storage.snapshot_missing([r.id for r in snapshots]) assert list(missing_snapshots) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_clear(sample_data) -> None: """Clear operation on buffer """ threshold = 10 contents = sample_data.contents assert 0 < len(contents) < threshold skipped_contents = sample_data.skipped_contents assert 0 < len(skipped_contents) < threshold directories = sample_data.directories assert 0 < len(directories) < threshold revisions = sample_data.revisions assert 0 < len(revisions) < threshold releases = sample_data.releases assert 0 < len(releases) < threshold snapshots = sample_data.snapshots assert 0 < len(snapshots) < threshold storage = get_storage_with_buffer_config( min_batch_size={ "content": threshold, "skipped_content": threshold, "directory": threshold, "revision": threshold, "release": threshold, } ) s = storage.content_add(contents) assert s == {} s = storage.skipped_content_add(skipped_contents) assert s == {} s = storage.directory_add(directories) assert s == {} s = storage.revision_add(revisions) assert s == {} s = storage.release_add(releases) assert s == {} s = storage.snapshot_add(snapshots) assert s == {} assert len(storage._objects["content"]) == len(contents) assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) assert len(storage._objects["snapshot"]) == len(snapshots) # clear only content from the buffer s = storage.clear_buffers(["content"]) # type: ignore assert s is None # specific clear operation on specific object type content only touched # them assert len(storage._objects["content"]) == 0 assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) assert len(storage._objects["snapshot"]) == len(snapshots) # clear current buffer from all object types s = storage.clear_buffers() # type: ignore assert s is None assert len(storage._objects["content"]) == 0 assert len(storage._objects["skipped_content"]) == 0 assert len(storage._objects["directory"]) == 0 assert len(storage._objects["revision"]) == 0 assert len(storage._objects["release"]) == 0 assert len(storage._objects["snapshot"]) == 0 def test_buffer_proxy_with_default_args() -> None: storage = get_storage_with_buffer_config() assert storage is not None def test_buffer_flush_stats(sample_data) -> None: storage = get_storage_with_buffer_config() s = storage.content_add(sample_data.contents) assert s == {} s = storage.skipped_content_add(sample_data.skipped_contents) assert s == {} s = storage.directory_add(sample_data.directories) assert s == {} s = storage.revision_add(sample_data.revisions) assert s == {} s = storage.release_add(sample_data.releases) assert s == {} s = storage.snapshot_add(sample_data.snapshots) assert s == {} # Flush all the things s = storage.flush() assert s["content:add"] > 0 assert s["content:add:bytes"] > 0 assert s["skipped_content:add"] > 0 assert s["directory:add"] > 0 assert s["revision:add"] > 0 assert s["release:add"] > 0 assert s["snapshot:add"] > 0 def test_buffer_operation_order(sample_data) -> None: storage = get_storage_with_buffer_config() # Wrap the inner storage in a mock to track all method calls. storage.storage = mocked_storage = Mock(wraps=storage.storage) # Simulate a loader: add contents, directories, revisions, releases, then # snapshots. storage.content_add(sample_data.contents) storage.skipped_content_add(sample_data.skipped_contents) storage.directory_add(sample_data.directories) storage.revision_add(sample_data.revisions) storage.release_add(sample_data.releases) storage.snapshot_add(sample_data.snapshots) # Check that nothing has been flushed yet assert mocked_storage.method_calls == [] # Flush all the things storage.flush() methods_called = [c[0] for c in mocked_storage.method_calls] prev = -1 for method in [ "content_add", "skipped_content_add", "directory_add", "revision_add", "release_add", "snapshot_add", "flush", ]: try: cur: Optional[int] = methods_called.index(method) except ValueError: cur = None assert cur is not None, "Method %s not called" % method assert cur > prev, "Method %s called out of order; all calls were: %s" % ( method, methods_called, ) prev = cur def test_buffer_empty_batches() -> None: "Flushing an empty buffer storage doesn't call any underlying _add method" storage = get_storage_with_buffer_config() storage.storage = mocked_storage = Mock(wraps=storage.storage) storage.flush() methods_called = {c[0] for c in mocked_storage.method_calls} assert methods_called == {"flush", "clear_buffers"}