Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_buffer.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from swh.storage.buffer import BufferingProxyStorage | from swh.storage import get_storage | ||||
def get_storage_with_buffer_config(**buffer_config): | |||||
storage_config = { | storage_config = { | ||||
'cls': 'validate', | 'cls': 'pipeline', | ||||
'storage': { | 'steps': [ | ||||
'cls': 'memory' | {'cls': 'validate'}, | ||||
} | {'cls': 'buffer', **buffer_config}, | ||||
{'cls': 'memory'}, | |||||
] | |||||
} | } | ||||
return get_storage(**storage_config) | |||||
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): | ||||
contents = sample_data['content'] | contents = sample_data['content'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'content': 10, | 'content': 10, | ||||
} | } | ||||
) | ) | ||||
s = storage.content_add([contents[0], contents[1]]) | s = storage.content_add([contents[0], contents[1]]) | ||||
assert s == {} | assert s == {} | ||||
# contents have not been written to storage | # contents have not been written to storage | ||||
Show All 10 Lines | def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): | ||||
missing_contents = storage.content_missing( | missing_contents = storage.content_missing( | ||||
[contents[0], contents[1]]) | [contents[0], contents[1]]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): | def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): | ||||
contents = sample_data['content'] | contents = sample_data['content'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'content': 1, | 'content': 1, | ||||
} | } | ||||
) | ) | ||||
s = storage.content_add([contents[0]]) | s = storage.content_add([contents[0]]) | ||||
assert s == { | assert s == { | ||||
'content:add': 1, | 'content:add': 1, | ||||
'content:add:bytes': contents[0]['length'], | 'content:add:bytes': contents[0]['length'], | ||||
} | } | ||||
missing_contents = storage.content_missing([contents[0]]) | missing_contents = storage.content_missing([contents[0]]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): | def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): | ||||
contents = sample_data['content'] | contents = sample_data['content'] | ||||
content_bytes_min_batch_size = 2 | content_bytes_min_batch_size = 2 | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'content': 10, | 'content': 10, | ||||
'content_bytes': content_bytes_min_batch_size, | 'content_bytes': content_bytes_min_batch_size, | ||||
} | } | ||||
) | ) | ||||
assert contents[0]['length'] > content_bytes_min_batch_size | assert contents[0]['length'] > content_bytes_min_batch_size | ||||
s = storage.content_add([contents[0]]) | s = storage.content_add([contents[0]]) | ||||
assert s == { | assert s == { | ||||
'content:add': 1, | 'content:add': 1, | ||||
'content:add:bytes': contents[0]['length'], | 'content:add:bytes': contents[0]['length'], | ||||
} | } | ||||
missing_contents = storage.content_missing([contents[0]]) | missing_contents = storage.content_missing([contents[0]]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_skipped_content_threshold_not_hit( | def test_buffering_proxy_storage_skipped_content_threshold_not_hit( | ||||
sample_data): | sample_data): | ||||
contents = sample_data['skipped_content'] | contents = sample_data['skipped_content'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'skipped_content': 10, | 'skipped_content': 10, | ||||
} | } | ||||
) | ) | ||||
s = storage.skipped_content_add([contents[0], contents[1]]) | s = storage.skipped_content_add([contents[0], contents[1]]) | ||||
assert s == {} | assert s == {} | ||||
# contents have not been written to storage | # contents have not been written to storage | ||||
Show All 9 Lines | def test_buffering_proxy_storage_skipped_content_threshold_not_hit( | ||||
missing_contents = storage.skipped_content_missing( | missing_contents = storage.skipped_content_missing( | ||||
[contents[0], contents[1]]) | [contents[0], contents[1]]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): | def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): | ||||
contents = sample_data['skipped_content'] | contents = sample_data['skipped_content'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'skipped_content': 1, | 'skipped_content': 1, | ||||
} | } | ||||
) | ) | ||||
s = storage.skipped_content_add([contents[0]]) | s = storage.skipped_content_add([contents[0]]) | ||||
assert s == { | assert s == { | ||||
'skipped_content:add': 1 | 'skipped_content:add': 1 | ||||
} | } | ||||
missing_contents = storage.skipped_content_missing([contents[0]]) | missing_contents = storage.skipped_content_missing([contents[0]]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): | ||||
directories = sample_data['directory'] | directories = sample_data['directory'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'directory': 10, | 'directory': 10, | ||||
} | } | ||||
) | ) | ||||
s = storage.directory_add([directories[0]]) | s = storage.directory_add([directories[0]]) | ||||
assert s == {} | assert s == {} | ||||
directory_id = directories[0]['id'] | directory_id = directories[0]['id'] | ||||
missing_directories = storage.directory_missing( | missing_directories = storage.directory_missing( | ||||
[directory_id]) | [directory_id]) | ||||
assert list(missing_directories) == [directory_id] | assert list(missing_directories) == [directory_id] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
'directory:add': 1, | 'directory:add': 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing( | missing_directories = storage.directory_missing( | ||||
[directory_id]) | [directory_id]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
def test_buffering_proxy_storage_directory_threshold_hit(sample_data): | def test_buffering_proxy_storage_directory_threshold_hit(sample_data): | ||||
directories = sample_data['directory'] | directories = sample_data['directory'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'directory': 1, | 'directory': 1, | ||||
} | } | ||||
) | ) | ||||
s = storage.directory_add([directories[0]]) | s = storage.directory_add([directories[0]]) | ||||
assert s == { | assert s == { | ||||
'directory:add': 1, | 'directory:add': 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing( | missing_directories = storage.directory_missing( | ||||
[directories[0]['id']]) | [directories[0]['id']]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): | ||||
revisions = sample_data['revision'] | revisions = sample_data['revision'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'revision': 10, | 'revision': 10, | ||||
} | } | ||||
) | ) | ||||
s = storage.revision_add([revisions[0]]) | s = storage.revision_add([revisions[0]]) | ||||
assert s == {} | assert s == {} | ||||
revision_id = revisions[0]['id'] | revision_id = revisions[0]['id'] | ||||
missing_revisions = storage.revision_missing( | missing_revisions = storage.revision_missing( | ||||
[revision_id]) | [revision_id]) | ||||
assert list(missing_revisions) == [revision_id] | assert list(missing_revisions) == [revision_id] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
'revision:add': 1, | 'revision:add': 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing( | missing_revisions = storage.revision_missing( | ||||
[revision_id]) | [revision_id]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
def test_buffering_proxy_storage_revision_threshold_hit(sample_data): | def test_buffering_proxy_storage_revision_threshold_hit(sample_data): | ||||
revisions = sample_data['revision'] | revisions = sample_data['revision'] | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'revision': 1, | 'revision': 1, | ||||
} | } | ||||
) | ) | ||||
s = storage.revision_add([revisions[0]]) | s = storage.revision_add([revisions[0]]) | ||||
assert s == { | assert s == { | ||||
'revision:add': 1, | 'revision:add': 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing( | missing_revisions = storage.revision_missing( | ||||
[revisions[0]['id']]) | [revisions[0]['id']]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): | def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): | ||||
releases = sample_data['release'] | releases = sample_data['release'] | ||||
threshold = 10 | threshold = 10 | ||||
assert len(releases) < threshold | assert len(releases) < threshold | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'release': threshold, # configuration set | 'release': threshold, # configuration set | ||||
} | } | ||||
) | ) | ||||
s = storage.release_add(releases) | s = storage.release_add(releases) | ||||
assert s == {} | assert s == {} | ||||
release_ids = [r['id'] for r in releases] | release_ids = [r['id'] for r in releases] | ||||
Show All 9 Lines | def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
def test_buffering_proxy_storage_release_threshold_hit(sample_data): | def test_buffering_proxy_storage_release_threshold_hit(sample_data): | ||||
releases = sample_data['release'] | releases = sample_data['release'] | ||||
threshold = 2 | threshold = 2 | ||||
assert len(releases) > threshold | assert len(releases) > threshold | ||||
storage = BufferingProxyStorage( | storage = get_storage_with_buffer_config( | ||||
storage=storage_config, | |||||
min_batch_size={ | min_batch_size={ | ||||
'release': threshold, # configuration set | 'release': threshold, # configuration set | ||||
} | } | ||||
) | ) | ||||
s = storage.release_add(releases) | s = storage.release_add(releases) | ||||
assert s == { | assert s == { | ||||
'release:add': len(releases), | 'release:add': len(releases), | ||||
} | } | ||||
release_ids = [r['id'] for r in releases] | release_ids = [r['id'] for r in releases] | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} |