Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
Show First 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
nb_visits += 1 | nb_visits += 1 | ||||
object_['visit'] = nb_visits | object_['visit'] = nb_visits | ||||
producer.send(topic, key=key, value=object_) | producer.send(topic, key=key, value=object_) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'consumer_id': 'replayer', | 'group_id': 'replayer', | ||||
'topic_prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
'max_messages': nb_sent, | 'max_messages': nb_sent, | ||||
} | } | ||||
replayer = JournalClient(**config) | replayer = JournalClient(**config) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
nb_inserted = 0 | nb_inserted = 0 | ||||
while nb_inserted < nb_sent: | while nb_inserted < nb_sent: | ||||
nb_inserted += replayer.process(worker_fn) | nb_inserted += replayer.process(worker_fn) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
Show All 33 Lines |