sample_data = {'authority': [{'metadata': {'location': 'France'}, 'type': 'deposit-client', 'url': 'http:///hal/inria'}], 'content':...384>, 'target': b'12345678901234567890', 'type': 'dir'}], 'id': b'4\x013B2S1\x000\xf51\xe62\xa73\xff7\xc3\xa90'}], ...}
def test_buffering_proxy_storage_clear(sample_data):
"""Clear operation on buffer
"""
threshold = 10
contents = sample_data["content"]
assert 0 < len(contents) < threshold
skipped_contents = sample_data["skipped_content"]
assert 0 < len(skipped_contents) < threshold
directories = sample_data["directory"]
assert 0 < len(directories) < threshold
revisions = sample_data["revision"]
assert 0 < len(revisions) < threshold
releases = sample_data["release"]
assert 0 < len(releases) < threshold
storage = get_storage_with_buffer_config(
min_batch_size={
"content": threshold,
"skipped_content": threshold,
"directory": threshold,
"revision": threshold,
"release": threshold,
}
)
s = storage.content_add(contents)
assert s == {}
s = storage.skipped_content_add(skipped_contents)
assert s == {}
s = storage.directory_add(directories)
assert s == {}
s = storage.revision_add(revisions)
assert s == {}
s = storage.release_add(releases)
assert s == {}
assert len(storage._objects["content"]) == len(contents)
assert len(storage._objects["skipped_content"]) == len(skipped_contents)
assert len(storage._objects["directory"]) == len(directories)
assert len(storage._objects["revision"]) == len(revisions)
assert len(storage._objects["release"]) == len(releases)
# clear only content from the buffer
> s = storage.clear_buffers(["content"])
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_buffer.py:385:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/validate.py:112: in clear_buffers
return self.storage.clear_buffers(object_types)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.buffer.BufferingProxyStorage object at 0x7f7462b32630>
object_types = ['content']
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None:
"""Clear objects from current buffer.
WARNING:
data that has not been flushed to storage will be lost when this
method is called. This should only be called when `flush` fails and
you want to continue your processing.
"""
if object_types is None:
object_types = self.object_types
for object_type in object_types:
q = self._objects[object_type]
q.clear()
> return self.storage.clear_buffers(object_types)
E AttributeError: 'InMemoryStorage' object has no attribute 'clear_buffers'
.tox/py3/lib/python3.7/site-packages/swh/storage/buffer.py:152: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Apr 9 2020, 12:48 PM