diff --git a/mypy.ini b/mypy.ini index 5cc88a5..7b769c0 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,18 +1,21 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-confluent_kafka.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True +[mypy-pytest_redis.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt index 2576dc1..d43a8d3 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ pytest pytest-mock confluent-kafka +pytest-redis diff --git a/requirements.txt b/requirements.txt index 1cde173..a9a9521 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +redis diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py index e5cb3cd..5c16c49 100644 --- a/swh/counters/journal_client.py +++ b/swh/counters/journal_client.py @@ -1,11 +1,17 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable +from swh.counters.redis import Redis -def process_journal_messages(messages: Dict[str, Iterable[Any]], *, counters): + +def process_journal_messages( + messages: Dict[str, Iterable[Any]], *, counters: Redis +) -> None: """Worker function for `JournalClient.process(worker_fn)`""" - pass + + for key in messages.keys(): + counters.add(key, messages[key]) diff --git a/swh/counters/redis.py b/swh/counters/redis.py index 8a5a5bd..beb1d64 100644 --- a/swh/counters/redis.py +++ b/swh/counters/redis.py @@ -1,20 +1,57 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging from typing import Any, Iterable +from redis.client import Redis as RedisClient +from redis.exceptions import ConnectionError + +DEFAULT_REDIS_PORT = 6379 + + +logger = logging.getLogger(__name__) + class Redis: + """Redis based implementation of the counters. + It uses one HyperLogLog collection per counter""" + + _redis_client = None + def __init__(self, host: str): - pass + host_port = host.split(":") + + if len(host_port) > 2: + raise ValueError("Invalid server url `%s`" % host) + + self.host = host_port[0] + self.port = int(host_port[1]) if len(host_port) > 1 else DEFAULT_REDIS_PORT + + @property + def redis_client(self) -> RedisClient: + + if self._redis_client is None: + self._redis_client = RedisClient(host=self.host, port=self.port) + + return self._redis_client def check(self): - pass + try: + return self.redis_client.ping() + except ConnectionError: + logger.exception("Unable to connect to the redis server") + return False def add(self, collection: str, keys: Iterable[Any]) -> None: - pass + redis = self.redis_client + pipeline = redis.pipeline(transaction=False) + + [pipeline.pfadd(collection, key) for key in keys] + + pipeline.execute() def get_count(self, collection: str) -> int: - pass + return self.redis_client.pfcount(collection) diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py index 53f7c9d..88ed98f 100644 --- a/swh/counters/tests/test_cli.py +++ b/swh/counters/tests/test_cli.py @@ -1,209 +1,208 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.counters.cli import counters_cli_group from swh.journal.serializers import value_to_kafka CLI_CONFIG = """ counters: cls: redis - host: - - %(redis_host)s + host: %(redis_host)s """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, redis_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write((CLI_CONFIG + config) % {"redis_host": redis_host}) config_fd.seek(0) result = runner.invoke(counters_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__worker_function_invoked( mocker, kafka_server, kafka_prefix, journal_config ): mock = mocker.patch("swh.counters.journal_client.process_journal_messages") mock.return_value = 1 producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) topic = f"{kafka_prefix}.content" value = value_to_kafka({"key": "value"}) producer.produce(topic=topic, key=b"message1", value=value) invoke( False, # Missing --object-types (and no config key) will make the cli raise ["journal-client", "--stop-after-objects", "1", "--object-type", "content"], journal_config, redis_host="localhost", ) assert mock.call_count == 1 def test__journal_client__missing_main_journal_config_key(): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, args=["journal-client", "--stop-after-objects", "1",], config="", # missing config will make it raise redis_host=None, ) def test__journal_client__missing_journal_config_keys(): """Missing configuration on mandatory journal keys should raise""" kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=[ "journal-client", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "content", ], config=yaml_cfg, # incomplete config will make the cli raise redis_host=None, ) def test__journal_client__missing_prefix_config_key(kafka_server): """Missing configuration on mandatory prefix key should raise""" journal_cfg_template = """ journal: brokers: - {broker} group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="prefix"): invoke( False, # Missing --prefix (and no config key) will make the cli raise [ "journal-client", "--stop-after-objects", "1", "--object-type", "content", ], journal_cfg, redis_host=None, ) def test__journal_client__missing_object_types_config_key(kafka_server): """Missing configuration on mandatory object-types key should raise""" journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer" ) with pytest.raises(ValueError, match="object_types"): invoke( False, # Missing --object-types (and no config key) will make the cli raise ["journal-client", "--stop-after-objects", "1",], journal_cfg, redis_host=None, ) def test__journal_client__key_received(mocker, kafka_server): mock = mocker.patch("swh.counters.journal_client.process_journal_messages") mock.return_value = 1 prefix = "swh.journal.objects" object_type = "content" topic = prefix + "." + object_type producer = Producer( {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",} ) value = value_to_kafka({"key": "value"}) key = b"message key" # Ensure empty messages are ignored producer.produce(topic=topic, key=b"emptymessage", value=None) producer.produce(topic=topic, key=key, value=value) producer.flush() journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "--stop-after-objects", "1", "--object-type", object_type, "--prefix", prefix, ], journal_cfg, redis_host=None, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output assert mock.call_args[0][0]["content"] assert len(mock.call_args[0][0]) == 1 assert object_type in mock.call_args[0][0].keys() assert key == mock.call_args[0][0][object_type][0] diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py new file mode 100644 index 0000000..806722d --- /dev/null +++ b/swh/counters/tests/test_journal_client.py @@ -0,0 +1,28 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.counters.journal_client import process_journal_messages +from swh.counters.redis import Redis + + +def test__journal_client__all_keys(mocker): + + mock = mocker.patch("swh.counters.redis.Redis.add") + + redis = Redis(host="localhost") + + keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]} + + process_journal_messages(messages=keys, counters=redis) + + assert mock.call_count == 2 + + first_call_args = mock.call_args_list[0] + assert first_call_args[0][0] == "coll1" + assert first_call_args[0][1] == keys["coll1"] + + second_call_args = mock.call_args_list[1] + assert second_call_args[0][0] == "coll2" + assert second_call_args[0][1] == keys["coll2"] diff --git a/swh/counters/tests/test_redis.py b/swh/counters/tests/test_redis.py new file mode 100644 index 0000000..f9d78fe --- /dev/null +++ b/swh/counters/tests/test_redis.py @@ -0,0 +1,68 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +from pytest_redis import factories + +from swh.counters.redis import DEFAULT_REDIS_PORT, Redis + +local_redis = factories.redis_proc(host="localhost") + + +def test__redis__constructor(): + r = Redis("fakehost") + assert r.host == "fakehost" + assert r.port == DEFAULT_REDIS_PORT + + r = Redis("host:11") + assert r.host == "host" + assert r.port == 11 + + with pytest.raises(ValueError, match="url"): + Redis("fake:host:port") + + +def test__redis__only_one_client_instantiation(mocker): + mock = mocker.patch("swh.counters.redis.RedisClient") + + r = Redis("redishost:1234") + + # ensure lazy loading + assert r._redis_client is None + + client = r.redis_client + + assert mock.call_count == 1 + args = mock.call_args[1] + assert args["host"] == "redishost" + assert args["port"] == 1234 + assert r._redis_client is not None + + client2 = r.redis_client + assert mock.call_count == 1 + assert client == client2 + + +def test__redis__ping_ko(): + r = Redis("wronghost") + assert r.check() is False + + +def test__redis__ping_ok(local_redis): + r = Redis("%s:%d" % (local_redis.host, local_redis.port)) + assert r.check() is True + + +def test__redis__collection(local_redis): + r = Redis("%s:%d" % (local_redis.host, local_redis.port)) + r.add("c1", [b"k1", b"k2", b"k3"]) + r.add("c2", [b"k1"]) + r.add("c3", [b"k2"]) + r.add("c3", [b"k5"]) + + assert 3 == r.get_count("c1") + assert 1 == r.get_count("c2") + assert 2 == r.get_count("c3") + assert 0 == r.get_count("c4")