diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -16,7 +16,7 @@ import pytest from swh.objstorage.backends.in_memory import InMemoryObjStorage -from swh.storage.in_memory import InMemoryStorage +from swh.storage import get_storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka @@ -42,9 +42,15 @@ @pytest.fixture def storage(): - """An instance of swh.storage.in_memory.InMemoryStorage that gets injected - into the CLI functions.""" - storage = InMemoryStorage() + """An swh-storage object that gets injected into the CLI functions.""" + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory'}, + ] + } + storage = get_storage(**storage_config) with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage @@ -64,7 +70,7 @@ def test_replay( - storage: InMemoryStorage, + storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server @@ -154,7 +160,7 @@ @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, - storage: InMemoryStorage, + storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server @@ -182,7 +188,7 @@ @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages, - storage: InMemoryStorage, + storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -104,7 +104,8 @@ consumer: Consumer): kafka_prefix += '.swh.journal.objects' - config = { + writer_config = { + 'cls': 'kafka', 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, @@ -112,10 +113,15 @@ 'message.max.bytes': 100000000, } } + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory', 'journal_writer': writer_config}, + ] + } - storage = get_storage('memory', journal_writer={ - 'cls': 'kafka', **config, - }) + storage = get_storage(**storage_config) expected_messages = 0 diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -24,13 +24,22 @@ from .utils import MockedJournalClient, MockedKafkaWriter +storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory'}, + ] +} + + def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' - storage = get_storage('memory') + storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), @@ -140,7 +149,7 @@ assert replayer.max_messages == 0 replayer.max_messages = queue_size - storage = get_storage('memory') + storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_messages = 0 while nb_messages < queue_size: diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -4,14 +4,14 @@ # See top-level LICENSE file for more information import functools +from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts -from swh.storage.in_memory import InMemoryStorage -from swh.storage import HashCollision +from swh.storage import get_storage, HashCollision from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content @@ -19,6 +19,15 @@ from .utils import MockedJournalClient, MockedKafkaWriter +storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, + ] +} + + def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" @@ -51,8 +60,9 @@ queue = [] replayer = MockedJournalClient(queue) - storage1 = InMemoryStorage() - storage1.journal_writer = MockedKafkaWriter(queue) + with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', + return_value=MockedKafkaWriter(queue)): + storage1 = get_storage(**storage_config) for (obj_type, obj) in objects: obj = obj.copy() @@ -72,7 +82,7 @@ assert replayer.max_messages == 0 replayer.max_messages = queue_size - storage2 = InMemoryStorage() + storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) nb_messages = 0 while nb_messages < queue_size: @@ -110,8 +120,9 @@ queue = [] replayer = MockedJournalClient(queue) - storage1 = InMemoryStorage() - storage1.journal_writer = MockedKafkaWriter(queue) + with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', + return_value=MockedKafkaWriter(queue)): + storage1 = get_storage(**storage_config) contents = [] for (obj_type, obj) in objects: @@ -127,7 +138,7 @@ assert replayer.max_messages == 0 replayer.max_messages = queue_size - storage2 = InMemoryStorage() + storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects_content, src=storage1.objstorage, dst=storage2.objstorage)