diff --git a/mypy.ini b/mypy.ini index 7b769c0..9c4d728 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,21 +1,24 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-confluent_kafka.*] ignore_missing_imports = True +[mypy-msgpack.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-pytest_redis.*] ignore_missing_imports = True # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/swh/counters/cli.py b/swh/counters/cli.py index 1bd8980..4311af5 100644 --- a/swh/counters/cli.py +++ b/swh/counters/cli.py @@ -1,107 +1,87 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.counters.kafka_client import KeyOrientedJournalClient @swh_cli_group.group(name="counters", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def counters_cli_group(ctx, config_file): """Software Heritage Counters tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @counters_cli_group.command("journal-client") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.option( "--object-type", "-o", multiple=True, help="Default list of object types to subscribe to", ) @click.option( "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)", ) -@click.argument( - "journal_type", type=click.Choice(["keys", "messages"],), -) @click.pass_context -def journal_client(ctx, stop_after_objects, object_type, prefix, journal_type): +def journal_client(ctx, stop_after_objects, object_type, prefix): """Listens for new messages from the SWH Journal, and count them - if the 'journal_type' argument is 'keys', it will only count the distinct - keys for each listened topic, if it's 'messages', the messages - are deserialized to be able to count the distinct values - of internal properties of the objects. - `""" + """ import functools - from swh.journal.client import get_journal_client - from . import get_counters - from .journal_client import ( - process_journal_messages, - process_journal_messages_by_keys, - ) + from .journal_client import process_journal_messages config = ctx.obj["config"] journal_cfg = config["journal"] journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( "stop_after_objects" ) if len(journal_cfg["object_types"]) == 0: raise ValueError("'object_types' must be specified by cli or configuration") if journal_cfg["prefix"] is None: raise ValueError("'prefix' must be specified by cli or configuration") counters = get_counters(**config["counters"]) - if journal_type == "keys": - client = KeyOrientedJournalClient(**journal_cfg,) - worker_fn = functools.partial( - process_journal_messages_by_keys, counters=counters, - ) - elif journal_type == "messages": - client = get_journal_client(cls="kafka", **journal_cfg,) - worker_fn = functools.partial(process_journal_messages, counters=counters,) - - assert client is not None - assert worker_fn is not None + client = KeyOrientedJournalClient(**journal_cfg) + + worker_fn = functools.partial(process_journal_messages, counters=counters) nb_messages = 0 try: nb_messages = client.process(worker_fn) print("Processed %d messages." % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py index 26f775a..07e0488 100644 --- a/swh/counters/journal_client.py +++ b/swh/counters/journal_client.py @@ -1,55 +1,52 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable - -from swh.counters.redis import Redis +from typing import Dict +import msgpack -def process_journal_messages_by_keys( - messages: Dict[str, Iterable[Any]], *, counters: Redis -) -> None: - """Count the number of different keys for a given message type""" - - for key in messages.keys(): - counters.add(key, messages[key]) +from swh.counters.redis import Redis def process_journal_messages( - messages: Dict[str, Iterable[Any]], *, counters: Redis + messages: Dict[str, Dict[bytes, bytes]], *, counters: Redis ) -> None: """Count the number of different values of an object's property. It allow for example to count the persons inside the Release (authors) and Revision (authors and committers) classes """ + for key in messages.keys(): + counters.add(key, messages[key]) if "revision" in messages: process_revisions(messages["revision"], counters) if "release" in messages: process_releases(messages["release"], counters) -def process_revisions(revisions: Iterable[Dict], counters: Redis): +def process_revisions(revisions: Dict[bytes, bytes], counters: Redis): """Count the number of different authors and committers on the revisions (in the person collection)""" persons = set() - for revision in revisions: + for revision_bytes in revisions.values(): + revision = msgpack.loads(revision_bytes) persons.add(revision["author"]["fullname"]) persons.add(revision["committer"]["fullname"]) counters.add("person", list(persons)) -def process_releases(releases: Iterable[Dict], counters: Redis): +def process_releases(releases: Dict[bytes, bytes], counters: Redis): """Count the number of different authors on the releases (in the person collection)""" persons = set() - for release in releases: + for release_bytes in releases.values(): + release = msgpack.loads(release_bytes) author = release.get("author") if author and "fullname" in author: persons.add(author["fullname"]) counters.add("person", list(persons)) diff --git a/swh/counters/kafka_client.py b/swh/counters/kafka_client.py index 4c762e0..415bca4 100644 --- a/swh/counters/kafka_client.py +++ b/swh/counters/kafka_client.py @@ -1,48 +1,50 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict -from typing import Any, Dict, List +from typing import Dict from confluent_kafka import KafkaError from swh.journal.client import JournalClient, _error_cb class KeyOrientedJournalClient(JournalClient): - """Journal Client implementation which only uses the message keys. - This does not need to bother with the message deserialization (contrary - to `swh.journal.client.JournalClient`) + """Journal Client implementation which only decodes the message keys. + This does not need to bother with the message deserialization (contrary + to :class:`swh.journal.client.JournalClient`) + Message values are still passed unparsed to ``worker_fn`` so it can + deserialize and use it if needed. """ def handle_messages(self, messages, worker_fn): - keys: Dict[str, List[Any]] = defaultdict(list) + objects: Dict[str, Dict[bytes, bytes]] = defaultdict(dict) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] - keys[object_type].append(message.key()) + objects[object_type][message.key()] = message.value() - if keys: - worker_fn(dict(keys)) + if objects: + worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py index e192d68..3e9cff9 100644 --- a/swh/counters/tests/test_cli.py +++ b/swh/counters/tests/test_cli.py @@ -1,251 +1,209 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.counters.cli import counters_cli_group from swh.journal.serializers import value_to_kafka CLI_CONFIG = """ counters: cls: redis host: %(redis_host)s """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, redis_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write((CLI_CONFIG + config) % {"redis_host": redis_host}) config_fd.seek(0) result = runner.invoke(counters_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__worker_function_invoked( mocker, kafka_server, kafka_prefix, journal_config, local_redis_host ): - mock = mocker.patch("swh.counters.journal_client.process_journal_messages_by_keys") + mock = mocker.patch("swh.counters.journal_client.process_journal_messages") producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) topic = f"{kafka_prefix}.content" value = value_to_kafka({"key": "value"}) producer.produce(topic=topic, key=b"message1", value=value) invoke( False, # Missing --object-types (and no config key) will make the cli raise - [ - "journal-client", - "--stop-after-objects", - "1", - "--object-type", - "content", - "keys", - ], + ["journal-client", "--stop-after-objects", "1", "--object-type", "content",], journal_config, redis_host=local_redis_host, ) assert mock.call_count == 1 def test__journal_client__missing_main_journal_config_key(local_redis_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, - args=["journal-client", "--stop-after-objects", "1", "messages"], + args=["journal-client", "--stop-after-objects", "1"], config="", # missing config will make it raise redis_host=local_redis_host, ) def test__journal_client__missing_journal_config_keys(local_redis_host): """Missing configuration on mandatory journal keys should raise""" kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=[ "journal-client", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "content", - "keys", ], config=yaml_cfg, # incomplete config will make the cli raise redis_host=local_redis_host, ) def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host): """Missing configuration on mandatory prefix key should raise""" journal_cfg_template = """ journal: brokers: - {broker} group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="prefix"): invoke( False, # Missing --prefix (and no config key) will make the cli raise [ "journal-client", "--stop-after-objects", "1", "--object-type", "content", - "messages", ], journal_cfg, redis_host=local_redis_host, ) def test__journal_client__missing_object_types_config_key( kafka_server, local_redis_host ): """Missing configuration on mandatory object-types key should raise""" journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer" ) with pytest.raises(ValueError, match="object_types"): invoke( False, # Missing --object-types (and no config key) will make the cli raise - ["journal-client", "--stop-after-objects", "1", "keys"], + ["journal-client", "--stop-after-objects", "1"], journal_cfg, redis_host=local_redis_host, ) -@pytest.mark.parametrize( - "message_handling, worker_fn", - [ - ("keys", "swh.counters.journal_client.process_journal_messages_by_keys"), - ("messages", "swh.counters.journal_client.process_journal_messages"), - ], -) -def test__journal_client__key_received( - mocker, kafka_server, local_redis_host, message_handling, worker_fn -): - mock = mocker.patch(worker_fn) +def test__journal_client__key_received(mocker, kafka_server, local_redis_host): + mock = mocker.patch("swh.counters.journal_client.process_journal_messages") + mock.return_value = 1 prefix = "swh.journal.objects" object_type = "content" topic = prefix + "." + object_type producer = Producer( {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",} ) value = value_to_kafka({"key": "value"}) key = b"message key" # Ensure empty messages are ignored producer.produce(topic=topic, key=b"emptymessage", value=None) producer.produce(topic=topic, key=key, value=value) producer.flush() journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "--stop-after-objects", "1", "--object-type", object_type, "--prefix", prefix, - message_handling, ], journal_cfg, redis_host=local_redis_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output assert mock.called assert mock.call_args[0][0]["content"] assert len(mock.call_args[0][0]) == 1 assert object_type in mock.call_args[0][0].keys() - - -def test__journal_client__no_journal_type_argument_should_raise( - kafka_server, local_redis_host -): - journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( - broker=kafka_server, prefix="prefix", group_id="test-consumer" - ) - - with pytest.raises(SystemExit): - invoke( - False, - [ - "journal-client", - "--stop-after-objects", - "1", - "--object-type", - "object_type", - "--prefix", - "prefix", - ], - journal_cfg, - redis_host=local_redis_host, - ) diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py index 9f2b13f..910d610 100644 --- a/swh/counters/tests/test_journal_client.py +++ b/swh/counters/tests/test_journal_client.py @@ -1,197 +1,181 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Optional -import pytest +import msgpack from swh.counters.journal_client import ( process_journal_messages, - process_journal_messages_by_keys, process_releases, process_revisions, ) from swh.counters.redis import Redis from swh.model.hashutil import hash_to_bytes from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, Timestamp, TimestampWithTimezone, ) PROCESSING_METHODS = { "release": "swh.counters.journal_client.process_releases", "revision": "swh.counters.journal_client.process_revisions", } DATE = TimestampWithTimezone( timestamp=Timestamp(seconds=0, microseconds=0), offset=0, negative_utc=False ) def _get_processing_method_mocks(mocker): return { message_type: mocker.patch(PROCESSING_METHODS[message_type]) for message_type in PROCESSING_METHODS.keys() } def _create_release(author_fullname: Optional[str]) -> Dict: """Use Release.to_dict to be sure the field's name used to retrieve the author is correct""" author = None if author_fullname: author = Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None) release = Release( - id=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), name=b"Release", message=b"Message", target=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), target_type=ObjectType.CONTENT, synthetic=True, author=author, ) return release.to_dict() -def _create_revision(author_fullname: str, committer_fullname: str) -> Revision: +def _create_revision(author_fullname: str, committer_fullname: str) -> Dict: """Use Revision.to_dict to be sure the names of the fields used to retrieve the author and the committer are correct""" revision = Revision( committer_date=DATE, date=None, type=RevisionType.GIT, parents=(), directory=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), synthetic=True, message=None, author=Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None), committer=Person( fullname=bytes(committer_fullname, "utf-8"), name=None, email=None ), ) return revision.to_dict() -RELEASES = [ - _create_release(author_fullname="author 1"), - _create_release(author_fullname="author 2"), - _create_release(author_fullname=None), -] +RELEASES = { + rel["id"]: msgpack.dumps(rel) + for rel in [ + _create_release(author_fullname="author 1"), + _create_release(author_fullname="author 2"), + _create_release(author_fullname=None), + ] +} RELEASES_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} -REVISIONS = [ - _create_revision(author_fullname="author 1", committer_fullname="committer 1"), - _create_revision(author_fullname="author 2", committer_fullname="committer 2"), - _create_revision(author_fullname="author 2", committer_fullname="committer 1"), - _create_revision(author_fullname="author 1", committer_fullname="committer 2"), -] +REVISIONS = { + rev["id"]: msgpack.dumps(rev) + for rev in [ + _create_revision(author_fullname="author 1", committer_fullname="committer 1"), + _create_revision(author_fullname="author 2", committer_fullname="committer 2"), + _create_revision(author_fullname="author 2", committer_fullname="committer 1"), + _create_revision(author_fullname="author 1", committer_fullname="committer 2"), + ] +} REVISIONS_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} REVISIONS_COMMITTER_FULLNAMES = {b"committer 1", b"committer 2"} REVISIONS_PERSON_FULLNAMES = REVISIONS_AUTHOR_FULLNAMES | REVISIONS_COMMITTER_FULLNAMES def test__journal_client__all_keys(mocker): mock = mocker.patch("swh.counters.redis.Redis.add") redis = Redis(host="localhost") - keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]} + keys = { + "coll1": {b"key1": b"value1", b"key2": b"value2"}, + "coll2": {b"key3": b"value3", b"key4": b"value4", b"key5": b"value5"}, + } - process_journal_messages_by_keys(messages=keys, counters=redis) + process_journal_messages(messages=keys, counters=redis) assert mock.call_count == 2 first_call_args = mock.call_args_list[0] assert first_call_args[0][0] == "coll1" assert first_call_args[0][1] == keys["coll1"] second_call_args = mock.call_args_list[1] assert second_call_args[0][0] == "coll2" assert second_call_args[0][1] == keys["coll2"] -def test__journal_client__unsupported_messages_do_nothin(mocker): - mocks = _get_processing_method_mocks(mocker) - - messages = {"fake_type": [{"keys": "value"}]} - - process_journal_messages(messages=messages, counters=None) - - for mocked_method in mocks.keys(): - assert not mocks[mocked_method].called - - -@pytest.mark.parametrize("message_type", ["release", "revision"]) -def test__journal_client__right_method_per_message_type_is_called(mocker, message_type): - mocks = _get_processing_method_mocks(mocker) - - messages = {message_type: [{"k1": "v1"}, {"k2": "v2"}]} - - process_journal_messages(messages=messages, counters=None) - - for mocked_method in mocks.keys(): - if mocked_method == message_type: - assert mocks[mocked_method].call_count == 1 - else: - assert not mocks[mocked_method].called - - def test__journal_client_process_revisions(mocker): mock = mocker.patch("swh.counters.redis.Redis.add") redis = Redis(host="localhost") process_revisions(REVISIONS, redis) assert mock.call_count == 1 first_call_args = mock.call_args_list[0] assert first_call_args[0][0] == "person" - assert first_call_args[0][1] == list(REVISIONS_PERSON_FULLNAMES) + assert sorted(first_call_args[0][1]) == sorted(REVISIONS_PERSON_FULLNAMES) def test__journal_client_process_releases(mocker): mock = mocker.patch("swh.counters.redis.Redis.add") redis = Redis(host="localhost") process_releases(RELEASES, redis) assert mock.call_count == 1 first_call_args = mock.call_args_list[0] assert first_call_args[0][0] == "person" assert first_call_args[0][1] == list(RELEASES_AUTHOR_FULLNAMES) def test__journal_client_process_releases_without_authors(mocker): mock = mocker.patch("swh.counters.redis.Redis.add") - releases = [ - _create_release(author_fullname=None), - _create_release(author_fullname=None), - ] + releases = { + rel["id"]: msgpack.dumps(rel) + for rel in [ + _create_release(author_fullname=None), + _create_release(author_fullname=None), + ] + } redis = Redis(host="localhost") process_releases(releases, redis) assert mock.called == 1 first_call_args = mock.call_args_list[0] assert first_call_args[0][0] == "person" assert first_call_args[0][1] == []