Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
Show First 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | def test_storage_play( | ||||
producer.flush() | producer.flush() | ||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers='localhost:%d' % kafka_server[1], | brokers='localhost:%d' % kafka_server[1], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
max_messages=nb_sent, | stop_after_objects=nb_sent, | ||||
) | ) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
nb_inserted = 0 | nb_inserted = 0 | ||||
while nb_inserted < nb_sent: | while nb_inserted < nb_sent: | ||||
nb_inserted += replayer.process(worker_fn) | nb_inserted += replayer.process(worker_fn) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
# Check the objects were actually inserted in the storage | # Check the objects were actually inserted in the storage | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def _test_write_replay_origin_visit(visits): | ||||
writer.send('origin', 'foo', { | writer.send('origin', 'foo', { | ||||
'url': 'http://example.com/', | 'url': 'http://example.com/', | ||||
'type': 'git', | 'type': 'git', | ||||
}) | }) | ||||
for visit in visits: | for visit in visits: | ||||
writer.send('origin_visit', 'foo', visit) | writer.send('origin_visit', 'foo', visit) | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert replayer.max_messages is None | assert replayer.stop_after_objects is None | ||||
replayer.max_messages = queue_size | replayer.stop_after_objects = queue_size | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
nb_messages = 0 | |||||
while nb_messages < queue_size: | replayer.process(worker_fn) | ||||
nb_messages += replayer.process(worker_fn) | |||||
actual_visits = list(storage.origin_visit_get('http://example.com/')) | actual_visits = list(storage.origin_visit_get('http://example.com/')) | ||||
assert len(actual_visits) == len(visits), actual_visits | assert len(actual_visits) == len(visits), actual_visits | ||||
for vin, vout in zip(visits, actual_visits): | for vin, vout in zip(visits, actual_visits): | ||||
vin = vin.copy() | vin = vin.copy() | ||||
vout = vout.copy() | vout = vout.copy() | ||||
▲ Show 20 Lines • Show All 81 Lines • Show Last 20 Lines |