diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,9 @@ [mypy-celery.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-magic.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -3,4 +3,4 @@ swh.objstorage >= 0.0.43 swh.scheduler >= 0.0.47 swh.storage >= 0.0.189 -swh.journal >= 0.0.31 +swh.journal >= 0.1.0 diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ +confluent-kafka pytest pytest-postgresql hypothesis>=3.11.0 diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -9,8 +9,9 @@ from unittest.mock import patch from click.testing import CliRunner +from confluent_kafka import Consumer, Producer -from swh.journal.tests.utils import FakeKafkaMessage, MockedKafkaConsumer +from swh.journal.serializers import value_to_kafka from swh.model.hashutil import hash_to_bytes from swh.indexer.cli import cli @@ -335,32 +336,40 @@ _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) -def test_journal_client(storage, indexer_scheduler): +def test_journal_client( + storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer +): """Test the 'swh indexer journal-client' cli tool.""" - message = FakeKafkaMessage( - "swh.journal.objects.origin_visit", - "bogus", - {"status": "full", "origin": {"url": "file://dev/0000",}}, + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } ) - consumer = MockedKafkaConsumer([message]) - - with patch("swh.journal.client.Consumer", return_value=consumer): - result = invoke( - indexer_scheduler, - False, - [ - "journal-client", - "--stop-after-objects", - "1", - "--broker", - "192.0.2.1", - "--prefix", - "swh.journal.objects", - "--group-id", - "test-consumer", - ], - ) + STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}} + producer.produce( + topic=kafka_prefix + ".origin_visit", + key=b"bogus", + value=value_to_kafka(STATUS), + ) + + result = invoke( + indexer_scheduler, + False, + [ + "journal-client", + "--stop-after-objects", + "1", + "--broker", + kafka_server, + "--prefix", + kafka_prefix, + "--group-id", + "test-consumer", + ], + ) # Check the output expected_output = "Done.\n"