Page MenuHomeSoftware Heritage

D3144.id11166.diff
No OneTemporary

D3144.id11166.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -8,6 +8,9 @@
[mypy-celery.*]
ignore_missing_imports = True
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-magic.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -3,4 +3,4 @@
swh.objstorage >= 0.0.43
swh.scheduler >= 0.0.47
swh.storage >= 0.0.189
-swh.journal >= 0.0.31
+swh.journal >= 0.1.0
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,3 +1,4 @@
+confluent-kafka
pytest
pytest-postgresql
hypothesis>=3.11.0
diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py
--- a/swh/indexer/tests/test_cli.py
+++ b/swh/indexer/tests/test_cli.py
@@ -9,8 +9,9 @@
from unittest.mock import patch
from click.testing import CliRunner
+from confluent_kafka import Consumer, Producer
-from swh.journal.tests.utils import FakeKafkaMessage, MockedKafkaConsumer
+from swh.journal.serializers import value_to_kafka
from swh.model.hashutil import hash_to_bytes
from swh.indexer.cli import cli
@@ -335,32 +336,40 @@
_assert_tasks_for_origins(tasks, [x * 2 for x in range(55)])
-def test_journal_client(storage, indexer_scheduler):
+def test_journal_client(
+ storage, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer
+):
"""Test the 'swh indexer journal-client' cli tool."""
- message = FakeKafkaMessage(
- "swh.journal.objects.origin_visit",
- "bogus",
- {"status": "full", "origin": {"url": "file://dev/0000",}},
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test producer",
+ "acks": "all",
+ }
)
- consumer = MockedKafkaConsumer([message])
-
- with patch("swh.journal.client.Consumer", return_value=consumer):
- result = invoke(
- indexer_scheduler,
- False,
- [
- "journal-client",
- "--stop-after-objects",
- "1",
- "--broker",
- "192.0.2.1",
- "--prefix",
- "swh.journal.objects",
- "--group-id",
- "test-consumer",
- ],
- )
+ STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}}
+ producer.produce(
+ topic=kafka_prefix + ".origin_visit",
+ key=b"bogus",
+ value=value_to_kafka(STATUS),
+ )
+
+ result = invoke(
+ indexer_scheduler,
+ False,
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--broker",
+ kafka_server,
+ "--prefix",
+ kafka_prefix,
+ "--group-id",
+ "test-consumer",
+ ],
+ )
# Check the output
expected_output = "Done.\n"

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 10:30 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219232

Event Timeline