diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,9 +1,9 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Iterator +from typing import Callable, Dict, Iterator, List, Optional # WARNING: do not import unnecessary things here to keep cli startup time under # control @@ -213,6 +213,12 @@ @indexer_cli_group.command("journal-client") +@click.argument( + "indexer", + type=click.Choice(["origin-intrinsic-metadata"]), + required=False + # TODO: remove required=False after we stop using it +) @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", @@ -236,18 +242,21 @@ @click.pass_context def journal_client( ctx, - scheduler_url, - origin_metadata_task_type, - brokers, - prefix, - group_id, - stop_after_objects, + indexer: Optional[str], + scheduler_url: str, + origin_metadata_task_type: str, + brokers: List[str], + prefix: str, + group_id: str, + stop_after_objects: Optional[int], ): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects.""" import functools + import warnings + from swh.indexer.indexer import ObjectsDict from swh.indexer.journal_client import process_journal_objects from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler @@ -268,22 +277,41 @@ ) stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") + worker_fn: Callable[[ObjectsDict], Dict] + + if indexer is None: + warnings.warn( + "'swh indexer journal-client' with no argument creates scheduler tasks " + "to index, rather than index directly.", + DeprecationWarning, + ) + object_types = ["origin_visit_status"] + worker_fn = functools.partial( + process_journal_objects, + scheduler=scheduler, + task_names={ + "origin_metadata": origin_metadata_task_type, + }, + ) + elif indexer == "origin-intrinsic-metadata": + from swh.indexer.metadata import OriginMetadataIndexer + + object_types = ["origin_visit_status"] + idx = OriginMetadataIndexer() + idx.catch_exceptions = False # don't commit offsets if indexation failed + worker_fn = idx.process_journal_objects + else: + raise click.ClickException(f"Unknown indexer: {indexer}") + client = get_journal_client( cls="kafka", brokers=brokers, prefix=prefix, group_id=group_id, - object_types=["origin_visit_status"], + object_types=object_types, stop_after_objects=stop_after_objects, ) - worker_fn = functools.partial( - process_journal_objects, - scheduler=scheduler, - task_names={ - "origin_metadata": origin_metadata_task_type, - }, - ) try: client.process(worker_fn) except KeyboardInterrupt: diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -541,13 +541,25 @@ def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``. Expects ``objects`` to have a single key, either ``origin`` or ``"origin_visit_status"``.""" - assert set(objects) == {"origin"} + # TODO: add support for subscribing to other topics? Currently, this is + # not implemented because no indexer would use it. + assert set(objects) <= {"origin", "origin_visit_status"} - origins = [Origin(url=origin["url"]) for origin in objects.get("origin", [])] + origins = [ + Origin(url=status["origin"]) + for status in objects.get("origin_visit_status", []) + if status["status"] == "full" + ] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] summary: Dict[str, Any] = {"status": "uneventful"} try: - results = self.index_list(origins) + results = self.index_list( + origins, + check_origin_known=False, + # no need to check they exist, as we just received either an origin or + # visit status; which cannot be created by swh-storage unless the origin + # already exists + ) except Exception: if not self.catch_exceptions: raise diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -325,19 +325,22 @@ self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list( - self, origins: List[Origin], **kwargs + self, origins: List[Origin], check_origin_known: bool = True, **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]]: head_rev_ids = [] origins_with_head = [] # Filter out origins not in the storage - known_origins = list( - call_with_batches( - self.storage.origin_get, - [origin.url for origin in origins], - ORIGIN_GET_BATCH_SIZE, + if check_origin_known: + known_origins = list( + call_with_batches( + self.storage.origin_get, + [origin.url for origin in origins], + ORIGIN_GET_BATCH_SIZE, + ) ) - ) + else: + known_origins = list(origins) for origin in known_origins: if origin is None: diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -23,6 +23,8 @@ from swh.model.hashutil import hash_to_bytes from swh.model.model import OriginVisitStatus +from .utils import REVISION + def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ @@ -400,7 +402,7 @@ return datetime.datetime.now(tz=datetime.timezone.utc) -def test_cli_journal_client( +def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, @@ -523,3 +525,129 @@ ], catch_exceptions=False, ) + + +def test_cli_journal_client_index( + cli_runner, + swh_config, + kafka_prefix: str, + kafka_server, + consumer: Consumer, + idx_storage, + storage, + mocker, + swh_indexer_config, +): + """Test the 'swh indexer journal-client' cli tool.""" + journal_writer = get_journal_writer( + "kafka", + brokers=[kafka_server], + prefix=kafka_prefix, + client_id="test producer", + value_sanitizer=lambda object_type, value: value, + flush_timeout=3, # fail early if something is going wrong + ) + + visit_statuses = [ + OriginVisitStatus( + origin="file:///dev/zero", + visit=1, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/foobar", + visit=2, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///tmp/spamegg", + visit=3, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( + origin="file:///dev/0002", + visit=6, + date=now(), + status="full", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'partial' status + origin="file:///dev/0000", + visit=4, + date=now(), + status="partial", + snapshot=None, + ), + OriginVisitStatus( # will be filtered out due to its 'ongoing' status + origin="file:///dev/0001", + visit=5, + date=now(), + status="ongoing", + snapshot=None, + ), + ] + + journal_writer.write_additions("origin_visit_status", visit_statuses) + visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] + storage.revision_add([REVISION]) + + mocker.patch( + "swh.indexer.origin_head.OriginHeadIndexer.index", + return_value=[{"revision_id": REVISION.id}], + ) + + mocker.patch( + "swh.indexer.metadata.RevisionMetadataIndexer.index", + return_value=[ + RevisionIntrinsicMetadataRow( + id=REVISION.id, + indexer_configuration_id=1, + mappings=["cff"], + metadata={"foo": "bar"}, + ) + ], + ) + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "journal-client", + "origin-intrinsic-metadata", + "--broker", + kafka_server, + "--prefix", + kafka_prefix, + "--group-id", + "test-consumer", + "--stop-after-objects", + len(visit_statuses), + ], + catch_exceptions=False, + ) + + # Check the output + expected_output = "Done.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + results = idx_storage.origin_intrinsic_metadata_get( + [status.origin for status in visit_statuses] + ) + expected_results = [ + OriginIntrinsicMetadataRow( + id=status.origin, + from_revision=REVISION.id, + tool={"id": 1, **swh_indexer_config["tools"]}, + mappings=["cff"], + metadata={"foo": "bar"}, + ) + for status in sorted(visit_statuses_full, key=lambda r: r.origin) + ] + assert sorted(results, key=lambda r: r.id) == expected_results diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py --- a/swh/indexer/tests/test_indexer.py +++ b/swh/indexer/tests/test_indexer.py @@ -93,22 +93,36 @@ assert indexer.run([b"foo"]) == {"status": "failed"} + assert indexer.process_journal_objects({"revision": [REVISION.to_dict()]}) == { + "status": "failed" + } + indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run([b"foo"]) + with pytest.raises(_TestException): + indexer.process_journal_objects({"revision": [REVISION.to_dict()]}) + def test_origin_indexer_catch_exceptions(): indexer = CrashingOriginIndexer(config=BASE_TEST_CONFIG) assert indexer.run(["http://example.org"]) == {"status": "failed"} + assert indexer.process_journal_objects( + {"origin": [{"url": "http://example.org"}]} + ) == {"status": "failed"} + indexer.catch_exceptions = False with pytest.raises(_TestException): indexer.run(["http://example.org"]) + with pytest.raises(_TestException): + indexer.process_journal_objects({"origin": [{"url": "http://example.org"}]}) + def test_content_partition_indexer_catch_exceptions(): indexer = CrashingContentPartitionIndexer(