diff --git a/swh/counters/cli.py b/swh/counters/cli.py --- a/swh/counters/cli.py +++ b/swh/counters/cli.py @@ -45,14 +45,26 @@ @click.option( "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)", ) +@click.argument( + "journal_type", type=click.Choice(["keys", "messages"],), +) @click.pass_context -def journal_client(ctx, stop_after_objects, object_type, prefix): +def journal_client(ctx, stop_after_objects, object_type, prefix, journal_type): """Listens for new messages from the SWH Journal, and count them + if the 'journal_type' argument is 'keys', it will only count the distinct + keys for each listened topic, if it's 'messages', the messages + are deserialized to be able to count the distinct values + of internal properties of the objects. `""" import functools + from swh.journal.client import get_journal_client + from . import get_counters - from .journal_client import process_journal_messages + from .journal_client import ( + process_journal_messages, + process_journal_messages_by_keys, + ) config = ctx.obj["config"] journal_cfg = config["journal"] @@ -69,11 +81,20 @@ if journal_cfg["prefix"] is None: raise ValueError("'prefix' must be specified by cli or configuration") - client = KeyOrientedJournalClient(**journal_cfg,) - counters = get_counters(**config["counters"]) - worker_fn = functools.partial(process_journal_messages, counters=counters,) + if journal_type == "keys": + client = KeyOrientedJournalClient(**journal_cfg,) + worker_fn = functools.partial( + process_journal_messages_by_keys, counters=counters, + ) + elif journal_type == "messages": + client = get_journal_client(cls="kafka", **journal_cfg,) + worker_fn = functools.partial(process_journal_messages, counters=counters,) + + assert client is not None + assert worker_fn is not None + nb_messages = 0 try: nb_messages = client.process(worker_fn) diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py --- a/swh/counters/journal_client.py +++ b/swh/counters/journal_client.py @@ -8,10 +8,48 @@ from swh.counters.redis import Redis -def process_journal_messages( +def process_journal_messages_by_keys( messages: Dict[str, Iterable[Any]], *, counters: Redis ) -> None: - """Worker function for `JournalClient.process(worker_fn)`""" + """Count the number of different keys for a given message type""" for key in messages.keys(): counters.add(key, messages[key]) + + +def process_journal_messages( + messages: Dict[str, Iterable[Any]], *, counters: Redis +) -> None: + """Count the number of different values of an object's property. + It allow for example to count the persons inside the + Release (authors) and Revision (authors and committers) classes + """ + + if "revision" in messages: + process_revisions(messages["revision"], counters) + + if "release" in messages: + process_releases(messages["release"], counters) + + +def process_revisions(revisions: Iterable[Dict], counters: Redis): + """Count the number of different authors and committers on the + revisions (in the person collection)""" + persons = set() + for revision in revisions: + persons.add(revision["author"]["fullname"]) + persons.add(revision["committer"]["fullname"]) + + counters.add("person", list(persons)) + + +def process_releases(releases: Iterable[Dict], counters: Redis): + """Count the number of different authors on the + releases (in the person collection)""" + persons = set() + for release in releases: + author = release.get("author") + if author and "fullname" in author: + persons.add(author["fullname"]) + + counters.add("person", list(persons)) diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py --- a/swh/counters/tests/conftest.py +++ b/swh/counters/tests/conftest.py @@ -33,3 +33,8 @@ # Cleanup redis between 2 tests rc = RedisClient(host=redis_proc.host, port=redis_proc.port) rc.flushall() + + +@pytest.fixture +def local_redis_host(local_redis): + return f"{local_redis.host}:{local_redis.port}" diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py --- a/swh/counters/tests/test_cli.py +++ b/swh/counters/tests/test_cli.py @@ -42,10 +42,9 @@ def test__journal_client__worker_function_invoked( - mocker, kafka_server, kafka_prefix, journal_config + mocker, kafka_server, kafka_prefix, journal_config, local_redis_host ): - mock = mocker.patch("swh.counters.journal_client.process_journal_messages") - mock.return_value = 1 + mock = mocker.patch("swh.counters.journal_client.process_journal_messages_by_keys") producer = Producer( { @@ -61,26 +60,33 @@ invoke( False, # Missing --object-types (and no config key) will make the cli raise - ["journal-client", "--stop-after-objects", "1", "--object-type", "content"], + [ + "journal-client", + "--stop-after-objects", + "1", + "--object-type", + "content", + "keys", + ], journal_config, - redis_host="localhost", + redis_host=local_redis_host, ) assert mock.call_count == 1 -def test__journal_client__missing_main_journal_config_key(): +def test__journal_client__missing_main_journal_config_key(local_redis_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, - args=["journal-client", "--stop-after-objects", "1",], + args=["journal-client", "--stop-after-objects", "1", "messages"], config="", # missing config will make it raise - redis_host=None, + redis_host=local_redis_host, ) -def test__journal_client__missing_journal_config_keys(): +def test__journal_client__missing_journal_config_keys(local_redis_host): """Missing configuration on mandatory journal keys should raise""" kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( @@ -106,13 +112,14 @@ kafka_prefix, "--object-type", "content", + "keys", ], config=yaml_cfg, # incomplete config will make the cli raise - redis_host=None, + redis_host=local_redis_host, ) -def test__journal_client__missing_prefix_config_key(kafka_server): +def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host): """Missing configuration on mandatory prefix key should raise""" journal_cfg_template = """ @@ -136,13 +143,16 @@ "1", "--object-type", "content", + "messages", ], journal_cfg, - redis_host=None, + redis_host=local_redis_host, ) -def test__journal_client__missing_object_types_config_key(kafka_server): +def test__journal_client__missing_object_types_config_key( + kafka_server, local_redis_host +): """Missing configuration on mandatory object-types key should raise""" journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( @@ -153,15 +163,23 @@ invoke( False, # Missing --object-types (and no config key) will make the cli raise - ["journal-client", "--stop-after-objects", "1",], + ["journal-client", "--stop-after-objects", "1", "keys"], journal_cfg, - redis_host=None, + redis_host=local_redis_host, ) -def test__journal_client__key_received(mocker, kafka_server): - mock = mocker.patch("swh.counters.journal_client.process_journal_messages") - mock.return_value = 1 +@pytest.mark.parametrize( + "message_handling, worker_fn", + [ + ("keys", "swh.counters.journal_client.process_journal_messages_by_keys"), + ("messages", "swh.counters.journal_client.process_journal_messages"), + ], +) +def test__journal_client__key_received( + mocker, kafka_server, local_redis_host, message_handling, worker_fn +): + mock = mocker.patch(worker_fn) prefix = "swh.journal.objects" object_type = "content" @@ -193,16 +211,41 @@ object_type, "--prefix", prefix, + message_handling, ], journal_cfg, - redis_host=None, + redis_host=local_redis_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output + assert mock.called assert mock.call_args[0][0]["content"] assert len(mock.call_args[0][0]) == 1 assert object_type in mock.call_args[0][0].keys() - assert key == mock.call_args[0][0][object_type][0] + + +def test__journal_client__no_journal_type_argument_should_raise( + kafka_server, local_redis_host +): + journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( + broker=kafka_server, prefix="prefix", group_id="test-consumer" + ) + + with pytest.raises(SystemExit): + invoke( + False, + [ + "journal-client", + "--stop-after-objects", + "1", + "--object-type", + "object_type", + "--prefix", + "prefix", + ], + journal_cfg, + redis_host=local_redis_host, + ) diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py --- a/swh/counters/tests/test_journal_client.py +++ b/swh/counters/tests/test_journal_client.py @@ -3,8 +3,107 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.counters.journal_client import process_journal_messages +from typing import Dict, Optional + +import pytest + +from swh.counters.journal_client import ( + process_journal_messages, + process_journal_messages_by_keys, + process_releases, + process_revisions, +) from swh.counters.redis import Redis +from swh.model.hashutil import hash_to_bytes +from swh.model.model import ( + ObjectType, + Person, + Release, + Revision, + RevisionType, + Timestamp, + TimestampWithTimezone, +) + +PROCESSING_METHODS = { + "release": "swh.counters.journal_client.process_releases", + "revision": "swh.counters.journal_client.process_revisions", +} + +DATE = TimestampWithTimezone( + timestamp=Timestamp(seconds=0, microseconds=0), offset=0, negative_utc=False +) + + +def _get_processing_method_mocks(mocker): + return { + message_type: mocker.patch(PROCESSING_METHODS[message_type]) + for message_type in PROCESSING_METHODS.keys() + } + + +def _create_release(author_fullname: Optional[str]) -> Dict: + """Use Release.to_dict to be sure the field's name used to retrieve + the author is correct""" + + author = None + if author_fullname: + author = Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None) + + release = Release( + id=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), + name=b"Release", + message=b"Message", + target=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), + target_type=ObjectType.CONTENT, + synthetic=True, + author=author, + ) + + return release.to_dict() + + +def _create_revision(author_fullname: str, committer_fullname: str) -> Revision: + """Use Revision.to_dict to be sure the names of the fields used to retrieve + the author and the committer are correct""" + revision = Revision( + committer_date=DATE, + date=None, + type=RevisionType.GIT, + parents=(), + directory=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), + synthetic=True, + message=None, + author=Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None), + committer=Person( + fullname=bytes(committer_fullname, "utf-8"), name=None, email=None + ), + ) + + return revision.to_dict() + + +RELEASES = [ + _create_release(author_fullname="author 1"), + _create_release(author_fullname="author 2"), + _create_release(author_fullname=None), +] + + +RELEASES_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} + + +REVISIONS = [ + _create_revision(author_fullname="author 1", committer_fullname="committer 1"), + _create_revision(author_fullname="author 2", committer_fullname="committer 2"), + _create_revision(author_fullname="author 2", committer_fullname="committer 1"), + _create_revision(author_fullname="author 1", committer_fullname="committer 2"), +] + + +REVISIONS_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} +REVISIONS_COMMITTER_FULLNAMES = {b"committer 1", b"committer 2"} +REVISIONS_PERSON_FULLNAMES = REVISIONS_AUTHOR_FULLNAMES | REVISIONS_COMMITTER_FULLNAMES def test__journal_client__all_keys(mocker): @@ -15,7 +114,7 @@ keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]} - process_journal_messages(messages=keys, counters=redis) + process_journal_messages_by_keys(messages=keys, counters=redis) assert mock.call_count == 2 @@ -26,3 +125,73 @@ second_call_args = mock.call_args_list[1] assert second_call_args[0][0] == "coll2" assert second_call_args[0][1] == keys["coll2"] + + +def test__journal_client__unsupported_messages_do_nothin(mocker): + mocks = _get_processing_method_mocks(mocker) + + messages = {"fake_type": [{"keys": "value"}]} + + process_journal_messages(messages=messages, counters=None) + + for mocked_method in mocks.keys(): + assert not mocks[mocked_method].called + + +@pytest.mark.parametrize("message_type", ["release", "revision"]) +def test__journal_client__right_method_per_message_type_is_called(mocker, message_type): + mocks = _get_processing_method_mocks(mocker) + + messages = {message_type: [{"k1": "v1"}, {"k2": "v2"}]} + + process_journal_messages(messages=messages, counters=None) + + for mocked_method in mocks.keys(): + if mocked_method == message_type: + assert mocks[mocked_method].call_count == 1 + else: + assert not mocks[mocked_method].called + + +def test__journal_client_process_revisions(mocker): + mock = mocker.patch("swh.counters.redis.Redis.add") + + redis = Redis(host="localhost") + + process_revisions(REVISIONS, redis) + + assert mock.call_count == 1 + first_call_args = mock.call_args_list[0] + assert first_call_args[0][0] == "person" + assert first_call_args[0][1] == list(REVISIONS_PERSON_FULLNAMES) + + +def test__journal_client_process_releases(mocker): + mock = mocker.patch("swh.counters.redis.Redis.add") + + redis = Redis(host="localhost") + + process_releases(RELEASES, redis) + + assert mock.call_count == 1 + first_call_args = mock.call_args_list[0] + assert first_call_args[0][0] == "person" + assert first_call_args[0][1] == list(RELEASES_AUTHOR_FULLNAMES) + + +def test__journal_client_process_releases_without_authors(mocker): + mock = mocker.patch("swh.counters.redis.Redis.add") + + releases = [ + _create_release(author_fullname=None), + _create_release(author_fullname=None), + ] + + redis = Redis(host="localhost") + + process_releases(releases, redis) + + assert mock.called == 1 + first_call_args = mock.call_args_list[0] + assert first_call_args[0][0] == "person" + assert first_call_args[0][1] == []