Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import namedtuple | |||||
import functools | import functools | ||||
from hypothesis import given, settings, HealthCheck | from hypothesis import given, settings, HealthCheck | ||||
from hypothesis.strategies import lists | from hypothesis.strategies import lists | ||||
from swh.model.hypothesis_strategies import object_dicts | from swh.model.hypothesis_strategies import object_dicts | ||||
from swh.storage.in_memory import Storage | from swh.storage.in_memory import Storage | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | |||||
from swh.journal.direct_writer import DirectKafkaWriter | |||||
from swh.journal.replay import process_replay_objects | from swh.journal.replay import process_replay_objects | ||||
from swh.journal.replay import process_replay_objects_content | from swh.journal.replay import process_replay_objects_content | ||||
from swh.journal.serializers import ( | |||||
key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) | |||||
FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') | |||||
class MockedKafkaWriter(DirectKafkaWriter): | |||||
def __init__(self, queue): | |||||
self._prefix = 'prefix' | |||||
self.queue = queue | |||||
def send(self, topic, key, value): | |||||
key = kafka_to_key(key_to_kafka(key)) | |||||
value = kafka_to_value(value_to_kafka(value)) | |||||
partition = FakeKafkaPartition(topic) | |||||
msg = FakeKafkaMessage(key=key, value=value) | |||||
if self.queue and {partition} == set(self.queue[-1]): | |||||
# The last message is of the same object type, groupping them | |||||
self.queue[-1][partition].append(msg) | |||||
else: | |||||
self.queue.append({partition: [msg]}) | |||||
class MockedKafkaConsumer: | |||||
def __init__(self, queue): | |||||
self.queue = queue | |||||
self.committed = False | |||||
def poll(self): | |||||
return self.queue.pop(0) | |||||
def commit(self): | |||||
if self.queue == []: | |||||
self.committed = True | |||||
class MockedJournalClient(JournalClient): | |||||
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | |||||
self._object_types = object_types | |||||
self.consumer = MockedKafkaConsumer(queue) | |||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(object_dicts(), min_size=1)) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_same_order_batches(objects): | def test_write_replay_same_order_batches(objects): | ||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
▲ Show 20 Lines • Show All 65 Lines • Show Last 20 Lines |