replayer_storage_and_client = (<swh.storage.in_memory.InMemoryStorage object at 0x7f88a2a3d630>, <swh.journal.client.JournalClient object at 0x7f88a2a3dc50>)
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f88a193b828>
def test_storage_replayer(replayer_storage_and_client, caplog):
"""Optimal replayer scenario.
This:
- writes objects to a source storage
- replayer consumes objects from the topic and replays them
- a destination storage is filled from this
In the end, both storages should have the same content.
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
method = getattr(src, object_type + "_add")
method(objects)
if object_type == "origin_visit":
nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
nb_sent += len(objects)
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
> nb_inserted = replayer.process(worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:82:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:264: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:291: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:61: in process_replay_objects
_insert_objects(object_type, objects, storage)
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:139: in _insert_objects
method(object_converter_fn[object_type](o) for o in objects)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f88a2a386a0>
origins = <generator object _insert_objects.<locals>.<genexpr> at 0x7f88a18df660>
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
added = 0
for origin in origins:
if origin.url not in self._origins:
self.origin_add_one(origin)
added += 1
> self._cql_runner.increment_counter("origin", len(origins))
E TypeError: object of type 'generator' has no len()
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:678: TypeError
TEST RESULT
TEST RESULT
- Run At
- Aug 12 2020, 6:44 PM