diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -5,6 +5,9 @@ # 3rd party libraries without stubs (yet) +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin +swh.journal diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1,2 @@ pytest +confluent-kafka diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html - diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -52,10 +52,10 @@ use_scm_version=True, extras_require={"testing": parse_requirements("test")}, include_package_data=True, - # entry_points=""" - # [swh.cli.subcommands] - # =swh..cli - # """, + entry_points=""" + [swh.cli.subcommands] + counters=swh.counters.cli + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", diff --git a/swh/__init__.py b/swh/__init__.py --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,3 +1,8 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + from pkgutil import extend_path from typing import Iterable diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py --- a/swh/counters/__init__.py +++ b/swh/counters/__init__.py @@ -0,0 +1,44 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +import importlib +from typing import TYPE_CHECKING, Any, Dict + +if TYPE_CHECKING: + from swh.counters.interface import CountersInterface + +COUNTERS_IMPLEMENTATIONS = { + "redis": ".redis.Counters", + "remote": ".api.client.RemoteCounters", +} + + +def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface: + """Get an counters object of class `cls` with arguments `args`. + + Args: + cls: counters's class, either 'local' or 'remote' + args: dictionary of arguments passed to the + counters class constructor + + Returns: + an instance of swh.counters's classes (either local or remote) + + Raises: + ValueError if passed an unknown counters class. + """ + class_path = COUNTERS_IMPLEMENTATIONS.get(cls) + if class_path is None: + raise ValueError( + "Unknown counters class `%s`. Supported: %s" + % (cls, ", ".join(COUNTERS_IMPLEMENTATIONS)) + ) + + (module_path, class_name) = class_path.rsplit(".", 1) + module = importlib.import_module(module_path, package=__package__) + Counters = getattr(module, class_name) + return Counters(**kwargs) diff --git a/swh/counters/__init__.py b/swh/counters/api/__init__.py copy from swh/counters/__init__.py copy to swh/counters/api/__init__.py diff --git a/swh/counters/api/client.py b/swh/counters/api/client.py new file mode 100644 --- /dev/null +++ b/swh/counters/api/client.py @@ -0,0 +1,14 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core.api import RPCClient + +from ..interface import CountersInterface + + +class RemoteCounters(RPCClient): + """Proxy to a remote counters API""" + + backend_class = CountersInterface diff --git a/swh/counters/cli.py b/swh/counters/cli.py --- a/swh/counters/cli.py +++ b/swh/counters/cli.py @@ -1,19 +1,86 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group -@swh_cli_group.group(name="foo", context_settings=CONTEXT_SETTINGS) +@swh_cli_group.group(name="counters", context_settings=CONTEXT_SETTINGS) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) @click.pass_context -def foo_cli_group(ctx): - """Foo main command. - """ +def counters_cli_group(ctx, config_file): + """Software Heritage Counters tools.""" + from swh.core import config + + ctx.ensure_object(dict) + conf = config.read(config_file) + ctx.obj["config"] = conf -@foo_cli_group.command() -@click.option("--bar", help="Something") +@counters_cli_group.command("journal-client") +@click.option( + "--stop-after-objects", + "-m", + default=None, + type=int, + help="Maximum number of objects to replay. Default is to run forever.", +) +@click.option( + "--object-type", + "-o", + multiple=True, + help="Default list of object types to subscribe to", +) +@click.option( + "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)", +) @click.pass_context -def bar(ctx, bar): - """Do something.""" - click.echo("bar") +def journal_client(ctx, stop_after_objects, object_type, prefix): + """Listens for new messages from the SWH Journal, and count them + `""" + import functools + + from swh.counters.kafka_client import get_journal_client + + from . import get_counters + from .journal_client import process_journal_messages + + config = ctx.obj["config"] + journal_cfg = config["journal"] + + journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) + journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") + journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( + "stop_after_objects" + ) + + if len(journal_cfg["object_types"]) == 0: + raise ValueError("'object_types' must be specified by cli or configuration") + + if journal_cfg["prefix"] is None: + raise ValueError("'prefix' must be specified by cli or configuration") + + client = get_journal_client(cls="kafka", **journal_cfg,) + counters = get_counters(**config["counters"]) + + worker_fn = functools.partial(process_journal_messages, counters=counters,) + nb_messages = 0 + try: + nb_messages = client.process(worker_fn) + print("Processed %d messages." % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() diff --git a/swh/counters/interface.py b/swh/counters/interface.py new file mode 100644 --- /dev/null +++ b/swh/counters/interface.py @@ -0,0 +1,29 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Iterable + +from swh.core.api import remote_api_endpoint + + +class CountersInterface: + @remote_api_endpoint("check") + def check(self): + """Dedicated method to execute some specific check per implementation. + + """ + ... + + @remote_api_endpoint("add") + def add(self, collection: str, keys: Iterable[Any]) -> None: + """Add the provided keys to the collection + Only count new keys. + """ + ... + + @remote_api_endpoint("get_count") + def get_count(self, collection: str) -> int: + """Return the number of keys for the provided collection""" + ... diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/counters/journal_client.py @@ -0,0 +1,9 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +def process_journal_messages(messages, *, counters): + """Worker function for `JournalClient.process(worker_fn)`""" + pass diff --git a/swh/counters/kafka_client.py b/swh/counters/kafka_client.py new file mode 100644 --- /dev/null +++ b/swh/counters/kafka_client.py @@ -0,0 +1,59 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +from typing import Any, Dict, List + +from confluent_kafka import KafkaError + +from swh.journal.client import JournalClient + + +def get_journal_client(cls: str, **kwargs: Any): + """Factory function to instantiate a journal client object. + + Currently, only the "kafka" journal client is supported. + """ + if cls == "kafka": + return KeyOrientedJournalClient(**kwargs) + raise ValueError("Unknown journal client class `%s`" % cls) + + +class KeyOrientedJournalClient(JournalClient): + """The default implementation deserialize the messages before + passing it to the worker function. + The counters use only the message key so the deserialization + is not needed. + """ + + def handle_message(self, messages, worker_fn): + keys: Dict[str, List[Any]] = defaultdict(list) + nb_processed = 0 + + for message in messages: + error = message.error() + if error is not None: + if error.code() == KafkaError._PARTITION_EOF: + self.eof_reached.add((message.topic(), message.partition())) + else: + JournalClient._error_cb(error) + continue + if message.value() is None: + # ignore message with no payload, these can be generated in tests + continue + nb_processed += 1 + object_type = message.topic().split(".")[-1] + keys[object_type].append(message.key) + + if keys: + worker_fn(dict(keys)) + self.consumer.commit() + + at_eof = self.stop_on_eof and all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + ) + + return nb_processed, at_eof diff --git a/swh/counters/__init__.py b/swh/counters/redis.py copy from swh/counters/__init__.py copy to swh/counters/redis.py