Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.storage.tests.test_buffer::test_buffering_proxy_storage_clear
Failed

TEST RESULT

Run At
Apr 9 2020, 12:48 PM
Details
sample_data = {'authority': [{'metadata': {'location': 'France'}, 'type': 'deposit-client', 'url': 'http:///hal/inria'}], 'content':...384>, 'target': b'12345678901234567890', 'type': 'dir'}], 'id': b'4\x013B2S1\x000\xf51\xe62\xa73\xff7\xc3\xa90'}], ...} def test_buffering_proxy_storage_clear(sample_data): """Clear operation on buffer """ threshold = 10 contents = sample_data["content"] assert 0 < len(contents) < threshold skipped_contents = sample_data["skipped_content"] assert 0 < len(skipped_contents) < threshold directories = sample_data["directory"] assert 0 < len(directories) < threshold revisions = sample_data["revision"] assert 0 < len(revisions) < threshold releases = sample_data["release"] assert 0 < len(releases) < threshold storage = get_storage_with_buffer_config( min_batch_size={ "content": threshold, "skipped_content": threshold, "directory": threshold, "revision": threshold, "release": threshold, } ) s = storage.content_add(contents) assert s == {} s = storage.skipped_content_add(skipped_contents) assert s == {} s = storage.directory_add(directories) assert s == {} s = storage.revision_add(revisions) assert s == {} s = storage.release_add(releases) assert s == {} assert len(storage._objects["content"]) == len(contents) assert len(storage._objects["skipped_content"]) == len(skipped_contents) assert len(storage._objects["directory"]) == len(directories) assert len(storage._objects["revision"]) == len(revisions) assert len(storage._objects["release"]) == len(releases) # clear only content from the buffer > s = storage.clear_buffers(["content"]) .tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_buffer.py:385: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/swh/storage/validate.py:112: in clear_buffers return self.storage.clear_buffers(object_types) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.storage.buffer.BufferingProxyStorage object at 0x7f7462b32630> object_types = ['content'] def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ if object_types is None: object_types = self.object_types for object_type in object_types: q = self._objects[object_type] q.clear() > return self.storage.clear_buffers(object_types) E AttributeError: 'InMemoryStorage' object has no attribute 'clear_buffers' .tox/py3/lib/python3.7/site-packages/swh/storage/buffer.py:152: AttributeError