Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
Show All 13 Lines | |||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.replay import StorageReplayer | from swh.journal.replay import StorageReplayer | ||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS | ||||
def test_storage_play( | def test_storage_play( | ||||
kafka_prefix: str, | |||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int]): | ||||
(_, port) = kafka_server | (_, port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | |||||
storage = get_storage('memory', {}) | storage = get_storage('memory', {}) | ||||
producer = KafkaProducer( | producer = KafkaProducer( | ||||
bootstrap_servers='localhost:{}'.format(port), | bootstrap_servers='localhost:{}'.format(port), | ||||
key_serializer=key_to_kafka, | key_serializer=key_to_kafka, | ||||
value_serializer=value_to_kafka, | value_serializer=value_to_kafka, | ||||
client_id='test producer', | client_id='test producer', | ||||
) | ) | ||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
topic = 'swh.journal.objects.' + object_type | topic = kafka_prefix + '.' + object_type | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
producer.send(topic, key=key, value=object_) | producer.send(topic, key=key, value=object_) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'consumer_id': 'replayer', | 'consumer_id': 'replayer', | ||||
'prefix': 'swh.journal.objects', | 'prefix': kafka_prefix, | ||||
} | } | ||||
replayer = StorageReplayer(**config) | replayer = StorageReplayer(**config) | ||||
nb_inserted = replayer.fill(storage, max_messages=nb_sent) | nb_inserted = replayer.fill(storage, max_messages=nb_sent) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
# Check the objects were actually inserted in the storage | # Check the objects were actually inserted in the storage | ||||
assert OBJECT_TYPE_KEYS['revision'][1] == \ | assert OBJECT_TYPE_KEYS['revision'][1] == \ | ||||
list(storage.revision_get( | list(storage.revision_get( | ||||
Show All 25 Lines |