Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
import attr | import attr | ||||
from hypothesis import given, settings, HealthCheck | from hypothesis import given, settings, HealthCheck | ||||
from hypothesis.strategies import lists | from hypothesis.strategies import lists | ||||
from swh.model.hypothesis_strategies import object_dicts | from swh.model.hypothesis_strategies import object_dicts, present_contents | ||||
from swh.storage import get_storage, HashCollision | from swh.storage import get_storage, HashCollision | ||||
from swh.journal.replay import process_replay_objects | from swh.journal.replay import process_replay_objects | ||||
from swh.journal.replay import process_replay_objects_content | from swh.journal.replay import process_replay_objects_content | ||||
from .utils import MockedJournalClient, MockedKafkaWriter | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | for (obj_type, obj) in objects: | ||||
if obj_type == 'content' and obj.get('status') == 'absent': | if obj_type == 'content' and obj.get('status') == 'absent': | ||||
obj_type = 'skipped_content' | obj_type = 'skipped_content' | ||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
# Bail out early if we didn't insert any relevant objects... | |||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | |||||
assert replayer.max_messages is None | assert replayer.max_messages is None | ||||
replayer.max_messages = queue_size | replayer.max_messages = queue_size | ||||
storage2 = get_storage(**storage_config) | storage2 = get_storage(**storage_config) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage2) | worker_fn = functools.partial(process_replay_objects, storage=storage2) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while nb_messages < queue_size: | while nb_messages < queue_size: | ||||
nb_messages += replayer.process(worker_fn) | nb_messages += replayer.process(worker_fn) | ||||
Show All 18 Lines | for attr_name in ('_revisions', '_releases'): | ||||
items2 = {k: empty_person_name_email(v) | items2 = {k: empty_person_name_email(v) | ||||
for (k, v) in getattr(storage2, attr_name).items()} | for (k, v) in getattr(storage2, attr_name).items()} | ||||
assert items1 == items2, attr_name | assert items1 == items2, attr_name | ||||
# TODO: add test for hash collision | # TODO: add test for hash collision | ||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(present_contents(), min_size=1)) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_content(objects): | def test_write_replay_content(objects): | ||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | ||||
return_value=MockedKafkaWriter(queue)): | return_value=MockedKafkaWriter(queue)): | ||||
storage1 = get_storage(**storage_config) | storage1 = get_storage(**storage_config) | ||||
contents = [] | contents = [] | ||||
for (obj_type, obj) in objects: | for obj in objects: | ||||
obj = obj.copy() | obj = obj.to_dict() | ||||
if obj_type == 'content': | |||||
# avoid hash collision | |||||
if not storage1.content_find(obj): | |||||
if obj.get('status') != 'absent': | |||||
storage1.content_add([obj]) | storage1.content_add([obj]) | ||||
contents.append(obj) | contents.append(obj) | ||||
# Bail out early if we didn't insert any relevant objects... | |||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | |||||
assert replayer.max_messages is None | assert replayer.max_messages is None | ||||
replayer.max_messages = queue_size | replayer.max_messages = queue_size | ||||
storage2 = get_storage(**storage_config) | storage2 = get_storage(**storage_config) | ||||
objstorage1 = storage1.objstorage.objstorage | objstorage1 = storage1.objstorage.objstorage | ||||
objstorage2 = storage2.objstorage.objstorage | objstorage2 = storage2.objstorage.objstorage | ||||
Show All 13 Lines |