diff --git a/MANIFEST.in b/MANIFEST.in --- a/MANIFEST.in +++ b/MANIFEST.in @@ -3,3 +3,4 @@ include version.txt include README.md recursive-include swh py.typed +recursive-include swh/counters/tests/data/ * diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,3 +2,4 @@ pytest-mock confluent-kafka pytest-redis +requests_mock diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py --- a/swh/counters/__init__.py +++ b/swh/counters/__init__.py @@ -9,13 +9,17 @@ from typing import TYPE_CHECKING, Any, Dict if TYPE_CHECKING: - from swh.counters.interface import CountersInterface + from swh.counters.interface import CountersInterface, HistoryInterface COUNTERS_IMPLEMENTATIONS = { "redis": ".redis.Redis", "remote": ".api.client.RemoteCounters", } +HISTORY_IMPLEMENTATIONS = { + "prometheus": ".history.History", +} + def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface: """Get an counters object of class `cls` with arguments `args`. @@ -42,3 +46,30 @@ module = importlib.import_module(module_path, package=__package__) Counters = getattr(module, class_name) return Counters(**kwargs) + + +def get_history(cls: str, **kwargs: Dict[str, Any]) -> HistoryInterface: + """Get a history object of class `cls` with arguments `args`. + + Args: + cls: hystory's class, only 'prometheus' is supported actually + args: dictionary of arguments passed to the + counters class constructor + + Returns: + an instance of swh.counters.history's classes (either local or remote) + + Raises: + ValueError if passed an unknown history class. + """ + class_path = HISTORY_IMPLEMENTATIONS.get(cls) + if class_path is None: + raise ValueError( + "Unknown history class `%s`. Supported: %s" + % (cls, ", ".join(HISTORY_IMPLEMENTATIONS)) + ) + + (module_path, class_name) = class_path.rsplit(".", 1) + module = importlib.import_module(module_path, package=__package__) + History = getattr(module, class_name) + return History(**kwargs) diff --git a/swh/counters/api/history.py b/swh/counters/api/history.py new file mode 100644 --- /dev/null +++ b/swh/counters/api/history.py @@ -0,0 +1,124 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information +import json +import time +from typing import Dict, List + +import requests + + +def load_history_data(history_data_file: str) -> Dict: + """Load the history from an history_data_file + + Args: + history_data_file: Path to history file to load from + + Returns: + dict with key (label in origin, revision, content), values (list of + history points: timestamp, counter): + + """ + with open(history_data_file, "r") as f: + return json.load(f) + + +def compute_url( + server: str, + port: int, + collection: str, + labels: Dict[str, str], + object: str, + start: int, + end: int, + step: str, +) -> str: + """Compute the api url to request data from, specific to a label. + + Args: + server: Prometheus server + port: Prometheus server port + collection: The prometheus collection to query + labels: additional labelled to filters on (ex environment:production) + object: object_type/label data (ex: content, revision, ...) + start: retrieve the data from this date (timestamp) + end: retrieve the data until this date (timestamp) + step: the interval between to date (ex: 12h) + + Returns: + The api url to fetch the label's data + """ + + labels["object_type"] = object + formated_labels = ",".join([f'{k}="{v}"' for k, v in labels.items()]) + + url = ( + f"http://{server}:{port}/api/v1/query_range?" + f"query=sum({collection}{{{formated_labels}}})" + f"&start={start}&end={end}&step={step}" + ) + return url + + +def _adapt_format(item: List) -> List: + """Javascript expects timestamps to be in milliseconds + and counter values as floats + + Args + item: List of 2 elements, timestamp and counter + + Return: + Normalized tuple (timestamp in js expected time, counter as float) + + """ + timestamp = int(item[0]) + counter_value = item[1] + return [timestamp * 1000, float(counter_value)] + + +def get_timestamp_history( + server: str, + port: int, + object: str, + labels: Dict[str, str], + start: int, + step: str, + collection: str = "swh_archive_object_total", +) -> List: + """Given a label, retrieve its associated graph data. + + Args: + server: Prometheus server + port: Prometheus server port + collection: the prometheus collection to query + labels: the filter to apply on the collection + start: the beginning of the data window + step: the interval between the points + + Returns: + The label's graph data from the prometheur server:port. + + """ + result = [] + + now = int(time.time()) + + url = compute_url( + server=server, + port=port, + object=object, + collection=collection, + labels=labels, + start=start, + end=now, + step=step, + ) + response = requests.get(url) + if response.ok: + data = response.json() + # data answer format: + # {"status":"success","data":{"result":[{"values":[[1544586427,"5375557897"]... # noqa + # Prometheus-provided data has to be adapted to js expectations + result = [_adapt_format(i) for i in data["data"]["result"][0]["values"]] + return result diff --git a/swh/counters/api/server.py b/swh/counters/api/server.py --- a/swh/counters/api/server.py +++ b/swh/counters/api/server.py @@ -8,8 +8,8 @@ from swh.core import config from swh.core.api import RPCServerApp -from swh.counters import get_counters -from swh.counters.interface import CountersInterface +from swh.counters import get_counters, get_history +from swh.counters.interface import CountersInterface, HistoryInterface logger = logging.getLogger(__name__) @@ -26,6 +26,11 @@ backend_factory=lambda: get_counters(**config["counters"]), ) + app.add_backend_class( + backend_class=HistoryInterface, + backend_factory=lambda: get_history(**config["history"]), + ) + handler = logging.StreamHandler() app.logger.addHandler(handler) diff --git a/swh/counters/history.py b/swh/counters/history.py new file mode 100644 --- /dev/null +++ b/swh/counters/history.py @@ -0,0 +1,122 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information +import json +import logging +import time +from typing import Dict, List + +import requests + +logger = logging.getLogger(__name__) + + +class History: + """Manage the historical data of the counters""" + + def __init__( + self, + prometheus_host: str, + prometheus_port: int, + live_data_start: int, + cache_base_directory: str, + interval: str = "12h", + prometheus_collection: str = "swh_archive_object_total", + query_range_uri="/api/v1/query_range", + labels: Dict[str, str] = {}, + ): + self.prometheus_host = prometheus_host + self.prometheus_port = prometheus_port + self.cache_base_directory = cache_base_directory + self.live_data_start = live_data_start + self.interval = interval + self.prometheus_collection = prometheus_collection + self.query_range_uri = query_range_uri + self.labels = labels + + def _validate_filename(self, filename: str): + if "/" in str(filename): + raise ValueError("filename must not contain path information") + + def _compute_url(self, object: str, end: int,) -> str: + """Compute the api url to request data from, specific to a label. + + Args: + object: object_type/label data (ex: content, revision, ...) + end: retrieve the data until this date (timestamp) + + Returns: + The api url to fetch the label's data + """ + labels = self.labels.copy() + labels["object_type"] = object + formated_labels = ",".join([f'{k}="{v}"' for k, v in labels.items()]) + + url = ( + f"http://{self.prometheus_host}:{self.prometheus_port}/" + f"{self.query_range_uri}?query=sum({self.prometheus_collection}" + f"{{{formated_labels}}})&start={self.live_data_start}&end={end}&" + f"step={self.interval}" + ) + return url + + def get_history(self, static_file: str) -> Dict: + self._validate_filename(static_file) + + path = f"{self.cache_base_directory}/{static_file}" + + with open(path, "r") as f: + return json.load(f) + + def _adapt_format(self, item: List) -> List: + """Javascript expects timestamps to be in milliseconds + and counter values as floats + + Args + item: List of 2 elements, timestamp and counter + + Return: + Normalized tuple (timestamp in js expected time, counter as float) + + """ + timestamp = int(item[0]) + counter_value = item[1] + return [timestamp * 1000, float(counter_value)] + + def _get_timestamp_history(self, object: str,) -> List: + """Return the live values of an object""" + result = [] + + now = int(time.time()) + + url = self._compute_url(object=object, end=now,) + response = requests.get(url) + if response.ok: + data = response.json() + # data answer format: + # {"status":"success","data":{"result":[{"values":[[1544586427,"5375557897"]... # noqa + # Prometheus-provided data has to be adapted to js expectations + result = [ + self._adapt_format(i) for i in data["data"]["result"][0]["values"] + ] + return result + + def refresh_history( + self, cache_file: str, objects: List[str], static_file: str = "", + ): + self._validate_filename(cache_file) + + if static_file: + static_data = self.get_history(static_file) + else: + static_data = {} + + # for live content, we retrieve existing data and merges with the new one + live_data = {} + for object in objects: + prometheus_data = self._get_timestamp_history(object=object) + live_data[object] = static_data.get(object, []) + prometheus_data + + with open(f"{self.cache_base_directory}/{cache_file}", "w") as f: + f.write(json.dumps(live_data)) diff --git a/swh/counters/interface.py b/swh/counters/interface.py --- a/swh/counters/interface.py +++ b/swh/counters/interface.py @@ -30,3 +30,18 @@ @remote_api_endpoint("counters") def get_counters(self) -> Iterable[str]: """Return the list of managed counters""" + ... + + +class HistoryInterface: + @remote_api_endpoint("/history") + def get_history(self, cache_file: str): + """Return the content of an history file previously created + by the refresh_counters_history method""" + + @remote_api_endpoint("/refresh_history") + def refresh_history(self, cache_file: str): + """Refresh the cache file containing the counters historical data. + It can be an aggregate of live data and static data stored on + a separate file""" + ... diff --git a/swh/counters/tests/data/content.json b/swh/counters/tests/data/content.json new file mode 100644 --- /dev/null +++ b/swh/counters/tests/data/content.json @@ -0,0 +1,32 @@ +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "swh_archive_object_total", + "col": "value", + "environment": "production", + "instance": "counters1.internal.softwareheritage.org", + "job": "swh-counters", + "object_type": "content" + }, + "values": [ + [ + 100.1, + "10" + ], + [ + 100.9, + "20" + ], + [ + 110.101, + "30" + ] + ] + } + ] + } +} diff --git a/swh/counters/tests/data/foo.json b/swh/counters/tests/data/foo.json new file mode 100644 --- /dev/null +++ b/swh/counters/tests/data/foo.json @@ -0,0 +1,4 @@ +{ + "key1": "value1", + "key2": "value2" +} diff --git a/swh/counters/tests/data/revision.json b/swh/counters/tests/data/revision.json new file mode 100644 --- /dev/null +++ b/swh/counters/tests/data/revision.json @@ -0,0 +1,32 @@ +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "swh_archive_object_total", + "col": "value", + "environment": "production", + "instance": "counters1.internal.softwareheritage.org", + "job": "swh-counters", + "object_type": "revision" + }, + "values": [ + [ + 80.1, + "1" + ], + [ + 90.9, + "2" + ], + [ + 95.1, + "5" + ] + ] + } + ] + } +} diff --git a/swh/counters/tests/test_history.py b/swh/counters/tests/test_history.py new file mode 100644 --- /dev/null +++ b/swh/counters/tests/test_history.py @@ -0,0 +1,220 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import os + +import pytest + +from swh.counters.api.history import load_history_data +from swh.counters.history import History + +TEST_HISTORY_CONFIG = { + "prometheus_host": "prometheus", + "prometheus_port": 8888, + "prometheus_collection": "swh.collection", + "cache_base_directory": "/tmp", + "live_data_start": "10", + "interval": "20h", + "query_range_uri": "/my/uri", + "labels": {"label1": "value1", "label2": "value2"}, +} + +TEST_JSON = {"key1": "value1", "key2": "value2"} +CACHED_DATA = {"content": [[10, 1.5], [12, 2.0]], "revision": [[11, 4], [13, 5]]} + + +@pytest.fixture +def history(): + return History(**TEST_HISTORY_CONFIG) + + +def test_history_load_data(datadir): + datatest = os.path.join(datadir, "foo.json") + result = load_history_data(datatest) + + assert result == {"key1": "value1", "key2": "value2"} + + +def test_history_compute_url(history): + + end = 99 + object_type = "content" + + url = history._compute_url(object=object_type, end=end,) + + assert url == ( + f'http://{TEST_HISTORY_CONFIG["prometheus_host"]}:' + f'{TEST_HISTORY_CONFIG["prometheus_port"]}/' + f'{TEST_HISTORY_CONFIG["query_range_uri"]}?' + f'query=sum({TEST_HISTORY_CONFIG["prometheus_collection"]}' + f'{{label1="value1",label2="value2",' + f'object_type="{object_type}"}})&' + f'start={TEST_HISTORY_CONFIG["live_data_start"]}&end={end}' + f'&step={TEST_HISTORY_CONFIG["interval"]}' + ) + + +@pytest.mark.parametrize( + "source, expected", [([1, "10"], [1000, 10.0]), ([2, "10.1"], [2000, 10.1]),] +) +def test_history__adapt_format(history, source, expected): + result = history._adapt_format(source) + + assert expected == result + + +def test_history__validate_filename(history): + with pytest.raises(ValueError, match="path information"): + history._validate_filename("/test.json") + + with pytest.raises(ValueError, match="path information"): + history._validate_filename("../../test.json") + + history._validate_filename("test.json") + + +def test_history_get_history(history, tmp_path): + history.cache_base_directory = tmp_path + + json_file = "test.json" + full_path = f"{tmp_path}/{json_file}" + + with open(full_path, "w") as f: + f.write(json.dumps(TEST_JSON)) + + result = history.get_history(json_file) + assert result == TEST_JSON + + +def test_history_get_history_relative_path_failed(history): + with pytest.raises(ValueError, match="path information"): + history.get_history("/test.json") + + +def test_history__get_timestamp_history(history, requests_mock, datadir, mocker): + object = "content" + end = 100 + url = history._compute_url(object, end) + + mock = mocker.patch("time.time") + mock.return_value = end + + request_content_file = os.path.join(datadir, "content.json") + with open(request_content_file, "r") as f: + content = f.read() + + requests_mock.get( + url, [{"content": bytes(content, "utf-8"), "status_code": 200},], + ) + + result = history._get_timestamp_history(object) + + assert result == [[100000, 10.0], [100000, 20.0], [110000, 30.0]] + + +def test_history__get_timestamp_history_request_failed( + history, requests_mock, datadir, mocker +): + object = "content" + end = 100 + url = history._compute_url(object, end) + + mock = mocker.patch("time.time") + mock.return_value = end + + requests_mock.get( + url, [{"content": None, "status_code": 503},], + ) + + result = history._get_timestamp_history(object) + + assert result == [] + + +def test_history__refresh_history_with_historical( + history, requests_mock, mocker, datadir, tmp_path +): + objects = ["content", "revision"] + static_file_name = "static.json" + cache_file = "result.json" + end = 100 + + with open(f"{tmp_path}/{static_file_name}", "w") as f: + f.write(json.dumps(CACHED_DATA)) + + for object_type in objects: + url = history._compute_url(object_type, end) + request_content_file = os.path.join(datadir, f"{object_type}.json") + with open(request_content_file, "r") as f: + content = f.read() + requests_mock.get( + url, [{"content": bytes(content, "utf-8"), "status_code": 200},], + ) + + mock = mocker.patch("time.time") + mock.return_value = end + + history.cache_base_directory = tmp_path + + history.refresh_history( + cache_file=cache_file, objects=objects, static_file=static_file_name + ) + + result_file = f"{tmp_path}/{cache_file}" + assert os.path.isfile(result_file) + + expected = { + "content": [ + [10, 1.5], + [12, 2.0], + [100000, 10.0], + [100000, 20.0], + [110000, 30.0], + ], + "revision": [[11, 4], [13, 5], [80000, 1.0], [90000, 2.0], [95000, 5.0]], + } + + with open(result_file, "r") as f: + content = json.load(f) + + assert expected == content + + +def test_history__refresh_history_without_historical( + history, requests_mock, mocker, datadir, tmp_path +): + objects = ["content", "revision"] + cache_file = "result.json" + end = 100 + + for object_type in objects: + url = history._compute_url(object_type, end) + request_content_file = os.path.join(datadir, f"{object_type}.json") + with open(request_content_file, "r") as f: + content = f.read() + requests_mock.get( + url, [{"content": bytes(content, "utf-8"), "status_code": 200},], + ) + + mock = mocker.patch("time.time") + mock.return_value = end + + history.cache_base_directory = tmp_path + + history.refresh_history(cache_file=cache_file, objects=objects) + + result_file = f"{tmp_path}/{cache_file}" + assert os.path.isfile(result_file) + + expected = { + "content": [[100000, 10.0], [100000, 20.0], [110000, 30.0]], + "revision": [[80000, 1.0], [90000, 2.0], [95000, 5.0]], + } + + with open(result_file, "r") as f: + content = json.load(f) + + assert expected == content diff --git a/swh/counters/tests/test_init.py b/swh/counters/tests/test_init.py --- a/swh/counters/tests/test_init.py +++ b/swh/counters/tests/test_init.py @@ -7,8 +7,9 @@ import pytest -from swh.counters import get_counters +from swh.counters import get_counters, get_history from swh.counters.api.client import RemoteCounters +from swh.counters.history import History from swh.counters.interface import CountersInterface from swh.counters.redis import Redis @@ -69,3 +70,21 @@ assert expected_signature == actual_signature, meth_name assert missing_methods == [] + + +def test_get_history_failure(): + with pytest.raises(ValueError, match="Unknown history class"): + get_history("unknown-history") + + +def test_get_history(): + concrete_history = get_history( + "prometheus", + **{ + "prometheus_host": "", + "prometheus_port": "", + "live_data_start": "", + "cache_base_directory": "", + }, + ) + assert isinstance(concrete_history, History)