diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from functools import reduce import re from typing import Any, Dict, List @@ -18,8 +19,9 @@ OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) -from swh.journal.serializers import value_to_kafka +from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.hashutil import hash_to_bytes +from swh.model.model import OriginVisitStatus def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: @@ -366,6 +368,10 @@ _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) +def now(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + def test_cli_journal_client( cli_runner, swh_config, @@ -383,12 +389,63 @@ } ) - STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}} - producer.produce( - topic=f"{kafka_prefix}.origin_visit_status", - key=b"bogus", - value=value_to_kafka(STATUS), - ) + visit_statuses = [ + OriginVisitStatus( + origin="file:///dev/zero", + visit=1, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/foobar", + visit=2, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///tmp/spamegg", + visit=3, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'partial' status + origin="file:///dev/0000", + visit=4, + date=now(), + status="partial", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'ongoing' status + origin="file:///dev/0001", + visit=5, + date=now(), + status="ongoing", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/0002", + visit=6, + date=now(), + status="full", + snapshot=None, + ), + ] + + topic = f"{kafka_prefix}.origin_visit_status" + for visit_status in visit_statuses: + visit_status_d = visit_status.to_dict() + visit_status_d.pop("date") # to avoid serialization issue about datetime + + key = key_to_kafka(visit_status_d) + value = value_to_kafka(visit_status.to_dict()) + print(f"key: {key!r}, value: {value!r}") + result = producer.produce(topic=topic, key=key, value=value) + print(f"result: {result}") + + producer.flush() result = cli_runner.invoke( indexer_cli_group, @@ -396,14 +453,16 @@ "-C", swh_config, "journal-client", - "--stop-after-objects", - "1", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", + "--stop-after-objects", + 3, + "--origin-metadata-task-type", + "index-origin-metadata", ], catch_exceptions=False, ) @@ -414,9 +473,12 @@ assert result.output == expected_output # Check scheduled tasks - tasks = indexer_scheduler.search_tasks() - assert len(tasks) == 1 - _assert_tasks_for_origins(tasks, [0]) + tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") + # more than 3 visit statuses exists + assert len([vs for vs in visit_statuses if vs.status == "full"]) > 3 + # so we should have 3 visit status scheduled (we stopped at stop-after-objects to 3) + assert len(tasks) == 3 + # _assert_tasks_for_origins(tasks, [0]) def test_cli_journal_client_without_brokers(