Page MenuHomeSoftware Heritage

D4876.diff
No OneTemporary

D4876.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,6 +11,9 @@
[mypy-celery.*]
ignore_missing_imports = True
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-elasticsearch.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,3 @@
swh.core[db,http] >= 0.5
swh.storage >= 0.11.1
+swh.journal
diff --git a/swh/scheduler/cli/journal.py b/swh/scheduler/cli/journal.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/journal.py
@@ -0,0 +1,57 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+import click
+
+from . import cli
+
+
+@cli.command("journal-client")
+@click.pass_context
+@click.option(
+ "--stop-after-objects",
+ "-m",
+ default=None,
+ type=int,
+ help="Maximum number of objects to replay. Default is to run forever.",
+)
+def visit_stats_journal_client(ctx, stop_after_objects):
+ from functools import partial
+
+ from swh.journal.client import get_journal_client
+ from swh.scheduler.journal_client import process_journal_objects
+
+ if not ctx.obj["scheduler"]:
+ raise ValueError("Scheduler class (local/remote) must be instantiated")
+
+ scheduler = ctx.obj["scheduler"]
+ config = ctx.obj["config"]
+
+ if "journal" not in config:
+ raise ValueError("Missing 'journal' configuration key")
+
+ journal_cfg = config["journal"]
+ journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get(
+ "stop_after_objects"
+ )
+
+ client = get_journal_client(
+ cls="kafka",
+ object_types=["origin_visit_status"],
+ prefix="swh.journal.objects",
+ **journal_cfg,
+ )
+ worker_fn = partial(process_journal_objects, scheduler=scheduler,)
+ nb_messages = 0
+ try:
+ nb_messages = client.process(worker_fn)
+ print(f"Processed {nb_messages} message(s).")
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ client.close()
diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_journal.py
@@ -0,0 +1,114 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+from typing import Dict, List
+
+from click.testing import CliRunner, Result
+from confluent_kafka import Producer
+import pytest
+import yaml
+
+from swh.journal.serializers import value_to_kafka
+from swh.scheduler import get_scheduler
+from swh.scheduler.cli import cli
+from swh.scheduler.tests.test_journal_client import VISIT_STATUSES1
+
+
+@pytest.fixture
+def swh_scheduler_cfg(postgresql_scheduler, kafka_server):
+ """Journal client configuration ready"""
+ return {
+ "scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,},
+ "journal": {
+ "brokers": [kafka_server],
+ "group_id": "test-consume-visit-status",
+ },
+ }
+
+
+def _write_configuration_path(config: Dict, tmp_path: str) -> str:
+ config_path = os.path.join(str(tmp_path), "scheduler.yml")
+ with open(config_path, "w") as f:
+ f.write(yaml.dump(config))
+ return config_path
+
+
+@pytest.fixture
+def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path):
+ """Write scheduler configuration in temporary path and returns such path"""
+ return _write_configuration_path(swh_scheduler_cfg, tmp_path)
+
+
+def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result:
+ """Invoke swh scheduler journal subcommands
+
+ """
+ runner = CliRunner()
+ result = runner.invoke(cli, ["-C" + config_path] + args)
+ if not catch_exceptions and result.exception:
+ print(result.output)
+ raise result.exception
+ return result
+
+
+def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler(
+ swh_scheduler_cfg, tmp_path
+):
+ config = swh_scheduler_cfg.copy()
+ config["scheduler"] = {"cls": "foo"}
+ config_path = _write_configuration_path(config, tmp_path)
+ with pytest.raises(ValueError, match="must be instantiated"):
+ invoke(
+ ["journal-client", "--stop-after-objects", "1",], config_path,
+ )
+
+
+def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf(
+ swh_scheduler_cfg, tmp_path
+):
+ config = swh_scheduler_cfg.copy()
+ config.pop("journal", None)
+ config_path = _write_configuration_path(config, tmp_path)
+
+ with pytest.raises(ValueError, match="Missing 'journal'"):
+ invoke(
+ ["journal-client", "--stop-after-objects", "1",], config_path,
+ )
+
+
+def test_cli_journal_client_origin_visit_status(
+ swh_scheduler_cfg, swh_scheduler_cfg_path,
+):
+ kafka_server = swh_scheduler_cfg["journal"]["brokers"][0]
+ swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"])
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test visit-stats producer",
+ "acks": "all",
+ }
+ )
+ visit_status = VISIT_STATUSES1[0]
+
+ value = value_to_kafka(visit_status)
+ topic = "swh.journal.objects.origin_visit_status"
+ producer.produce(topic=topic, key=b"bogus-origin", value=value)
+ producer.flush()
+
+ result = invoke(
+ ["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 message(s).\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ actual_visit_stats = swh_scheduler.origin_visit_stats_get(
+ visit_status["origin"], visit_status["type"]
+ )
+
+ assert actual_visit_stats is not None

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:26 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217100

Event Timeline