diff --git a/mypy.ini b/mypy.ini index 070dde4..e0fc4ad 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,36 +1,39 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-arrow.*] ignore_missing_imports = True [mypy-celery.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-elasticsearch.*] ignore_missing_imports = True [mypy-humanize.*] ignore_missing_imports = True [mypy-kombu.*] ignore_missing_imports = True [mypy-pika.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-pytest_postgresql.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt index 39bac35..d73d93e 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core[db,http] >= 0.5 swh.storage >= 0.11.1 +swh.journal diff --git a/swh/scheduler/cli/journal.py b/swh/scheduler/cli/journal.py new file mode 100644 index 0000000..f3c562c --- /dev/null +++ b/swh/scheduler/cli/journal.py @@ -0,0 +1,57 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import click + +from . import cli + + +@cli.command("journal-client") +@click.pass_context +@click.option( + "--stop-after-objects", + "-m", + default=None, + type=int, + help="Maximum number of objects to replay. Default is to run forever.", +) +def visit_stats_journal_client(ctx, stop_after_objects): + from functools import partial + + from swh.journal.client import get_journal_client + from swh.scheduler.journal_client import process_journal_objects + + if not ctx.obj["scheduler"]: + raise ValueError("Scheduler class (local/remote) must be instantiated") + + scheduler = ctx.obj["scheduler"] + config = ctx.obj["config"] + + if "journal" not in config: + raise ValueError("Missing 'journal' configuration key") + + journal_cfg = config["journal"] + journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( + "stop_after_objects" + ) + + client = get_journal_client( + cls="kafka", + object_types=["origin_visit_status"], + prefix="swh.journal.objects", + **journal_cfg, + ) + worker_fn = partial(process_journal_objects, scheduler=scheduler,) + nb_messages = 0 + try: + nb_messages = client.process(worker_fn) + print(f"Processed {nb_messages} message(s).") + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py new file mode 100644 index 0000000..ae00645 --- /dev/null +++ b/swh/scheduler/tests/test_cli_journal.py @@ -0,0 +1,114 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from typing import Dict, List + +from click.testing import CliRunner, Result +from confluent_kafka import Producer +import pytest +import yaml + +from swh.journal.serializers import value_to_kafka +from swh.scheduler import get_scheduler +from swh.scheduler.cli import cli +from swh.scheduler.tests.test_journal_client import VISIT_STATUSES1 + + +@pytest.fixture +def swh_scheduler_cfg(postgresql_scheduler, kafka_server): + """Journal client configuration ready""" + return { + "scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,}, + "journal": { + "brokers": [kafka_server], + "group_id": "test-consume-visit-status", + }, + } + + +def _write_configuration_path(config: Dict, tmp_path: str) -> str: + config_path = os.path.join(str(tmp_path), "scheduler.yml") + with open(config_path, "w") as f: + f.write(yaml.dump(config)) + return config_path + + +@pytest.fixture +def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path): + """Write scheduler configuration in temporary path and returns such path""" + return _write_configuration_path(swh_scheduler_cfg, tmp_path) + + +def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: + """Invoke swh scheduler journal subcommands + + """ + runner = CliRunner() + result = runner.invoke(cli, ["-C" + config_path] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler( + swh_scheduler_cfg, tmp_path +): + config = swh_scheduler_cfg.copy() + config["scheduler"] = {"cls": "foo"} + config_path = _write_configuration_path(config, tmp_path) + with pytest.raises(ValueError, match="must be instantiated"): + invoke( + ["journal-client", "--stop-after-objects", "1",], config_path, + ) + + +def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf( + swh_scheduler_cfg, tmp_path +): + config = swh_scheduler_cfg.copy() + config.pop("journal", None) + config_path = _write_configuration_path(config, tmp_path) + + with pytest.raises(ValueError, match="Missing 'journal'"): + invoke( + ["journal-client", "--stop-after-objects", "1",], config_path, + ) + + +def test_cli_journal_client_origin_visit_status( + swh_scheduler_cfg, swh_scheduler_cfg_path, +): + kafka_server = swh_scheduler_cfg["journal"]["brokers"][0] + swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"]) + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test visit-stats producer", + "acks": "all", + } + ) + visit_status = VISIT_STATUSES1[0] + + value = value_to_kafka(visit_status) + topic = "swh.journal.objects.origin_visit_status" + producer.produce(topic=topic, key=b"bogus-origin", value=value) + producer.flush() + + result = invoke( + ["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path, + ) + + # Check the output + expected_output = "Processed 1 message(s).\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + actual_visit_stats = swh_scheduler.origin_visit_stats_get( + visit_status["origin"], visit_status["type"] + ) + + assert actual_visit_stats is not None