Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
Show All 32 Lines | producer = KafkaProducer( | ||||
value_serializer=value_to_kafka, | value_serializer=value_to_kafka, | ||||
client_id='test producer', | client_id='test producer', | ||||
) | ) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
nb_visits = 0 | |||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
topic = kafka_prefix + '.' + object_type | topic = kafka_prefix + '.' + object_type | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
if object_type == 'content': | if object_type == 'content': | ||||
object_['ctime'] = now | object_['ctime'] = now | ||||
elif object_type == 'origin_visit': | |||||
nb_visits += 1 | |||||
object_['visit'] = nb_visits | |||||
producer.send(topic, key=key, value=object_) | producer.send(topic, key=key, value=object_) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'consumer_id': 'replayer', | 'consumer_id': 'replayer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
Show All 37 Lines |