Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_write_replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
from unittest.mock import patch | from unittest.mock import patch | ||||
import attr | import attr | ||||
from hypothesis import given, settings, HealthCheck | from hypothesis import given, settings, HealthCheck | ||||
from hypothesis.strategies import lists | from hypothesis.strategies import lists | ||||
from swh.model.hypothesis_strategies import object_dicts, present_contents | from swh.model.hypothesis_strategies import ( | ||||
object_dicts, present_contents | |||||
) | |||||
from swh.model.model import Origin | from swh.model.model import Origin | ||||
from swh.storage import get_storage, HashCollision | from swh.storage import get_storage, HashCollision | ||||
from swh.journal.replay import ( | from swh.journal.replay import ( | ||||
process_replay_objects, process_replay_objects_content, object_converter_fn | process_replay_objects, process_replay_objects_content, object_converter_fn | ||||
) | ) | ||||
from .utils import MockedJournalClient, MockedKafkaWriter | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
storage_config = { | storage_config = { | ||||
'cls': 'pipeline', | 'cls': 'memory', | ||||
'steps': [ | 'journal_writer': {'cls': 'memory'}, | ||||
{'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, | |||||
] | |||||
} | } | ||||
def empty_person_name_email(rev_or_rel): | def empty_person_name_email(rev_or_rel): | ||||
"""Empties the 'name' and 'email' fields of the author/committer fields | """Empties the 'name' and 'email' fields of the author/committer fields | ||||
of a revision or release; leaving only the fullname.""" | of a revision or release; leaving only the fullname.""" | ||||
if getattr(rev_or_rel, 'author', None): | if getattr(rev_or_rel, 'author', None): | ||||
rev_or_rel = attr.evolve( | rev_or_rel = attr.evolve( | ||||
Show All 13 Lines | if getattr(rev_or_rel, 'committer', None): | ||||
name=b'', | name=b'', | ||||
email=b'', | email=b'', | ||||
) | ) | ||||
) | ) | ||||
return rev_or_rel | return rev_or_rel | ||||
@given(lists(object_dicts(), min_size=1)) | @given(lists(object_dicts(), min_size=1)) | ||||
ardumont: I'd be great to be able to use the `objects()` generator here.
But we can't as it's generated… | |||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
def test_write_replay_same_order_batches(objects): | def test_write_replay_same_order_batches(objects): | ||||
queue = [] | queue = [] | ||||
replayer = MockedJournalClient(queue) | replayer = MockedJournalClient(queue) | ||||
with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', | ||||
return_value=MockedKafkaWriter(queue)): | return_value=MockedKafkaWriter(queue)): | ||||
storage1 = get_storage(**storage_config) | storage1 = get_storage(**storage_config) | ||||
# Write objects to storage1 | # Write objects to storage1 | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.copy() | if obj_type == 'content' and obj.get('status') == 'absent': | ||||
obj_type = 'skipped_content' | |||||
obj = object_converter_fn[obj_type](obj) | |||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
storage1.origin_add_one(Origin(url=obj['origin'])) | storage1.origin_add_one(Origin(url=obj.origin)) | ||||
storage1.origin_visit_upsert([obj]) | storage1.origin_visit_upsert([obj]) | ||||
else: | else: | ||||
if obj_type == 'content' and obj.get('status') == 'absent': | |||||
obj_type = 'skipped_content' | |||||
method = getattr(storage1, obj_type + '_add') | method = getattr(storage1, obj_type + '_add') | ||||
try: | try: | ||||
method([object_converter_fn[obj_type](obj)]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
# Bail out early if we didn't insert any relevant objects... | # Bail out early if we didn't insert any relevant objects... | ||||
queue_size = len(queue) | queue_size = len(queue) | ||||
assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | assert queue_size != 0, "No test objects found; hypothesis strategy bug?" | ||||
assert replayer.stop_after_objects is None | assert replayer.stop_after_objects is None | ||||
▲ Show 20 Lines • Show All 72 Lines • Show Last 20 Lines |
I'd be great to be able to use the objects() generator here.
But we can't as it's generated without consistency with the other types yet (e.g. Origin and OriginVisit for one)
So that makes the test fail.