Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
Show First 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
object_['visit'] = nb_visits | object_['visit'] = nb_visits | ||||
producer.send(topic, key=key, value=object_) | producer.send(topic, key=key, value=object_) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'consumer_id': 'replayer', | 'consumer_id': 'replayer', | ||||
'prefix': kafka_prefix, | 'topic_prefix': kafka_prefix, | ||||
'max_messages': nb_sent, | |||||
} | } | ||||
replayer = StorageReplayer(**config) | replayer = StorageReplayer(**config, storage=storage) | ||||
nb_inserted = replayer.fill(storage, max_messages=nb_sent) | nb_inserted = replayer.process() | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
# Check the objects were actually inserted in the storage | # Check the objects were actually inserted in the storage | ||||
assert OBJECT_TYPE_KEYS['revision'][1] == \ | assert OBJECT_TYPE_KEYS['revision'][1] == \ | ||||
list(storage.revision_get( | list(storage.revision_get( | ||||
[rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) | [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) | ||||
assert OBJECT_TYPE_KEYS['release'][1] == \ | assert OBJECT_TYPE_KEYS['release'][1] == \ | ||||
list(storage.release_get( | list(storage.release_get( | ||||
Show All 26 Lines |