Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345369
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
25 KB
Subscribers
None
View Options
diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
index 3ff22bdd..0389754d 100644
--- a/swh/storage/buffer.py
+++ b/swh/storage/buffer.py
@@ -1,174 +1,182 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import partial
from typing import Dict, Iterable, Mapping, Sequence, Tuple
from typing_extensions import Literal
from swh.core.utils import grouper
from swh.model.model import BaseModel, Content, SkippedContent
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
LObjectType = Literal[
- "content", "skipped_content", "directory", "revision", "release", "snapshot"
+ "content",
+ "skipped_content",
+ "directory",
+ "revision",
+ "release",
+ "snapshot",
+ "extid",
]
OBJECT_TYPES: Tuple[LObjectType, ...] = (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
+ "extid",
)
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
"content": 10000,
"content_bytes": 100 * 1024 * 1024,
"skipped_content": 10000,
"directory": 25000,
"revision": 100000,
"release": 100000,
"snapshot": 25000,
+ "extid": 10000,
}
class BufferingProxyStorage:
"""Storage implementation in charge of accumulating objects prior to
discussing with the "main" storage.
Deduplicates values based on a tuple of keys depending on the object type.
Sample configuration use case for buffering storage:
.. code-block:: yaml
storage:
cls: buffer
args:
storage:
cls: remote
args: http://storage.internal.staging.swh.network:5002/
min_batch_size:
content: 10000
content_bytes: 100000000
skipped_content: 10000
directory: 5000
revision: 1000
release: 10000
snapshot: 5000
"""
def __init__(self, storage: Mapping, min_batch_size: Mapping = {}):
self.storage: StorageInterface = get_storage(**storage)
self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size}
self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = {
k: {} for k in OBJECT_TYPES
}
self._contents_size: int = 0
def __getattr__(self, key: str):
if key.endswith("_add"):
object_type = key.rsplit("_", 1)[0]
if object_type in OBJECT_TYPES:
return partial(self.object_add, object_type=object_type, keys=["id"],)
if key == "storage":
raise AttributeError(key)
return getattr(self.storage, key)
def content_add(self, contents: Sequence[Content]) -> Dict:
"""Push contents to write to the storage in the buffer.
Following policies apply:
- if the buffer's threshold is hit, flush content to the storage.
- otherwise, if the total size of buffered contents's threshold is hit,
flush content to the storage.
"""
stats = self.object_add(
contents,
object_type="content",
keys=["sha1", "sha1_git", "sha256", "blake2s256"],
)
if not stats: # We did not flush already
self._contents_size += sum(c.length for c in contents)
if self._contents_size >= self._buffer_thresholds["content_bytes"]:
return self.flush(["content"])
return stats
def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict:
return self.object_add(
contents,
object_type="skipped_content",
keys=["sha1", "sha1_git", "sha256", "blake2s256"],
)
def object_add(
self,
objects: Sequence[BaseModel],
*,
object_type: LObjectType,
keys: Iterable[str],
) -> Dict[str, int]:
"""Push objects to write to the storage in the buffer. Flushes the
buffer to the storage if the threshold is hit.
"""
buffer_ = self._objects[object_type]
for obj in objects:
obj_key = tuple(getattr(obj, key) for key in keys)
buffer_[obj_key] = obj
if len(buffer_) >= self._buffer_thresholds[object_type]:
return self.flush()
return {}
def flush(
self, object_types: Sequence[LObjectType] = OBJECT_TYPES
) -> Dict[str, int]:
summary: Dict[str, int] = {}
def update_summary(stats):
for k, v in stats.items():
summary[k] = v + summary.get(k, 0)
for object_type in object_types:
buffer_ = self._objects[object_type]
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type])
for batch in batches:
add_fn = getattr(self.storage, "%s_add" % object_type)
stats = add_fn(list(batch))
update_summary(stats)
# Flush underlying storage
stats = self.storage.flush(object_types)
update_summary(stats)
self.clear_buffers(object_types)
return summary
def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None:
"""Clear objects from current buffer.
WARNING:
data that has not been flushed to storage will be lost when this
method is called. This should only be called when `flush` fails and
you want to continue your processing.
"""
for object_type in object_types:
buffer_ = self._objects[object_type]
buffer_.clear()
if object_type == "content":
self._contents_size = 0
self.storage.clear_buffers(object_types)
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
index ee359ee7..090237f8 100644
--- a/swh/storage/tests/test_buffer.py
+++ b/swh/storage/tests/test_buffer.py
@@ -1,558 +1,622 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Optional
from unittest.mock import Mock
from swh.storage import get_storage
from swh.storage.buffer import BufferingProxyStorage
def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage:
steps = [
{"cls": "buffer", **buffer_config},
{"cls": "memory"},
]
ret = get_storage("pipeline", steps=steps)
assert isinstance(ret, BufferingProxyStorage)
return ret
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None:
contents = sample_data.contents[:2]
contents_dict = [c.to_dict() for c in contents]
storage = get_storage_with_buffer_config(min_batch_size={"content": 10,})
s = storage.content_add(contents)
assert s == {}
# contents have not been written to storage
missing_contents = storage.content_missing(contents_dict)
assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1])
s = storage.flush()
assert s == {
"content:add": 1 + 1,
"content:add:bytes": contents[0].length + contents[1].length,
}
missing_contents = storage.content_missing(contents_dict)
assert list(missing_contents) == []
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None:
content = sample_data.content
content_dict = content.to_dict()
storage = get_storage_with_buffer_config(min_batch_size={"content": 1,})
s = storage.content_add([content])
assert s == {
"content:add": 1,
"content:add:bytes": content.length,
}
missing_contents = storage.content_missing([content_dict])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None:
contents = sample_data.contents[:2]
storage = get_storage_with_buffer_config(min_batch_size={"content": 2,})
s = storage.content_add([contents[0], contents[0]])
assert s == {}
s = storage.content_add([contents[0]])
assert s == {}
s = storage.content_add([contents[1]])
assert s == {
"content:add": 1 + 1,
"content:add:bytes": contents[0].length + contents[1].length,
}
missing_contents = storage.content_missing([c.to_dict() for c in contents])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None:
contents = sample_data.contents[:2]
content_bytes_min_batch_size = 2
storage = get_storage_with_buffer_config(
min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,}
)
assert contents[0].length > content_bytes_min_batch_size
s = storage.content_add([contents[0]])
assert s == {
"content:add": 1,
"content:add:bytes": contents[0].length,
}
missing_contents = storage.content_missing([contents[0].to_dict()])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None:
contents = sample_data.skipped_contents
contents_dict = [c.to_dict() for c in contents]
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,})
s = storage.skipped_content_add([contents[0], contents[1]])
assert s == {}
# contents have not been written to storage
missing_contents = storage.skipped_content_missing(contents_dict)
assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents}
s = storage.flush()
assert s == {"skipped_content:add": 1 + 1}
missing_contents = storage.skipped_content_missing(contents_dict)
assert list(missing_contents) == []
def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None:
contents = sample_data.skipped_contents
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,})
s = storage.skipped_content_add([contents[0]])
assert s == {"skipped_content:add": 1}
missing_contents = storage.skipped_content_missing([contents[0].to_dict()])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data):
contents = sample_data.skipped_contents[:2]
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,})
s = storage.skipped_content_add([contents[0], contents[0]])
assert s == {}
s = storage.skipped_content_add([contents[0]])
assert s == {}
s = storage.skipped_content_add([contents[1]])
assert s == {
"skipped_content:add": 1 + 1,
}
missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
+def test_buffering_proxy_storage_extid_threshold_not_hit(sample_data) -> None:
+ extid = sample_data.extid1
+ storage = get_storage_with_buffer_config(min_batch_size={"extid": 10,})
+ s = storage.extid_add([extid])
+ assert s == {}
+
+ present_extids = storage.extid_get_from_target(
+ extid.target.object_type, [extid.target.object_id]
+ )
+ assert list(present_extids) == []
+
+ s = storage.flush()
+ assert s == {
+ "extid:add": 1,
+ }
+
+ present_extids = storage.extid_get_from_target(
+ extid.target.object_type, [extid.target.object_id]
+ )
+ assert list(present_extids) == [extid]
+
+
+def test_buffering_proxy_storage_extid_threshold_hit(sample_data) -> None:
+ extid = sample_data.extid1
+ storage = get_storage_with_buffer_config(min_batch_size={"extid": 1,})
+ s = storage.extid_add([extid])
+ assert s == {
+ "extid:add": 1,
+ }
+
+ present_extids = storage.extid_get_from_target(
+ extid.target.object_type, [extid.target.object_id]
+ )
+ assert list(present_extids) == [extid]
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_extid_deduplicate(sample_data) -> None:
+ extids = sample_data.extids[:2]
+ storage = get_storage_with_buffer_config(min_batch_size={"extid": 2,})
+
+ s = storage.extid_add([extids[0], extids[0]])
+ assert s == {}
+
+ s = storage.extid_add([extids[0]])
+ assert s == {}
+
+ s = storage.extid_add([extids[1]])
+ assert s == {
+ "extid:add": 1 + 1,
+ }
+
+ for extid in extids:
+ present_extids = storage.extid_get_from_target(
+ extid.target.object_type, [extid.target.object_id]
+ )
+ assert list(present_extids) == [extid]
+
+ s = storage.flush()
+ assert s == {}
+
+
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None:
directory = sample_data.directory
storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,})
s = storage.directory_add([directory])
assert s == {}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == [directory.id]
s = storage.flush()
assert s == {
"directory:add": 1,
}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == []
def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None:
directory = sample_data.directory
storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,})
s = storage.directory_add([directory])
assert s == {
"directory:add": 1,
}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None:
directories = sample_data.directories[:2]
storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,})
s = storage.directory_add([directories[0], directories[0]])
assert s == {}
s = storage.directory_add([directories[0]])
assert s == {}
s = storage.directory_add([directories[1]])
assert s == {
"directory:add": 1 + 1,
}
missing_directories = storage.directory_missing([d.id for d in directories])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None:
revision = sample_data.revision
storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,})
s = storage.revision_add([revision])
assert s == {}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == [revision.id]
s = storage.flush()
assert s == {
"revision:add": 1,
}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == []
def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None:
revision = sample_data.revision
storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,})
s = storage.revision_add([revision])
assert s == {
"revision:add": 1,
}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None:
revisions = sample_data.revisions[:2]
storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,})
s = storage.revision_add([revisions[0], revisions[0]])
assert s == {}
s = storage.revision_add([revisions[0]])
assert s == {}
s = storage.revision_add([revisions[1]])
assert s == {
"revision:add": 1 + 1,
}
missing_revisions = storage.revision_missing([r.id for r in revisions])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None:
releases = sample_data.releases
threshold = 10
assert len(releases) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={"release": threshold,} # configuration set
)
s = storage.release_add(releases)
assert s == {}
release_ids = [r.id for r in releases]
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == release_ids
s = storage.flush()
assert s == {
"release:add": len(releases),
}
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == []
def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None:
releases = sample_data.releases
threshold = 2
assert len(releases) > threshold
storage = get_storage_with_buffer_config(
min_batch_size={"release": threshold,} # configuration set
)
s = storage.release_add(releases)
assert s == {
"release:add": len(releases),
}
release_ids = [r.id for r in releases]
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None:
releases = sample_data.releases[:2]
storage = get_storage_with_buffer_config(min_batch_size={"release": 2,})
s = storage.release_add([releases[0], releases[0]])
assert s == {}
s = storage.release_add([releases[0]])
assert s == {}
s = storage.release_add([releases[1]])
assert s == {
"release:add": 1 + 1,
}
missing_releases = storage.release_missing([r.id for r in releases])
assert list(missing_releases) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None:
snapshots = sample_data.snapshots
threshold = 10
assert len(snapshots) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={"snapshot": threshold,} # configuration set
)
s = storage.snapshot_add(snapshots)
assert s == {}
snapshot_ids = [r.id for r in snapshots]
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == snapshot_ids
s = storage.flush()
assert s == {
"snapshot:add": len(snapshots),
}
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == []
def test_buffering_proxy_storage_snapshot_threshold_hit(sample_data) -> None:
snapshots = sample_data.snapshots
threshold = 2
assert len(snapshots) > threshold
storage = get_storage_with_buffer_config(
min_batch_size={"snapshot": threshold,} # configuration set
)
s = storage.snapshot_add(snapshots)
assert s == {
"snapshot:add": len(snapshots),
}
snapshot_ids = [r.id for r in snapshots]
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_snapshot_deduplicate(sample_data) -> None:
snapshots = sample_data.snapshots[:2]
storage = get_storage_with_buffer_config(min_batch_size={"snapshot": 2,})
s = storage.snapshot_add([snapshots[0], snapshots[0]])
assert s == {}
s = storage.snapshot_add([snapshots[0]])
assert s == {}
s = storage.snapshot_add([snapshots[1]])
assert s == {
"snapshot:add": 1 + 1,
}
missing_snapshots = storage.snapshot_missing([r.id for r in snapshots])
assert list(missing_snapshots) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_clear(sample_data) -> None:
"""Clear operation on buffer
"""
threshold = 10
contents = sample_data.contents
assert 0 < len(contents) < threshold
skipped_contents = sample_data.skipped_contents
assert 0 < len(skipped_contents) < threshold
directories = sample_data.directories
assert 0 < len(directories) < threshold
revisions = sample_data.revisions
assert 0 < len(revisions) < threshold
releases = sample_data.releases
assert 0 < len(releases) < threshold
snapshots = sample_data.snapshots
assert 0 < len(snapshots) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={
"content": threshold,
"skipped_content": threshold,
"directory": threshold,
"revision": threshold,
"release": threshold,
}
)
s = storage.content_add(contents)
assert s == {}
s = storage.skipped_content_add(skipped_contents)
assert s == {}
s = storage.directory_add(directories)
assert s == {}
s = storage.revision_add(revisions)
assert s == {}
s = storage.release_add(releases)
assert s == {}
s = storage.snapshot_add(snapshots)
assert s == {}
assert len(storage._objects["content"]) == len(contents)
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
assert len(storage._objects["snapshot"]) == len(snapshots)
# clear only content from the buffer
s = storage.clear_buffers(["content"]) # type: ignore
assert s is None
# specific clear operation on specific object type content only touched
# them
assert len(storage._objects["content"]) == 0
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
assert len(storage._objects["snapshot"]) == len(snapshots)
# clear current buffer from all object types
s = storage.clear_buffers() # type: ignore
assert s is None
assert len(storage._objects["content"]) == 0
assert len(storage._objects["skipped_content"]) == 0
assert len(storage._objects["directory"]) == 0
assert len(storage._objects["revision"]) == 0
assert len(storage._objects["release"]) == 0
assert len(storage._objects["snapshot"]) == 0
def test_buffer_proxy_with_default_args() -> None:
storage = get_storage_with_buffer_config()
assert storage is not None
def test_buffer_flush_stats(sample_data) -> None:
storage = get_storage_with_buffer_config()
s = storage.content_add(sample_data.contents)
assert s == {}
s = storage.skipped_content_add(sample_data.skipped_contents)
assert s == {}
s = storage.directory_add(sample_data.directories)
assert s == {}
s = storage.revision_add(sample_data.revisions)
assert s == {}
s = storage.release_add(sample_data.releases)
assert s == {}
s = storage.snapshot_add(sample_data.snapshots)
assert s == {}
# Flush all the things
s = storage.flush()
assert s["content:add"] > 0
assert s["content:add:bytes"] > 0
assert s["skipped_content:add"] > 0
assert s["directory:add"] > 0
assert s["revision:add"] > 0
assert s["release:add"] > 0
assert s["snapshot:add"] > 0
def test_buffer_operation_order(sample_data) -> None:
storage = get_storage_with_buffer_config()
# Wrap the inner storage in a mock to track all method calls.
storage.storage = mocked_storage = Mock(wraps=storage.storage)
# Simulate a loader: add contents, directories, revisions, releases, then
# snapshots.
storage.content_add(sample_data.contents)
storage.skipped_content_add(sample_data.skipped_contents)
storage.directory_add(sample_data.directories)
storage.revision_add(sample_data.revisions)
storage.release_add(sample_data.releases)
storage.snapshot_add(sample_data.snapshots)
# Check that nothing has been flushed yet
assert mocked_storage.method_calls == []
# Flush all the things
storage.flush()
methods_called = [c[0] for c in mocked_storage.method_calls]
prev = -1
for method in [
"content_add",
"skipped_content_add",
"directory_add",
"revision_add",
"release_add",
"snapshot_add",
"flush",
]:
try:
cur: Optional[int] = methods_called.index(method)
except ValueError:
cur = None
assert cur is not None, "Method %s not called" % method
assert cur > prev, "Method %s called out of order; all calls were: %s" % (
method,
methods_called,
)
prev = cur
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:19 PM (4 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3274338
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment