diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -3,13 +3,14 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch from click.testing import CliRunner -from confluent_kafka import Consumer, Producer +from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group @@ -18,8 +19,9 @@ OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) -from swh.journal.serializers import value_to_kafka +from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes +from swh.model.model import OriginVisitStatus def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: @@ -366,6 +368,10 @@ _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) +def now(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + def test_cli_journal_client( cli_runner, swh_config, @@ -375,20 +381,62 @@ consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } + journal_writer = get_journal_writer( + "kafka", + brokers=[kafka_server], + prefix=kafka_prefix, + client_id="test producer", + value_sanitizer=lambda object_type, value: value, + flush_timeout=3, # fail early if something is going wrong ) - STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}} - producer.produce( - topic=f"{kafka_prefix}.origin_visit_status", - key=b"bogus", - value=value_to_kafka(STATUS), - ) + visit_statuses = [ + OriginVisitStatus( + origin="file:///dev/zero", + visit=1, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/foobar", + visit=2, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///tmp/spamegg", + visit=3, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/0002", + visit=6, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'partial' status + origin="file:///dev/0000", + visit=4, + date=now(), + status="partial", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'ongoing' status + origin="file:///dev/0001", + visit=5, + date=now(), + status="ongoing", + snapshot=None, + ), + ] + + journal_writer.write_additions("origin_visit_status", visit_statuses) + visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, @@ -396,14 +444,16 @@ "-C", swh_config, "journal-client", - "--stop-after-objects", - "1", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", + "--stop-after-objects", + len(visit_statuses), + "--origin-metadata-task-type", + "index-origin-metadata", ], catch_exceptions=False, ) @@ -414,9 +464,20 @@ assert result.output == expected_output # Check scheduled tasks - tasks = indexer_scheduler.search_tasks() - assert len(tasks) == 1 - _assert_tasks_for_origins(tasks, [0]) + tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") + + # This can be split into multiple tasks but no more than the origin-visit-statuses + # written in the journal + assert len(tasks) <= len(visit_statuses_full) + + actual_origins = [] + for task in tasks: + actual_task = dict(task) + assert actual_task["type"] == "index-origin-metadata" + scheduled_origins = actual_task["arguments"]["args"][0] + actual_origins.extend(scheduled_origins) + + assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers(