diff --git a/swh/counters/api/server.py b/swh/counters/api/server.py index 717615f..09a2e9b 100644 --- a/swh/counters/api/server.py +++ b/swh/counters/api/server.py @@ -1,140 +1,138 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from typing import Any, Dict, Optional from flask import abort, jsonify from swh.core import config from swh.core.api import RPCServerApp from swh.counters import get_counters, get_history from swh.counters.interface import CountersInterface, HistoryInterface logger = logging.getLogger(__name__) app: Optional[RPCServerApp] = None def make_app(config: Dict[str, Any]) -> RPCServerApp: - """Initialize the remote api application. - - """ + """Initialize the remote api application.""" app = RPCServerApp( __name__, backend_class=CountersInterface, backend_factory=lambda: get_counters(**config["counters"]), ) handler = logging.StreamHandler() app.logger.addHandler(handler) app.config["counters"] = get_counters(**config["counters"]) if "history" in config: app.add_backend_class( backend_class=HistoryInterface, backend_factory=lambda: get_history(**config["history"]), ) app.config["history"] = get_history(**config["history"]) app.add_url_rule( "/counters_history/", "history", get_history_file_content ) app.add_url_rule("/", "index", index) app.add_url_rule("/metrics", "metrics", get_metrics) return app def index(): return "SWH Counters API server" def load_and_check_config(config_file: str) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file: Path to the configuration file to load type: configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) if "counters" not in cfg: raise KeyError("Missing 'counters' configuration") return cfg def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from - a configuration file. + a configuration file. - SWH_CONFIG_FILENAME environment variable defines the - configuration path to load. + SWH_CONFIG_FILENAME environment variable defines the + configuration path to load. """ global app if app is None: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app = make_app(api_cfg) return app def get_metrics(): """expose the counters values in a prometheus format - detailed format: - # HELP swh_archive_object_total Software Heritage Archive object counters - # TYPE swh_archive_object_total gauge - swh_archive_object_total{col="value",object_type=""} - ... + detailed format: + # HELP swh_archive_object_total Software Heritage Archive object counters + # TYPE swh_archive_object_total gauge + swh_archive_object_total{col="value",object_type=""} + ... """ response = [ "# HELP swh_archive_object_total Software Heritage Archive object counters", "# TYPE swh_archive_object_total gauge", ] counters = app.config["counters"] for collection in counters.get_counters(): collection_name = collection.decode("utf-8") value = counters.get_count(collection) line = 'swh_archive_object_total{col="value", object_type="%s"} %s' % ( collection_name, value, ) response.append(line) response.append("") return "\n".join(response) def get_history_file_content(filename: str): assert app is not None try: content = app.config["history"].get_history(filename) except FileNotFoundError: abort(404) return jsonify(content) diff --git a/swh/counters/cli.py b/swh/counters/cli.py index 4311af5..82a4131 100644 --- a/swh/counters/cli.py +++ b/swh/counters/cli.py @@ -1,87 +1,91 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.counters.kafka_client import KeyOrientedJournalClient @swh_cli_group.group(name="counters", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, - type=click.Path(exists=True, dir_okay=False,), + type=click.Path( + exists=True, + dir_okay=False, + ), help="Configuration file.", ) @click.pass_context def counters_cli_group(ctx, config_file): """Software Heritage Counters tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf @counters_cli_group.command("journal-client") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.option( "--object-type", "-o", multiple=True, help="Default list of object types to subscribe to", ) @click.option( - "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.objects)", + "--prefix", + "-p", + help="Topic prefix to use (e.g swh.journal.objects)", ) @click.pass_context def journal_client(ctx, stop_after_objects, object_type, prefix): - """Listens for new messages from the SWH Journal, and count them - """ + """Listens for new messages from the SWH Journal, and count them""" import functools from . import get_counters from .journal_client import process_journal_messages config = ctx.obj["config"] journal_cfg = config["journal"] journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( "stop_after_objects" ) if len(journal_cfg["object_types"]) == 0: raise ValueError("'object_types' must be specified by cli or configuration") if journal_cfg["prefix"] is None: raise ValueError("'prefix' must be specified by cli or configuration") counters = get_counters(**config["counters"]) client = KeyOrientedJournalClient(**journal_cfg) worker_fn = functools.partial(process_journal_messages, counters=counters) nb_messages = 0 try: nb_messages = client.process(worker_fn) print("Processed %d messages." % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() diff --git a/swh/counters/history.py b/swh/counters/history.py index 00e60bb..d7f9753 100644 --- a/swh/counters/history.py +++ b/swh/counters/history.py @@ -1,127 +1,140 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import os import time from typing import Dict, List, Optional import requests logger = logging.getLogger(__name__) class History: """Manage the historical data of the counters""" def __init__( self, prometheus_host: str, prometheus_port: int, live_data_start: int, cache_base_directory: str, interval: str = "12h", prometheus_collection: str = "swh_archive_object_total", query_range_uri="/api/v1/query_range", labels: Dict[str, str] = {}, ): self.prometheus_host = prometheus_host self.prometheus_port = prometheus_port self.cache_base_directory = cache_base_directory self.live_data_start = live_data_start self.interval = interval self.prometheus_collection = prometheus_collection self.query_range_uri = query_range_uri self.labels = labels def _validate_filename(self, filename: str): if "/" in str(filename): raise ValueError("filename must not contain path information") - def _compute_url(self, object: str, end: int,) -> str: + def _compute_url( + self, + object: str, + end: int, + ) -> str: """Compute the api url to request data from, specific to a label. Args: object: object_type/label data (ex: content, revision, ...) end: retrieve the data until this date (timestamp) Returns: The api url to fetch the label's data """ labels = self.labels.copy() labels["object_type"] = object formated_labels = ",".join([f'{k}="{v}"' for k, v in labels.items()]) url = ( f"http://{self.prometheus_host}:{self.prometheus_port}/" f"{self.query_range_uri}?query=sum({self.prometheus_collection}" f"{{{formated_labels}}})&start={self.live_data_start}&end={end}&" f"step={self.interval}" ) return url def get_history(self, cache_file: str) -> Dict: self._validate_filename(cache_file) path = f"{self.cache_base_directory}/{cache_file}" with open(path, "r") as f: return json.load(f) def _adapt_format(self, item: List) -> List: """Javascript expects timestamps to be in milliseconds and counter values as floats Args item: List of 2 elements, timestamp and counter Return: Normalized tuple (timestamp in js expected time, counter as float) """ timestamp = int(item[0]) counter_value = item[1] return [timestamp * 1000, float(counter_value)] - def _get_timestamp_history(self, object: str,) -> List: + def _get_timestamp_history( + self, + object: str, + ) -> List: """Return the live values of an object""" result = [] now = int(time.time()) - url = self._compute_url(object=object, end=now,) + url = self._compute_url( + object=object, + end=now, + ) response = requests.get(url) if response.ok: data = response.json() # data answer format: # {"status":"success","data":{"result":[{"values":[[1544586427,"5375557897"]... # noqa # Prometheus-provided data has to be adapted to js expectations result = [ self._adapt_format(i) for i in data["data"]["result"][0]["values"] ] return result def refresh_history( - self, cache_file: str, objects: List[str], static_file: Optional[str] = None, + self, + cache_file: str, + objects: List[str], + static_file: Optional[str] = None, ): self._validate_filename(cache_file) if static_file is not None: static_data = self.get_history(static_file) else: static_data = {} # for live content, we retrieve existing data and merges with the new one live_data = {} for object in objects: prometheus_data = self._get_timestamp_history(object=object) live_data[object] = static_data.get(object, []) + prometheus_data target_file = f"{self.cache_base_directory}/{cache_file}" tmp_file = f"{target_file}.tmp" with open(tmp_file, "w") as f: f.write(json.dumps(live_data)) os.rename(tmp_file, target_file) diff --git a/swh/counters/in_memory.py b/swh/counters/in_memory.py index d4c4889..dae8646 100644 --- a/swh/counters/in_memory.py +++ b/swh/counters/in_memory.py @@ -1,31 +1,31 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from typing import Any, Dict, Iterable, List class InMemory: """InMemory implementation of the counters. - Naive implementation using a Dict[str, Set]""" + Naive implementation using a Dict[str, Set]""" def __init__(self): self.counters = defaultdict(set) def check(self): return "OK" def add(self, collection: str, keys: Iterable[Any]) -> None: for value in keys: self.counters[collection].add(value) def get_count(self, collection: str) -> int: return len(self.counters.get(collection, [])) def get_counts(self, collections: List[str]) -> Dict[str, int]: return {coll: self.get_count(coll) for coll in collections} def get_counters(self) -> Iterable[str]: return list(self.counters.keys()) diff --git a/swh/counters/interface.py b/swh/counters/interface.py index ac7049d..14219a1 100644 --- a/swh/counters/interface.py +++ b/swh/counters/interface.py @@ -1,52 +1,51 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List from swh.core.api import remote_api_endpoint class CountersInterface: @remote_api_endpoint("check") def check(self): - """Dedicated method to execute some specific check per implementation. - """ + """Dedicated method to execute some specific check per implementation.""" ... @remote_api_endpoint("add") def add(self, collection: str, keys: Iterable[Any]) -> None: """Add the provided keys to the collection - Only count new keys. + Only count new keys. """ ... @remote_api_endpoint("count") def get_count(self, collection: str) -> int: """Return the number of keys for the provided collection""" ... @remote_api_endpoint("counts") def get_counts(self, collections: List[str]) -> Dict[str, int]: """Return the number of keys for the provided collection""" ... @remote_api_endpoint("counters") def get_counters(self) -> Iterable[str]: """Return the list of managed counters""" ... class HistoryInterface: @remote_api_endpoint("history") def get_history(self, cache_file: str): """Return the content of an history file previously created - by the refresh_counters method""" + by the refresh_counters method""" @remote_api_endpoint("refresh_history") def refresh_history(self, cache_file: str): """Refresh the cache file containing the counters historical data. - It can be an aggregate of live data and static data stored on - a separate file""" + It can be an aggregate of live data and static data stored on + a separate file""" ... diff --git a/swh/counters/journal_client.py b/swh/counters/journal_client.py index 66942b4..a91c64f 100644 --- a/swh/counters/journal_client.py +++ b/swh/counters/journal_client.py @@ -1,52 +1,52 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict import msgpack from swh.counters.interface import CountersInterface def process_journal_messages( messages: Dict[str, Dict[bytes, bytes]], *, counters: CountersInterface ) -> None: """Count the number of different values of an object's property. - It allow for example to count the persons inside the - Release (authors) and Revision (authors and committers) classes + It allow for example to count the persons inside the + Release (authors) and Revision (authors and committers) classes """ for key in messages.keys(): counters.add(key, messages[key]) if "revision" in messages: process_revisions(messages["revision"], counters) if "release" in messages: process_releases(messages["release"], counters) def process_revisions(revisions: Dict[bytes, bytes], counters: CountersInterface): """Count the number of different authors and committers on the - revisions (in the person collection)""" + revisions (in the person collection)""" persons = set() for revision_bytes in revisions.values(): revision = msgpack.loads(revision_bytes) persons.add(revision["author"]["fullname"]) persons.add(revision["committer"]["fullname"]) counters.add("person", list(persons)) def process_releases(releases: Dict[bytes, bytes], counters: CountersInterface): """Count the number of different authors on the - releases (in the person collection)""" + releases (in the person collection)""" persons = set() for release_bytes in releases.values(): release = msgpack.loads(release_bytes) author = release.get("author") if author and "fullname" in author: persons.add(author["fullname"]) counters.add("person", list(persons)) diff --git a/swh/counters/redis.py b/swh/counters/redis.py index 982e25a..e3fdc95 100644 --- a/swh/counters/redis.py +++ b/swh/counters/redis.py @@ -1,63 +1,63 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Dict, Iterable, List from redis.client import Redis as RedisClient from redis.exceptions import ConnectionError DEFAULT_REDIS_PORT = 6379 logger = logging.getLogger(__name__) class Redis: """Redis based implementation of the counters. - It uses one HyperLogLog collection per counter""" + It uses one HyperLogLog collection per counter""" _redis_client = None def __init__(self, host: str): host_port = host.split(":") if len(host_port) > 2: raise ValueError("Invalid server url `%s`" % host) self.host = host_port[0] self.port = int(host_port[1]) if len(host_port) > 1 else DEFAULT_REDIS_PORT @property def redis_client(self) -> RedisClient: if self._redis_client is None: self._redis_client = RedisClient(host=self.host, port=self.port) return self._redis_client def check(self): try: return self.redis_client.ping() except ConnectionError: logger.exception("Unable to connect to the redis server") return False def add(self, collection: str, keys: Iterable[Any]) -> None: redis = self.redis_client pipeline = redis.pipeline(transaction=False) [pipeline.pfadd(collection, key) for key in keys] pipeline.execute() def get_count(self, collection: str) -> int: return self.redis_client.pfcount(collection) def get_counts(self, collections: List[str]) -> Dict[str, int]: return {coll: self.get_count(coll) for coll in collections} def get_counters(self) -> Iterable[str]: return self.redis_client.keys() diff --git a/swh/counters/tests/test_cli.py b/swh/counters/tests/test_cli.py index 3e9cff9..526d132 100644 --- a/swh/counters/tests/test_cli.py +++ b/swh/counters/tests/test_cli.py @@ -1,209 +1,219 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import tempfile from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.counters.cli import counters_cli_group from swh.journal.serializers import value_to_kafka CLI_CONFIG = """ counters: cls: redis host: %(redis_host)s """ JOURNAL_OBJECTS_CONFIG_TEMPLATE = """ journal: brokers: - {broker} prefix: {prefix} group_id: {group_id} """ def invoke(catch_exceptions, args, config="", *, redis_host): runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write((CLI_CONFIG + config) % {"redis_host": redis_host}) config_fd.seek(0) result = runner.invoke(counters_cli_group, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test__journal_client__worker_function_invoked( mocker, kafka_server, kafka_prefix, journal_config, local_redis_host ): mock = mocker.patch("swh.counters.journal_client.process_journal_messages") producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) topic = f"{kafka_prefix}.content" value = value_to_kafka({"key": "value"}) producer.produce(topic=topic, key=b"message1", value=value) invoke( False, # Missing --object-types (and no config key) will make the cli raise - ["journal-client", "--stop-after-objects", "1", "--object-type", "content",], + [ + "journal-client", + "--stop-after-objects", + "1", + "--object-type", + "content", + ], journal_config, redis_host=local_redis_host, ) assert mock.call_count == 1 def test__journal_client__missing_main_journal_config_key(local_redis_host): """Missing configuration on journal should raise""" with pytest.raises(KeyError, match="journal"): invoke( catch_exceptions=False, args=["journal-client", "--stop-after-objects", "1"], config="", # missing config will make it raise redis_host=local_redis_host, ) def test__journal_client__missing_journal_config_keys(local_redis_host): """Missing configuration on mandatory journal keys should raise""" kafka_prefix = "swh.journal.objects" journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker="192.0.2.1", prefix=kafka_prefix, group_id="test-consumer" ) journal_config = yaml.safe_load(journal_objects_config) for key in journal_config["journal"].keys(): if key == "prefix": # optional continue cfg = copy.deepcopy(journal_config) del cfg["journal"][key] # make config incomplete yaml_cfg = yaml.dump(cfg) with pytest.raises(TypeError, match=f"{key}"): invoke( catch_exceptions=False, args=[ "journal-client", "--stop-after-objects", "1", "--prefix", kafka_prefix, "--object-type", "content", ], config=yaml_cfg, # incomplete config will make the cli raise redis_host=local_redis_host, ) def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host): """Missing configuration on mandatory prefix key should raise""" journal_cfg_template = """ journal: brokers: - {broker} group_id: {group_id} """ journal_cfg = journal_cfg_template.format( broker=kafka_server, group_id="test-consumer" ) with pytest.raises(ValueError, match="prefix"): invoke( False, # Missing --prefix (and no config key) will make the cli raise [ "journal-client", "--stop-after-objects", "1", "--object-type", "content", ], journal_cfg, redis_host=local_redis_host, ) def test__journal_client__missing_object_types_config_key( kafka_server, local_redis_host ): """Missing configuration on mandatory object-types key should raise""" journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer" ) with pytest.raises(ValueError, match="object_types"): invoke( False, # Missing --object-types (and no config key) will make the cli raise ["journal-client", "--stop-after-objects", "1"], journal_cfg, redis_host=local_redis_host, ) def test__journal_client__key_received(mocker, kafka_server, local_redis_host): mock = mocker.patch("swh.counters.journal_client.process_journal_messages") mock.return_value = 1 prefix = "swh.journal.objects" object_type = "content" topic = prefix + "." + object_type producer = Producer( - {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",} + { + "bootstrap.servers": kafka_server, + "client.id": "testproducer", + "acks": "all", + } ) value = value_to_kafka({"key": "value"}) key = b"message key" # Ensure empty messages are ignored producer.produce(topic=topic, key=b"emptymessage", value=None) producer.produce(topic=topic, key=key, value=value) producer.flush() journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "--stop-after-objects", "1", "--object-type", object_type, "--prefix", prefix, ], journal_cfg, redis_host=local_redis_host, ) # Check the output expected_output = "Processed 1 messages.\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output assert mock.called assert mock.call_args[0][0]["content"] assert len(mock.call_args[0][0]) == 1 assert object_type in mock.call_args[0][0].keys() diff --git a/swh/counters/tests/test_history.py b/swh/counters/tests/test_history.py index 48e3abd..46746f8 100644 --- a/swh/counters/tests/test_history.py +++ b/swh/counters/tests/test_history.py @@ -1,217 +1,233 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os from typing import List import pytest from swh.counters.history import History TEST_HISTORY_CONFIG = { "prometheus_host": "prometheus", "prometheus_port": 8888, "prometheus_collection": "swh.collection", "cache_base_directory": "/tmp", "live_data_start": "10", "interval": "20h", "query_range_uri": "/my/uri", "labels": {"label1": "value1", "label2": "value2"}, } TEST_JSON = {"key1": "value1", "key2": "value2"} CACHED_DATA = {"content": [[10, 1.5], [12, 2.0]], "revision": [[11, 4], [13, 5]]} @pytest.fixture def history(): return History(**TEST_HISTORY_CONFIG) def test_history_compute_url(history): end = 99 object_type = "content" - url = history._compute_url(object=object_type, end=end,) + url = history._compute_url( + object=object_type, + end=end, + ) assert url == ( f'http://{TEST_HISTORY_CONFIG["prometheus_host"]}:' f'{TEST_HISTORY_CONFIG["prometheus_port"]}/' f'{TEST_HISTORY_CONFIG["query_range_uri"]}?' f'query=sum({TEST_HISTORY_CONFIG["prometheus_collection"]}' f'{{label1="value1",label2="value2",' f'object_type="{object_type}"}})&' f'start={TEST_HISTORY_CONFIG["live_data_start"]}&end={end}' f'&step={TEST_HISTORY_CONFIG["interval"]}' ) @pytest.mark.parametrize( - "source, expected", [([1, "10"], [1000, 10.0]), ([2, "10.1"], [2000, 10.1]),] + "source, expected", + [ + ([1, "10"], [1000, 10.0]), + ([2, "10.1"], [2000, 10.1]), + ], ) def test_history__adapt_format(history, source, expected): result = history._adapt_format(source) assert expected == result def test_history__validate_filename(history): with pytest.raises(ValueError, match="path information"): history._validate_filename("/test.json") with pytest.raises(ValueError, match="path information"): history._validate_filename("../../test.json") history._validate_filename("test.json") def test_history_get_history(history, tmp_path): history.cache_base_directory = tmp_path json_file = "test.json" full_path = f"{tmp_path}/{json_file}" with open(full_path, "w") as f: f.write(json.dumps(TEST_JSON)) result = history.get_history(json_file) assert result == TEST_JSON def test_history_get_history_relative_path_failed(history): with pytest.raises(ValueError, match="path information"): history.get_history("/test.json") def test_history__get_timestamp_history(history, requests_mock, datadir, mocker): object = "content" end = 100 url = history._compute_url(object, end) mock = mocker.patch("time.time") mock.return_value = end request_content_file = os.path.join(datadir, "content.json") with open(request_content_file, "r") as f: content = f.read() requests_mock.get( - url, [{"content": bytes(content, "utf-8"), "status_code": 200},], + url, + [ + {"content": bytes(content, "utf-8"), "status_code": 200}, + ], ) result = history._get_timestamp_history(object) assert result == [[100000, 10.0], [100000, 20.0], [110000, 30.0]] def test_history__get_timestamp_history_request_failed( history, requests_mock, datadir, mocker ): object = "content" end = 100 url = history._compute_url(object, end) mock = mocker.patch("time.time") mock.return_value = end requests_mock.get( - url, [{"content": None, "status_code": 503},], + url, + [ + {"content": None, "status_code": 503}, + ], ) result = history._get_timestamp_history(object) assert result == [] def _configure_request_mock( history_object, mock, datadir, objects: List[str], end: int ): for object_type in objects: url = history_object._compute_url(object_type, end) request_content_file = os.path.join(datadir, f"{object_type}.json") with open(request_content_file, "r") as f: content = f.read() mock.get( - url, [{"content": bytes(content, "utf-8"), "status_code": 200},], + url, + [ + {"content": bytes(content, "utf-8"), "status_code": 200}, + ], ) def test_history__refresh_history_with_static_data( history, requests_mock, mocker, datadir, tmp_path ): """Test the generation of a cache file with an aggregation - of static data and live data from prometheus + of static data and live data from prometheus """ objects = ["content", "revision"] static_file_name = "static.json" cache_file = "result.json" end = 100 with open(f"{tmp_path}/{static_file_name}", "w") as f: f.write(json.dumps(CACHED_DATA)) _configure_request_mock(history, requests_mock, datadir, objects, end) mock = mocker.patch("time.time") mock.return_value = end history.cache_base_directory = tmp_path history.refresh_history( cache_file=cache_file, objects=objects, static_file=static_file_name ) result_file = f"{tmp_path}/{cache_file}" assert os.path.isfile(result_file) expected_history = { "content": [ [10, 1.5], [12, 2.0], [100000, 10.0], [100000, 20.0], [110000, 30.0], ], "revision": [[11, 4], [13, 5], [80000, 1.0], [90000, 2.0], [95000, 5.0]], } with open(result_file, "r") as f: content = json.load(f) assert expected_history == content def test_history__refresh_history_without_historical( history, requests_mock, mocker, datadir, tmp_path ): """Test the generation of a cache file with only - live data from prometheus""" + live data from prometheus""" objects = ["content", "revision"] cache_file = "result.json" end = 100 _configure_request_mock(history, requests_mock, datadir, objects, end) mock = mocker.patch("time.time") mock.return_value = end history.cache_base_directory = tmp_path history.refresh_history(cache_file=cache_file, objects=objects) result_file = f"{tmp_path}/{cache_file}" assert os.path.isfile(result_file) expected_history = { "content": [[100000, 10.0], [100000, 20.0], [110000, 30.0]], "revision": [[80000, 1.0], [90000, 2.0], [95000, 5.0]], } with open(result_file, "r") as f: content = json.load(f) assert expected_history == content diff --git a/swh/counters/tests/test_journal_client.py b/swh/counters/tests/test_journal_client.py index 9369b9f..400814e 100644 --- a/swh/counters/tests/test_journal_client.py +++ b/swh/counters/tests/test_journal_client.py @@ -1,147 +1,147 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Dict, Optional import msgpack from swh.counters.journal_client import ( process_journal_messages, process_releases, process_revisions, ) from swh.counters.redis import Redis from swh.model.hashutil import hash_to_bytes from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, TimestampWithTimezone, ) DATE = TimestampWithTimezone.from_datetime( datetime.datetime(2022, 1, 11, 0, 0, 0, tzinfo=datetime.timezone.utc) ) def _create_release(author_fullname: Optional[str]) -> Dict: """Use Release.to_dict to be sure the field's name used to retrieve - the author is correct""" + the author is correct""" author = None if author_fullname: author = Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None) release = Release( name=b"Release", message=b"Message", target=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), target_type=ObjectType.CONTENT, synthetic=True, author=author, ) return release.to_dict() def _create_revision(author_fullname: str, committer_fullname: str) -> Dict: """Use Revision.to_dict to be sure the names of the fields used to retrieve - the author and the committer are correct""" + the author and the committer are correct""" revision = Revision( committer_date=DATE, date=None, type=RevisionType.GIT, parents=(), directory=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), synthetic=True, message=None, author=Person(fullname=bytes(author_fullname, "utf-8"), name=None, email=None), committer=Person( fullname=bytes(committer_fullname, "utf-8"), name=None, email=None ), ) return revision.to_dict() RELEASES = { rel["id"]: msgpack.dumps(rel) for rel in [ _create_release(author_fullname="author 1"), _create_release(author_fullname="author 2"), _create_release(author_fullname=None), ] } RELEASES_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} REVISIONS = { rev["id"]: msgpack.dumps(rev) for rev in [ _create_revision(author_fullname="author 1", committer_fullname="committer 1"), _create_revision(author_fullname="author 2", committer_fullname="committer 2"), _create_revision(author_fullname="author 2", committer_fullname="committer 1"), _create_revision(author_fullname="author 1", committer_fullname="committer 2"), ] } REVISIONS_AUTHOR_FULLNAMES = {b"author 1", b"author 2"} REVISIONS_COMMITTER_FULLNAMES = {b"committer 1", b"committer 2"} REVISIONS_PERSON_FULLNAMES = REVISIONS_AUTHOR_FULLNAMES | REVISIONS_COMMITTER_FULLNAMES def test_journal_client_all_keys(local_redis_host): redis = Redis(host=local_redis_host) keys = { "coll1": {b"key1": b"value1", b"key2": b"value2"}, "coll2": {b"key3": b"value3", b"key4": b"value4", b"key5": b"value5"}, } process_journal_messages(messages=keys, counters=redis) assert redis.get_counts(redis.get_counters()) == {b"coll1": 2, b"coll2": 3} def test_journal_client_process_revisions(local_redis_host): redis = Redis(host=local_redis_host) process_revisions(REVISIONS, redis) assert redis.get_counts(redis.get_counters()) == { b"person": len(REVISIONS_PERSON_FULLNAMES) } def test_journal_client_process_releases(local_redis_host): redis = Redis(host=local_redis_host) process_releases(RELEASES, redis) assert redis.get_counts(redis.get_counters()) == { b"person": len(RELEASES_AUTHOR_FULLNAMES) } def test_journal_client_process_releases_without_authors(local_redis_host): releases = { rel["id"]: msgpack.dumps(rel) for rel in [ _create_release(author_fullname=None), _create_release(author_fullname=None), ] } redis = Redis(host=local_redis_host) process_releases(releases, redis) assert redis.get_counts(redis.get_counters()) == {} diff --git a/swh/counters/tests/test_server.py b/swh/counters/tests/test_server.py index 76986ca..3969098 100644 --- a/swh/counters/tests/test_server.py +++ b/swh/counters/tests/test_server.py @@ -1,194 +1,199 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import re from typing import Any, Dict import pytest from redis import Redis as RedisClient import yaml from swh.core.api import RPCServerApp from swh.counters.api import server from swh.counters.api.server import load_and_check_config, make_app_from_configfile def teardown_function(): # Ensure there is no configuration loaded from a previous test server.app = None @pytest.fixture def swh_counters_server_config() -> Dict[str, Any]: - return {"counters": {"cls": "redis", "host": "redis",}} + return { + "counters": { + "cls": "redis", + "host": "redis", + } + } @pytest.fixture def swh_counters_server_config_on_disk( tmp_path, monkeypatch, swh_counters_server_config ) -> str: return _environment_config_file(tmp_path, monkeypatch, swh_counters_server_config) @pytest.fixture def history_test_client(tmp_path, monkeypatch): cfg = { "counters": {"cls": "redis", "host": "redis:6379"}, "history": { "cls": "prometheus", "prometheus_host": "prometheus", "prometheus_port": "9090", "live_data_start": "0", "cache_base_directory": "/tmp", }, } _environment_config_file(tmp_path, monkeypatch, cfg) app = make_app_from_configfile() app.config["TESTING"] = True return app.test_client() def write_config_file(tmpdir, config_dict: Dict, name: str = "config.yml") -> str: """Prepare configuration file in `$tmpdir/name` with content `content`. Args: tmpdir (LocalPath): root directory content: Content of the file either as string or as a dict. If a dict, converts the dict into a yaml string. name: configuration filename Returns path of the configuration file prepared. """ config_path = tmpdir / name config_path.write_text(yaml.dump(config_dict), encoding="utf-8") # pytest on python3.5 does not support LocalPath manipulation, so # convert path to string return str(config_path) def _environment_config_file(tmp_path, monkeypatch, content): conf_path = write_config_file(tmp_path, content) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) @pytest.mark.parametrize("config_file", [None, ""]) def test_load_and_check_config_no_configuration(config_file): """Inexistent configuration files raises""" with pytest.raises(EnvironmentError, match="Configuration file must be defined"): load_and_check_config(config_file) def test_load_and_check_config_inexistent_file(): config_path = "/some/inexistent/config.yml" expected_error = f"Configuration file {config_path} does not exist" with pytest.raises(EnvironmentError, match=expected_error): load_and_check_config(config_path) def test_load_and_check_config_wrong_configuration(tmpdir): """Wrong configuration raises""" config_path = write_config_file(tmpdir, {"something": "useless"}) with pytest.raises(KeyError, match="Missing 'counters' configuration"): load_and_check_config(config_path) def test_server_make_app_from_config_file(swh_counters_server_config_on_disk): app = make_app_from_configfile() assert app is not None assert isinstance(app, RPCServerApp) app2 = make_app_from_configfile() assert app is app2 def test_server_index(swh_counters_server_config_on_disk, mocker): """Test the result of the main page""" app = make_app_from_configfile() app.config["TESTING"] = True tc = app.test_client() r = tc.get("/") assert 200 == r.status_code assert b"SWH Counters" in r.get_data() def test_server_metrics(local_redis, tmp_path, monkeypatch): """Test the metrics generation""" rc = RedisClient(host=local_redis.host, port=local_redis.port) data = { "col1": 1, "col2": 4, "col3": 6, "col4": 10, } for coll in data.keys(): for i in range(0, data[coll]): rc.pfadd(coll, i) cfg = { "counters": {"cls": "redis", "host": f"{local_redis.host}:{local_redis.port}"} } _environment_config_file(tmp_path, monkeypatch, cfg) app = make_app_from_configfile() app.config["TESTING"] = True tc = app.test_client() r = tc.get("/metrics") assert 200 == r.status_code response = r.get_data().decode("utf-8") assert "HELP" in response assert "TYPE" in response for collection in data.keys(): obj_type = f'object_type="{collection}"' assert obj_type in response pattern = r'swh_archive_object_total{col="value", object_type="%s"} (\d+)' % ( collection ) m = re.search(pattern, response) assert data[collection] == int(m.group(1)) def test_server_counters_history(history_test_client, mocker): """Test the counters history file download""" expected_result = {"content": [[1, 1], [2, 2]]} mock = mocker.patch("swh.counters.history.History.get_history") mock.return_value = expected_result r = history_test_client.get("/counters_history/test.json") assert 200 == r.status_code response = r.get_data().decode("utf-8") response_json = json.loads(response) assert response_json == expected_result assert "application/json" == r.headers["Content-Type"] def test_server_counters_history_file_not_found(history_test_client, mocker): """ensure a 404 is returned when the file doesn't exists""" mock = mocker.patch("swh.counters.history.History.get_history") mock.side_effect = FileNotFoundError r = history_test_client.get("/counters_history/fake.json") assert 404 == r.status_code