diff --git a/swh/counters/api/server.py b/swh/counters/api/server.py index 27e5ac3..9834e44 100644 --- a/swh/counters/api/server.py +++ b/swh/counters/api/server.py @@ -1,86 +1,118 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from typing import Any, Dict from swh.core import config from swh.core.api import RPCServerApp from swh.counters import get_counters from swh.counters.interface import CountersInterface logger = logging.getLogger(__name__) app = None def make_app(config: Dict[str, Any]) -> RPCServerApp: """Initialize the remote api application. """ app = RPCServerApp( __name__, backend_class=CountersInterface, backend_factory=lambda: get_counters(**config["counters"]), ) handler = logging.StreamHandler() app.logger.addHandler(handler) + app.config["counters"] = get_counters(**config["counters"]) + app.add_url_rule("/", "index", index) + app.add_url_rule("/metrics", "metrics", get_metrics) return app def index(): return "SWH Counters API server" def load_and_check_config(config_file: str) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file: Path to the configuration file to load type: configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) if "counters" not in cfg: raise KeyError("Missing 'counters' configuration") return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global app if app is None: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app = make_app(api_cfg) return app + + +def get_metrics(): + """expose the counters values in a prometheus format + + detailed format: + # HELP swh_archive_object_total Software Heritage Archive object counters + # TYPE swh_archive_object_total gauge + swh_archive_object_total{col="value",object_type=""} + ... + """ + + response = [ + "# HELP swh_archive_object_total Software Heritage Archive object counters", + "# TYPE swh_archive_object_total gauge", + ] + counters = app.config["counters"] + + for collection in counters.get_counters(): + collection_name = collection.decode("utf-8") + value = counters.get_count(collection) + line = 'swh_archive_object_total{col="value", object_type="%s"} %s' % ( + collection_name, + value, + ) + response.append(line) + response.append("") + + return "\n".join(response) diff --git a/swh/counters/interface.py b/swh/counters/interface.py index 5f53eb9..ce136c6 100644 --- a/swh/counters/interface.py +++ b/swh/counters/interface.py @@ -1,28 +1,32 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Iterable from swh.core.api import remote_api_endpoint class CountersInterface: @remote_api_endpoint("check") def check(self): """Dedicated method to execute some specific check per implementation. """ ... @remote_api_endpoint("add") def add(self, collection: str, keys: Iterable[Any]) -> None: """Add the provided keys to the collection Only count new keys. """ ... @remote_api_endpoint("count") def get_count(self, collection: str) -> int: """Return the number of keys for the provided collection""" ... + + @remote_api_endpoint("counters") + def get_counters(self) -> Iterable[str]: + """Return the list of managed counters""" diff --git a/swh/counters/redis.py b/swh/counters/redis.py index beb1d64..3862759 100644 --- a/swh/counters/redis.py +++ b/swh/counters/redis.py @@ -1,57 +1,60 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Iterable from redis.client import Redis as RedisClient from redis.exceptions import ConnectionError DEFAULT_REDIS_PORT = 6379 logger = logging.getLogger(__name__) class Redis: """Redis based implementation of the counters. It uses one HyperLogLog collection per counter""" _redis_client = None def __init__(self, host: str): host_port = host.split(":") if len(host_port) > 2: raise ValueError("Invalid server url `%s`" % host) self.host = host_port[0] self.port = int(host_port[1]) if len(host_port) > 1 else DEFAULT_REDIS_PORT @property def redis_client(self) -> RedisClient: if self._redis_client is None: self._redis_client = RedisClient(host=self.host, port=self.port) return self._redis_client def check(self): try: return self.redis_client.ping() except ConnectionError: logger.exception("Unable to connect to the redis server") return False def add(self, collection: str, keys: Iterable[Any]) -> None: redis = self.redis_client pipeline = redis.pipeline(transaction=False) [pipeline.pfadd(collection, key) for key in keys] pipeline.execute() def get_count(self, collection: str) -> int: return self.redis_client.pfcount(collection) + + def get_counters(self) -> Iterable[str]: + return self.redis_client.keys() diff --git a/swh/counters/tests/conftest.py b/swh/counters/tests/conftest.py index 517251b..1ca0da7 100644 --- a/swh/counters/tests/conftest.py +++ b/swh/counters/tests/conftest.py @@ -1,26 +1,35 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import pytest +from redis import Redis as RedisClient logger = logging.getLogger(__name__) JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ @pytest.fixture def journal_config(kafka_server, kafka_prefix) -> str: return JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, group_id="test-consumer", prefix=kafka_prefix ) + + +@pytest.fixture +def local_redis(redis_proc): + yield redis_proc + # Cleanup redis between 2 tests + rc = RedisClient(host=redis_proc.host, port=redis_proc.port) + rc.flushall() diff --git a/swh/counters/tests/test_redis.py b/swh/counters/tests/test_redis.py index f9d78fe..704b7c2 100644 --- a/swh/counters/tests/test_redis.py +++ b/swh/counters/tests/test_redis.py @@ -1,68 +1,80 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from pytest_redis import factories +from redis import Redis as RedisClient from swh.counters.redis import DEFAULT_REDIS_PORT, Redis -local_redis = factories.redis_proc(host="localhost") - def test__redis__constructor(): r = Redis("fakehost") assert r.host == "fakehost" assert r.port == DEFAULT_REDIS_PORT r = Redis("host:11") assert r.host == "host" assert r.port == 11 with pytest.raises(ValueError, match="url"): Redis("fake:host:port") def test__redis__only_one_client_instantiation(mocker): mock = mocker.patch("swh.counters.redis.RedisClient") r = Redis("redishost:1234") # ensure lazy loading assert r._redis_client is None client = r.redis_client assert mock.call_count == 1 args = mock.call_args[1] assert args["host"] == "redishost" assert args["port"] == 1234 assert r._redis_client is not None client2 = r.redis_client assert mock.call_count == 1 assert client == client2 def test__redis__ping_ko(): r = Redis("wronghost") assert r.check() is False def test__redis__ping_ok(local_redis): r = Redis("%s:%d" % (local_redis.host, local_redis.port)) assert r.check() is True def test__redis__collection(local_redis): r = Redis("%s:%d" % (local_redis.host, local_redis.port)) r.add("c1", [b"k1", b"k2", b"k3"]) r.add("c2", [b"k1"]) r.add("c3", [b"k2"]) r.add("c3", [b"k5"]) assert 3 == r.get_count("c1") assert 1 == r.get_count("c2") assert 2 == r.get_count("c3") assert 0 == r.get_count("c4") + + +def test__redis__collections(local_redis): + client = RedisClient(host=local_redis.host, port=local_redis.port) + client.pfadd("counter1", b"k1") + client.pfadd("counter2", b"k2") + + r = Redis("%s:%d" % (local_redis.host, local_redis.port)) + + counters = r.get_counters() + + assert 2 == len(counters) + assert b"counter1" in counters + assert b"counter2" in counters diff --git a/swh/counters/tests/test_server.py b/swh/counters/tests/test_server.py index 80eb964..7fe4468 100644 --- a/swh/counters/tests/test_server.py +++ b/swh/counters/tests/test_server.py @@ -1,99 +1,145 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import re from typing import Any, Dict import pytest +from redis import Redis as RedisClient import yaml from swh.core.api import RPCServerApp from swh.counters.api import server from swh.counters.api.server import load_and_check_config, make_app_from_configfile def teardown_function(): # Ensure there is no configuration loaded from a previous test - server.api = None + server.app = None @pytest.fixture def swh_counters_server_config() -> Dict[str, Any]: - return {"counters": {"cls": "redis", "hosts": "redis",}} + return {"counters": {"cls": "redis", "host": "redis",}} @pytest.fixture def swh_counters_server_config_on_disk( tmp_path, monkeypatch, swh_counters_server_config ) -> str: return _environment_config_file(tmp_path, monkeypatch, swh_counters_server_config) def write_config_file(tmpdir, config_dict: Dict, name: str = "config.yml") -> str: """Prepare configuration file in `$tmpdir/name` with content `content`. Args: tmpdir (LocalPath): root directory content: Content of the file either as string or as a dict. If a dict, converts the dict into a yaml string. name: configuration filename Returns path of the configuration file prepared. """ config_path = tmpdir / name config_path.write_text(yaml.dump(config_dict), encoding="utf-8") # pytest on python3.5 does not support LocalPath manipulation, so # convert path to string return str(config_path) def _environment_config_file(tmp_path, monkeypatch, content): conf_path = write_config_file(tmp_path, content) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) @pytest.mark.parametrize("config_file", [None, ""]) def test_load_and_check_config_no_configuration(config_file): """Inexistent configuration files raises""" with pytest.raises(EnvironmentError, match="Configuration file must be defined"): load_and_check_config(config_file) def test_load_and_check_config_inexistent_file(): config_path = "/some/inexistent/config.yml" expected_error = f"Configuration file {config_path} does not exist" with pytest.raises(EnvironmentError, match=expected_error): load_and_check_config(config_path) def test_load_and_check_config_wrong_configuration(tmpdir): """Wrong configuration raises""" config_path = write_config_file(tmpdir, {"something": "useless"}) with pytest.raises(KeyError, match="Missing 'counters' configuration"): load_and_check_config(config_path) def test_server_make_app_from_config_file(swh_counters_server_config_on_disk): app = make_app_from_configfile() assert app is not None assert isinstance(app, RPCServerApp) app2 = make_app_from_configfile() assert app is app2 def test_server_index(swh_counters_server_config_on_disk, mocker): """Test the result of the main page""" app = make_app_from_configfile() app.config["TESTING"] = True tc = app.test_client() r = tc.get("/") assert 200 == r.status_code assert b"SWH Counters" in r.get_data() + + +def test_server_metrics(local_redis, tmp_path, monkeypatch): + """Test the metrics generation""" + + rc = RedisClient(host=local_redis.host, port=local_redis.port) + data = { + "col1": 1, + "col2": 4, + "col3": 6, + "col4": 10, + } + + for coll in data.keys(): + for i in range(0, data[coll]): + rc.pfadd(coll, i) + + cfg = { + "counters": {"cls": "redis", "host": f"{local_redis.host}:{local_redis.port}"} + } + _environment_config_file(tmp_path, monkeypatch, cfg) + + app = make_app_from_configfile() + app.config["TESTING"] = True + tc = app.test_client() + + r = tc.get("/metrics") + + assert 200 == r.status_code + + response = r.get_data().decode("utf-8") + + assert "HELP" in response + assert "TYPE" in response + + for collection in data.keys(): + obj_type = f'object_type="{collection}"' + assert obj_type in response + + pattern = r'swh_archive_object_total{col="value", object_type="%s"} (\d+)' % ( + collection + ) + m = re.search(pattern, response) + assert data[collection] == int(m.group(1))