diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,5 +14,8 @@ [mypy-pytest.*] ignore_missing_imports = True +[mypy-pytest_redis.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,4 @@ pytest pytest-mock confluent-kafka +pytest-redis diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +redis diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py --- a/swh/counters/journal_client.py +++ b/swh/counters/journal_client.py @@ -5,7 +5,13 @@ from typing import Any, Dict, Iterable +from swh.counters.redis import Redis -def process_journal_messages(messages: Dict[str, Iterable[Any]], *, counters): + +def process_journal_messages( + messages: Dict[str, Iterable[Any]], *, counters: Redis +) -> None: """Worker function for `JournalClient.process(worker_fn)`""" - pass + + for key in messages.keys(): + counters.add(key, messages[key]) diff --git a/swh/counters/redis.py b/swh/counters/redis.py --- a/swh/counters/redis.py +++ b/swh/counters/redis.py @@ -3,18 +3,55 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging from typing import Any, Iterable +from redis.client import Redis as RedisClient +from redis.exceptions import ConnectionError + +DEFAULT_REDIS_PORT = 6379 + + +logger = logging.getLogger(__name__) + class Redis: + """Redis based implementation of the counters. + It uses one HyperLogLog collection per counter""" + + _redis_client = None + def __init__(self, host: str): - pass + host_port = host.split(":") + + if len(host_port) > 2: + raise ValueError("Invalid server url `%s`" % host) + + self.host = host_port[0] + self.port = int(host_port[1]) if len(host_port) > 1 else DEFAULT_REDIS_PORT + + @property + def redis_client(self) -> RedisClient: + + if self._redis_client is None: + self._redis_client = RedisClient(host=self.host, port=self.port) + + return self._redis_client def check(self): - pass + try: + return self.redis_client.ping() + except ConnectionError: + logger.exception("Unable to connect to the redis server") + return False def add(self, collection: str, keys: Iterable[Any]) -> None: - pass + redis = self.redis_client + pipeline = redis.pipeline(transaction=False) + + [pipeline.pfadd(collection, key) for key in keys] + + pipeline.execute() def get_count(self, collection: str) -> int: - pass + return self.redis_client.pfcount(collection) diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py --- a/swh/counters/tests/test_cli.py +++ b/swh/counters/tests/test_cli.py @@ -17,8 +17,7 @@ CLI_CONFIG = """ counters: cls: redis - host: - - %(redis_host)s + host: %(redis_host)s """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/counters/tests/test_journal_client.py @@ -0,0 +1,28 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.counters.journal_client import process_journal_messages +from swh.counters.redis import Redis + + +def test__journal_client__all_keys(mocker): + + mock = mocker.patch("swh.counters.redis.Redis.add") + + redis = Redis(host="localhost") + + keys = {"coll1": [b"key1", b"key2"], "coll2": [b"key3", b"key4", b"key5"]} + + process_journal_messages(messages=keys, counters=redis) + + assert mock.call_count == 2 + + first_call_args = mock.call_args_list[0] + assert first_call_args[0][0] == "coll1" + assert first_call_args[0][1] == keys["coll1"] + + second_call_args = mock.call_args_list[1] + assert second_call_args[0][0] == "coll2" + assert second_call_args[0][1] == keys["coll2"] diff --git a/swh/counters/tests/test_redis.py b/swh/counters/tests/test_redis.py new file mode 100644 --- /dev/null +++ b/swh/counters/tests/test_redis.py @@ -0,0 +1,68 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest +from pytest_redis import factories + +from swh.counters.redis import DEFAULT_REDIS_PORT, Redis + +local_redis = factories.redis_proc(host="localhost") + + +def test__redis__constructor(): + r = Redis("fakehost") + assert r.host == "fakehost" + assert r.port == DEFAULT_REDIS_PORT + + r = Redis("host:11") + assert r.host == "host" + assert r.port == 11 + + with pytest.raises(ValueError, match="url"): + Redis("fake:host:port") + + +def test__redis__only_one_client_instantiation(mocker): + mock = mocker.patch("swh.counters.redis.RedisClient") + + r = Redis("redishost:1234") + + # ensure lazy loading + assert r._redis_client is None + + client = r.redis_client + + assert mock.call_count == 1 + args = mock.call_args[1] + assert args["host"] == "redishost" + assert args["port"] == 1234 + assert r._redis_client is not None + + client2 = r.redis_client + assert mock.call_count == 1 + assert client == client2 + + +def test__redis__ping_ko(): + r = Redis("wronghost") + assert r.check() is False + + +def test__redis__ping_ok(local_redis): + r = Redis("%s:%d" % (local_redis.host, local_redis.port)) + assert r.check() is True + + +def test__redis__collection(local_redis): + r = Redis("%s:%d" % (local_redis.host, local_redis.port)) + r.add("c1", [b"k1", b"k2", b"k3"]) + r.add("c2", [b"k1"]) + r.add("c3", [b"k2"]) + r.add("c3", [b"k5"]) + + assert 3 == r.get_count("c1") + assert 1 == r.get_count("c2") + assert 2 == r.get_count("c3") + assert 0 == r.get_count("c4")