Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from hypothesis import given, settings, HealthCheck | from hypothesis import given, settings, HealthCheck | ||||
from hypothesis.strategies import lists, one_of, composite | from hypothesis.strategies import lists | ||||
from swh.model.hashutil import MultiHash | from swh.model.hypothesis_strategies import object_dicts | ||||
from swh.storage.in_memory import Storage | from swh.storage.in_memory import Storage | ||||
from swh.storage.tests.algos.test_snapshot import snapshots, origins | from swh.storage import HashCollision | ||||
from swh.storage.tests.generate_data_test import gen_raw_content | |||||
from swh.journal.serializers import ( | from swh.journal.serializers import ( | ||||
key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) | key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) | ||||
from swh.journal.direct_writer import DirectKafkaWriter | from swh.journal.direct_writer import DirectKafkaWriter | ||||
from swh.journal.replay import StorageReplayer, OBJECT_TYPES | from swh.journal.replay import StorageReplayer, OBJECT_TYPES | ||||
FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'topic key value') | FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'topic key value') | ||||
class MockedDirectKafkaWriter(DirectKafkaWriter): | class MockedDirectKafkaWriter(DirectKafkaWriter): | ||||
def __init__(self): | def __init__(self): | ||||
self._prefix = 'prefix' | self._prefix = 'prefix' | ||||
class MockedStorageReplayer(StorageReplayer): | class MockedStorageReplayer(StorageReplayer): | ||||
def __init__(self, object_types=OBJECT_TYPES): | def __init__(self, object_types=OBJECT_TYPES): | ||||
self._object_types = object_types | self._object_types = object_types | ||||
@composite | @given(lists(object_dicts())) | ||||
def contents(draw): | |||||
"""Generate valid and consistent content. | |||||
Context: Test purposes | |||||
Args: | |||||
**draw**: Used by hypothesis to generate data | |||||
Returns: | |||||
dict representing a content. | |||||
""" | |||||
raw_content = draw(gen_raw_content()) | |||||
return { | |||||
'data': raw_content, | |||||
'length': len(raw_content), | |||||
'status': 'visible', | |||||
**MultiHash.from_data(raw_content).digest() | |||||
} | |||||
objects = lists(one_of( | |||||
origins().map(lambda x: ('origin', x)), | |||||
snapshots().map(lambda x: ('snapshot', x)), | |||||
contents().map(lambda x: ('content', x)), | |||||
)) | |||||
@given(objects) | |||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_same_order(objects): | def test_write_replay_same_order(objects): | ||||
queue = [] | queue = [] | ||||
def send(topic, key, value): | def send(topic, key, value): | ||||
key = kafka_to_key(key_to_kafka(key)) | key = kafka_to_key(key_to_kafka(key)) | ||||
value = kafka_to_value(value_to_kafka(value)) | value = kafka_to_value(value_to_kafka(value)) | ||||
queue.append(FakeKafkaMessage(topic=topic, key=key, value=value)) | queue.append(FakeKafkaMessage(topic=topic, key=key, value=value)) | ||||
def poll(): | def poll(): | ||||
yield from queue | yield from queue | ||||
storage1 = Storage() | storage1 = Storage() | ||||
storage1.journal_writer = MockedDirectKafkaWriter() | storage1.journal_writer = MockedDirectKafkaWriter() | ||||
storage1.journal_writer.send = send | storage1.journal_writer.send = send | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | |||||
if obj_type == 'origin_visit': | |||||
origin_id = storage1.origin_add_one(obj.pop('origin')) | |||||
if 'visit' in obj: | |||||
del obj['visit'] | |||||
storage1.origin_visit_add(origin_id, **obj) | |||||
else: | |||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | |||||
method([obj]) | method([obj]) | ||||
except HashCollision: | |||||
pass | |||||
storage2 = Storage() | storage2 = Storage() | ||||
replayer = MockedStorageReplayer() | replayer = MockedStorageReplayer() | ||||
replayer.poll = poll | replayer.poll = poll | ||||
replayer.fill(storage2) | replayer.fill(storage2) | ||||
for attr in ('_contents', '_directories', '_revisions', '_releases', | for attr_name in ('_contents', '_directories', '_revisions', '_releases', | ||||
'_snapshots', '_origin_visits', '_origins'): | '_snapshots', '_origin_visits', '_origins'): | ||||
assert getattr(storage1, attr) == getattr(storage2, attr), attr | assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ | ||||
attr_name | |||||
# TODO: add test for hash collision |