Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342039
D4876.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D4876.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,6 +11,9 @@
[mypy-celery.*]
ignore_missing_imports = True
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-elasticsearch.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,3 @@
swh.core[db,http] >= 0.5
swh.storage >= 0.11.1
+swh.journal
diff --git a/swh/scheduler/cli/journal.py b/swh/scheduler/cli/journal.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/cli/journal.py
@@ -0,0 +1,57 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+import click
+
+from . import cli
+
+
+@cli.command("journal-client")
+@click.pass_context
+@click.option(
+ "--stop-after-objects",
+ "-m",
+ default=None,
+ type=int,
+ help="Maximum number of objects to replay. Default is to run forever.",
+)
+def visit_stats_journal_client(ctx, stop_after_objects):
+ from functools import partial
+
+ from swh.journal.client import get_journal_client
+ from swh.scheduler.journal_client import process_journal_objects
+
+ if not ctx.obj["scheduler"]:
+ raise ValueError("Scheduler class (local/remote) must be instantiated")
+
+ scheduler = ctx.obj["scheduler"]
+ config = ctx.obj["config"]
+
+ if "journal" not in config:
+ raise ValueError("Missing 'journal' configuration key")
+
+ journal_cfg = config["journal"]
+ journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get(
+ "stop_after_objects"
+ )
+
+ client = get_journal_client(
+ cls="kafka",
+ object_types=["origin_visit_status"],
+ prefix="swh.journal.objects",
+ **journal_cfg,
+ )
+ worker_fn = partial(process_journal_objects, scheduler=scheduler,)
+ nb_messages = 0
+ try:
+ nb_messages = client.process(worker_fn)
+ print(f"Processed {nb_messages} message(s).")
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ client.close()
diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_journal.py
@@ -0,0 +1,114 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+from typing import Dict, List
+
+from click.testing import CliRunner, Result
+from confluent_kafka import Producer
+import pytest
+import yaml
+
+from swh.journal.serializers import value_to_kafka
+from swh.scheduler import get_scheduler
+from swh.scheduler.cli import cli
+from swh.scheduler.tests.test_journal_client import VISIT_STATUSES1
+
+
+@pytest.fixture
+def swh_scheduler_cfg(postgresql_scheduler, kafka_server):
+ """Journal client configuration ready"""
+ return {
+ "scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,},
+ "journal": {
+ "brokers": [kafka_server],
+ "group_id": "test-consume-visit-status",
+ },
+ }
+
+
+def _write_configuration_path(config: Dict, tmp_path: str) -> str:
+ config_path = os.path.join(str(tmp_path), "scheduler.yml")
+ with open(config_path, "w") as f:
+ f.write(yaml.dump(config))
+ return config_path
+
+
+@pytest.fixture
+def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path):
+ """Write scheduler configuration in temporary path and returns such path"""
+ return _write_configuration_path(swh_scheduler_cfg, tmp_path)
+
+
+def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result:
+ """Invoke swh scheduler journal subcommands
+
+ """
+ runner = CliRunner()
+ result = runner.invoke(cli, ["-C" + config_path] + args)
+ if not catch_exceptions and result.exception:
+ print(result.output)
+ raise result.exception
+ return result
+
+
+def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler(
+ swh_scheduler_cfg, tmp_path
+):
+ config = swh_scheduler_cfg.copy()
+ config["scheduler"] = {"cls": "foo"}
+ config_path = _write_configuration_path(config, tmp_path)
+ with pytest.raises(ValueError, match="must be instantiated"):
+ invoke(
+ ["journal-client", "--stop-after-objects", "1",], config_path,
+ )
+
+
+def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf(
+ swh_scheduler_cfg, tmp_path
+):
+ config = swh_scheduler_cfg.copy()
+ config.pop("journal", None)
+ config_path = _write_configuration_path(config, tmp_path)
+
+ with pytest.raises(ValueError, match="Missing 'journal'"):
+ invoke(
+ ["journal-client", "--stop-after-objects", "1",], config_path,
+ )
+
+
+def test_cli_journal_client_origin_visit_status(
+ swh_scheduler_cfg, swh_scheduler_cfg_path,
+):
+ kafka_server = swh_scheduler_cfg["journal"]["brokers"][0]
+ swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"])
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test visit-stats producer",
+ "acks": "all",
+ }
+ )
+ visit_status = VISIT_STATUSES1[0]
+
+ value = value_to_kafka(visit_status)
+ topic = "swh.journal.objects.origin_visit_status"
+ producer.produce(topic=topic, key=b"bogus-origin", value=value)
+ producer.flush()
+
+ result = invoke(
+ ["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 message(s).\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+
+ actual_visit_stats = swh_scheduler.origin_visit_stats_get(
+ visit_status["origin"], visit_status["type"]
+ )
+
+ assert actual_visit_stats is not None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:26 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217100
Attached To
D4876: scheduler.cli.journal: Add `swh scheduler journal visit-stats` cli
Event Timeline
Log In to Comment