Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337420
D5229.id18761.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D5229.id18761.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,6 +5,9 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,3 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin
+swh.journal
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1 +1,3 @@
pytest
+pytest-mock
+confluent-kafka
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -52,10 +52,10 @@
use_scm_version=True,
extras_require={"testing": parse_requirements("test")},
include_package_data=True,
- # entry_points="""
- # [swh.cli.subcommands]
- # <cli-name>=swh.<module>.cli
- # """,
+ entry_points="""
+ [swh.cli.subcommands]
+ counters=swh.counters.cli
+ """,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
diff --git a/swh/__init__.py b/swh/__init__.py
--- a/swh/__init__.py
+++ b/swh/__init__.py
@@ -1,3 +1,8 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
from pkgutil import extend_path
from typing import Iterable
diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py
--- a/swh/counters/__init__.py
+++ b/swh/counters/__init__.py
@@ -0,0 +1,44 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+import importlib
+from typing import TYPE_CHECKING, Any, Dict
+
+if TYPE_CHECKING:
+ from swh.counters.interface import CountersInterface
+
+COUNTERS_IMPLEMENTATIONS = {
+ "redis": ".redis.Redis",
+ "remote": ".api.client.RemoteCounters",
+}
+
+
+def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface:
+ """Get an counters object of class `cls` with arguments `args`.
+
+ Args:
+ cls: counters's class, either 'local' or 'remote'
+ args: dictionary of arguments passed to the
+ counters class constructor
+
+ Returns:
+ an instance of swh.counters's classes (either local or remote)
+
+ Raises:
+ ValueError if passed an unknown counters class.
+ """
+ class_path = COUNTERS_IMPLEMENTATIONS.get(cls)
+ if class_path is None:
+ raise ValueError(
+ "Unknown counters class `%s`. Supported: %s"
+ % (cls, ", ".join(COUNTERS_IMPLEMENTATIONS))
+ )
+
+ (module_path, class_name) = class_path.rsplit(".", 1)
+ module = importlib.import_module(module_path, package=__package__)
+ Counters = getattr(module, class_name)
+ return Counters(**kwargs)
diff --git a/swh/counters/__init__.py b/swh/counters/api/__init__.py
copy from swh/counters/__init__.py
copy to swh/counters/api/__init__.py
diff --git a/swh/counters/api/client.py b/swh/counters/api/client.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/api/client.py
@@ -0,0 +1,14 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.core.api import RPCClient
+
+from ..interface import CountersInterface
+
+
+class RemoteCounters(RPCClient):
+ """Proxy to a remote counters API"""
+
+ backend_class = CountersInterface
diff --git a/swh/counters/cli.py b/swh/counters/cli.py
--- a/swh/counters/cli.py
+++ b/swh/counters/cli.py
@@ -1,19 +1,86 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
+from swh.counters.kafka_client import KeyOrientedJournalClient
-@swh_cli_group.group(name="foo", context_settings=CONTEXT_SETTINGS)
+@swh_cli_group.group(name="counters", context_settings=CONTEXT_SETTINGS)
+@click.option(
+ "--config-file",
+ "-C",
+ default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.",
+)
@click.pass_context
-def foo_cli_group(ctx):
- """Foo main command.
- """
+def counters_cli_group(ctx, config_file):
+ """Software Heritage Counters tools."""
+ from swh.core import config
+
+ ctx.ensure_object(dict)
+ conf = config.read(config_file)
+ ctx.obj["config"] = conf
-@foo_cli_group.command()
-@click.option("--bar", help="Something")
+@counters_cli_group.command("journal-client")
+@click.option(
+ "--stop-after-objects",
+ "-m",
+ default=None,
+ type=int,
+ help="Maximum number of objects to replay. Default is to run forever.",
+)
+@click.option(
+ "--object-type",
+ "-o",
+ multiple=True,
+ help="Default list of object types to subscribe to",
+)
+@click.option(
+ "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)",
+)
@click.pass_context
-def bar(ctx, bar):
- """Do something."""
- click.echo("bar")
+def journal_client(ctx, stop_after_objects, object_type, prefix):
+ """Listens for new messages from the SWH Journal, and count them
+ `"""
+ import functools
+
+ from . import get_counters
+ from .journal_client import process_journal_messages
+
+ config = ctx.obj["config"]
+ journal_cfg = config["journal"]
+
+ journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", [])
+ journal_cfg["prefix"] = prefix or journal_cfg.get("prefix")
+ journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get(
+ "stop_after_objects"
+ )
+
+ if len(journal_cfg["object_types"]) == 0:
+ raise ValueError("'object_types' must be specified by cli or configuration")
+
+ if journal_cfg["prefix"] is None:
+ raise ValueError("'prefix' must be specified by cli or configuration")
+
+ client = KeyOrientedJournalClient(**journal_cfg,)
+
+ counters = get_counters(**config["counters"])
+
+ worker_fn = functools.partial(process_journal_messages, counters=counters,)
+ nb_messages = 0
+ try:
+ nb_messages = client.process(worker_fn)
+ print("Processed %d messages." % nb_messages)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ client.close()
diff --git a/swh/counters/interface.py b/swh/counters/interface.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/interface.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Iterable
+
+from swh.core.api import remote_api_endpoint
+
+
+class CountersInterface:
+ @remote_api_endpoint("check")
+ def check(self):
+ """Dedicated method to execute some specific check per implementation.
+ """
+ ...
+
+ @remote_api_endpoint("add")
+ def add(self, collection: str, keys: Iterable[Any]) -> None:
+ """Add the provided keys to the collection
+ Only count new keys.
+ """
+ ...
+
+ @remote_api_endpoint("get_count")
+ def get_count(self, collection: str) -> int:
+ """Return the number of keys for the provided collection"""
+ ...
diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/journal_client.py
@@ -0,0 +1,11 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Iterable
+
+
+def process_journal_messages(messages: Dict[str, Iterable[Any]], *, counters):
+ """Worker function for `JournalClient.process(worker_fn)`"""
+ pass
diff --git a/swh/counters/kafka_client.py b/swh/counters/kafka_client.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/kafka_client.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import defaultdict
+from typing import Any, Dict, List
+
+from confluent_kafka import KafkaError
+
+from swh.journal.client import JournalClient, _error_cb
+
+
+class KeyOrientedJournalClient(JournalClient):
+ """Journal Client implementation which only uses the message keys.
+ This does not need to bother with the message deserialization (contrary
+ to `swh.journal.client.JournalClient`)
+ """
+
+ def handle_messages(self, messages, worker_fn):
+ keys: Dict[str, List[Any]] = defaultdict(list)
+ nb_processed = 0
+
+ for message in messages:
+ error = message.error()
+ if error is not None:
+ if error.code() == KafkaError._PARTITION_EOF:
+ self.eof_reached.add((message.topic(), message.partition()))
+ else:
+ _error_cb(error)
+ continue
+ if message.value() is None:
+ # ignore message with no payload, these can be generated in tests
+ continue
+ nb_processed += 1
+ object_type = message.topic().split(".")[-1]
+ keys[object_type].append(message.key())
+
+ if keys:
+ worker_fn(dict(keys))
+ self.consumer.commit()
+
+ at_eof = self.stop_on_eof and all(
+ (tp.topic, tp.partition) in self.eof_reached
+ for tp in self.consumer.assignment()
+ )
+
+ return nb_processed, at_eof
diff --git a/swh/counters/redis.py b/swh/counters/redis.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/redis.py
@@ -0,0 +1,20 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Iterable
+
+
+class Redis:
+ def __init__(self, host: str):
+ pass
+
+ def check(self):
+ pass
+
+ def add(self, collection: str, keys: Iterable[Any]) -> None:
+ pass
+
+ def get_count(self, collection: str) -> int:
+ pass
diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/conftest.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+import pytest
+
+logger = logging.getLogger(__name__)
+
+
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
+journal:
+ brokers:
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
+"""
+
+
+@pytest.fixture
+def journal_config(kafka_server, kafka_prefix) -> str:
+ return JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, group_id="test-consumer", prefix=kafka_prefix
+ )
diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/test_cli.py
@@ -0,0 +1,209 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import copy
+import tempfile
+
+from click.testing import CliRunner
+from confluent_kafka import Producer
+import pytest
+import yaml
+
+from swh.counters.cli import counters_cli_group
+from swh.journal.serializers import value_to_kafka
+
+CLI_CONFIG = """
+counters:
+ cls: redis
+ host:
+ - %(redis_host)s
+"""
+
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
+journal:
+ brokers:
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
+"""
+
+
+def invoke(catch_exceptions, args, config="", *, redis_host):
+ runner = CliRunner()
+ with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
+ config_fd.write((CLI_CONFIG + config) % {"redis_host": redis_host})
+ config_fd.seek(0)
+ result = runner.invoke(counters_cli_group, ["-C" + config_fd.name] + args)
+ if not catch_exceptions and result.exception:
+ print(result.output)
+ raise result.exception
+ return result
+
+
+def test__journal_client__worker_function_invoked(
+ mocker, kafka_server, kafka_prefix, journal_config
+):
+ mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
+ mock.return_value = 1
+
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test-producer",
+ "acks": "all",
+ }
+ )
+ topic = f"{kafka_prefix}.content"
+ value = value_to_kafka({"key": "value"})
+ producer.produce(topic=topic, key=b"message1", value=value)
+
+ invoke(
+ False,
+ # Missing --object-types (and no config key) will make the cli raise
+ ["journal-client", "--stop-after-objects", "1", "--object-type", "content"],
+ journal_config,
+ redis_host="localhost",
+ )
+
+ assert mock.call_count == 1
+
+
+def test__journal_client__missing_main_journal_config_key():
+ """Missing configuration on journal should raise"""
+ with pytest.raises(KeyError, match="journal"):
+ invoke(
+ catch_exceptions=False,
+ args=["journal-client", "--stop-after-objects", "1",],
+ config="", # missing config will make it raise
+ redis_host=None,
+ )
+
+
+def test__journal_client__missing_journal_config_keys():
+ """Missing configuration on mandatory journal keys should raise"""
+ kafka_prefix = "swh.journal.objects"
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer"
+ )
+ journal_config = yaml.safe_load(journal_objects_config)
+
+ for key in journal_config["journal"].keys():
+ if key == "prefix": # optional
+ continue
+ cfg = copy.deepcopy(journal_config)
+ del cfg["journal"][key] # make config incomplete
+ yaml_cfg = yaml.dump(cfg)
+
+ with pytest.raises(TypeError, match=f"{key}"):
+ invoke(
+ catch_exceptions=False,
+ args=[
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--prefix",
+ kafka_prefix,
+ "--object-type",
+ "content",
+ ],
+ config=yaml_cfg, # incomplete config will make the cli raise
+ redis_host=None,
+ )
+
+
+def test__journal_client__missing_prefix_config_key(kafka_server):
+ """Missing configuration on mandatory prefix key should raise"""
+
+ journal_cfg_template = """
+journal:
+ brokers:
+ - {broker}
+ group_id: {group_id}
+ """
+
+ journal_cfg = journal_cfg_template.format(
+ broker=kafka_server, group_id="test-consumer"
+ )
+
+ with pytest.raises(ValueError, match="prefix"):
+ invoke(
+ False,
+ # Missing --prefix (and no config key) will make the cli raise
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ "content",
+ ],
+ journal_cfg,
+ redis_host=None,
+ )
+
+
+def test__journal_client__missing_object_types_config_key(kafka_server):
+ """Missing configuration on mandatory object-types key should raise"""
+
+ journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer"
+ )
+
+ with pytest.raises(ValueError, match="object_types"):
+ invoke(
+ False,
+ # Missing --object-types (and no config key) will make the cli raise
+ ["journal-client", "--stop-after-objects", "1",],
+ journal_cfg,
+ redis_host=None,
+ )
+
+
+def test__journal_client__key_received(mocker, kafka_server):
+ mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
+ mock.return_value = 1
+
+ prefix = "swh.journal.objects"
+ object_type = "content"
+ topic = prefix + "." + object_type
+
+ producer = Producer(
+ {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",}
+ )
+
+ value = value_to_kafka({"key": "value"})
+ key = b"message key"
+
+ # Ensure empty messages are ignored
+ producer.produce(topic=topic, key=b"emptymessage", value=None)
+ producer.produce(topic=topic, key=key, value=value)
+ producer.flush()
+
+ journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=prefix, group_id="test-consumer"
+ )
+
+ result = invoke(
+ False,
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ object_type,
+ "--prefix",
+ prefix,
+ ],
+ journal_cfg,
+ redis_host=None,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+ assert mock.call_args[0][0]["content"]
+ assert len(mock.call_args[0][0]) == 1
+ assert object_type in mock.call_args[0][0].keys()
+ assert key == mock.call_args[0][0][object_type][0]
diff --git a/swh/counters/tests/test_init.py b/swh/counters/tests/test_init.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/test_init.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import inspect
+
+import pytest
+
+from swh.counters import get_counters
+from swh.counters.api.client import RemoteCounters
+from swh.counters.interface import CountersInterface
+from swh.counters.redis import Redis
+
+COUNTERS_IMPLEMENTATIONS = [
+ ("remote", RemoteCounters, {"url": "localhost"}),
+ ("redis", Redis, {"host": "localhost"}),
+]
+
+
+def test_get_counters_failure():
+ with pytest.raises(ValueError, match="Unknown counters class"):
+ get_counters("unknown-counters")
+
+
+@pytest.mark.parametrize("class_,expected_class,kwargs", COUNTERS_IMPLEMENTATIONS)
+def test_get_counters(mocker, class_, expected_class, kwargs):
+ if kwargs:
+ concrete_counters = get_counters(class_, **kwargs)
+ else:
+ concrete_counters = get_counters(class_)
+ assert isinstance(concrete_counters, expected_class)
+
+
+@pytest.mark.parametrize("class_,expected_class,kwargs", COUNTERS_IMPLEMENTATIONS)
+def test_types(mocker, class_, expected_class, kwargs):
+ """Checks all methods of CountersInterface are implemented by this
+ backend, and that they have the same signature.
+
+ """
+ # mocker.patch("swh.counters.redis.Redis")
+ if kwargs:
+ concrete_counters = get_counters(class_, **kwargs)
+ else:
+ concrete_counters = get_counters(class_)
+
+ # Create an instance of the protocol (which cannot be instantiated
+ # directly, so this creates a subclass, then instantiates it)
+ interface = type("_", (CountersInterface,), {})()
+
+ for meth_name in dir(interface):
+ if meth_name.startswith("_"):
+ continue
+ interface_meth = getattr(interface, meth_name)
+
+ missing_methods = []
+
+ try:
+ concrete_meth = getattr(concrete_counters, meth_name)
+ except AttributeError:
+ if not getattr(interface_meth, "deprecated_endpoint", False):
+ # The backend is missing a (non-deprecated) endpoint
+ missing_methods.append(meth_name)
+ continue
+
+ expected_signature = inspect.signature(interface_meth)
+ actual_signature = inspect.signature(concrete_meth)
+
+ assert expected_signature == actual_signature, meth_name
+
+ assert missing_methods == []
diff --git a/swh/counters/tests/test_nothing.py b/swh/counters/tests/test_nothing.py
deleted file mode 100644
--- a/swh/counters/tests/test_nothing.py
+++ /dev/null
@@ -1,3 +0,0 @@
-def test_nothing():
- # Placeholder; remove this when we add actual tests
- pass
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 8:04 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220043
Attached To
D5229: swh-counters: Implement the cli skeleton
Event Timeline
Log In to Comment