Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import random | import random | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import Tuple | ||||
import dateutil | import dateutil | ||||
from kafka import KafkaProducer | from confluent_kafka import Producer | ||||
from hypothesis import strategies, given, settings | from hypothesis import strategies, given, settings | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.in_memory import ENABLE_ORIGIN_IDS | from swh.storage.in_memory import ENABLE_ORIGIN_IDS | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | ||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS | ||||
from .utils import MockedJournalClient, MockedKafkaWriter | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
def test_storage_play( | def test_storage_play( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int]): | ||||
(_, port) = kafka_server | (_, port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
storage = get_storage('memory', {}) | storage = get_storage('memory', {}) | ||||
producer = KafkaProducer( | producer = Producer({ | ||||
bootstrap_servers='localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(port), | ||||
key_serializer=key_to_kafka, | 'client.id': 'test producer', | ||||
value_serializer=value_to_kafka, | 'enable.idempotence': 'true', | ||||
client_id='test producer', | }) | ||||
) | |||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
topic = kafka_prefix + '.' + object_type | topic = kafka_prefix + '.' + object_type | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
if object_type == 'content': | if object_type == 'content': | ||||
object_['ctime'] = now | object_['ctime'] = now | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
nb_visits += 1 | nb_visits += 1 | ||||
object_['visit'] = nb_visits | object_['visit'] = nb_visits | ||||
producer.send(topic, key=key, value=object_) | producer.produce( | ||||
topic=topic, key=key_to_kafka(key), | |||||
value=value_to_kafka(object_), | |||||
) | |||||
nb_sent += 1 | nb_sent += 1 | ||||
producer.flush() | |||||
vlorentz: missing `producer.flush()` | |||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'group_id': 'replayer', | 'group_id': 'replayer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
'max_messages': nb_sent, | 'max_messages': nb_sent, | ||||
} | } | ||||
replayer = JournalClient(**config) | replayer = JournalClient(**config) | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def _test_write_replay_origin_visit(visits): | ||||
# the origin when needed (type is missing) | # the origin when needed (type is missing) | ||||
writer.send('origin', 'foo', { | writer.send('origin', 'foo', { | ||||
'url': 'http://example.com/', | 'url': 'http://example.com/', | ||||
'type': 'git', | 'type': 'git', | ||||
}) | }) | ||||
for visit in visits: | for visit in visits: | ||||
writer.send('origin_visit', 'foo', visit) | writer.send('origin_visit', 'foo', visit) | ||||
queue_size = sum(len(partition) | queue_size = len(queue) | ||||
Not Done Inline ActionsIsn't this simplification working only as a side-effect of the current implementation? vlorentz: Isn't this simplification working only as a side-effect of the current implementation? | |||||
Done Inline ActionsI don't understand this question. The whole point of this queue business was to mock what the upstream kafka consumer produces, so we don't need to add a kafka in the loop. With kafka-python, the consumer produces batches of messages grouped by partition; with confluent-kafka, it just produces a bunch of individual messages. olasd: I don't understand this question.
The whole point of this queue business was to mock what the… | |||||
for batch in queue | |||||
for partition in batch.values()) | |||||
storage = get_storage('memory', {}) | storage = get_storage('memory', {}) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while nb_messages < queue_size: | while nb_messages < queue_size: | ||||
nb_messages += replayer.process(worker_fn) | nb_messages += replayer.process(worker_fn) | ||||
actual_visits = list(storage.origin_visit_get('http://example.com/')) | actual_visits = list(storage.origin_visit_get('http://example.com/')) | ||||
▲ Show 20 Lines • Show All 59 Lines • Show Last 20 Lines |
missing producer.flush()