diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -5,6 +5,9 @@ # 3rd party libraries without stubs (yet) +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin +swh.journal diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1,3 @@ pytest +pytest-mock +confluent-kafka diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html - diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -52,10 +52,10 @@ use_scm_version=True, extras_require={"testing": parse_requirements("test")}, include_package_data=True, - # entry_points=""" - # [swh.cli.subcommands] - # =swh..cli - # """, + entry_points=""" + [swh.cli.subcommands] + counters=swh.counters.cli + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", diff --git a/swh/__init__.py b/swh/__init__.py --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,3 +1,8 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + from pkgutil import extend_path from typing import Iterable diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py --- a/swh/counters/__init__.py +++ b/swh/counters/__init__.py @@ -0,0 +1,44 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +import importlib +from typing import TYPE_CHECKING, Any, Dict + +if TYPE_CHECKING: + from swh.counters.interface import CountersInterface + +COUNTERS_IMPLEMENTATIONS = { + "redis": ".redis.Redis", + "remote": ".api.client.RemoteCounters", +} + + +def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface: + """Get an counters object of class `cls` with arguments `args`. + + Args: + cls: counters's class, either 'local' or 'remote' + args: dictionary of arguments passed to the + counters class constructor + + Returns: + an instance of swh.counters's classes (either local or remote) + + Raises: + ValueError if passed an unknown counters class. + """ + class_path = COUNTERS_IMPLEMENTATIONS.get(cls) + if class_path is None: + raise ValueError( + "Unknown counters class `%s`. Supported: %s" + % (cls, ", ".join(COUNTERS_IMPLEMENTATIONS)) + ) + + (module_path, class_name) = class_path.rsplit(".", 1) + module = importlib.import_module(module_path, package=__package__) + Counters = getattr(module, class_name) + return Counters(**kwargs) diff --git a/swh/counters/__init__.py b/swh/counters/api/__init__.py copy from swh/counters/__init__.py copy to swh/counters/api/__init__.py diff --git a/swh/counters/api/client.py b/swh/counters/api/client.py new file mode 100644 --- /dev/null +++ b/swh/counters/api/client.py @@ -0,0 +1,14 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core.api import RPCClient + +from ..interface import CountersInterface + + +class RemoteCounters(RPCClient): + """Proxy to a remote counters API""" + + backend_class = CountersInterface diff --git a/swh/counters/cli.py b/swh/counters/cli.py --- a/swh/counters/cli.py +++ b/swh/counters/cli.py @@ -1,19 +1,86 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group +from swh.counters.kafka_client import KeyOrientedJournalClient -@swh_cli_group.group(name="foo", context_settings=CONTEXT_SETTINGS) +@swh_cli_group.group(name="counters", context_settings=CONTEXT_SETTINGS) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) @click.pass_context -def foo_cli_group(ctx): - """Foo main command. - """ +def counters_cli_group(ctx, config_file): + """Software Heritage Counters tools.""" + from swh.core import config + + ctx.ensure_object(dict) + conf = config.read(config_file) + ctx.obj["config"] = conf -@foo_cli_group.command() -@click.option("--bar", help="Something") +@counters_cli_group.command("journal-client") +@click.option( + "--stop-after-objects", + "-m", + default=None, + type=int, + help="Maximum number of objects to replay. Default is to run forever.", +) +@click.option( + "--object-type", + "-o", + multiple=True, + help="Default list of object types to subscribe to", +) +@click.option( + "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)", +) @click.pass_context -def bar(ctx, bar): - """Do something.""" - click.echo("bar") +def journal_client(ctx, stop_after_objects, object_type, prefix): + """Listens for new messages from the SWH Journal, and count them + `""" + import functools + + from . import get_counters + from .journal_client import process_journal_messages + + config = ctx.obj["config"] + journal_cfg = config["journal"] + + journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) + journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") + journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( + "stop_after_objects" + ) + + if len(journal_cfg["object_types"]) == 0: + raise ValueError("'object_types' must be specified by cli or configuration") + + if journal_cfg["prefix"] is None: + raise ValueError("'prefix' must be specified by cli or configuration") + + client = KeyOrientedJournalClient(**journal_cfg,) + + counters = get_counters(**config["counters"]) + + worker_fn = functools.partial(process_journal_messages, counters=counters,) + nb_messages = 0 + try: + nb_messages = client.process(worker_fn) + print("Processed %d messages." % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() diff --git a/swh/counters/interface.py b/swh/counters/interface.py new file mode 100644 --- /dev/null +++ b/swh/counters/interface.py @@ -0,0 +1,28 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Iterable + +from swh.core.api import remote_api_endpoint + + +class CountersInterface: + @remote_api_endpoint("check") + def check(self): + """Dedicated method to execute some specific check per implementation. + """ + ... + + @remote_api_endpoint("add") + def add(self, collection: str, keys: Iterable[Any]) -> None: + """Add the provided keys to the collection + Only count new keys. + """ + ... + + @remote_api_endpoint("get_count") + def get_count(self, collection: str) -> int: + """Return the number of keys for the provided collection""" + ... diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/counters/journal_client.py @@ -0,0 +1,11 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Iterable + + +def process_journal_messages(messages: Dict[str, Iterable[Any]], *, counters): + """Worker function for `JournalClient.process(worker_fn)`""" + pass diff --git a/swh/counters/kafka_client.py b/swh/counters/kafka_client.py new file mode 100644 --- /dev/null +++ b/swh/counters/kafka_client.py @@ -0,0 +1,48 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +from typing import Any, Dict, List + +from confluent_kafka import KafkaError + +from swh.journal.client import JournalClient, _error_cb + + +class KeyOrientedJournalClient(JournalClient): + """Journal Client implementation which only uses the message keys. + This does not need to bother with the message deserialization (contrary + to `swh.journal.client.JournalClient`) + """ + + def handle_messages(self, messages, worker_fn): + keys: Dict[str, List[Any]] = defaultdict(list) + nb_processed = 0 + + for message in messages: + error = message.error() + if error is not None: + if error.code() == KafkaError._PARTITION_EOF: + self.eof_reached.add((message.topic(), message.partition())) + else: + _error_cb(error) + continue + if message.value() is None: + # ignore message with no payload, these can be generated in tests + continue + nb_processed += 1 + object_type = message.topic().split(".")[-1] + keys[object_type].append(message.key()) + + if keys: + worker_fn(dict(keys)) + self.consumer.commit() + + at_eof = self.stop_on_eof and all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + ) + + return nb_processed, at_eof diff --git a/swh/counters/redis.py b/swh/counters/redis.py new file mode 100644 --- /dev/null +++ b/swh/counters/redis.py @@ -0,0 +1,20 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Iterable + + +class Redis: + def __init__(self, host: str): + pass + + def check(self): + pass + + def add(self, collection: str, keys: Iterable[Any]) -> None: + pass + + def get_count(self, collection: str) -> int: + pass diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/counters/tests/conftest.py @@ -0,0 +1,26 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +import pytest + +logger = logging.getLogger(__name__) + + +JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ +journal: + brokers: + - {broker} + prefix: {prefix} + group_id: {group_id} +""" + + +@pytest.fixture +def journal_config(kafka_server, kafka_prefix) -> str: + return JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, group_id="test-consumer", prefix=kafka_prefix + ) diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py new file mode 100644 --- /dev/null +++ b/swh/counters/tests/test_cli.py @@ -0,0 +1,209 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import tempfile + +from click.testing import CliRunner +from confluent_kafka import Producer +import pytest +import yaml + +from swh.counters.cli import counters_cli_group +from swh.journal.serializers import value_to_kafka + +CLI_CONFIG = """ +counters: + cls: redis + host: + - %(redis_host)s +""" + +JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ +journal: + brokers: + - {broker} + prefix: {prefix} + group_id: {group_id} +""" + + +def invoke(catch_exceptions, args, config="", *, redis_host): + runner = CliRunner() + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + config_fd.write((CLI_CONFIG + config) % {"redis_host": redis_host}) + config_fd.seek(0) + result = runner.invoke(counters_cli_group, ["-C" + config_fd.name] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test__journal_client__worker_function_invoked( + mocker, kafka_server, kafka_prefix, journal_config +): + mock = mocker.patch("swh.counters.journal_client.process_journal_messages") + mock.return_value = 1 + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test-producer", + "acks": "all", + } + ) + topic = f"{kafka_prefix}.content" + value = value_to_kafka({"key": "value"}) + producer.produce(topic=topic, key=b"message1", value=value) + + invoke( + False, + # Missing --object-types (and no config key) will make the cli raise + ["journal-client", "--stop-after-objects", "1", "--object-type", "content"], + journal_config, + redis_host="localhost", + ) + + assert mock.call_count == 1 + + +def test__journal_client__missing_main_journal_config_key(): + """Missing configuration on journal should raise""" + with pytest.raises(KeyError, match="journal"): + invoke( + catch_exceptions=False, + args=["journal-client", "--stop-after-objects", "1",], + config="", # missing config will make it raise + redis_host=None, + ) + + +def test__journal_client__missing_journal_config_keys(): + """Missing configuration on mandatory journal keys should raise""" + kafka_prefix = "swh.journal.objects" + journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" + ) + journal_config = yaml.safe_load(journal_objects_config) + + for key in journal_config["journal"].keys(): + if key == "prefix": # optional + continue + cfg = copy.deepcopy(journal_config) + del cfg["journal"][key] # make config incomplete + yaml_cfg = yaml.dump(cfg) + + with pytest.raises(TypeError, match=f"{key}"): + invoke( + catch_exceptions=False, + args=[ + "journal-client", + "--stop-after-objects", + "1", + "--prefix", + kafka_prefix, + "--object-type", + "content", + ], + config=yaml_cfg, # incomplete config will make the cli raise + redis_host=None, + ) + + +def test__journal_client__missing_prefix_config_key(kafka_server): + """Missing configuration on mandatory prefix key should raise""" + + journal_cfg_template = """ +journal: + brokers: + - {broker} + group_id: {group_id} + """ + + journal_cfg = journal_cfg_template.format( + broker=kafka_server, group_id="test-consumer" + ) + + with pytest.raises(ValueError, match="prefix"): + invoke( + False, + # Missing --prefix (and no config key) will make the cli raise + [ + "journal-client", + "--stop-after-objects", + "1", + "--object-type", + "content", + ], + journal_cfg, + redis_host=None, + ) + + +def test__journal_client__missing_object_types_config_key(kafka_server): + """Missing configuration on mandatory object-types key should raise""" + + journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer" + ) + + with pytest.raises(ValueError, match="object_types"): + invoke( + False, + # Missing --object-types (and no config key) will make the cli raise + ["journal-client", "--stop-after-objects", "1",], + journal_cfg, + redis_host=None, + ) + + +def test__journal_client__key_received(mocker, kafka_server): + mock = mocker.patch("swh.counters.journal_client.process_journal_messages") + mock.return_value = 1 + + prefix = "swh.journal.objects" + object_type = "content" + topic = prefix + "." + object_type + + producer = Producer( + {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",} + ) + + value = value_to_kafka({"key": "value"}) + key = b"message key" + + # Ensure empty messages are ignored + producer.produce(topic=topic, key=b"emptymessage", value=None) + producer.produce(topic=topic, key=key, value=value) + producer.flush() + + journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix=prefix, group_id="test-consumer" + ) + + result = invoke( + False, + [ + "journal-client", + "--stop-after-objects", + "1", + "--object-type", + object_type, + "--prefix", + prefix, + ], + journal_cfg, + redis_host=None, + ) + + # Check the output + expected_output = "Processed 1 messages.\nDone.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + assert mock.call_args[0][0]["content"] + assert len(mock.call_args[0][0]) == 1 + assert object_type in mock.call_args[0][0].keys() + assert key == mock.call_args[0][0][object_type][0] diff --git a/swh/counters/tests/test_init.py b/swh/counters/tests/test_init.py new file mode 100644 --- /dev/null +++ b/swh/counters/tests/test_init.py @@ -0,0 +1,71 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import inspect + +import pytest + +from swh.counters import get_counters +from swh.counters.api.client import RemoteCounters +from swh.counters.interface import CountersInterface +from swh.counters.redis import Redis + +COUNTERS_IMPLEMENTATIONS = [ + ("remote", RemoteCounters, {"url": "localhost"}), + ("redis", Redis, {"host": "localhost"}), +] + + +def test_get_counters_failure(): + with pytest.raises(ValueError, match="Unknown counters class"): + get_counters("unknown-counters") + + +@pytest.mark.parametrize("class_,expected_class,kwargs", COUNTERS_IMPLEMENTATIONS) +def test_get_counters(mocker, class_, expected_class, kwargs): + if kwargs: + concrete_counters = get_counters(class_, **kwargs) + else: + concrete_counters = get_counters(class_) + assert isinstance(concrete_counters, expected_class) + + +@pytest.mark.parametrize("class_,expected_class,kwargs", COUNTERS_IMPLEMENTATIONS) +def test_types(mocker, class_, expected_class, kwargs): + """Checks all methods of CountersInterface are implemented by this + backend, and that they have the same signature. + + """ + # mocker.patch("swh.counters.redis.Redis") + if kwargs: + concrete_counters = get_counters(class_, **kwargs) + else: + concrete_counters = get_counters(class_) + + # Create an instance of the protocol (which cannot be instantiated + # directly, so this creates a subclass, then instantiates it) + interface = type("_", (CountersInterface,), {})() + + for meth_name in dir(interface): + if meth_name.startswith("_"): + continue + interface_meth = getattr(interface, meth_name) + + missing_methods = [] + + try: + concrete_meth = getattr(concrete_counters, meth_name) + except AttributeError: + if not getattr(interface_meth, "deprecated_endpoint", False): + # The backend is missing a (non-deprecated) endpoint + missing_methods.append(meth_name) + continue + + expected_signature = inspect.signature(interface_meth) + actual_signature = inspect.signature(concrete_meth) + + assert expected_signature == actual_signature, meth_name + + assert missing_methods == [] diff --git a/swh/counters/tests/test_nothing.py b/swh/counters/tests/test_nothing.py deleted file mode 100644 --- a/swh/counters/tests/test_nothing.py +++ /dev/null @@ -1,3 +0,0 @@ -def test_nothing(): - # Placeholder; remove this when we add actual tests - pass