Changeset View
Standalone View
swh/journal/tests/test_write_replay.py
Show All 10 Lines | |||||
from swh.model.hypothesis_strategies import object_dicts | from swh.model.hypothesis_strategies import object_dicts | ||||
from swh.storage.in_memory import Storage | from swh.storage.in_memory import Storage | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES | ||||
from swh.journal.direct_writer import DirectKafkaWriter | from swh.journal.direct_writer import DirectKafkaWriter | ||||
from swh.journal.replay import process_replay_objects | from swh.journal.replay import process_replay_objects | ||||
from swh.journal.replay import process_replay_objects_content | |||||
from swh.journal.serializers import ( | from swh.journal.serializers import ( | ||||
key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) | key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) | ||||
FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') | FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') | ||||
FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') | FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') | ||||
class MockedKafkaWriter(DirectKafkaWriter): | class MockedKafkaWriter(DirectKafkaWriter): | ||||
def __init__(self, queue): | def __init__(self, queue): | ||||
self._prefix = 'prefix' | self._prefix = 'prefix' | ||||
self.queue = queue | self.queue = queue | ||||
vlorentz: A writer is not a consumer. You would want to use `MockConsumer` instead. | |||||
Done Inline Actionsas said before, this mockup is not a nice mockup of a kafka consumer nor a kafka producer, but some sort of frankenstein of both. It does the job, but is not so pretty. douardda: as said before, this mockup is not a nice mockup of a kafka consumer nor a kafka producer, but… | |||||
def send(self, topic, key, value): | def send(self, topic, key, value): | ||||
key = kafka_to_key(key_to_kafka(key)) | key = kafka_to_key(key_to_kafka(key)) | ||||
value = kafka_to_value(value_to_kafka(value)) | value = kafka_to_value(value_to_kafka(value)) | ||||
partition = FakeKafkaPartition(topic) | partition = FakeKafkaPartition(topic) | ||||
msg = FakeKafkaMessage(key=key, value=value) | msg = FakeKafkaMessage(key=key, value=value) | ||||
if self.queue and {partition} == set(self.queue[-1]): | if self.queue and {partition} == set(self.queue[-1]): | ||||
# The last message is of the same object type, groupping them | # The last message is of the same object type, groupping them | ||||
self.queue[-1][partition].append(msg) | self.queue[-1][partition].append(msg) | ||||
else: | else: | ||||
self.queue.append({partition: [msg]}) | self.queue.append({partition: [msg]}) | ||||
class MockedKafkaConsumer: | class MockedKafkaConsumer: | ||||
def __init__(self, queue): | def __init__(self, queue): | ||||
self.queue = queue | self.queue = queue | ||||
self.committed = False | self.committed = False | ||||
Not Done Inline Actionsnot needed in a writer. You should put that in a new class, eg. MockConsumer. vlorentz: not needed in a writer.
You should put that in a new class, eg. `MockConsumer`. | |||||
Done Inline Actionsthis mock is both a writer and a consumer in this test. I mostly just merged several sparse functions in this class. I was not very pleased with the way these tests were written before, so I did this, but the result does not please me too much neither... douardda: this mock is both a writer and a consumer in this test. I mostly just merged several sparse… | |||||
Not Done Inline Actions
these are two different things, so they should be different classes.
Did you try using unittest.mock.patch? vlorentz: > this mock is both a writer and a consumer in this test
these are two different things, so… | |||||
Done Inline Actionsplease note I "just" refactored existing tests... douardda: please note I "just" refactored existing tests... | |||||
def poll(self): | def poll(self): | ||||
return self.queue.pop(0) | return self.queue.pop(0) | ||||
def commit(self): | def commit(self): | ||||
if self.queue == []: | if self.queue == []: | ||||
self.committed = True | self.committed = True | ||||
class MockedJournalClient(JournalClient): | class MockedJournalClient(JournalClient): | ||||
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): | ||||
self._object_types = object_types | self._object_types = object_types | ||||
self.consumer = MockedKafkaConsumer(queue) | self.consumer = MockedKafkaConsumer(queue) | ||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(object_dicts(), min_size=1)) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_same_order_batches(objects): | def test_write_replay_same_order_batches(objects): | ||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
storage1 = Storage() | storage1 = Storage() | ||||
storage1.journal_writer = MockedKafkaWriter(queue) | storage1.journal_writer = MockedKafkaWriter(queue) | ||||
Not Done Inline Actionssame vlorentz: same | |||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
origin_id = storage1.origin_add_one(obj.pop('origin')) | origin_id = storage1.origin_add_one(obj.pop('origin')) | ||||
if 'visit' in obj: | if 'visit' in obj: | ||||
del obj['visit'] | del obj['visit'] | ||||
storage1.origin_visit_add(origin_id, **obj) | storage1.origin_visit_add(origin_id, **obj) | ||||
Show All 18 Lines | def test_write_replay_same_order_batches(objects): | ||||
for attr_name in ('_contents', '_directories', '_revisions', '_releases', | for attr_name in ('_contents', '_directories', '_revisions', '_releases', | ||||
'_snapshots', '_origin_visits', '_origins'): | '_snapshots', '_origin_visits', '_origins'): | ||||
assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | ||||
attr_name | attr_name | ||||
# TODO: add test for hash collision | # TODO: add test for hash collision | ||||
@given(lists(object_dicts(), min_size=1)) | |||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | |||||
def test_write_replay_content(objects): | |||||
queue = [] | |||||
replayer = MockedJournalClient(queue) | |||||
storage1 = Storage() | |||||
storage1.journal_writer = MockedKafkaWriter(queue) | |||||
for (obj_type, obj) in objects: | |||||
obj = obj.copy() | |||||
if obj_type == 'content': | |||||
storage1.content_add([obj]) | |||||
queue_size = sum(len(partition) | |||||
for batch in queue | |||||
for partition in batch.values()) | |||||
storage2 = Storage() | |||||
worker_fn = functools.partial(process_replay_objects_content, | |||||
src=storage1.objstorage, | |||||
dst=storage2.objstorage) | |||||
nb_messages = 0 | |||||
while nb_messages < queue_size: | |||||
nb_messages += replayer.process(worker_fn) | |||||
assert storage1.objstorage.state == storage2.objstorage.state |
A writer is not a consumer. You would want to use MockConsumer instead.