Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337308
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
32 KB
Subscribers
None
View Options
diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py
index 55572cb0..6a958bf0 100644
--- a/swh/storage/proxies/buffer.py
+++ b/swh/storage/proxies/buffer.py
@@ -1,216 +1,273 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import partial
from typing import Dict, Iterable, Mapping, Sequence, Tuple
from typing_extensions import Literal
from swh.core.utils import grouper
-from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent
+from swh.model.model import (
+ BaseModel,
+ Content,
+ Directory,
+ Release,
+ Revision,
+ SkippedContent,
+)
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
LObjectType = Literal[
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"extid",
]
OBJECT_TYPES: Tuple[LObjectType, ...] = (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"extid",
)
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
"content": 10000,
"content_bytes": 100 * 1024 * 1024,
"skipped_content": 10000,
"directory": 25000,
"directory_entries": 200000,
"revision": 100000,
"revision_parents": 200000,
+ "revision_bytes": 100 * 1024 * 1024,
"release": 100000,
+ "release_bytes": 100 * 1024 * 1024,
"snapshot": 25000,
"extid": 10000,
}
+def estimate_revision_size(revision: Revision) -> int:
+ """Estimate the size of a revision, by summing the size of variable length fields"""
+ s = 20 * len(revision.parents)
+
+ if revision.message:
+ s += len(revision.message)
+
+ s += len(revision.author.fullname)
+ s += len(revision.committer.fullname)
+ s += sum(len(h) + len(v) for h, v in revision.extra_headers)
+
+ return s
+
+
+def estimate_release_size(release: Release) -> int:
+ """Estimate the size of a release, by summing the size of variable length fields"""
+ s = 0
+ if release.message:
+ s += len(release.message)
+ if release.author:
+ s += len(release.author.fullname)
+
+ return s
+
+
class BufferingProxyStorage:
"""Storage implementation in charge of accumulating objects prior to
discussing with the "main" storage.
Deduplicates values based on a tuple of keys depending on the object type.
Sample configuration use case for buffering storage:
.. code-block:: yaml
storage:
cls: buffer
args:
storage:
cls: remote
args: http://storage.internal.staging.swh.network:5002/
min_batch_size:
content: 10000
content_bytes: 100000000
skipped_content: 10000
directory: 5000
directory_entries: 100000
revision: 1000
revision_parents: 2000
+ revision_bytes: 100000000
release: 10000
+ release_bytes: 100000000
snapshot: 5000
"""
def __init__(self, storage: Mapping, min_batch_size: Mapping = {}):
self.storage: StorageInterface = get_storage(**storage)
self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size}
self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = {
k: {} for k in OBJECT_TYPES
}
self._contents_size: int = 0
self._directory_entries: int = 0
self._revision_parents: int = 0
+ self._revision_size: int = 0
+ self._release_size: int = 0
def __getattr__(self, key: str):
if key.endswith("_add"):
object_type = key.rsplit("_", 1)[0]
if object_type in OBJECT_TYPES:
return partial(self.object_add, object_type=object_type, keys=["id"],)
if key == "storage":
raise AttributeError(key)
return getattr(self.storage, key)
def content_add(self, contents: Sequence[Content]) -> Dict[str, int]:
"""Push contents to write to the storage in the buffer.
Following policies apply:
- if the buffer's threshold is hit, flush content to the storage.
- otherwise, if the total size of buffered contents's threshold is hit,
flush content to the storage.
"""
stats = self.object_add(
contents,
object_type="content",
keys=["sha1", "sha1_git", "sha256", "blake2s256"],
)
if not stats:
# We did not flush based on number of objects; check total size
self._contents_size += sum(c.length for c in contents)
if self._contents_size >= self._buffer_thresholds["content_bytes"]:
return self.flush(["content"])
return stats
def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]:
return self.object_add(
contents,
object_type="skipped_content",
keys=["sha1", "sha1_git", "sha256", "blake2s256"],
)
def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]:
stats = self.object_add(directories, object_type="directory", keys=["id"])
if not stats:
# We did not flush based on number of objects; check the number of entries
self._directory_entries += sum(len(d.entries) for d in directories)
if self._directory_entries >= self._buffer_thresholds["directory_entries"]:
return self.flush(["content", "directory"])
return stats
def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]:
stats = self.object_add(revisions, object_type="revision", keys=["id"])
if not stats:
- # We did not flush based on number of objects; check the number of parents
+ # We did not flush based on number of objects; check the number of
+ # parents and estimated size
self._revision_parents += sum(len(r.parents) for r in revisions)
- if self._revision_parents >= self._buffer_thresholds["revision_parents"]:
+ self._revision_size += sum(estimate_revision_size(r) for r in revisions)
+ if (
+ self._revision_parents >= self._buffer_thresholds["revision_parents"]
+ or self._revision_size >= self._buffer_thresholds["revision_bytes"]
+ ):
return self.flush(["content", "directory", "revision"])
return stats
+ def release_add(self, releases: Sequence[Release]) -> Dict[str, int]:
+ stats = self.object_add(releases, object_type="release", keys=["id"])
+
+ if not stats:
+ # We did not flush based on number of objects; check the estimated size
+ self._release_size += sum(estimate_release_size(r) for r in releases)
+ if self._release_size >= self._buffer_thresholds["release_bytes"]:
+ return self.flush(["content", "directory", "revision", "release"])
+
+ return stats
+
def object_add(
self,
objects: Sequence[BaseModel],
*,
object_type: LObjectType,
keys: Iterable[str],
) -> Dict[str, int]:
"""Push objects to write to the storage in the buffer. Flushes the
buffer to the storage if the threshold is hit.
"""
buffer_ = self._objects[object_type]
for obj in objects:
obj_key = tuple(getattr(obj, key) for key in keys)
buffer_[obj_key] = obj
if len(buffer_) >= self._buffer_thresholds[object_type]:
return self.flush()
return {}
def flush(
self, object_types: Sequence[LObjectType] = OBJECT_TYPES
) -> Dict[str, int]:
summary: Dict[str, int] = {}
def update_summary(stats):
for k, v in stats.items():
summary[k] = v + summary.get(k, 0)
for object_type in object_types:
buffer_ = self._objects[object_type]
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type])
for batch in batches:
add_fn = getattr(self.storage, "%s_add" % object_type)
stats = add_fn(list(batch))
update_summary(stats)
# Flush underlying storage
stats = self.storage.flush(object_types)
update_summary(stats)
self.clear_buffers(object_types)
return summary
def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None:
"""Clear objects from current buffer.
WARNING:
data that has not been flushed to storage will be lost when this
method is called. This should only be called when `flush` fails and
you want to continue your processing.
"""
for object_type in object_types:
buffer_ = self._objects[object_type]
buffer_.clear()
if object_type == "content":
self._contents_size = 0
elif object_type == "directory":
self._directory_entries = 0
elif object_type == "revision":
self._revision_parents = 0
+ self._revision_size = 0
+ elif object_type == "release":
+ self._release_size = 0
self.storage.clear_buffers(object_types)
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
index 48584402..8f0f30ef 100644
--- a/swh/storage/tests/test_buffer.py
+++ b/swh/storage/tests/test_buffer.py
@@ -1,679 +1,729 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
from typing import Optional
from unittest.mock import Mock
from swh.storage import get_storage
-from swh.storage.proxies.buffer import BufferingProxyStorage
+from swh.storage.proxies.buffer import (
+ BufferingProxyStorage,
+ estimate_release_size,
+ estimate_revision_size,
+)
def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage:
steps = [
{"cls": "buffer", **buffer_config},
{"cls": "memory"},
]
ret = get_storage("pipeline", steps=steps)
assert isinstance(ret, BufferingProxyStorage)
return ret
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None:
contents = sample_data.contents[:2]
contents_dict = [c.to_dict() for c in contents]
storage = get_storage_with_buffer_config(min_batch_size={"content": 10,})
s = storage.content_add(contents)
assert s == {}
# contents have not been written to storage
missing_contents = storage.content_missing(contents_dict)
assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1])
s = storage.flush()
assert s == {
"content:add": 1 + 1,
"content:add:bytes": contents[0].length + contents[1].length,
}
missing_contents = storage.content_missing(contents_dict)
assert list(missing_contents) == []
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None:
content = sample_data.content
content_dict = content.to_dict()
storage = get_storage_with_buffer_config(min_batch_size={"content": 1,})
s = storage.content_add([content])
assert s == {
"content:add": 1,
"content:add:bytes": content.length,
}
missing_contents = storage.content_missing([content_dict])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None:
contents = sample_data.contents[:2]
storage = get_storage_with_buffer_config(min_batch_size={"content": 2,})
s = storage.content_add([contents[0], contents[0]])
assert s == {}
s = storage.content_add([contents[0]])
assert s == {}
s = storage.content_add([contents[1]])
assert s == {
"content:add": 1 + 1,
"content:add:bytes": contents[0].length + contents[1].length,
}
missing_contents = storage.content_missing([c.to_dict() for c in contents])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None:
contents = sample_data.contents[:2]
content_bytes_min_batch_size = 2
storage = get_storage_with_buffer_config(
min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,}
)
assert contents[0].length > content_bytes_min_batch_size
s = storage.content_add([contents[0]])
assert s == {
"content:add": 1,
"content:add:bytes": contents[0].length,
}
missing_contents = storage.content_missing([contents[0].to_dict()])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None:
contents = sample_data.skipped_contents
contents_dict = [c.to_dict() for c in contents]
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,})
s = storage.skipped_content_add([contents[0], contents[1]])
assert s == {}
# contents have not been written to storage
missing_contents = storage.skipped_content_missing(contents_dict)
assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents}
s = storage.flush()
assert s == {"skipped_content:add": 1 + 1}
missing_contents = storage.skipped_content_missing(contents_dict)
assert list(missing_contents) == []
def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None:
contents = sample_data.skipped_contents
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,})
s = storage.skipped_content_add([contents[0]])
assert s == {"skipped_content:add": 1}
missing_contents = storage.skipped_content_missing([contents[0].to_dict()])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data):
contents = sample_data.skipped_contents[:2]
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,})
s = storage.skipped_content_add([contents[0], contents[0]])
assert s == {}
s = storage.skipped_content_add([contents[0]])
assert s == {}
s = storage.skipped_content_add([contents[1]])
assert s == {
"skipped_content:add": 1 + 1,
}
missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_extid_threshold_not_hit(sample_data) -> None:
extid = sample_data.extid1
storage = get_storage_with_buffer_config(min_batch_size={"extid": 10,})
s = storage.extid_add([extid])
assert s == {}
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == []
s = storage.flush()
assert s == {
"extid:add": 1,
}
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == [extid]
def test_buffering_proxy_storage_extid_threshold_hit(sample_data) -> None:
extid = sample_data.extid1
storage = get_storage_with_buffer_config(min_batch_size={"extid": 1,})
s = storage.extid_add([extid])
assert s == {
"extid:add": 1,
}
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == [extid]
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_extid_deduplicate(sample_data) -> None:
extids = sample_data.extids[:2]
storage = get_storage_with_buffer_config(min_batch_size={"extid": 2,})
s = storage.extid_add([extids[0], extids[0]])
assert s == {}
s = storage.extid_add([extids[0]])
assert s == {}
s = storage.extid_add([extids[1]])
assert s == {
"extid:add": 1 + 1,
}
for extid in extids:
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == [extid]
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None:
directory = sample_data.directory
storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,})
s = storage.directory_add([directory])
assert s == {}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == [directory.id]
s = storage.flush()
assert s == {
"directory:add": 1,
}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == []
def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None:
directory = sample_data.directory
storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,})
s = storage.directory_add([directory])
assert s == {
"directory:add": 1,
}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None:
directories = sample_data.directories[:2]
storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,})
s = storage.directory_add([directories[0], directories[0]])
assert s == {}
s = storage.directory_add([directories[0]])
assert s == {}
s = storage.directory_add([directories[1]])
assert s == {
"directory:add": 1 + 1,
}
missing_directories = storage.directory_missing([d.id for d in directories])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_entries_threshold(sample_data) -> None:
directories = sample_data.directories
n_entries = sum(len(d.entries) for d in directories)
threshold = sum(len(d.entries) for d in directories[:-2])
# ensure the threshold is in the middle
assert 0 < threshold < n_entries
storage = get_storage_with_buffer_config(
min_batch_size={"directory_entries": threshold}
)
storage.storage = Mock(wraps=storage.storage)
for directory in directories:
storage.directory_add([directory])
storage.flush()
# We should have called the underlying directory_add at least twice, as
# we have hit the threshold for number of entries on directory n-2
method_calls = Counter(c[0] for c in storage.storage.method_calls)
assert method_calls["directory_add"] >= 2
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None:
revision = sample_data.revision
storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,})
s = storage.revision_add([revision])
assert s == {}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == [revision.id]
s = storage.flush()
assert s == {
"revision:add": 1,
}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == []
def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None:
revision = sample_data.revision
storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,})
s = storage.revision_add([revision])
assert s == {
"revision:add": 1,
}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None:
revisions = sample_data.revisions[:2]
storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,})
s = storage.revision_add([revisions[0], revisions[0]])
assert s == {}
s = storage.revision_add([revisions[0]])
assert s == {}
s = storage.revision_add([revisions[1]])
assert s == {
"revision:add": 1 + 1,
}
missing_revisions = storage.revision_missing([r.id for r in revisions])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_parents_threshold(sample_data) -> None:
revisions = sample_data.revisions
n_parents = sum(len(r.parents) for r in revisions)
threshold = sum(len(r.parents) for r in revisions[:-2])
# ensure the threshold is in the middle
assert 0 < threshold < n_parents
storage = get_storage_with_buffer_config(
min_batch_size={"revision_parents": threshold}
)
storage.storage = Mock(wraps=storage.storage)
for revision in revisions:
storage.revision_add([revision])
storage.flush()
# We should have called the underlying revision_add at least twice, as
# we have hit the threshold for number of parents on revision n-2
method_calls = Counter(c[0] for c in storage.storage.method_calls)
assert method_calls["revision_add"] >= 2
+def test_buffering_proxy_storage_revision_size_threshold(sample_data) -> None:
+ revisions = sample_data.revisions
+ total_size = sum(estimate_revision_size(r) for r in revisions)
+ threshold = sum(estimate_revision_size(r) for r in revisions[:-2])
+
+ # ensure the threshold is in the middle
+ assert 0 < threshold < total_size
+
+ storage = get_storage_with_buffer_config(
+ min_batch_size={"revision_bytes": threshold}
+ )
+ storage.storage = Mock(wraps=storage.storage)
+
+ for revision in revisions:
+ storage.revision_add([revision])
+ storage.flush()
+
+ # We should have called the underlying revision_add at least twice, as
+ # we have hit the threshold for number of parents on revision n-2
+ method_calls = Counter(c[0] for c in storage.storage.method_calls)
+ assert method_calls["revision_add"] >= 2
+
+
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None:
releases = sample_data.releases
threshold = 10
assert len(releases) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={"release": threshold,} # configuration set
)
s = storage.release_add(releases)
assert s == {}
release_ids = [r.id for r in releases]
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == release_ids
s = storage.flush()
assert s == {
"release:add": len(releases),
}
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == []
def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None:
releases = sample_data.releases
threshold = 2
assert len(releases) > threshold
storage = get_storage_with_buffer_config(
min_batch_size={"release": threshold,} # configuration set
)
s = storage.release_add(releases)
assert s == {
"release:add": len(releases),
}
release_ids = [r.id for r in releases]
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None:
releases = sample_data.releases[:2]
storage = get_storage_with_buffer_config(min_batch_size={"release": 2,})
s = storage.release_add([releases[0], releases[0]])
assert s == {}
s = storage.release_add([releases[0]])
assert s == {}
s = storage.release_add([releases[1]])
assert s == {
"release:add": 1 + 1,
}
missing_releases = storage.release_missing([r.id for r in releases])
assert list(missing_releases) == []
s = storage.flush()
assert s == {}
+def test_buffering_proxy_storage_release_size_threshold(sample_data) -> None:
+ releases = sample_data.releases
+ total_size = sum(estimate_release_size(r) for r in releases)
+ threshold = sum(estimate_release_size(r) for r in releases[:-2])
+
+ # ensure the threshold is in the middle
+ assert 0 < threshold < total_size
+
+ storage = get_storage_with_buffer_config(
+ min_batch_size={"release_bytes": threshold}
+ )
+ storage.storage = Mock(wraps=storage.storage)
+
+ for release in releases:
+ storage.release_add([release])
+ storage.flush()
+
+ # We should have called the underlying release_add at least twice, as
+ # we have hit the threshold for number of parents on release n-2
+ method_calls = Counter(c[0] for c in storage.storage.method_calls)
+ assert method_calls["release_add"] >= 2
+
+
def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None:
snapshots = sample_data.snapshots
threshold = 10
assert len(snapshots) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={"snapshot": threshold,} # configuration set
)
s = storage.snapshot_add(snapshots)
assert s == {}
snapshot_ids = [r.id for r in snapshots]
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == snapshot_ids
s = storage.flush()
assert s == {
"snapshot:add": len(snapshots),
}
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == []
def test_buffering_proxy_storage_snapshot_threshold_hit(sample_data) -> None:
snapshots = sample_data.snapshots
threshold = 2
assert len(snapshots) > threshold
storage = get_storage_with_buffer_config(
min_batch_size={"snapshot": threshold,} # configuration set
)
s = storage.snapshot_add(snapshots)
assert s == {
"snapshot:add": len(snapshots),
}
snapshot_ids = [r.id for r in snapshots]
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_snapshot_deduplicate(sample_data) -> None:
snapshots = sample_data.snapshots[:2]
storage = get_storage_with_buffer_config(min_batch_size={"snapshot": 2,})
s = storage.snapshot_add([snapshots[0], snapshots[0]])
assert s == {}
s = storage.snapshot_add([snapshots[0]])
assert s == {}
s = storage.snapshot_add([snapshots[1]])
assert s == {
"snapshot:add": 1 + 1,
}
missing_snapshots = storage.snapshot_missing([r.id for r in snapshots])
assert list(missing_snapshots) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_clear(sample_data) -> None:
"""Clear operation on buffer
"""
threshold = 10
contents = sample_data.contents
assert 0 < len(contents) < threshold
skipped_contents = sample_data.skipped_contents
assert 0 < len(skipped_contents) < threshold
directories = sample_data.directories
assert 0 < len(directories) < threshold
revisions = sample_data.revisions
assert 0 < len(revisions) < threshold
releases = sample_data.releases
assert 0 < len(releases) < threshold
snapshots = sample_data.snapshots
assert 0 < len(snapshots) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={
"content": threshold,
"skipped_content": threshold,
"directory": threshold,
"revision": threshold,
"release": threshold,
}
)
s = storage.content_add(contents)
assert s == {}
s = storage.skipped_content_add(skipped_contents)
assert s == {}
s = storage.directory_add(directories)
assert s == {}
s = storage.revision_add(revisions)
assert s == {}
s = storage.release_add(releases)
assert s == {}
s = storage.snapshot_add(snapshots)
assert s == {}
assert len(storage._objects["content"]) == len(contents)
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
assert len(storage._objects["snapshot"]) == len(snapshots)
# clear only content from the buffer
s = storage.clear_buffers(["content"]) # type: ignore
assert s is None
# specific clear operation on specific object type content only touched
# them
assert len(storage._objects["content"]) == 0
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
assert len(storage._objects["snapshot"]) == len(snapshots)
# clear current buffer from all object types
s = storage.clear_buffers() # type: ignore
assert s is None
assert len(storage._objects["content"]) == 0
assert len(storage._objects["skipped_content"]) == 0
assert len(storage._objects["directory"]) == 0
assert len(storage._objects["revision"]) == 0
assert len(storage._objects["release"]) == 0
assert len(storage._objects["snapshot"]) == 0
def test_buffer_proxy_with_default_args() -> None:
storage = get_storage_with_buffer_config()
assert storage is not None
def test_buffer_flush_stats(sample_data) -> None:
storage = get_storage_with_buffer_config()
s = storage.content_add(sample_data.contents)
assert s == {}
s = storage.skipped_content_add(sample_data.skipped_contents)
assert s == {}
s = storage.directory_add(sample_data.directories)
assert s == {}
s = storage.revision_add(sample_data.revisions)
assert s == {}
s = storage.release_add(sample_data.releases)
assert s == {}
s = storage.snapshot_add(sample_data.snapshots)
assert s == {}
# Flush all the things
s = storage.flush()
assert s["content:add"] > 0
assert s["content:add:bytes"] > 0
assert s["skipped_content:add"] > 0
assert s["directory:add"] > 0
assert s["revision:add"] > 0
assert s["release:add"] > 0
assert s["snapshot:add"] > 0
def test_buffer_operation_order(sample_data) -> None:
storage = get_storage_with_buffer_config()
# Wrap the inner storage in a mock to track all method calls.
storage.storage = mocked_storage = Mock(wraps=storage.storage)
# Simulate a loader: add contents, directories, revisions, releases, then
# snapshots.
storage.content_add(sample_data.contents)
storage.skipped_content_add(sample_data.skipped_contents)
storage.directory_add(sample_data.directories)
storage.revision_add(sample_data.revisions)
storage.release_add(sample_data.releases)
storage.snapshot_add(sample_data.snapshots)
# Check that nothing has been flushed yet
assert mocked_storage.method_calls == []
# Flush all the things
storage.flush()
methods_called = [c[0] for c in mocked_storage.method_calls]
prev = -1
for method in [
"content_add",
"skipped_content_add",
"directory_add",
"revision_add",
"release_add",
"snapshot_add",
"flush",
]:
try:
cur: Optional[int] = methods_called.index(method)
except ValueError:
cur = None
assert cur is not None, "Method %s not called" % method
assert cur > prev, "Method %s called out of order; all calls were: %s" % (
method,
methods_called,
)
prev = cur
def test_buffer_empty_batches() -> None:
"Flushing an empty buffer storage doesn't call any underlying _add method"
storage = get_storage_with_buffer_config()
storage.storage = mocked_storage = Mock(wraps=storage.storage)
storage.flush()
methods_called = {c[0] for c in mocked_storage.method_calls}
assert methods_called == {"flush", "clear_buffers"}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 8:00 AM (10 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252348
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment