diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,9 @@ [mypy-confluent_kafka.*] ignore_missing_imports = True +[mypy-msgpack.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/swh/counters/cli.py b/swh/counters/cli.py --- a/swh/counters/cli.py +++ b/swh/counters/cli.py @@ -45,26 +45,14 @@ @click.option( "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)", ) -@click.argument( - "journal_type", type=click.Choice(["keys", "messages"],), -) @click.pass_context -def journal_client(ctx, stop_after_objects, object_type, prefix, journal_type): +def journal_client(ctx, stop_after_objects, object_type, prefix): """Listens for new messages from the SWH Journal, and count them - if the 'journal_type' argument is 'keys', it will only count the distinct - keys for each listened topic, if it's 'messages', the messages - are deserialized to be able to count the distinct values - of internal properties of the objects. - `""" + """ import functools - from swh.journal.client import get_journal_client - from . import get_counters - from .journal_client import ( - process_journal_messages, - process_journal_messages_by_keys, - ) + from .journal_client import process_journal_messages config = ctx.obj["config"] journal_cfg = config["journal"] @@ -83,17 +71,9 @@ counters = get_counters(**config["counters"]) - if journal_type == "keys": - client = KeyOrientedJournalClient(**journal_cfg,) - worker_fn = functools.partial( - process_journal_messages_by_keys, counters=counters, - ) - elif journal_type == "messages": - client = get_journal_client(cls="kafka", **journal_cfg,) - worker_fn = functools.partial(process_journal_messages, counters=counters,) - - assert client is not None - assert worker_fn is not None + client = KeyOrientedJournalClient(**journal_cfg) + + worker_fn = functools.partial(process_journal_messages, counters=counters) nb_messages = 0 try: diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py --- a/swh/counters/journal_client.py +++ b/swh/counters/journal_client.py @@ -3,27 +3,22 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable - -from swh.counters.redis import Redis +from typing import Dict +import msgpack -def process_journal_messages_by_keys( - messages: Dict[str, Iterable[Any]], *, counters: Redis -) -> None: - """Count the number of different keys for a given message type""" - - for key in messages.keys(): - counters.add(key, messages[key]) +from swh.counters.redis import Redis def process_journal_messages( - messages: Dict[str, Iterable[Any]], *, counters: Redis + messages: Dict[str, Dict[bytes, bytes]], *, counters: Redis ) -> None: """Count the number of different values of an object's property. It allow for example to count the persons inside the Release (authors) and Revision (authors and committers) classes """ + for key in messages.keys(): + counters.add(key, messages[key]) if "revision" in messages: process_revisions(messages["revision"], counters) @@ -32,22 +27,24 @@ process_releases(messages["release"], counters) -def process_revisions(revisions: Iterable[Dict], counters: Redis): +def process_revisions(revisions: Dict[bytes, bytes], counters: Redis): """Count the number of different authors and committers on the revisions (in the person collection)""" persons = set() - for revision in revisions: + for revision_bytes in revisions.values(): + revision = msgpack.loads(revision_bytes) persons.add(revision["author"]["fullname"]) persons.add(revision["committer"]["fullname"]) counters.add("person", list(persons)) -def process_releases(releases: Iterable[Dict], counters: Redis): +def process_releases(releases: Dict[bytes, bytes], counters: Redis): """Count the number of different authors on the releases (in the person collection)""" persons = set() - for release in releases: + for release_bytes in releases.values(): + release = msgpack.loads(release_bytes) author = release.get("author") if author and "fullname" in author: persons.add(author["fullname"]) diff --git a/swh/counters/kafka_client.py b/swh/counters/kafka_client.py --- a/swh/counters/kafka_client.py +++ b/swh/counters/kafka_client.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information from collections import defaultdict -from typing import Any, Dict, List +from typing import Dict from confluent_kafka import KafkaError @@ -12,13 +12,15 @@ class KeyOrientedJournalClient(JournalClient): - """Journal Client implementation which only uses the message keys. - This does not need to bother with the message deserialization (contrary - to `swh.journal.client.JournalClient`) + """Journal Client implementation which only decodes the message keys. + This does not need to bother with the message deserialization (contrary + to :class:`swh.journal.client.JournalClient`) + Message values are still passed unparsed to ``worker_fn`` so it can + deserialize and use it if needed. """ def handle_messages(self, messages, worker_fn): - keys: Dict[str, List[Any]] = defaultdict(list) + objects: Dict[str, Dict[bytes, bytes]] = defaultdict(dict) nb_processed = 0 for message in messages: @@ -34,10 +36,10 @@ continue nb_processed += 1 object_type = message.topic().split(".")[-1] - keys[object_type].append(message.key()) + objects[object_type][message.key()] = message.value() - if keys: - worker_fn(dict(keys)) + if objects: + worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py --- a/swh/counters/tests/test_cli.py +++ b/swh/counters/tests/test_cli.py @@ -44,7 +44,7 @@ def test__journal_client__worker_function_invoked( mocker, kafka_server, kafka_prefix, journal_config, local_redis_host ): - mock = mocker.patch("swh.counters.journal_client.process_journal_messages_by_keys") + mock = mocker.patch("swh.counters.journal_client.process_journal_messages") producer = Producer( { @@ -60,14 +60,7 @@ invoke( False, # Missing --object-types (and no config key) will make the cli raise - [ - "journal-client", - "--stop-after-objects", - "1", - "--object-type", - "content", - "keys", - ], + ["journal-client", "--stop-after-objects", "1", "--object-type", "content",], journal_config, redis_host=local_redis_host, ) @@ -80,7 +73,7 @@ with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, - args=["journal-client", "--stop-after-objects", "1", "messages"], + args=["journal-client", "--stop-after-objects", "1"], config="", # missing config will make it raise redis_host=local_redis_host, ) @@ -112,7 +105,6 @@ kafka_prefix, "--object-type", "content", - "keys", ], config=yaml_cfg, # incomplete config will make the cli raise redis_host=local_redis_host, @@ -143,7 +135,6 @@ "1", "--object-type", "content", - "messages", ], journal_cfg, redis_host=local_redis_host, @@ -163,23 +154,15 @@ invoke( False, # Missing --object-types (and no config key) will make the cli raise - ["journal-client", "--stop-after-objects", "1", "keys"], + ["journal-client", "--stop-after-objects", "1"], journal_cfg, redis_host=local_redis_host, ) -@pytest.mark.parametrize( - "message_handling, worker_fn", - [ - ("keys", "swh.counters.journal_client.process_journal_messages_by_keys"), - ("messages", "swh.counters.journal_client.process_journal_messages"), - ], -) -def test__journal_client__key_received( - mocker, kafka_server, local_redis_host, message_handling, worker_fn -): - mock = mocker.patch(worker_fn) +def test__journal_client__key_received(mocker, kafka_server, local_redis_host): + mock = mocker.patch("swh.counters.journal_client.process_journal_messages") + mock.return_value = 1 prefix = "swh.journal.objects" object_type = "content" @@ -211,7 +194,6 @@ object_type, "--prefix", prefix, - message_handling, ], journal_cfg, redis_host=local_redis_host, @@ -225,27 +207,3 @@ assert mock.call_args[0][0]["content"] assert len(mock.call_args[0][0]) == 1 assert object_type in mock.call_args[0][0].keys() - - -def test__journal_client__no_journal_type_argument_should_raise( - kafka_server, local_redis_host -): - journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( - broker=kafka_server, prefix="prefix", group_id="test-consumer" - ) - - with pytest.raises(SystemExit): - invoke( - False, - [ - "journal-client", - "--stop-after-objects", - "1", - "--object-type", - "object_type", - "--prefix", - "prefix", - ], - journal_cfg, - redis_host=local_redis_host, - ) diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py --- a/swh/counters/tests/test_journal_client.py +++ b/swh/counters/tests/test_journal_client.py @@ -5,11 +5,10 @@ from typing import Dict, Optional -import pytest +import msgpack from swh.counters.journal_client import ( process_journal_messages, - process_journal_messages_by_keys, process_releases, process_revisions, ) @@ -51,7 +50,6 @@ author = Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None) release = Release( - id=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), name=b"Release", message=b"Message", target=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), @@ -63,7 +61,7 @@ return release.to_dict() -def _create_revision(author_fullname: str, committer_fullname: str) -> Revision: +def _create_revision(author_fullname: str, committer_fullname: str) -> Dict: """Use Revision.to_dict to be sure the names of the fields used to retrieve the author and the committer are correct""" revision = Revision( @@ -83,22 +81,28 @@ return revision.to_dict() -RELEASES = [ - _create_release(author_fullname="author 1"), - _create_release(author_fullname="author 2"), - _create_release(author_fullname=None), -] +RELEASES = { + rel["id"]: msgpack.dumps(rel) + for rel in [ + _create_release(author_fullname="author 1"), + _create_release(author_fullname="author 2"), + _create_release(author_fullname=None), + ] +} RELEASES_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} -REVISIONS = [ - _create_revision(author_fullname="author 1", committer_fullname="committer 1"), - _create_revision(author_fullname="author 2", committer_fullname="committer 2"), - _create_revision(author_fullname="author 2", committer_fullname="committer 1"), - _create_revision(author_fullname="author 1", committer_fullname="committer 2"), -] +REVISIONS = { + rev["id"]: msgpack.dumps(rev) + for rev in [ + _create_revision(author_fullname="author 1", committer_fullname="committer 1"), + _create_revision(author_fullname="author 2", committer_fullname="committer 2"), + _create_revision(author_fullname="author 2", committer_fullname="committer 1"), + _create_revision(author_fullname="author 1", committer_fullname="committer 2"), + ] +} REVISIONS_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} @@ -112,9 +116,12 @@ redis = Redis(host="localhost") - keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]} + keys = { + "coll1": {b"key1": b"value1", b"key2": b"value2"}, + "coll2": {b"key3": b"value3", b"key4": b"value4", b"key5": b"value5"}, + } - process_journal_messages_by_keys(messages=keys, counters=redis) + process_journal_messages(messages=keys, counters=redis) assert mock.call_count == 2 @@ -127,32 +134,6 @@ assert second_call_args[0][1] == keys["coll2"] -def test__journal_client__unsupported_messages_do_nothin(mocker): - mocks = _get_processing_method_mocks(mocker) - - messages = {"fake_type": [{"keys": "value"}]} - - process_journal_messages(messages=messages, counters=None) - - for mocked_method in mocks.keys(): - assert not mocks[mocked_method].called - - -@pytest.mark.parametrize("message_type", ["release", "revision"]) -def test__journal_client__right_method_per_message_type_is_called(mocker, message_type): - mocks = _get_processing_method_mocks(mocker) - - messages = {message_type: [{"k1": "v1"}, {"k2": "v2"}]} - - process_journal_messages(messages=messages, counters=None) - - for mocked_method in mocks.keys(): - if mocked_method == message_type: - assert mocks[mocked_method].call_count == 1 - else: - assert not mocks[mocked_method].called - - def test__journal_client_process_revisions(mocker): mock = mocker.patch("swh.counters.redis.Redis.add") @@ -163,7 +144,7 @@ assert mock.call_count == 1 first_call_args = mock.call_args_list[0] assert first_call_args[0][0] == "person" - assert first_call_args[0][1] == list(REVISIONS_PERSON_FULLNAMES) + assert sorted(first_call_args[0][1]) == sorted(REVISIONS_PERSON_FULLNAMES) def test__journal_client_process_releases(mocker): @@ -182,10 +163,13 @@ def test__journal_client_process_releases_without_authors(mocker): mock = mocker.patch("swh.counters.redis.Redis.add") - releases = [ - _create_release(author_fullname=None), - _create_release(author_fullname=None), - ] + releases = { + rel["id"]: msgpack.dumps(rel) + for rel in [ + _create_release(author_fullname=None), + _create_release(author_fullname=None), + ] + } redis = Redis(host="localhost")