Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
Show All 20 Lines | |||||
FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') | FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') | ||||
FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') | FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') | ||||
class MockedDirectKafkaWriter(DirectKafkaWriter): | class MockedDirectKafkaWriter(DirectKafkaWriter): | ||||
def __init__(self): | def __init__(self): | ||||
self._prefix = 'prefix' | self._prefix = 'prefix' | ||||
self.queue = [] | |||||
self.committed = False | |||||
def send(self, topic, key, value): | |||||
class MockedJournalClient(JournalClient): | |||||
def __init__(self, object_types=ACCEPTED_OBJECT_TYPES): | |||||
self._object_types = object_types | |||||
@given(lists(object_dicts(), min_size=1)) | |||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | |||||
def test_write_replay_same_order(objects): | |||||
committed = False | |||||
queue = [] | |||||
def send(topic, key, value): | |||||
key = kafka_to_key(key_to_kafka(key)) | key = kafka_to_key(key_to_kafka(key)) | ||||
value = kafka_to_value(value_to_kafka(value)) | value = kafka_to_value(value_to_kafka(value)) | ||||
queue.append({ | partition = FakeKafkaPartition(topic) | ||||
FakeKafkaPartition(topic): | msg = FakeKafkaMessage(key=key, value=value) | ||||
[FakeKafkaMessage(key=key, value=value)] | if self.queue and {partition} == set(self.queue[-1]): | ||||
}) | # The last message is of the same object type, groupping them | ||||
self.queue[-1][partition].append(msg) | |||||
def poll(): | |||||
return queue.pop(0) | |||||
def commit(): | |||||
nonlocal committed | |||||
if queue == []: | |||||
committed = True | |||||
storage1 = Storage() | |||||
storage1.journal_writer = MockedDirectKafkaWriter() | |||||
storage1.journal_writer.send = send | |||||
for (obj_type, obj) in objects: | |||||
obj = obj.copy() | |||||
if obj_type == 'origin_visit': | |||||
origin_id = storage1.origin_add_one(obj.pop('origin')) | |||||
if 'visit' in obj: | |||||
del obj['visit'] | |||||
storage1.origin_visit_add(origin_id, **obj) | |||||
else: | else: | ||||
method = getattr(storage1, obj_type + '_add') | self.queue.append({partition: [msg]}) | ||||
try: | |||||
method([obj]) | |||||
except HashCollision: | |||||
pass | |||||
storage2 = Storage() | def poll(self): | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage2) | return self.queue.pop(0) | ||||
replayer = MockedJournalClient() | |||||
replayer.poll = poll | |||||
replayer.commit = commit | |||||
queue_size = len(queue) | |||||
nb_messages = 0 | |||||
while nb_messages < queue_size: | |||||
nb_messages += replayer.process(worker_fn) | |||||
assert nb_messages == queue_size | def commit(self): | ||||
assert committed | if self.queue == []: | ||||
self.committed = True | |||||
for attr_name in ('_contents', '_directories', '_revisions', '_releases', | |||||
'_snapshots', '_origin_visits', '_origins'): | class MockedJournalClient(JournalClient): | ||||
assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | def __init__(self, object_types=ACCEPTED_OBJECT_TYPES): | ||||
attr_name | self._object_types = object_types | ||||
self.consumer = MockedDirectKafkaWriter() | |||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(object_dicts(), min_size=1)) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_same_order_batches(objects): | def test_write_replay_same_order_batches(objects): | ||||
committed = False | replayer = MockedJournalClient() | ||||
queue = [] | |||||
def send(topic, key, value): | |||||
key = kafka_to_key(key_to_kafka(key)) | |||||
value = kafka_to_value(value_to_kafka(value)) | |||||
partition = FakeKafkaPartition(topic) | |||||
msg = FakeKafkaMessage(key=key, value=value) | |||||
if queue and {partition} == set(queue[-1]): | |||||
# The last message is of the same object type, groupping them | |||||
queue[-1][partition].append(msg) | |||||
else: | |||||
queue.append({ | |||||
FakeKafkaPartition(topic): [msg] | |||||
}) | |||||
def poll(): | |||||
return queue.pop(0) | |||||
def commit(): | |||||
nonlocal committed | |||||
if queue == []: | |||||
committed = True | |||||
storage1 = Storage() | storage1 = Storage() | ||||
storage1.journal_writer = MockedDirectKafkaWriter() | storage1.journal_writer = replayer.consumer | ||||
storage1.journal_writer.send = send | |||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
origin_id = storage1.origin_add_one(obj.pop('origin')) | origin_id = storage1.origin_add_one(obj.pop('origin')) | ||||
if 'visit' in obj: | if 'visit' in obj: | ||||
del obj['visit'] | del obj['visit'] | ||||
storage1.origin_visit_add(origin_id, **obj) | storage1.origin_visit_add(origin_id, **obj) | ||||
else: | else: | ||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
queue_size = sum(len(partition) | queue_size = sum(len(partition) | ||||
for batch in queue | for batch in replayer.consumer.queue | ||||
for partition in batch.values()) | for partition in batch.values()) | ||||
storage2 = Storage() | storage2 = Storage() | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage2) | worker_fn = functools.partial(process_replay_objects, storage=storage2) | ||||
replayer = MockedJournalClient() | |||||
replayer.poll = poll | |||||
replayer.commit = commit | |||||
nb_messages = 0 | nb_messages = 0 | ||||
while nb_messages < queue_size: | while nb_messages < queue_size: | ||||
nb_messages += replayer.process(worker_fn) | nb_messages += replayer.process(worker_fn) | ||||
assert committed | assert replayer.consumer.committed | ||||
for attr_name in ('_contents', '_directories', '_revisions', '_releases', | for attr_name in ('_contents', '_directories', '_revisions', '_releases', | ||||
'_snapshots', '_origin_visits', '_origins'): | '_snapshots', '_origin_visits', '_origins'): | ||||
assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | ||||
attr_name | attr_name | ||||
# TODO: add test for hash collision | # TODO: add test for hash collision |