Page MenuHomeSoftware Heritage

D5572.diff
No OneTemporary

D5572.diff

diff --git a/swh/counters/cli.py b/swh/counters/cli.py
--- a/swh/counters/cli.py
+++ b/swh/counters/cli.py
@@ -45,14 +45,26 @@
@click.option(
"--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)",
)
+@click.argument(
+ "journal_type", type=click.Choice(["keys", "messages"],),
+)
@click.pass_context
-def journal_client(ctx, stop_after_objects, object_type, prefix):
+def journal_client(ctx, stop_after_objects, object_type, prefix, journal_type):
"""Listens for new messages from the SWH Journal, and count them
+ if the 'journal_type' argument is 'keys', it will only count the distinct
+ keys for each listened topic, if it's 'messages', the messages
+ are deserialized to be able to count the distinct values
+ of internal properties of the objects.
`"""
import functools
+ from swh.journal.client import get_journal_client
+
from . import get_counters
- from .journal_client import process_journal_messages
+ from .journal_client import (
+ process_journal_messages,
+ process_journal_messages_by_keys,
+ )
config = ctx.obj["config"]
journal_cfg = config["journal"]
@@ -69,11 +81,20 @@
if journal_cfg["prefix"] is None:
raise ValueError("'prefix' must be specified by cli or configuration")
- client = KeyOrientedJournalClient(**journal_cfg,)
-
counters = get_counters(**config["counters"])
- worker_fn = functools.partial(process_journal_messages, counters=counters,)
+ if journal_type == "keys":
+ client = KeyOrientedJournalClient(**journal_cfg,)
+ worker_fn = functools.partial(
+ process_journal_messages_by_keys, counters=counters,
+ )
+ elif journal_type == "messages":
+ client = get_journal_client(cls="kafka", **journal_cfg,)
+ worker_fn = functools.partial(process_journal_messages, counters=counters,)
+
+ assert client is not None
+ assert worker_fn is not None
+
nb_messages = 0
try:
nb_messages = client.process(worker_fn)
diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py
--- a/swh/counters/journal_client.py
+++ b/swh/counters/journal_client.py
@@ -8,10 +8,48 @@
from swh.counters.redis import Redis
-def process_journal_messages(
+def process_journal_messages_by_keys(
messages: Dict[str, Iterable[Any]], *, counters: Redis
) -> None:
- """Worker function for `JournalClient.process(worker_fn)`"""
+ """Count the number of different keys for a given message type"""
for key in messages.keys():
counters.add(key, messages[key])
+
+
+def process_journal_messages(
+ messages: Dict[str, Iterable[Any]], *, counters: Redis
+) -> None:
+ """Count the number of different values of an object's property.
+ It allow for example to count the persons inside the
+ Release (authors) and Revision (authors and committers) classes
+ """
+
+ if "revision" in messages:
+ process_revisions(messages["revision"], counters)
+
+ if "release" in messages:
+ process_releases(messages["release"], counters)
+
+
+def process_revisions(revisions: Iterable[Dict], counters: Redis):
+ """Count the number of different authors and committers on the
+ revisions (in the person collection)"""
+ persons = set()
+ for revision in revisions:
+ persons.add(revision["author"]["fullname"])
+ persons.add(revision["committer"]["fullname"])
+
+ counters.add("person", list(persons))
+
+
+def process_releases(releases: Iterable[Dict], counters: Redis):
+ """Count the number of different authors on the
+ releases (in the person collection)"""
+ persons = set()
+ for release in releases:
+ author = release.get("author")
+ if author and "fullname" in author:
+ persons.add(author["fullname"])
+
+ counters.add("person", list(persons))
diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py
--- a/swh/counters/tests/conftest.py
+++ b/swh/counters/tests/conftest.py
@@ -33,3 +33,8 @@
# Cleanup redis between 2 tests
rc = RedisClient(host=redis_proc.host, port=redis_proc.port)
rc.flushall()
+
+
+@pytest.fixture
+def local_redis_host(local_redis):
+ return f"{local_redis.host}:{local_redis.port}"
diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py
--- a/swh/counters/tests/test_cli.py
+++ b/swh/counters/tests/test_cli.py
@@ -42,10 +42,9 @@
def test__journal_client__worker_function_invoked(
- mocker, kafka_server, kafka_prefix, journal_config
+ mocker, kafka_server, kafka_prefix, journal_config, local_redis_host
):
- mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
- mock.return_value = 1
+ mock = mocker.patch("swh.counters.journal_client.process_journal_messages_by_keys")
producer = Producer(
{
@@ -61,26 +60,33 @@
invoke(
False,
# Missing --object-types (and no config key) will make the cli raise
- ["journal-client", "--stop-after-objects", "1", "--object-type", "content"],
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ "content",
+ "keys",
+ ],
journal_config,
- redis_host="localhost",
+ redis_host=local_redis_host,
)
assert mock.call_count == 1
-def test__journal_client__missing_main_journal_config_key():
+def test__journal_client__missing_main_journal_config_key(local_redis_host):
"""Missing configuration on journal should raise"""
with pytest.raises(KeyError, match="journal"):
invoke(
catch_exceptions=False,
- args=["journal-client", "--stop-after-objects", "1",],
+ args=["journal-client", "--stop-after-objects", "1", "messages"],
config="", # missing config will make it raise
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__missing_journal_config_keys():
+def test__journal_client__missing_journal_config_keys(local_redis_host):
"""Missing configuration on mandatory journal keys should raise"""
kafka_prefix = "swh.journal.objects"
journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
@@ -106,13 +112,14 @@
kafka_prefix,
"--object-type",
"content",
+ "keys",
],
config=yaml_cfg, # incomplete config will make the cli raise
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__missing_prefix_config_key(kafka_server):
+def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host):
"""Missing configuration on mandatory prefix key should raise"""
journal_cfg_template = """
@@ -136,13 +143,16 @@
"1",
"--object-type",
"content",
+ "messages",
],
journal_cfg,
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__missing_object_types_config_key(kafka_server):
+def test__journal_client__missing_object_types_config_key(
+ kafka_server, local_redis_host
+):
"""Missing configuration on mandatory object-types key should raise"""
journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
@@ -153,15 +163,23 @@
invoke(
False,
# Missing --object-types (and no config key) will make the cli raise
- ["journal-client", "--stop-after-objects", "1",],
+ ["journal-client", "--stop-after-objects", "1", "keys"],
journal_cfg,
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__key_received(mocker, kafka_server):
- mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
- mock.return_value = 1
+@pytest.mark.parametrize(
+ "message_handling, worker_fn",
+ [
+ ("keys", "swh.counters.journal_client.process_journal_messages_by_keys"),
+ ("messages", "swh.counters.journal_client.process_journal_messages"),
+ ],
+)
+def test__journal_client__key_received(
+ mocker, kafka_server, local_redis_host, message_handling, worker_fn
+):
+ mock = mocker.patch(worker_fn)
prefix = "swh.journal.objects"
object_type = "content"
@@ -193,16 +211,41 @@
object_type,
"--prefix",
prefix,
+ message_handling,
],
journal_cfg,
- redis_host=None,
+ redis_host=local_redis_host,
)
# Check the output
expected_output = "Processed 1 messages.\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
+ assert mock.called
assert mock.call_args[0][0]["content"]
assert len(mock.call_args[0][0]) == 1
assert object_type in mock.call_args[0][0].keys()
- assert key == mock.call_args[0][0][object_type][0]
+
+
+def test__journal_client__no_journal_type_argument_should_raise(
+ kafka_server, local_redis_host
+):
+ journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix="prefix", group_id="test-consumer"
+ )
+
+ with pytest.raises(SystemExit):
+ invoke(
+ False,
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ "object_type",
+ "--prefix",
+ "prefix",
+ ],
+ journal_cfg,
+ redis_host=local_redis_host,
+ )
diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py
--- a/swh/counters/tests/test_journal_client.py
+++ b/swh/counters/tests/test_journal_client.py
@@ -3,8 +3,107 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.counters.journal_client import process_journal_messages
+from typing import Dict, Optional
+
+import pytest
+
+from swh.counters.journal_client import (
+ process_journal_messages,
+ process_journal_messages_by_keys,
+ process_releases,
+ process_revisions,
+)
from swh.counters.redis import Redis
+from swh.model.hashutil import hash_to_bytes
+from swh.model.model import (
+ ObjectType,
+ Person,
+ Release,
+ Revision,
+ RevisionType,
+ Timestamp,
+ TimestampWithTimezone,
+)
+
+PROCESSING_METHODS = {
+ "release": "swh.counters.journal_client.process_releases",
+ "revision": "swh.counters.journal_client.process_revisions",
+}
+
+DATE = TimestampWithTimezone(
+ timestamp=Timestamp(seconds=0, microseconds=0), offset=0, negative_utc=False
+)
+
+
+def _get_processing_method_mocks(mocker):
+ return {
+ message_type: mocker.patch(PROCESSING_METHODS[message_type])
+ for message_type in PROCESSING_METHODS.keys()
+ }
+
+
+def _create_release(author_fullname: Optional[str]) -> Dict:
+ """Use Release.to_dict to be sure the field's name used to retrieve
+ the author is correct"""
+
+ author = None
+ if author_fullname:
+ author = Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None)
+
+ release = Release(
+ id=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
+ name=b"Release",
+ message=b"Message",
+ target=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
+ target_type=ObjectType.CONTENT,
+ synthetic=True,
+ author=author,
+ )
+
+ return release.to_dict()
+
+
+def _create_revision(author_fullname: str, committer_fullname: str) -> Revision:
+ """Use Revision.to_dict to be sure the names of the fields used to retrieve
+ the author and the committer are correct"""
+ revision = Revision(
+ committer_date=DATE,
+ date=None,
+ type=RevisionType.GIT,
+ parents=(),
+ directory=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
+ synthetic=True,
+ message=None,
+ author=Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None),
+ committer=Person(
+ fullname=bytes(committer_fullname, "utf-8"), name=None, email=None
+ ),
+ )
+
+ return revision.to_dict()
+
+
+RELEASES = [
+ _create_release(author_fullname="author 1"),
+ _create_release(author_fullname="author 2"),
+ _create_release(author_fullname=None),
+]
+
+
+RELEASES_AUTHOR_FULLNAMES = {b"author 1", b"author 2"}
+
+
+REVISIONS = [
+ _create_revision(author_fullname="author 1", committer_fullname="committer 1"),
+ _create_revision(author_fullname="author 2", committer_fullname="committer 2"),
+ _create_revision(author_fullname="author 2", committer_fullname="committer 1"),
+ _create_revision(author_fullname="author 1", committer_fullname="committer 2"),
+]
+
+
+REVISIONS_AUTHOR_FULLNAMES = {b"author 1", b"author 2"}
+REVISIONS_COMMITTER_FULLNAMES = {b"committer 1", b"committer 2"}
+REVISIONS_PERSON_FULLNAMES = REVISIONS_AUTHOR_FULLNAMES | REVISIONS_COMMITTER_FULLNAMES
def test__journal_client__all_keys(mocker):
@@ -15,7 +114,7 @@
keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]}
- process_journal_messages(messages=keys, counters=redis)
+ process_journal_messages_by_keys(messages=keys, counters=redis)
assert mock.call_count == 2
@@ -26,3 +125,73 @@
second_call_args = mock.call_args_list[1]
assert second_call_args[0][0] == "coll2"
assert second_call_args[0][1] == keys["coll2"]
+
+
+def test__journal_client__unsupported_messages_do_nothin(mocker):
+ mocks = _get_processing_method_mocks(mocker)
+
+ messages = {"fake_type": [{"keys": "value"}]}
+
+ process_journal_messages(messages=messages, counters=None)
+
+ for mocked_method in mocks.keys():
+ assert not mocks[mocked_method].called
+
+
+@pytest.mark.parametrize("message_type", ["release", "revision"])
+def test__journal_client__right_method_per_message_type_is_called(mocker, message_type):
+ mocks = _get_processing_method_mocks(mocker)
+
+ messages = {message_type: [{"k1": "v1"}, {"k2": "v2"}]}
+
+ process_journal_messages(messages=messages, counters=None)
+
+ for mocked_method in mocks.keys():
+ if mocked_method == message_type:
+ assert mocks[mocked_method].call_count == 1
+ else:
+ assert not mocks[mocked_method].called
+
+
+def test__journal_client_process_revisions(mocker):
+ mock = mocker.patch("swh.counters.redis.Redis.add")
+
+ redis = Redis(host="localhost")
+
+ process_revisions(REVISIONS, redis)
+
+ assert mock.call_count == 1
+ first_call_args = mock.call_args_list[0]
+ assert first_call_args[0][0] == "person"
+ assert first_call_args[0][1] == list(REVISIONS_PERSON_FULLNAMES)
+
+
+def test__journal_client_process_releases(mocker):
+ mock = mocker.patch("swh.counters.redis.Redis.add")
+
+ redis = Redis(host="localhost")
+
+ process_releases(RELEASES, redis)
+
+ assert mock.call_count == 1
+ first_call_args = mock.call_args_list[0]
+ assert first_call_args[0][0] == "person"
+ assert first_call_args[0][1] == list(RELEASES_AUTHOR_FULLNAMES)
+
+
+def test__journal_client_process_releases_without_authors(mocker):
+ mock = mocker.patch("swh.counters.redis.Redis.add")
+
+ releases = [
+ _create_release(author_fullname=None),
+ _create_release(author_fullname=None),
+ ]
+
+ redis = Redis(host="localhost")
+
+ process_releases(releases, redis)
+
+ assert mock.called == 1
+ first_call_args = mock.call_args_list[0]
+ assert first_call_args[0][0] == "person"
+ assert first_call_args[0][1] == []

File Metadata

Mime Type
text/plain
Expires
Wed, Jul 2, 11:23 AM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220369

Event Timeline