Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
import attr | import attr | ||||
from hypothesis import given, settings, HealthCheck | from hypothesis import given, settings, HealthCheck | ||||
from hypothesis.strategies import lists | from hypothesis.strategies import lists | ||||
from swh.model.hypothesis_strategies import object_dicts, present_contents | from swh.model.hypothesis_strategies import object_dicts, present_contents | ||||
from swh.model.model import Origin | |||||
from swh.storage import get_storage, HashCollision | from swh.storage import get_storage, HashCollision | ||||
from swh.journal.replay import process_replay_objects | from swh.journal.replay import ( | ||||
from swh.journal.replay import process_replay_objects_content | process_replay_objects, process_replay_objects_content, object_converter_fn | ||||
) | |||||
from .utils import MockedJournalClient, MockedKafkaWriter | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
storage_config = { | storage_config = { | ||||
'cls': 'pipeline', | 'cls': 'pipeline', | ||||
'steps': [ | 'steps': [ | ||||
{'cls': 'validate'}, | |||||
{'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, | {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, | ||||
] | ] | ||||
} | } | ||||
def empty_person_name_email(rev_or_rel): | def empty_person_name_email(rev_or_rel): | ||||
"""Empties the 'name' and 'email' fields of the author/committer fields | """Empties the 'name' and 'email' fields of the author/committer fields | ||||
of a revision or release; leaving only the fullname.""" | of a revision or release; leaving only the fullname.""" | ||||
Show All 25 Lines | |||||
def test_write_replay_same_order_batches(objects): | def test_write_replay_same_order_batches(objects): | ||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | ||||
return_value=MockedKafkaWriter(queue)): | return_value=MockedKafkaWriter(queue)): | ||||
storage1 = get_storage(**storage_config) | storage1 = get_storage(**storage_config) | ||||
# Write objects to storage1 | |||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | obj = obj.copy() | ||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
storage1.origin_add_one({'url': obj['origin']}) | storage1.origin_add_one(Origin(url=obj['origin'])) | ||||
storage1.origin_visit_upsert([obj]) | storage1.origin_visit_upsert([obj]) | ||||
else: | else: | ||||
if obj_type == 'content' and obj.get('status') == 'absent': | if obj_type == 'content' and obj.get('status') == 'absent': | ||||
obj_type = 'skipped_content' | obj_type = 'skipped_content' | ||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([object_converter_fn[obj_type](obj)]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
# Bail out early if we didn't insert any relevant objects... | # Bail out early if we didn't insert any relevant objects... | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | ||||
assert replayer.stop_after_objects is None | assert replayer.stop_after_objects is None | ||||
Show All 37 Lines | def test_write_replay_content(objects): | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | ||||
return_value=MockedKafkaWriter(queue)): | return_value=MockedKafkaWriter(queue)): | ||||
storage1 = get_storage(**storage_config) | storage1 = get_storage(**storage_config) | ||||
contents = [] | contents = [] | ||||
for obj in objects: | for obj in objects: | ||||
obj = obj.to_dict() | |||||
storage1.content_add([obj]) | storage1.content_add([obj]) | ||||
contents.append(obj) | contents.append(obj) | ||||
# Bail out early if we didn't insert any relevant objects... | # Bail out early if we didn't insert any relevant objects... | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | ||||
assert replayer.stop_after_objects is None | assert replayer.stop_after_objects is None | ||||
replayer.stop_after_objects = queue_size | replayer.stop_after_objects = queue_size | ||||
storage2 = get_storage(**storage_config) | storage2 = get_storage(**storage_config) | ||||
objstorage1 = storage1.objstorage.objstorage | objstorage1 = storage1.objstorage.objstorage | ||||
objstorage2 = storage2.objstorage.objstorage | objstorage2 = storage2.objstorage.objstorage | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=objstorage1, | src=objstorage1, | ||||
dst=objstorage2) | dst=objstorage2) | ||||
replayer.process(worker_fn) | replayer.process(worker_fn) | ||||
# only content with status visible will be copied in storage2 | # only content with status visible will be copied in storage2 | ||||
expected_objstorage_state = { | expected_objstorage_state = { | ||||
c['sha1']: c['data'] for c in contents if c['status'] == 'visible' | c.sha1: c.data for c in contents if c.status == 'visible' | ||||
} | } | ||||
assert expected_objstorage_state == objstorage2.state | assert expected_objstorage_state == objstorage2.state |