Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py
index 55572cb0..6a958bf0 100644
--- a/swh/storage/proxies/buffer.py
+++ b/swh/storage/proxies/buffer.py
@@ -1,216 +1,273 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import partial
from typing import Dict, Iterable, Mapping, Sequence, Tuple
from typing_extensions import Literal
from swh.core.utils import grouper
-from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent
+from swh.model.model import (
+ BaseModel,
+ Content,
+ Directory,
+ Release,
+ Revision,
+ SkippedContent,
+)
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
LObjectType = Literal[
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"extid",
]
OBJECT_TYPES: Tuple[LObjectType, ...] = (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"extid",
)
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = {
"content": 10000,
"content_bytes": 100 * 1024 * 1024,
"skipped_content": 10000,
"directory": 25000,
"directory_entries": 200000,
"revision": 100000,
"revision_parents": 200000,
+ "revision_bytes": 100 * 1024 * 1024,
"release": 100000,
+ "release_bytes": 100 * 1024 * 1024,
"snapshot": 25000,
"extid": 10000,
}
+def estimate_revision_size(revision: Revision) -> int:
+ """Estimate the size of a revision, by summing the size of variable length fields"""
+ s = 20 * len(revision.parents)
+
+ if revision.message:
+ s += len(revision.message)
+
+ s += len(revision.author.fullname)
+ s += len(revision.committer.fullname)
+ s += sum(len(h) + len(v) for h, v in revision.extra_headers)
+
+ return s
+
+
+def estimate_release_size(release: Release) -> int:
+ """Estimate the size of a release, by summing the size of variable length fields"""
+ s = 0
+ if release.message:
+ s += len(release.message)
+ if release.author:
+ s += len(release.author.fullname)
+
+ return s
+
+
class BufferingProxyStorage:
"""Storage implementation in charge of accumulating objects prior to
discussing with the "main" storage.
Deduplicates values based on a tuple of keys depending on the object type.
Sample configuration use case for buffering storage:
.. code-block:: yaml
storage:
cls: buffer
args:
storage:
cls: remote
args: http://storage.internal.staging.swh.network:5002/
min_batch_size:
content: 10000
content_bytes: 100000000
skipped_content: 10000
directory: 5000
directory_entries: 100000
revision: 1000
revision_parents: 2000
+ revision_bytes: 100000000
release: 10000
+ release_bytes: 100000000
snapshot: 5000
"""
def __init__(self, storage: Mapping, min_batch_size: Mapping = {}):
self.storage: StorageInterface = get_storage(**storage)
self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size}
self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = {
k: {} for k in OBJECT_TYPES
}
self._contents_size: int = 0
self._directory_entries: int = 0
self._revision_parents: int = 0
+ self._revision_size: int = 0
+ self._release_size: int = 0
def __getattr__(self, key: str):
if key.endswith("_add"):
object_type = key.rsplit("_", 1)[0]
if object_type in OBJECT_TYPES:
return partial(self.object_add, object_type=object_type, keys=["id"],)
if key == "storage":
raise AttributeError(key)
return getattr(self.storage, key)
def content_add(self, contents: Sequence[Content]) -> Dict[str, int]:
"""Push contents to write to the storage in the buffer.
Following policies apply:
- if the buffer's threshold is hit, flush content to the storage.
- otherwise, if the total size of buffered contents's threshold is hit,
flush content to the storage.
"""
stats = self.object_add(
contents,
object_type="content",
keys=["sha1", "sha1_git", "sha256", "blake2s256"],
)
if not stats:
# We did not flush based on number of objects; check total size
self._contents_size += sum(c.length for c in contents)
if self._contents_size >= self._buffer_thresholds["content_bytes"]:
return self.flush(["content"])
return stats
def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]:
return self.object_add(
contents,
object_type="skipped_content",
keys=["sha1", "sha1_git", "sha256", "blake2s256"],
)
def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]:
stats = self.object_add(directories, object_type="directory", keys=["id"])
if not stats:
# We did not flush based on number of objects; check the number of entries
self._directory_entries += sum(len(d.entries) for d in directories)
if self._directory_entries >= self._buffer_thresholds["directory_entries"]:
return self.flush(["content", "directory"])
return stats
def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]:
stats = self.object_add(revisions, object_type="revision", keys=["id"])
if not stats:
- # We did not flush based on number of objects; check the number of parents
+ # We did not flush based on number of objects; check the number of
+ # parents and estimated size
self._revision_parents += sum(len(r.parents) for r in revisions)
- if self._revision_parents >= self._buffer_thresholds["revision_parents"]:
+ self._revision_size += sum(estimate_revision_size(r) for r in revisions)
+ if (
+ self._revision_parents >= self._buffer_thresholds["revision_parents"]
+ or self._revision_size >= self._buffer_thresholds["revision_bytes"]
+ ):
return self.flush(["content", "directory", "revision"])
return stats
+ def release_add(self, releases: Sequence[Release]) -> Dict[str, int]:
+ stats = self.object_add(releases, object_type="release", keys=["id"])
+
+ if not stats:
+ # We did not flush based on number of objects; check the estimated size
+ self._release_size += sum(estimate_release_size(r) for r in releases)
+ if self._release_size >= self._buffer_thresholds["release_bytes"]:
+ return self.flush(["content", "directory", "revision", "release"])
+
+ return stats
+
def object_add(
self,
objects: Sequence[BaseModel],
*,
object_type: LObjectType,
keys: Iterable[str],
) -> Dict[str, int]:
"""Push objects to write to the storage in the buffer. Flushes the
buffer to the storage if the threshold is hit.
"""
buffer_ = self._objects[object_type]
for obj in objects:
obj_key = tuple(getattr(obj, key) for key in keys)
buffer_[obj_key] = obj
if len(buffer_) >= self._buffer_thresholds[object_type]:
return self.flush()
return {}
def flush(
self, object_types: Sequence[LObjectType] = OBJECT_TYPES
) -> Dict[str, int]:
summary: Dict[str, int] = {}
def update_summary(stats):
for k, v in stats.items():
summary[k] = v + summary.get(k, 0)
for object_type in object_types:
buffer_ = self._objects[object_type]
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type])
for batch in batches:
add_fn = getattr(self.storage, "%s_add" % object_type)
stats = add_fn(list(batch))
update_summary(stats)
# Flush underlying storage
stats = self.storage.flush(object_types)
update_summary(stats)
self.clear_buffers(object_types)
return summary
def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None:
"""Clear objects from current buffer.
WARNING:
data that has not been flushed to storage will be lost when this
method is called. This should only be called when `flush` fails and
you want to continue your processing.
"""
for object_type in object_types:
buffer_ = self._objects[object_type]
buffer_.clear()
if object_type == "content":
self._contents_size = 0
elif object_type == "directory":
self._directory_entries = 0
elif object_type == "revision":
self._revision_parents = 0
+ self._revision_size = 0
+ elif object_type == "release":
+ self._release_size = 0
self.storage.clear_buffers(object_types)
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
index 48584402..8f0f30ef 100644
--- a/swh/storage/tests/test_buffer.py
+++ b/swh/storage/tests/test_buffer.py
@@ -1,679 +1,729 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
from typing import Optional
from unittest.mock import Mock
from swh.storage import get_storage
-from swh.storage.proxies.buffer import BufferingProxyStorage
+from swh.storage.proxies.buffer import (
+ BufferingProxyStorage,
+ estimate_release_size,
+ estimate_revision_size,
+)
def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage:
steps = [
{"cls": "buffer", **buffer_config},
{"cls": "memory"},
]
ret = get_storage("pipeline", steps=steps)
assert isinstance(ret, BufferingProxyStorage)
return ret
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None:
contents = sample_data.contents[:2]
contents_dict = [c.to_dict() for c in contents]
storage = get_storage_with_buffer_config(min_batch_size={"content": 10,})
s = storage.content_add(contents)
assert s == {}
# contents have not been written to storage
missing_contents = storage.content_missing(contents_dict)
assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1])
s = storage.flush()
assert s == {
"content:add": 1 + 1,
"content:add:bytes": contents[0].length + contents[1].length,
}
missing_contents = storage.content_missing(contents_dict)
assert list(missing_contents) == []
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None:
content = sample_data.content
content_dict = content.to_dict()
storage = get_storage_with_buffer_config(min_batch_size={"content": 1,})
s = storage.content_add([content])
assert s == {
"content:add": 1,
"content:add:bytes": content.length,
}
missing_contents = storage.content_missing([content_dict])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None:
contents = sample_data.contents[:2]
storage = get_storage_with_buffer_config(min_batch_size={"content": 2,})
s = storage.content_add([contents[0], contents[0]])
assert s == {}
s = storage.content_add([contents[0]])
assert s == {}
s = storage.content_add([contents[1]])
assert s == {
"content:add": 1 + 1,
"content:add:bytes": contents[0].length + contents[1].length,
}
missing_contents = storage.content_missing([c.to_dict() for c in contents])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None:
contents = sample_data.contents[:2]
content_bytes_min_batch_size = 2
storage = get_storage_with_buffer_config(
min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,}
)
assert contents[0].length > content_bytes_min_batch_size
s = storage.content_add([contents[0]])
assert s == {
"content:add": 1,
"content:add:bytes": contents[0].length,
}
missing_contents = storage.content_missing([contents[0].to_dict()])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None:
contents = sample_data.skipped_contents
contents_dict = [c.to_dict() for c in contents]
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,})
s = storage.skipped_content_add([contents[0], contents[1]])
assert s == {}
# contents have not been written to storage
missing_contents = storage.skipped_content_missing(contents_dict)
assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents}
s = storage.flush()
assert s == {"skipped_content:add": 1 + 1}
missing_contents = storage.skipped_content_missing(contents_dict)
assert list(missing_contents) == []
def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None:
contents = sample_data.skipped_contents
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,})
s = storage.skipped_content_add([contents[0]])
assert s == {"skipped_content:add": 1}
missing_contents = storage.skipped_content_missing([contents[0].to_dict()])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data):
contents = sample_data.skipped_contents[:2]
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,})
s = storage.skipped_content_add([contents[0], contents[0]])
assert s == {}
s = storage.skipped_content_add([contents[0]])
assert s == {}
s = storage.skipped_content_add([contents[1]])
assert s == {
"skipped_content:add": 1 + 1,
}
missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_extid_threshold_not_hit(sample_data) -> None:
extid = sample_data.extid1
storage = get_storage_with_buffer_config(min_batch_size={"extid": 10,})
s = storage.extid_add([extid])
assert s == {}
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == []
s = storage.flush()
assert s == {
"extid:add": 1,
}
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == [extid]
def test_buffering_proxy_storage_extid_threshold_hit(sample_data) -> None:
extid = sample_data.extid1
storage = get_storage_with_buffer_config(min_batch_size={"extid": 1,})
s = storage.extid_add([extid])
assert s == {
"extid:add": 1,
}
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == [extid]
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_extid_deduplicate(sample_data) -> None:
extids = sample_data.extids[:2]
storage = get_storage_with_buffer_config(min_batch_size={"extid": 2,})
s = storage.extid_add([extids[0], extids[0]])
assert s == {}
s = storage.extid_add([extids[0]])
assert s == {}
s = storage.extid_add([extids[1]])
assert s == {
"extid:add": 1 + 1,
}
for extid in extids:
present_extids = storage.extid_get_from_target(
extid.target.object_type, [extid.target.object_id]
)
assert list(present_extids) == [extid]
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None:
directory = sample_data.directory
storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,})
s = storage.directory_add([directory])
assert s == {}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == [directory.id]
s = storage.flush()
assert s == {
"directory:add": 1,
}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == []
def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None:
directory = sample_data.directory
storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,})
s = storage.directory_add([directory])
assert s == {
"directory:add": 1,
}
missing_directories = storage.directory_missing([directory.id])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None:
directories = sample_data.directories[:2]
storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,})
s = storage.directory_add([directories[0], directories[0]])
assert s == {}
s = storage.directory_add([directories[0]])
assert s == {}
s = storage.directory_add([directories[1]])
assert s == {
"directory:add": 1 + 1,
}
missing_directories = storage.directory_missing([d.id for d in directories])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_entries_threshold(sample_data) -> None:
directories = sample_data.directories
n_entries = sum(len(d.entries) for d in directories)
threshold = sum(len(d.entries) for d in directories[:-2])
# ensure the threshold is in the middle
assert 0 < threshold < n_entries
storage = get_storage_with_buffer_config(
min_batch_size={"directory_entries": threshold}
)
storage.storage = Mock(wraps=storage.storage)
for directory in directories:
storage.directory_add([directory])
storage.flush()
# We should have called the underlying directory_add at least twice, as
# we have hit the threshold for number of entries on directory n-2
method_calls = Counter(c[0] for c in storage.storage.method_calls)
assert method_calls["directory_add"] >= 2
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None:
revision = sample_data.revision
storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,})
s = storage.revision_add([revision])
assert s == {}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == [revision.id]
s = storage.flush()
assert s == {
"revision:add": 1,
}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == []
def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None:
revision = sample_data.revision
storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,})
s = storage.revision_add([revision])
assert s == {
"revision:add": 1,
}
missing_revisions = storage.revision_missing([revision.id])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None:
revisions = sample_data.revisions[:2]
storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,})
s = storage.revision_add([revisions[0], revisions[0]])
assert s == {}
s = storage.revision_add([revisions[0]])
assert s == {}
s = storage.revision_add([revisions[1]])
assert s == {
"revision:add": 1 + 1,
}
missing_revisions = storage.revision_missing([r.id for r in revisions])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_parents_threshold(sample_data) -> None:
revisions = sample_data.revisions
n_parents = sum(len(r.parents) for r in revisions)
threshold = sum(len(r.parents) for r in revisions[:-2])
# ensure the threshold is in the middle
assert 0 < threshold < n_parents
storage = get_storage_with_buffer_config(
min_batch_size={"revision_parents": threshold}
)
storage.storage = Mock(wraps=storage.storage)
for revision in revisions:
storage.revision_add([revision])
storage.flush()
# We should have called the underlying revision_add at least twice, as
# we have hit the threshold for number of parents on revision n-2
method_calls = Counter(c[0] for c in storage.storage.method_calls)
assert method_calls["revision_add"] >= 2
+def test_buffering_proxy_storage_revision_size_threshold(sample_data) -> None:
+ revisions = sample_data.revisions
+ total_size = sum(estimate_revision_size(r) for r in revisions)
+ threshold = sum(estimate_revision_size(r) for r in revisions[:-2])
+
+ # ensure the threshold is in the middle
+ assert 0 < threshold < total_size
+
+ storage = get_storage_with_buffer_config(
+ min_batch_size={"revision_bytes": threshold}
+ )
+ storage.storage = Mock(wraps=storage.storage)
+
+ for revision in revisions:
+ storage.revision_add([revision])
+ storage.flush()
+
+ # We should have called the underlying revision_add at least twice, as
+ # we have hit the threshold for number of parents on revision n-2
+ method_calls = Counter(c[0] for c in storage.storage.method_calls)
+ assert method_calls["revision_add"] >= 2
+
+
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None:
releases = sample_data.releases
threshold = 10
assert len(releases) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={"release": threshold,} # configuration set
)
s = storage.release_add(releases)
assert s == {}
release_ids = [r.id for r in releases]
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == release_ids
s = storage.flush()
assert s == {
"release:add": len(releases),
}
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == []
def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None:
releases = sample_data.releases
threshold = 2
assert len(releases) > threshold
storage = get_storage_with_buffer_config(
min_batch_size={"release": threshold,} # configuration set
)
s = storage.release_add(releases)
assert s == {
"release:add": len(releases),
}
release_ids = [r.id for r in releases]
missing_releases = storage.release_missing(release_ids)
assert list(missing_releases) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None:
releases = sample_data.releases[:2]
storage = get_storage_with_buffer_config(min_batch_size={"release": 2,})
s = storage.release_add([releases[0], releases[0]])
assert s == {}
s = storage.release_add([releases[0]])
assert s == {}
s = storage.release_add([releases[1]])
assert s == {
"release:add": 1 + 1,
}
missing_releases = storage.release_missing([r.id for r in releases])
assert list(missing_releases) == []
s = storage.flush()
assert s == {}
+def test_buffering_proxy_storage_release_size_threshold(sample_data) -> None:
+ releases = sample_data.releases
+ total_size = sum(estimate_release_size(r) for r in releases)
+ threshold = sum(estimate_release_size(r) for r in releases[:-2])
+
+ # ensure the threshold is in the middle
+ assert 0 < threshold < total_size
+
+ storage = get_storage_with_buffer_config(
+ min_batch_size={"release_bytes": threshold}
+ )
+ storage.storage = Mock(wraps=storage.storage)
+
+ for release in releases:
+ storage.release_add([release])
+ storage.flush()
+
+ # We should have called the underlying release_add at least twice, as
+ # we have hit the threshold for number of parents on release n-2
+ method_calls = Counter(c[0] for c in storage.storage.method_calls)
+ assert method_calls["release_add"] >= 2
+
+
def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None:
snapshots = sample_data.snapshots
threshold = 10
assert len(snapshots) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={"snapshot": threshold,} # configuration set
)
s = storage.snapshot_add(snapshots)
assert s == {}
snapshot_ids = [r.id for r in snapshots]
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == snapshot_ids
s = storage.flush()
assert s == {
"snapshot:add": len(snapshots),
}
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == []
def test_buffering_proxy_storage_snapshot_threshold_hit(sample_data) -> None:
snapshots = sample_data.snapshots
threshold = 2
assert len(snapshots) > threshold
storage = get_storage_with_buffer_config(
min_batch_size={"snapshot": threshold,} # configuration set
)
s = storage.snapshot_add(snapshots)
assert s == {
"snapshot:add": len(snapshots),
}
snapshot_ids = [r.id for r in snapshots]
missing_snapshots = storage.snapshot_missing(snapshot_ids)
assert list(missing_snapshots) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_snapshot_deduplicate(sample_data) -> None:
snapshots = sample_data.snapshots[:2]
storage = get_storage_with_buffer_config(min_batch_size={"snapshot": 2,})
s = storage.snapshot_add([snapshots[0], snapshots[0]])
assert s == {}
s = storage.snapshot_add([snapshots[0]])
assert s == {}
s = storage.snapshot_add([snapshots[1]])
assert s == {
"snapshot:add": 1 + 1,
}
missing_snapshots = storage.snapshot_missing([r.id for r in snapshots])
assert list(missing_snapshots) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_clear(sample_data) -> None:
"""Clear operation on buffer
"""
threshold = 10
contents = sample_data.contents
assert 0 < len(contents) < threshold
skipped_contents = sample_data.skipped_contents
assert 0 < len(skipped_contents) < threshold
directories = sample_data.directories
assert 0 < len(directories) < threshold
revisions = sample_data.revisions
assert 0 < len(revisions) < threshold
releases = sample_data.releases
assert 0 < len(releases) < threshold
snapshots = sample_data.snapshots
assert 0 < len(snapshots) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={
"content": threshold,
"skipped_content": threshold,
"directory": threshold,
"revision": threshold,
"release": threshold,
}
)
s = storage.content_add(contents)
assert s == {}
s = storage.skipped_content_add(skipped_contents)
assert s == {}
s = storage.directory_add(directories)
assert s == {}
s = storage.revision_add(revisions)
assert s == {}
s = storage.release_add(releases)
assert s == {}
s = storage.snapshot_add(snapshots)
assert s == {}
assert len(storage._objects["content"]) == len(contents)
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
assert len(storage._objects["snapshot"]) == len(snapshots)
# clear only content from the buffer
s = storage.clear_buffers(["content"]) # type: ignore
assert s is None
# specific clear operation on specific object type content only touched
# them
assert len(storage._objects["content"]) == 0
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
assert len(storage._objects["snapshot"]) == len(snapshots)
# clear current buffer from all object types
s = storage.clear_buffers() # type: ignore
assert s is None
assert len(storage._objects["content"]) == 0
assert len(storage._objects["skipped_content"]) == 0
assert len(storage._objects["directory"]) == 0
assert len(storage._objects["revision"]) == 0
assert len(storage._objects["release"]) == 0
assert len(storage._objects["snapshot"]) == 0
def test_buffer_proxy_with_default_args() -> None:
storage = get_storage_with_buffer_config()
assert storage is not None
def test_buffer_flush_stats(sample_data) -> None:
storage = get_storage_with_buffer_config()
s = storage.content_add(sample_data.contents)
assert s == {}
s = storage.skipped_content_add(sample_data.skipped_contents)
assert s == {}
s = storage.directory_add(sample_data.directories)
assert s == {}
s = storage.revision_add(sample_data.revisions)
assert s == {}
s = storage.release_add(sample_data.releases)
assert s == {}
s = storage.snapshot_add(sample_data.snapshots)
assert s == {}
# Flush all the things
s = storage.flush()
assert s["content:add"] > 0
assert s["content:add:bytes"] > 0
assert s["skipped_content:add"] > 0
assert s["directory:add"] > 0
assert s["revision:add"] > 0
assert s["release:add"] > 0
assert s["snapshot:add"] > 0
def test_buffer_operation_order(sample_data) -> None:
storage = get_storage_with_buffer_config()
# Wrap the inner storage in a mock to track all method calls.
storage.storage = mocked_storage = Mock(wraps=storage.storage)
# Simulate a loader: add contents, directories, revisions, releases, then
# snapshots.
storage.content_add(sample_data.contents)
storage.skipped_content_add(sample_data.skipped_contents)
storage.directory_add(sample_data.directories)
storage.revision_add(sample_data.revisions)
storage.release_add(sample_data.releases)
storage.snapshot_add(sample_data.snapshots)
# Check that nothing has been flushed yet
assert mocked_storage.method_calls == []
# Flush all the things
storage.flush()
methods_called = [c[0] for c in mocked_storage.method_calls]
prev = -1
for method in [
"content_add",
"skipped_content_add",
"directory_add",
"revision_add",
"release_add",
"snapshot_add",
"flush",
]:
try:
cur: Optional[int] = methods_called.index(method)
except ValueError:
cur = None
assert cur is not None, "Method %s not called" % method
assert cur > prev, "Method %s called out of order; all calls were: %s" % (
method,
methods_called,
)
prev = cur
def test_buffer_empty_batches() -> None:
"Flushing an empty buffer storage doesn't call any underlying _add method"
storage = get_storage_with_buffer_config()
storage.storage = mocked_storage = Mock(wraps=storage.storage)
storage.flush()
methods_called = {c[0] for c in mocked_storage.method_calls}
assert methods_called == {"flush", "clear_buffers"}

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 8:00 AM (10 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252348

Event Timeline