kafka_prefix = 'inawwufgkf', kafka_consumer_group = 'test-consumer-inawwufgkf'
kafka_server = '127.0.0.1:41033'
def test_storage_play_anonymized(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
):
"""Optimal replayer scenario.
This:
- writes objects to the topic
- replayer consumes objects from the topic and replay them
"""
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
}
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config}
storage = get_storage(**src_config)
# Fill the src storage
nb_sent = 0
for obj_type, objs in TEST_OBJECTS.items():
if obj_type in ("origin_visit", "origin_visit_status"):
# these are unrelated with what we want to test here
continue
method = getattr(storage, obj_type + "_add")
method(objs)
nb_sent += len(objs)
# Fill a destination storage from Kafka **using anonymized topics**
dst_storage = get_storage(cls="memory")
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
privileged=False,
)
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
> nb_inserted = replayer.process(worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:304:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:264: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:291: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:61: in process_replay_objects
_insert_objects(object_type, objects, storage)
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:139: in _insert_objects
method(object_converter_fn[object_type](o) for o in objects)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7fcfff7ec240>
origins = <generator object _insert_objects.<locals>.<genexpr> at 0x7fcfff78ea20>
def origin_add(self, origins: List[Origin]) -> Dict[str, int]:
added = 0
for origin in origins:
if origin.url not in self._origins:
self.origin_add_one(origin)
added += 1
> self._cql_runner.increment_counter("origin", len(origins))
E TypeError: object of type 'generator' has no len()
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:659: TypeError
TEST RESULT
TEST RESULT
- Run At
- Aug 12 2020, 6:49 PM