diff --git a/swh/objstorage/replayer/tests/test_replay.py b/swh/objstorage/replayer/tests/test_replay.py --- a/swh/objstorage/replayer/tests/test_replay.py +++ b/swh/objstorage/replayer/tests/test_replay.py @@ -5,16 +5,23 @@ import functools -from hypothesis import given, settings, HealthCheck -from hypothesis.strategies import lists, sets +from hypothesis import given, settings +from hypothesis.strategies import sets -from swh.model.hypothesis_strategies import present_contents, sha1 -from swh.objstorage import get_objstorage +from swh.model.model import Content +from swh.model.hypothesis_strategies import sha1 +from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) -from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter +from swh.journal.writer.kafka import KafkaJournalWriter +from swh.journal.client import JournalClient + + +CONTENTS = [Content.from_data(f"foo{i}".encode()) for i in range(10)] + [ + Content.from_data(f"forbidden foo{i}".encode(), status="hidden") for i in range(10) +] @settings(max_examples=500) @@ -30,39 +37,36 @@ ) -@given(lists(present_contents(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_replay_content(objects): - - queue = [] - replayer = MockedJournalClient(queue) - writer = MockedKafkaWriter(queue) - +def test_replay_content(kafka_server, kafka_prefix, kafka_consumer_group): objstorage1 = get_objstorage(cls="memory", args={}) objstorage2 = get_objstorage(cls="memory", args={}) - contents = [] - for obj in objects: - objstorage1.add(obj.data) - contents.append(obj) - writer.write_addition("content", obj) + writer = KafkaJournalWriter( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, + ) - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + for content in CONTENTS: + objstorage1.add(content.data) + writer.write_addition("content", content) - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + # stop_after_objects=len(objects), + ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage1, dst=objstorage2 ) - replayer.process(worker_fn) - # only content with status visible will be copied in storage2 expected_objstorage_state = { - c.sha1: c.data for c in contents if c.status == "visible" + c.sha1: c.data for c in CONTENTS if c.status == "visible" } assert expected_objstorage_state == objstorage2.state