Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.buffer import BufferingProxyStorage | |||||
def get_storage_with_buffer_config(**buffer_config): | def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: | ||||
storage_config = { | steps = [ | ||||
"cls": "pipeline", | {"cls": "buffer", **buffer_config}, | ||||
"steps": [{"cls": "buffer", **buffer_config}, {"cls": "memory"},], | {"cls": "memory"}, | ||||
} | ] | ||||
return get_storage(**storage_config) | ret = get_storage("pipeline", steps=steps) | ||||
assert isinstance(ret, BufferingProxyStorage) | |||||
return ret | |||||
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_content_threshold_not_hit(sample_data) -> None: | ||||
contents = sample_data.contents[:2] | contents = sample_data.contents[:2] | ||||
contents_dict = [c.to_dict() for c in contents] | contents_dict = [c.to_dict() for c in contents] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) | ||||
s = storage.content_add(contents) | s = storage.content_add(contents) | ||||
assert s == {} | assert s == {} | ||||
# contents have not been written to storage | # contents have not been written to storage | ||||
missing_contents = storage.content_missing(contents_dict) | missing_contents = storage.content_missing(contents_dict) | ||||
assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) | assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"content:add": 1 + 1, | "content:add": 1 + 1, | ||||
"content:add:bytes": contents[0].length + contents[1].length, | "content:add:bytes": contents[0].length + contents[1].length, | ||||
} | } | ||||
missing_contents = storage.content_missing(contents_dict) | missing_contents = storage.content_missing(contents_dict) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): | def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None: | ||||
content = sample_data.content | content = sample_data.content | ||||
content_dict = content.to_dict() | content_dict = content.to_dict() | ||||
storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) | ||||
s = storage.content_add([content]) | s = storage.content_add([content]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": content.length, | "content:add:bytes": content.length, | ||||
} | } | ||||
missing_contents = storage.content_missing([content_dict]) | missing_contents = storage.content_missing([content_dict]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_content_deduplicate(sample_data): | def test_buffering_proxy_storage_content_deduplicate(sample_data) -> None: | ||||
contents = sample_data.contents[:2] | contents = sample_data.contents[:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) | ||||
s = storage.content_add([contents[0], contents[0]]) | s = storage.content_add([contents[0], contents[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.content_add([contents[0]]) | s = storage.content_add([contents[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.content_add([contents[1]]) | s = storage.content_add([contents[1]]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1 + 1, | "content:add": 1 + 1, | ||||
"content:add:bytes": contents[0].length + contents[1].length, | "content:add:bytes": contents[0].length + contents[1].length, | ||||
} | } | ||||
missing_contents = storage.content_missing([c.to_dict() for c in contents]) | missing_contents = storage.content_missing([c.to_dict() for c in contents]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): | def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data) -> None: | ||||
contents = sample_data.contents[:2] | contents = sample_data.contents[:2] | ||||
content_bytes_min_batch_size = 2 | content_bytes_min_batch_size = 2 | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} | min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} | ||||
) | ) | ||||
assert contents[0].length > content_bytes_min_batch_size | assert contents[0].length > content_bytes_min_batch_size | ||||
s = storage.content_add([contents[0]]) | s = storage.content_add([contents[0]]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": contents[0].length, | "content:add:bytes": contents[0].length, | ||||
} | } | ||||
missing_contents = storage.content_missing([contents[0].to_dict()]) | missing_contents = storage.content_missing([contents[0].to_dict()]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data) -> None: | ||||
contents = sample_data.skipped_contents | contents = sample_data.skipped_contents | ||||
contents_dict = [c.to_dict() for c in contents] | contents_dict = [c.to_dict() for c in contents] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) | ||||
s = storage.skipped_content_add([contents[0], contents[1]]) | s = storage.skipped_content_add([contents[0], contents[1]]) | ||||
assert s == {} | assert s == {} | ||||
# contents have not been written to storage | # contents have not been written to storage | ||||
missing_contents = storage.skipped_content_missing(contents_dict) | missing_contents = storage.skipped_content_missing(contents_dict) | ||||
assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} | assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {"skipped_content:add": 1 + 1} | assert s == {"skipped_content:add": 1 + 1} | ||||
missing_contents = storage.skipped_content_missing(contents_dict) | missing_contents = storage.skipped_content_missing(contents_dict) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): | def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data) -> None: | ||||
contents = sample_data.skipped_contents | contents = sample_data.skipped_contents | ||||
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) | ||||
s = storage.skipped_content_add([contents[0]]) | s = storage.skipped_content_add([contents[0]]) | ||||
assert s == {"skipped_content:add": 1} | assert s == {"skipped_content:add": 1} | ||||
missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) | missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
Show All 19 Lines | def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): | ||||
missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) | missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data) -> None: | ||||
directory = sample_data.directory | directory = sample_data.directory | ||||
storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) | ||||
s = storage.directory_add([directory]) | s = storage.directory_add([directory]) | ||||
assert s == {} | assert s == {} | ||||
missing_directories = storage.directory_missing([directory.id]) | missing_directories = storage.directory_missing([directory.id]) | ||||
assert list(missing_directories) == [directory.id] | assert list(missing_directories) == [directory.id] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"directory:add": 1, | "directory:add": 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing([directory.id]) | missing_directories = storage.directory_missing([directory.id]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
def test_buffering_proxy_storage_directory_threshold_hit(sample_data): | def test_buffering_proxy_storage_directory_threshold_hit(sample_data) -> None: | ||||
directory = sample_data.directory | directory = sample_data.directory | ||||
storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) | ||||
s = storage.directory_add([directory]) | s = storage.directory_add([directory]) | ||||
assert s == { | assert s == { | ||||
"directory:add": 1, | "directory:add": 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing([directory.id]) | missing_directories = storage.directory_missing([directory.id]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_directory_deduplicate(sample_data): | def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None: | ||||
directories = sample_data.directories[:2] | directories = sample_data.directories[:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) | ||||
s = storage.directory_add([directories[0], directories[0]]) | s = storage.directory_add([directories[0], directories[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.directory_add([directories[0]]) | s = storage.directory_add([directories[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.directory_add([directories[1]]) | s = storage.directory_add([directories[1]]) | ||||
assert s == { | assert s == { | ||||
"directory:add": 1 + 1, | "directory:add": 1 + 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing([d.id for d in directories]) | missing_directories = storage.directory_missing([d.id for d in directories]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: | ||||
revision = sample_data.revision | revision = sample_data.revision | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) | ||||
s = storage.revision_add([revision]) | s = storage.revision_add([revision]) | ||||
assert s == {} | assert s == {} | ||||
missing_revisions = storage.revision_missing([revision.id]) | missing_revisions = storage.revision_missing([revision.id]) | ||||
assert list(missing_revisions) == [revision.id] | assert list(missing_revisions) == [revision.id] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"revision:add": 1, | "revision:add": 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing([revision.id]) | missing_revisions = storage.revision_missing([revision.id]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
def test_buffering_proxy_storage_revision_threshold_hit(sample_data): | def test_buffering_proxy_storage_revision_threshold_hit(sample_data) -> None: | ||||
revision = sample_data.revision | revision = sample_data.revision | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) | ||||
s = storage.revision_add([revision]) | s = storage.revision_add([revision]) | ||||
assert s == { | assert s == { | ||||
"revision:add": 1, | "revision:add": 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing([revision.id]) | missing_revisions = storage.revision_missing([revision.id]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_revision_deduplicate(sample_data): | def test_buffering_proxy_storage_revision_deduplicate(sample_data) -> None: | ||||
revisions = sample_data.revisions[:2] | revisions = sample_data.revisions[:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) | ||||
s = storage.revision_add([revisions[0], revisions[0]]) | s = storage.revision_add([revisions[0], revisions[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.revision_add([revisions[0]]) | s = storage.revision_add([revisions[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.revision_add([revisions[1]]) | s = storage.revision_add([revisions[1]]) | ||||
assert s == { | assert s == { | ||||
"revision:add": 1 + 1, | "revision:add": 1 + 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing([r.id for r in revisions]) | missing_revisions = storage.revision_missing([r.id for r in revisions]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: | ||||
releases = sample_data.releases | releases = sample_data.releases | ||||
threshold = 10 | threshold = 10 | ||||
assert len(releases) < threshold | assert len(releases) < threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"release": threshold,} # configuration set | min_batch_size={"release": threshold,} # configuration set | ||||
) | ) | ||||
s = storage.release_add(releases) | s = storage.release_add(releases) | ||||
assert s == {} | assert s == {} | ||||
release_ids = [r.id for r in releases] | release_ids = [r.id for r in releases] | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == release_ids | assert list(missing_releases) == release_ids | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"release:add": len(releases), | "release:add": len(releases), | ||||
} | } | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
def test_buffering_proxy_storage_release_threshold_hit(sample_data): | def test_buffering_proxy_storage_release_threshold_hit(sample_data) -> None: | ||||
releases = sample_data.releases | releases = sample_data.releases | ||||
threshold = 2 | threshold = 2 | ||||
assert len(releases) > threshold | assert len(releases) > threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"release": threshold,} # configuration set | min_batch_size={"release": threshold,} # configuration set | ||||
) | ) | ||||
s = storage.release_add(releases) | s = storage.release_add(releases) | ||||
assert s == { | assert s == { | ||||
"release:add": len(releases), | "release:add": len(releases), | ||||
} | } | ||||
release_ids = [r.id for r in releases] | release_ids = [r.id for r in releases] | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_release_deduplicate(sample_data): | def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None: | ||||
releases = sample_data.releases[:2] | releases = sample_data.releases[:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) | ||||
s = storage.release_add([releases[0], releases[0]]) | s = storage.release_add([releases[0], releases[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.release_add([releases[0]]) | s = storage.release_add([releases[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.release_add([releases[1]]) | s = storage.release_add([releases[1]]) | ||||
assert s == { | assert s == { | ||||
"release:add": 1 + 1, | "release:add": 1 + 1, | ||||
} | } | ||||
missing_releases = storage.release_missing([r.id for r in releases]) | missing_releases = storage.release_missing([r.id for r in releases]) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_clear(sample_data): | def test_buffering_proxy_storage_clear(sample_data) -> None: | ||||
"""Clear operation on buffer | """Clear operation on buffer | ||||
""" | """ | ||||
threshold = 10 | threshold = 10 | ||||
contents = sample_data.contents | contents = sample_data.contents | ||||
assert 0 < len(contents) < threshold | assert 0 < len(contents) < threshold | ||||
skipped_contents = sample_data.skipped_contents | skipped_contents = sample_data.skipped_contents | ||||
assert 0 < len(skipped_contents) < threshold | assert 0 < len(skipped_contents) < threshold | ||||
Show All 27 Lines | def test_buffering_proxy_storage_clear(sample_data) -> None: | ||||
assert len(storage._objects["content"]) == len(contents) | assert len(storage._objects["content"]) == len(contents) | ||||
assert len(storage._objects["skipped_content"]) == len(skipped_contents) | assert len(storage._objects["skipped_content"]) == len(skipped_contents) | ||||
assert len(storage._objects["directory"]) == len(directories) | assert len(storage._objects["directory"]) == len(directories) | ||||
assert len(storage._objects["revision"]) == len(revisions) | assert len(storage._objects["revision"]) == len(revisions) | ||||
assert len(storage._objects["release"]) == len(releases) | assert len(storage._objects["release"]) == len(releases) | ||||
# clear only content from the buffer | # clear only content from the buffer | ||||
s = storage.clear_buffers(["content"]) | s = storage.clear_buffers(["content"]) # type: ignore | ||||
assert s is None | assert s is None | ||||
# specific clear operation on specific object type content only touched | # specific clear operation on specific object type content only touched | ||||
# them | # them | ||||
assert len(storage._objects["content"]) == 0 | assert len(storage._objects["content"]) == 0 | ||||
assert len(storage._objects["skipped_content"]) == len(skipped_contents) | assert len(storage._objects["skipped_content"]) == len(skipped_contents) | ||||
assert len(storage._objects["directory"]) == len(directories) | assert len(storage._objects["directory"]) == len(directories) | ||||
assert len(storage._objects["revision"]) == len(revisions) | assert len(storage._objects["revision"]) == len(revisions) | ||||
assert len(storage._objects["release"]) == len(releases) | assert len(storage._objects["release"]) == len(releases) | ||||
# clear current buffer from all object types | # clear current buffer from all object types | ||||
s = storage.clear_buffers() | s = storage.clear_buffers() # type: ignore | ||||
assert s is None | assert s is None | ||||
assert len(storage._objects["content"]) == 0 | assert len(storage._objects["content"]) == 0 | ||||
assert len(storage._objects["skipped_content"]) == 0 | assert len(storage._objects["skipped_content"]) == 0 | ||||
assert len(storage._objects["directory"]) == 0 | assert len(storage._objects["directory"]) == 0 | ||||
assert len(storage._objects["revision"]) == 0 | assert len(storage._objects["revision"]) == 0 | ||||
assert len(storage._objects["release"]) == 0 | assert len(storage._objects["release"]) == 0 | ||||
def test_buffer_proxy_with_default_args(): | def test_buffer_proxy_with_default_args() -> None: | ||||
storage = get_storage_with_buffer_config() | storage = get_storage_with_buffer_config() | ||||
assert storage is not None | assert storage is not None |