Page MenuHomeSoftware Heritage

D5229.id18761.diff
No OneTemporary

D5229.id18761.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -5,6 +5,9 @@
# 3rd party libraries without stubs (yet)
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,2 +1,3 @@
# Add here internal Software Heritage dependencies, one per line.
swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin
+swh.journal
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1 +1,3 @@
pytest
+pytest-mock
+confluent-kafka
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -52,10 +52,10 @@
use_scm_version=True,
extras_require={"testing": parse_requirements("test")},
include_package_data=True,
- # entry_points="""
- # [swh.cli.subcommands]
- # <cli-name>=swh.<module>.cli
- # """,
+ entry_points="""
+ [swh.cli.subcommands]
+ counters=swh.counters.cli
+ """,
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
diff --git a/swh/__init__.py b/swh/__init__.py
--- a/swh/__init__.py
+++ b/swh/__init__.py
@@ -1,3 +1,8 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
from pkgutil import extend_path
from typing import Iterable
diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py
--- a/swh/counters/__init__.py
+++ b/swh/counters/__init__.py
@@ -0,0 +1,44 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+import importlib
+from typing import TYPE_CHECKING, Any, Dict
+
+if TYPE_CHECKING:
+ from swh.counters.interface import CountersInterface
+
+COUNTERS_IMPLEMENTATIONS = {
+ "redis": ".redis.Redis",
+ "remote": ".api.client.RemoteCounters",
+}
+
+
+def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface:
+ """Get an counters object of class `cls` with arguments `args`.
+
+ Args:
+ cls: counters's class, either 'local' or 'remote'
+ args: dictionary of arguments passed to the
+ counters class constructor
+
+ Returns:
+ an instance of swh.counters's classes (either local or remote)
+
+ Raises:
+ ValueError if passed an unknown counters class.
+ """
+ class_path = COUNTERS_IMPLEMENTATIONS.get(cls)
+ if class_path is None:
+ raise ValueError(
+ "Unknown counters class `%s`. Supported: %s"
+ % (cls, ", ".join(COUNTERS_IMPLEMENTATIONS))
+ )
+
+ (module_path, class_name) = class_path.rsplit(".", 1)
+ module = importlib.import_module(module_path, package=__package__)
+ Counters = getattr(module, class_name)
+ return Counters(**kwargs)
diff --git a/swh/counters/__init__.py b/swh/counters/api/__init__.py
copy from swh/counters/__init__.py
copy to swh/counters/api/__init__.py
diff --git a/swh/counters/api/client.py b/swh/counters/api/client.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/api/client.py
@@ -0,0 +1,14 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.core.api import RPCClient
+
+from ..interface import CountersInterface
+
+
+class RemoteCounters(RPCClient):
+ """Proxy to a remote counters API"""
+
+ backend_class = CountersInterface
diff --git a/swh/counters/cli.py b/swh/counters/cli.py
--- a/swh/counters/cli.py
+++ b/swh/counters/cli.py
@@ -1,19 +1,86 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
+from swh.counters.kafka_client import KeyOrientedJournalClient
-@swh_cli_group.group(name="foo", context_settings=CONTEXT_SETTINGS)
+@swh_cli_group.group(name="counters", context_settings=CONTEXT_SETTINGS)
+@click.option(
+ "--config-file",
+ "-C",
+ default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.",
+)
@click.pass_context
-def foo_cli_group(ctx):
- """Foo main command.
- """
+def counters_cli_group(ctx, config_file):
+ """Software Heritage Counters tools."""
+ from swh.core import config
+
+ ctx.ensure_object(dict)
+ conf = config.read(config_file)
+ ctx.obj["config"] = conf
-@foo_cli_group.command()
-@click.option("--bar", help="Something")
+@counters_cli_group.command("journal-client")
+@click.option(
+ "--stop-after-objects",
+ "-m",
+ default=None,
+ type=int,
+ help="Maximum number of objects to replay. Default is to run forever.",
+)
+@click.option(
+ "--object-type",
+ "-o",
+ multiple=True,
+ help="Default list of object types to subscribe to",
+)
+@click.option(
+ "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)",
+)
@click.pass_context
-def bar(ctx, bar):
- """Do something."""
- click.echo("bar")
+def journal_client(ctx, stop_after_objects, object_type, prefix):
+ """Listens for new messages from the SWH Journal, and count them
+ `"""
+ import functools
+
+ from . import get_counters
+ from .journal_client import process_journal_messages
+
+ config = ctx.obj["config"]
+ journal_cfg = config["journal"]
+
+ journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", [])
+ journal_cfg["prefix"] = prefix or journal_cfg.get("prefix")
+ journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get(
+ "stop_after_objects"
+ )
+
+ if len(journal_cfg["object_types"]) == 0:
+ raise ValueError("'object_types' must be specified by cli or configuration")
+
+ if journal_cfg["prefix"] is None:
+ raise ValueError("'prefix' must be specified by cli or configuration")
+
+ client = KeyOrientedJournalClient(**journal_cfg,)
+
+ counters = get_counters(**config["counters"])
+
+ worker_fn = functools.partial(process_journal_messages, counters=counters,)
+ nb_messages = 0
+ try:
+ nb_messages = client.process(worker_fn)
+ print("Processed %d messages." % nb_messages)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ client.close()
diff --git a/swh/counters/interface.py b/swh/counters/interface.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/interface.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Iterable
+
+from swh.core.api import remote_api_endpoint
+
+
+class CountersInterface:
+ @remote_api_endpoint("check")
+ def check(self):
+ """Dedicated method to execute some specific check per implementation.
+ """
+ ...
+
+ @remote_api_endpoint("add")
+ def add(self, collection: str, keys: Iterable[Any]) -> None:
+ """Add the provided keys to the collection
+ Only count new keys.
+ """
+ ...
+
+ @remote_api_endpoint("get_count")
+ def get_count(self, collection: str) -> int:
+ """Return the number of keys for the provided collection"""
+ ...
diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/journal_client.py
@@ -0,0 +1,11 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Iterable
+
+
+def process_journal_messages(messages: Dict[str, Iterable[Any]], *, counters):
+ """Worker function for `JournalClient.process(worker_fn)`"""
+ pass
diff --git a/swh/counters/kafka_client.py b/swh/counters/kafka_client.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/kafka_client.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import defaultdict
+from typing import Any, Dict, List
+
+from confluent_kafka import KafkaError
+
+from swh.journal.client import JournalClient, _error_cb
+
+
+class KeyOrientedJournalClient(JournalClient):
+ """Journal Client implementation which only uses the message keys.
+ This does not need to bother with the message deserialization (contrary
+ to `swh.journal.client.JournalClient`)
+ """
+
+ def handle_messages(self, messages, worker_fn):
+ keys: Dict[str, List[Any]] = defaultdict(list)
+ nb_processed = 0
+
+ for message in messages:
+ error = message.error()
+ if error is not None:
+ if error.code() == KafkaError._PARTITION_EOF:
+ self.eof_reached.add((message.topic(), message.partition()))
+ else:
+ _error_cb(error)
+ continue
+ if message.value() is None:
+ # ignore message with no payload, these can be generated in tests
+ continue
+ nb_processed += 1
+ object_type = message.topic().split(".")[-1]
+ keys[object_type].append(message.key())
+
+ if keys:
+ worker_fn(dict(keys))
+ self.consumer.commit()
+
+ at_eof = self.stop_on_eof and all(
+ (tp.topic, tp.partition) in self.eof_reached
+ for tp in self.consumer.assignment()
+ )
+
+ return nb_processed, at_eof
diff --git a/swh/counters/redis.py b/swh/counters/redis.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/redis.py
@@ -0,0 +1,20 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Iterable
+
+
+class Redis:
+ def __init__(self, host: str):
+ pass
+
+ def check(self):
+ pass
+
+ def add(self, collection: str, keys: Iterable[Any]) -> None:
+ pass
+
+ def get_count(self, collection: str) -> int:
+ pass
diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/conftest.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+import pytest
+
+logger = logging.getLogger(__name__)
+
+
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
+journal:
+ brokers:
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
+"""
+
+
+@pytest.fixture
+def journal_config(kafka_server, kafka_prefix) -> str:
+ return JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, group_id="test-consumer", prefix=kafka_prefix
+ )
diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/test_cli.py
@@ -0,0 +1,209 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import copy
+import tempfile
+
+from click.testing import CliRunner
+from confluent_kafka import Producer
+import pytest
+import yaml
+
+from swh.counters.cli import counters_cli_group
+from swh.journal.serializers import value_to_kafka
+
+CLI_CONFIG = """
+counters:
+ cls: redis
+ host:
+ - %(redis_host)s
+"""
+
+JOURNAL_OBJECTS_CONFIG_TEMPLATE = """
+journal:
+ brokers:
+ - {broker}
+ prefix: {prefix}
+ group_id: {group_id}
+"""
+
+
+def invoke(catch_exceptions, args, config="", *, redis_host):
+ runner = CliRunner()
+ with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
+ config_fd.write((CLI_CONFIG + config) % {"redis_host": redis_host})
+ config_fd.seek(0)
+ result = runner.invoke(counters_cli_group, ["-C" + config_fd.name] + args)
+ if not catch_exceptions and result.exception:
+ print(result.output)
+ raise result.exception
+ return result
+
+
+def test__journal_client__worker_function_invoked(
+ mocker, kafka_server, kafka_prefix, journal_config
+):
+ mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
+ mock.return_value = 1
+
+ producer = Producer(
+ {
+ "bootstrap.servers": kafka_server,
+ "client.id": "test-producer",
+ "acks": "all",
+ }
+ )
+ topic = f"{kafka_prefix}.content"
+ value = value_to_kafka({"key": "value"})
+ producer.produce(topic=topic, key=b"message1", value=value)
+
+ invoke(
+ False,
+ # Missing --object-types (and no config key) will make the cli raise
+ ["journal-client", "--stop-after-objects", "1", "--object-type", "content"],
+ journal_config,
+ redis_host="localhost",
+ )
+
+ assert mock.call_count == 1
+
+
+def test__journal_client__missing_main_journal_config_key():
+ """Missing configuration on journal should raise"""
+ with pytest.raises(KeyError, match="journal"):
+ invoke(
+ catch_exceptions=False,
+ args=["journal-client", "--stop-after-objects", "1",],
+ config="", # missing config will make it raise
+ redis_host=None,
+ )
+
+
+def test__journal_client__missing_journal_config_keys():
+ """Missing configuration on mandatory journal keys should raise"""
+ kafka_prefix = "swh.journal.objects"
+ journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer"
+ )
+ journal_config = yaml.safe_load(journal_objects_config)
+
+ for key in journal_config["journal"].keys():
+ if key == "prefix": # optional
+ continue
+ cfg = copy.deepcopy(journal_config)
+ del cfg["journal"][key] # make config incomplete
+ yaml_cfg = yaml.dump(cfg)
+
+ with pytest.raises(TypeError, match=f"{key}"):
+ invoke(
+ catch_exceptions=False,
+ args=[
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--prefix",
+ kafka_prefix,
+ "--object-type",
+ "content",
+ ],
+ config=yaml_cfg, # incomplete config will make the cli raise
+ redis_host=None,
+ )
+
+
+def test__journal_client__missing_prefix_config_key(kafka_server):
+ """Missing configuration on mandatory prefix key should raise"""
+
+ journal_cfg_template = """
+journal:
+ brokers:
+ - {broker}
+ group_id: {group_id}
+ """
+
+ journal_cfg = journal_cfg_template.format(
+ broker=kafka_server, group_id="test-consumer"
+ )
+
+ with pytest.raises(ValueError, match="prefix"):
+ invoke(
+ False,
+ # Missing --prefix (and no config key) will make the cli raise
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ "content",
+ ],
+ journal_cfg,
+ redis_host=None,
+ )
+
+
+def test__journal_client__missing_object_types_config_key(kafka_server):
+ """Missing configuration on mandatory object-types key should raise"""
+
+ journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer"
+ )
+
+ with pytest.raises(ValueError, match="object_types"):
+ invoke(
+ False,
+ # Missing --object-types (and no config key) will make the cli raise
+ ["journal-client", "--stop-after-objects", "1",],
+ journal_cfg,
+ redis_host=None,
+ )
+
+
+def test__journal_client__key_received(mocker, kafka_server):
+ mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
+ mock.return_value = 1
+
+ prefix = "swh.journal.objects"
+ object_type = "content"
+ topic = prefix + "." + object_type
+
+ producer = Producer(
+ {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",}
+ )
+
+ value = value_to_kafka({"key": "value"})
+ key = b"message key"
+
+ # Ensure empty messages are ignored
+ producer.produce(topic=topic, key=b"emptymessage", value=None)
+ producer.produce(topic=topic, key=key, value=value)
+ producer.flush()
+
+ journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix=prefix, group_id="test-consumer"
+ )
+
+ result = invoke(
+ False,
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ object_type,
+ "--prefix",
+ prefix,
+ ],
+ journal_cfg,
+ redis_host=None,
+ )
+
+ # Check the output
+ expected_output = "Processed 1 messages.\nDone.\n"
+ assert result.exit_code == 0, result.output
+ assert result.output == expected_output
+ assert mock.call_args[0][0]["content"]
+ assert len(mock.call_args[0][0]) == 1
+ assert object_type in mock.call_args[0][0].keys()
+ assert key == mock.call_args[0][0][object_type][0]
diff --git a/swh/counters/tests/test_init.py b/swh/counters/tests/test_init.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/test_init.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import inspect
+
+import pytest
+
+from swh.counters import get_counters
+from swh.counters.api.client import RemoteCounters
+from swh.counters.interface import CountersInterface
+from swh.counters.redis import Redis
+
+COUNTERS_IMPLEMENTATIONS = [
+ ("remote", RemoteCounters, {"url": "localhost"}),
+ ("redis", Redis, {"host": "localhost"}),
+]
+
+
+def test_get_counters_failure():
+ with pytest.raises(ValueError, match="Unknown counters class"):
+ get_counters("unknown-counters")
+
+
+@pytest.mark.parametrize("class_,expected_class,kwargs", COUNTERS_IMPLEMENTATIONS)
+def test_get_counters(mocker, class_, expected_class, kwargs):
+ if kwargs:
+ concrete_counters = get_counters(class_, **kwargs)
+ else:
+ concrete_counters = get_counters(class_)
+ assert isinstance(concrete_counters, expected_class)
+
+
+@pytest.mark.parametrize("class_,expected_class,kwargs", COUNTERS_IMPLEMENTATIONS)
+def test_types(mocker, class_, expected_class, kwargs):
+ """Checks all methods of CountersInterface are implemented by this
+ backend, and that they have the same signature.
+
+ """
+ # mocker.patch("swh.counters.redis.Redis")
+ if kwargs:
+ concrete_counters = get_counters(class_, **kwargs)
+ else:
+ concrete_counters = get_counters(class_)
+
+ # Create an instance of the protocol (which cannot be instantiated
+ # directly, so this creates a subclass, then instantiates it)
+ interface = type("_", (CountersInterface,), {})()
+
+ for meth_name in dir(interface):
+ if meth_name.startswith("_"):
+ continue
+ interface_meth = getattr(interface, meth_name)
+
+ missing_methods = []
+
+ try:
+ concrete_meth = getattr(concrete_counters, meth_name)
+ except AttributeError:
+ if not getattr(interface_meth, "deprecated_endpoint", False):
+ # The backend is missing a (non-deprecated) endpoint
+ missing_methods.append(meth_name)
+ continue
+
+ expected_signature = inspect.signature(interface_meth)
+ actual_signature = inspect.signature(concrete_meth)
+
+ assert expected_signature == actual_signature, meth_name
+
+ assert missing_methods == []
diff --git a/swh/counters/tests/test_nothing.py b/swh/counters/tests/test_nothing.py
deleted file mode 100644
--- a/swh/counters/tests/test_nothing.py
+++ /dev/null
@@ -1,3 +0,0 @@
-def test_nothing():
- # Placeholder; remove this when we add actual tests
- pass

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 8:04 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220043

Event Timeline