Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9313089
D5572.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D5572.diff
View Options
diff --git a/swh/counters/cli.py b/swh/counters/cli.py
--- a/swh/counters/cli.py
+++ b/swh/counters/cli.py
@@ -45,14 +45,26 @@
@click.option(
"--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)",
)
+@click.argument(
+ "journal_type", type=click.Choice(["keys", "messages"],),
+)
@click.pass_context
-def journal_client(ctx, stop_after_objects, object_type, prefix):
+def journal_client(ctx, stop_after_objects, object_type, prefix, journal_type):
"""Listens for new messages from the SWH Journal, and count them
+ if the 'journal_type' argument is 'keys', it will only count the distinct
+ keys for each listened topic, if it's 'messages', the messages
+ are deserialized to be able to count the distinct values
+ of internal properties of the objects.
`"""
import functools
+ from swh.journal.client import get_journal_client
+
from . import get_counters
- from .journal_client import process_journal_messages
+ from .journal_client import (
+ process_journal_messages,
+ process_journal_messages_by_keys,
+ )
config = ctx.obj["config"]
journal_cfg = config["journal"]
@@ -69,11 +81,20 @@
if journal_cfg["prefix"] is None:
raise ValueError("'prefix' must be specified by cli or configuration")
- client = KeyOrientedJournalClient(**journal_cfg,)
-
counters = get_counters(**config["counters"])
- worker_fn = functools.partial(process_journal_messages, counters=counters,)
+ if journal_type == "keys":
+ client = KeyOrientedJournalClient(**journal_cfg,)
+ worker_fn = functools.partial(
+ process_journal_messages_by_keys, counters=counters,
+ )
+ elif journal_type == "messages":
+ client = get_journal_client(cls="kafka", **journal_cfg,)
+ worker_fn = functools.partial(process_journal_messages, counters=counters,)
+
+ assert client is not None
+ assert worker_fn is not None
+
nb_messages = 0
try:
nb_messages = client.process(worker_fn)
diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py
--- a/swh/counters/journal_client.py
+++ b/swh/counters/journal_client.py
@@ -8,10 +8,48 @@
from swh.counters.redis import Redis
-def process_journal_messages(
+def process_journal_messages_by_keys(
messages: Dict[str, Iterable[Any]], *, counters: Redis
) -> None:
- """Worker function for `JournalClient.process(worker_fn)`"""
+ """Count the number of different keys for a given message type"""
for key in messages.keys():
counters.add(key, messages[key])
+
+
+def process_journal_messages(
+ messages: Dict[str, Iterable[Any]], *, counters: Redis
+) -> None:
+ """Count the number of different values of an object's property.
+ It allow for example to count the persons inside the
+ Release (authors) and Revision (authors and committers) classes
+ """
+
+ if "revision" in messages:
+ process_revisions(messages["revision"], counters)
+
+ if "release" in messages:
+ process_releases(messages["release"], counters)
+
+
+def process_revisions(revisions: Iterable[Dict], counters: Redis):
+ """Count the number of different authors and committers on the
+ revisions (in the person collection)"""
+ persons = set()
+ for revision in revisions:
+ persons.add(revision["author"]["fullname"])
+ persons.add(revision["committer"]["fullname"])
+
+ counters.add("person", list(persons))
+
+
+def process_releases(releases: Iterable[Dict], counters: Redis):
+ """Count the number of different authors on the
+ releases (in the person collection)"""
+ persons = set()
+ for release in releases:
+ author = release.get("author")
+ if author and "fullname" in author:
+ persons.add(author["fullname"])
+
+ counters.add("person", list(persons))
diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py
--- a/swh/counters/tests/conftest.py
+++ b/swh/counters/tests/conftest.py
@@ -33,3 +33,8 @@
# Cleanup redis between 2 tests
rc = RedisClient(host=redis_proc.host, port=redis_proc.port)
rc.flushall()
+
+
+@pytest.fixture
+def local_redis_host(local_redis):
+ return f"{local_redis.host}:{local_redis.port}"
diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py
--- a/swh/counters/tests/test_cli.py
+++ b/swh/counters/tests/test_cli.py
@@ -42,10 +42,9 @@
def test__journal_client__worker_function_invoked(
- mocker, kafka_server, kafka_prefix, journal_config
+ mocker, kafka_server, kafka_prefix, journal_config, local_redis_host
):
- mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
- mock.return_value = 1
+ mock = mocker.patch("swh.counters.journal_client.process_journal_messages_by_keys")
producer = Producer(
{
@@ -61,26 +60,33 @@
invoke(
False,
# Missing --object-types (and no config key) will make the cli raise
- ["journal-client", "--stop-after-objects", "1", "--object-type", "content"],
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ "content",
+ "keys",
+ ],
journal_config,
- redis_host="localhost",
+ redis_host=local_redis_host,
)
assert mock.call_count == 1
-def test__journal_client__missing_main_journal_config_key():
+def test__journal_client__missing_main_journal_config_key(local_redis_host):
"""Missing configuration on journal should raise"""
with pytest.raises(KeyError, match="journal"):
invoke(
catch_exceptions=False,
- args=["journal-client", "--stop-after-objects", "1",],
+ args=["journal-client", "--stop-after-objects", "1", "messages"],
config="", # missing config will make it raise
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__missing_journal_config_keys():
+def test__journal_client__missing_journal_config_keys(local_redis_host):
"""Missing configuration on mandatory journal keys should raise"""
kafka_prefix = "swh.journal.objects"
journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
@@ -106,13 +112,14 @@
kafka_prefix,
"--object-type",
"content",
+ "keys",
],
config=yaml_cfg, # incomplete config will make the cli raise
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__missing_prefix_config_key(kafka_server):
+def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host):
"""Missing configuration on mandatory prefix key should raise"""
journal_cfg_template = """
@@ -136,13 +143,16 @@
"1",
"--object-type",
"content",
+ "messages",
],
journal_cfg,
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__missing_object_types_config_key(kafka_server):
+def test__journal_client__missing_object_types_config_key(
+ kafka_server, local_redis_host
+):
"""Missing configuration on mandatory object-types key should raise"""
journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
@@ -153,15 +163,23 @@
invoke(
False,
# Missing --object-types (and no config key) will make the cli raise
- ["journal-client", "--stop-after-objects", "1",],
+ ["journal-client", "--stop-after-objects", "1", "keys"],
journal_cfg,
- redis_host=None,
+ redis_host=local_redis_host,
)
-def test__journal_client__key_received(mocker, kafka_server):
- mock = mocker.patch("swh.counters.journal_client.process_journal_messages")
- mock.return_value = 1
+@pytest.mark.parametrize(
+ "message_handling, worker_fn",
+ [
+ ("keys", "swh.counters.journal_client.process_journal_messages_by_keys"),
+ ("messages", "swh.counters.journal_client.process_journal_messages"),
+ ],
+)
+def test__journal_client__key_received(
+ mocker, kafka_server, local_redis_host, message_handling, worker_fn
+):
+ mock = mocker.patch(worker_fn)
prefix = "swh.journal.objects"
object_type = "content"
@@ -193,16 +211,41 @@
object_type,
"--prefix",
prefix,
+ message_handling,
],
journal_cfg,
- redis_host=None,
+ redis_host=local_redis_host,
)
# Check the output
expected_output = "Processed 1 messages.\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
+ assert mock.called
assert mock.call_args[0][0]["content"]
assert len(mock.call_args[0][0]) == 1
assert object_type in mock.call_args[0][0].keys()
- assert key == mock.call_args[0][0][object_type][0]
+
+
+def test__journal_client__no_journal_type_argument_should_raise(
+ kafka_server, local_redis_host
+):
+ journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
+ broker=kafka_server, prefix="prefix", group_id="test-consumer"
+ )
+
+ with pytest.raises(SystemExit):
+ invoke(
+ False,
+ [
+ "journal-client",
+ "--stop-after-objects",
+ "1",
+ "--object-type",
+ "object_type",
+ "--prefix",
+ "prefix",
+ ],
+ journal_cfg,
+ redis_host=local_redis_host,
+ )
diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py
--- a/swh/counters/tests/test_journal_client.py
+++ b/swh/counters/tests/test_journal_client.py
@@ -3,8 +3,107 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.counters.journal_client import process_journal_messages
+from typing import Dict, Optional
+
+import pytest
+
+from swh.counters.journal_client import (
+ process_journal_messages,
+ process_journal_messages_by_keys,
+ process_releases,
+ process_revisions,
+)
from swh.counters.redis import Redis
+from swh.model.hashutil import hash_to_bytes
+from swh.model.model import (
+ ObjectType,
+ Person,
+ Release,
+ Revision,
+ RevisionType,
+ Timestamp,
+ TimestampWithTimezone,
+)
+
+PROCESSING_METHODS = {
+ "release": "swh.counters.journal_client.process_releases",
+ "revision": "swh.counters.journal_client.process_revisions",
+}
+
+DATE = TimestampWithTimezone(
+ timestamp=Timestamp(seconds=0, microseconds=0), offset=0, negative_utc=False
+)
+
+
+def _get_processing_method_mocks(mocker):
+ return {
+ message_type: mocker.patch(PROCESSING_METHODS[message_type])
+ for message_type in PROCESSING_METHODS.keys()
+ }
+
+
+def _create_release(author_fullname: Optional[str]) -> Dict:
+ """Use Release.to_dict to be sure the field's name used to retrieve
+ the author is correct"""
+
+ author = None
+ if author_fullname:
+ author = Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None)
+
+ release = Release(
+ id=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
+ name=b"Release",
+ message=b"Message",
+ target=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
+ target_type=ObjectType.CONTENT,
+ synthetic=True,
+ author=author,
+ )
+
+ return release.to_dict()
+
+
+def _create_revision(author_fullname: str, committer_fullname: str) -> Revision:
+ """Use Revision.to_dict to be sure the names of the fields used to retrieve
+ the author and the committer are correct"""
+ revision = Revision(
+ committer_date=DATE,
+ date=None,
+ type=RevisionType.GIT,
+ parents=(),
+ directory=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"),
+ synthetic=True,
+ message=None,
+ author=Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None),
+ committer=Person(
+ fullname=bytes(committer_fullname, "utf-8"), name=None, email=None
+ ),
+ )
+
+ return revision.to_dict()
+
+
+RELEASES = [
+ _create_release(author_fullname="author 1"),
+ _create_release(author_fullname="author 2"),
+ _create_release(author_fullname=None),
+]
+
+
+RELEASES_AUTHOR_FULLNAMES = {b"author 1", b"author 2"}
+
+
+REVISIONS = [
+ _create_revision(author_fullname="author 1", committer_fullname="committer 1"),
+ _create_revision(author_fullname="author 2", committer_fullname="committer 2"),
+ _create_revision(author_fullname="author 2", committer_fullname="committer 1"),
+ _create_revision(author_fullname="author 1", committer_fullname="committer 2"),
+]
+
+
+REVISIONS_AUTHOR_FULLNAMES = {b"author 1", b"author 2"}
+REVISIONS_COMMITTER_FULLNAMES = {b"committer 1", b"committer 2"}
+REVISIONS_PERSON_FULLNAMES = REVISIONS_AUTHOR_FULLNAMES | REVISIONS_COMMITTER_FULLNAMES
def test__journal_client__all_keys(mocker):
@@ -15,7 +114,7 @@
keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]}
- process_journal_messages(messages=keys, counters=redis)
+ process_journal_messages_by_keys(messages=keys, counters=redis)
assert mock.call_count == 2
@@ -26,3 +125,73 @@
second_call_args = mock.call_args_list[1]
assert second_call_args[0][0] == "coll2"
assert second_call_args[0][1] == keys["coll2"]
+
+
+def test__journal_client__unsupported_messages_do_nothin(mocker):
+ mocks = _get_processing_method_mocks(mocker)
+
+ messages = {"fake_type": [{"keys": "value"}]}
+
+ process_journal_messages(messages=messages, counters=None)
+
+ for mocked_method in mocks.keys():
+ assert not mocks[mocked_method].called
+
+
+@pytest.mark.parametrize("message_type", ["release", "revision"])
+def test__journal_client__right_method_per_message_type_is_called(mocker, message_type):
+ mocks = _get_processing_method_mocks(mocker)
+
+ messages = {message_type: [{"k1": "v1"}, {"k2": "v2"}]}
+
+ process_journal_messages(messages=messages, counters=None)
+
+ for mocked_method in mocks.keys():
+ if mocked_method == message_type:
+ assert mocks[mocked_method].call_count == 1
+ else:
+ assert not mocks[mocked_method].called
+
+
+def test__journal_client_process_revisions(mocker):
+ mock = mocker.patch("swh.counters.redis.Redis.add")
+
+ redis = Redis(host="localhost")
+
+ process_revisions(REVISIONS, redis)
+
+ assert mock.call_count == 1
+ first_call_args = mock.call_args_list[0]
+ assert first_call_args[0][0] == "person"
+ assert first_call_args[0][1] == list(REVISIONS_PERSON_FULLNAMES)
+
+
+def test__journal_client_process_releases(mocker):
+ mock = mocker.patch("swh.counters.redis.Redis.add")
+
+ redis = Redis(host="localhost")
+
+ process_releases(RELEASES, redis)
+
+ assert mock.call_count == 1
+ first_call_args = mock.call_args_list[0]
+ assert first_call_args[0][0] == "person"
+ assert first_call_args[0][1] == list(RELEASES_AUTHOR_FULLNAMES)
+
+
+def test__journal_client_process_releases_without_authors(mocker):
+ mock = mocker.patch("swh.counters.redis.Redis.add")
+
+ releases = [
+ _create_release(author_fullname=None),
+ _create_release(author_fullname=None),
+ ]
+
+ redis = Redis(host="localhost")
+
+ process_releases(releases, redis)
+
+ assert mock.called == 1
+ first_call_args = mock.call_args_list[0]
+ assert first_call_args[0][0] == "person"
+ assert first_call_args[0][1] == []
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 11:23 AM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220369
Attached To
D5572: Implement the jounal client counting an internal property of an object
Event Timeline
Log In to Comment