sample_data = <swh.storage.tests.storage_data.StorageData object at 0x7f1bf97305f8>
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data) -> None:
content = sample_data.content
content_dict = content.to_dict()
storage = get_storage_with_buffer_config(min_batch_size={"content": 1,})
s = storage.content_add([content])
assert s == {
"content:add": 1,
"content:add:bytes": content.length,
}
missing_contents = storage.content_missing([content_dict])
> assert list(missing_contents) == []
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_buffer.py:64:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f1bf9730470>
contents = [{'blake2s256': b"\xd5\xfe\x199We'\xe4,\xfdv\xa9EZ$2\xfe\x7fVf\x95dW}\xd9<B\x80\xe7mf\x1d", 'data': b'42\n', 'length': 3, 'sha1': b'4\x972t\xcc\xefj\xb4\xdf\xaa\xf8e\x99y/\xa9\xc3\xfeF\x89', ...}]
key_hash = 'sha1'
@timed
def content_missing(
self, contents: List[Dict[str, Any]], key_hash: str = "sha1"
) -> Iterable[bytes]:
if key_hash not in DEFAULT_ALGORITHMS:
raise StorageArgumentException(
"key_hash should be one of {','.join(DEFAULT_ALGORITHMS)}"
)
contents_with_all_hashes = []
contents_with_missing_hashes = []
for content in contents:
if DEFAULT_ALGORITHMS <= set(content):
contents_with_all_hashes.append(content)
else:
contents_with_missing_hashes.append(content)
# These contents can be queried efficiently directly in the main table
> for content in self._cql_runner.content_missing_from_all_hashes(
contents_with_all_hashes
):
E AttributeError: 'InMemoryCqlRunner' object has no attribute 'content_missing_from_all_hashes'
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:426: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Oct 18 2021, 1:45 PM