swh_storage = <swh.storage.validate.ValidatingProxyStorage object at 0x7f4b199f5c50>
sample_data = {'authority': [{'metadata': {'location': 'France'}, 'type': 'deposit-client', 'url': 'http:///hal/inria'}], 'content':...384>, 'target': b'12345678901234567890', 'type': 'dir'}], 'id': b'4\x013B2S1\x000\xf51\xe62\xa73\xff7\xc3\xa90'}], ...}
def test_filtering_proxy_storage_clear(swh_storage, sample_data):
"""Clear operation on filter proxy
"""
threshold = 10
contents = sample_data["content"]
assert 0 < len(contents) < threshold
skipped_contents = sample_data["skipped_content"]
assert 0 < len(skipped_contents) < threshold
directories = sample_data["directory"]
assert 0 < len(directories) < threshold
revisions = sample_data["revision"]
assert 0 < len(revisions) < threshold
releases = sample_data["release"]
assert 0 < len(releases) < threshold
s = swh_storage.content_add(contents)
assert s["content:add"] == len(contents)
s = swh_storage.skipped_content_add(skipped_contents)
assert s == {
"skipped_content:add": len(directories),
}
s = swh_storage.directory_add(directories)
assert s == {
"directory:add": len(directories),
}
s = swh_storage.revision_add(revisions)
assert s == {
"revision:add": len(revisions),
}
assert len(swh_storage.objects_seen["content"]) == len(contents)
assert len(swh_storage.objects_seen["skipped_content"]) == len(skipped_contents)
assert len(swh_storage.objects_seen["directory"]) == len(directories)
assert len(swh_storage.objects_seen["revision"]) == len(revisions)
# clear only content from the buffer
> s = swh_storage.clear_buffers(["content"])
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_filter.py:167:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/validate.py:112: in clear_buffers
return self.storage.clear_buffers(object_types)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.filter.FilteringProxyStorage object at 0x7f4b199f5b00>
object_types = ['content']
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None:
"""Clear objects from current buffer
"""
if object_types is None:
object_types = self.object_types
for object_type in object_types:
self.objects_seen[object_type] = set()
> return self.storage.clear_buffers(object_types)
E AttributeError: 'InMemoryStorage' object has no attribute 'clear_buffers'
.tox/py3/lib/python3.7/site-packages/swh/storage/filter.py:150: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Apr 9 2020, 3:28 PM