Page MenuHomeSoftware Heritage

D5429.diff
No OneTemporary

D5429.diff

diff --git a/MANIFEST.in b/MANIFEST.in
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -3,3 +3,4 @@
include version.txt
include README.md
recursive-include swh py.typed
+recursive-include swh/counters/tests/data/ *
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -2,3 +2,4 @@
pytest-mock
confluent-kafka
pytest-redis
+requests_mock
diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py
--- a/swh/counters/__init__.py
+++ b/swh/counters/__init__.py
@@ -9,13 +9,17 @@
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
- from swh.counters.interface import CountersInterface
+ from swh.counters.interface import CountersInterface, HistoryInterface
COUNTERS_IMPLEMENTATIONS = {
"redis": ".redis.Redis",
"remote": ".api.client.RemoteCounters",
}
+HISTORY_IMPLEMENTATIONS = {
+ "prometheus": ".history.History",
+}
+
def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface:
"""Get an counters object of class `cls` with arguments `args`.
@@ -42,3 +46,30 @@
module = importlib.import_module(module_path, package=__package__)
Counters = getattr(module, class_name)
return Counters(**kwargs)
+
+
+def get_history(cls: str, **kwargs: Dict[str, Any]) -> HistoryInterface:
+ """Get a history object of class `cls` with arguments `kwargs`.
+
+ Args:
+ cls: history's class, only 'prometheus' is supported actually
+ kwargs: dictionary of arguments passed to the
+ counters class constructor
+
+ Returns:
+ an instance of swh.counters.history's classes (either local or remote)
+
+ Raises:
+ ValueError if passed an unknown history class.
+ """
+ class_path = HISTORY_IMPLEMENTATIONS.get(cls)
+ if class_path is None:
+ raise ValueError(
+ "Unknown history class `%s`. Supported: %s"
+ % (cls, ", ".join(HISTORY_IMPLEMENTATIONS))
+ )
+
+ (module_path, class_name) = class_path.rsplit(".", 1)
+ module = importlib.import_module(module_path, package=__package__)
+ History = getattr(module, class_name)
+ return History(**kwargs)
diff --git a/swh/counters/api/server.py b/swh/counters/api/server.py
--- a/swh/counters/api/server.py
+++ b/swh/counters/api/server.py
@@ -8,8 +8,8 @@
from swh.core import config
from swh.core.api import RPCServerApp
-from swh.counters import get_counters
-from swh.counters.interface import CountersInterface
+from swh.counters import get_counters, get_history
+from swh.counters.interface import CountersInterface, HistoryInterface
logger = logging.getLogger(__name__)
@@ -26,6 +26,11 @@
backend_factory=lambda: get_counters(**config["counters"]),
)
+ app.add_backend_class(
+ backend_class=HistoryInterface,
+ backend_factory=lambda: get_history(**config["history"]),
+ )
+
handler = logging.StreamHandler()
app.logger.addHandler(handler)
diff --git a/swh/counters/history.py b/swh/counters/history.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/history.py
@@ -0,0 +1,122 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+import json
+import logging
+import time
+from typing import Dict, List, Optional
+
+import requests
+
+logger = logging.getLogger(__name__)
+
+
+class History:
+ """Manage the historical data of the counters"""
+
+ def __init__(
+ self,
+ prometheus_host: str,
+ prometheus_port: int,
+ live_data_start: int,
+ cache_base_directory: str,
+ interval: str = "12h",
+ prometheus_collection: str = "swh_archive_object_total",
+ query_range_uri="/api/v1/query_range",
+ labels: Dict[str, str] = {},
+ ):
+ self.prometheus_host = prometheus_host
+ self.prometheus_port = prometheus_port
+ self.cache_base_directory = cache_base_directory
+ self.live_data_start = live_data_start
+ self.interval = interval
+ self.prometheus_collection = prometheus_collection
+ self.query_range_uri = query_range_uri
+ self.labels = labels
+
+ def _validate_filename(self, filename: str):
+ if "/" in str(filename):
+ raise ValueError("filename must not contain path information")
+
+ def _compute_url(self, object: str, end: int,) -> str:
+ """Compute the api url to request data from, specific to a label.
+
+ Args:
+ object: object_type/label data (ex: content, revision, ...)
+ end: retrieve the data until this date (timestamp)
+
+ Returns:
+ The api url to fetch the label's data
+ """
+ labels = self.labels.copy()
+ labels["object_type"] = object
+ formated_labels = ",".join([f'{k}="{v}"' for k, v in labels.items()])
+
+ url = (
+ f"http://{self.prometheus_host}:{self.prometheus_port}/"
+ f"{self.query_range_uri}?query=sum({self.prometheus_collection}"
+ f"{{{formated_labels}}})&start={self.live_data_start}&end={end}&"
+ f"step={self.interval}"
+ )
+ return url
+
+ def get_history(self, cache_file: str) -> Dict:
+ self._validate_filename(cache_file)
+
+ path = f"{self.cache_base_directory}/{cache_file}"
+
+ with open(path, "r") as f:
+ return json.load(f)
+
+ def _adapt_format(self, item: List) -> List:
+ """Javascript expects timestamps to be in milliseconds
+ and counter values as floats
+
+ Args
+ item: List of 2 elements, timestamp and counter
+
+ Return:
+ Normalized tuple (timestamp in js expected time, counter as float)
+
+ """
+ timestamp = int(item[0])
+ counter_value = item[1]
+ return [timestamp * 1000, float(counter_value)]
+
+ def _get_timestamp_history(self, object: str,) -> List:
+ """Return the live values of an object"""
+ result = []
+
+ now = int(time.time())
+
+ url = self._compute_url(object=object, end=now,)
+ response = requests.get(url)
+ if response.ok:
+ data = response.json()
+ # data answer format:
+ # {"status":"success","data":{"result":[{"values":[[1544586427,"5375557897"]... # noqa
+ # Prometheus-provided data has to be adapted to js expectations
+ result = [
+ self._adapt_format(i) for i in data["data"]["result"][0]["values"]
+ ]
+ return result
+
+ def refresh_history(
+ self, cache_file: str, objects: List[str], static_file: Optional[str] = None,
+ ):
+ self._validate_filename(cache_file)
+
+ if static_file is not None:
+ static_data = self.get_history(static_file)
+ else:
+ static_data = {}
+
+ # for live content, we retrieve existing data and merges with the new one
+ live_data = {}
+ for object in objects:
+ prometheus_data = self._get_timestamp_history(object=object)
+ live_data[object] = static_data.get(object, []) + prometheus_data
+
+ with open(f"{self.cache_base_directory}/{cache_file}", "w") as f:
+ f.write(json.dumps(live_data))
diff --git a/swh/counters/interface.py b/swh/counters/interface.py
--- a/swh/counters/interface.py
+++ b/swh/counters/interface.py
@@ -30,3 +30,18 @@
@remote_api_endpoint("counters")
def get_counters(self) -> Iterable[str]:
"""Return the list of managed counters"""
+ ...
+
+
+class HistoryInterface:
+ @remote_api_endpoint("/history")
+ def get_history(self, cache_file: str):
+ """Return the content of an history file previously created
+ by the refresh_counters method"""
+
+ @remote_api_endpoint("/refresh_history")
+ def refresh_history(self, cache_file: str):
+ """Refresh the cache file containing the counters historical data.
+ It can be an aggregate of live data and static data stored on
+ a separate file"""
+ ...
diff --git a/swh/counters/tests/data/content.json b/swh/counters/tests/data/content.json
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/data/content.json
@@ -0,0 +1,32 @@
+{
+ "status": "success",
+ "data": {
+ "resultType": "matrix",
+ "result": [
+ {
+ "metric": {
+ "__name__": "swh_archive_object_total",
+ "col": "value",
+ "environment": "production",
+ "instance": "counters1.internal.softwareheritage.org",
+ "job": "swh-counters",
+ "object_type": "content"
+ },
+ "values": [
+ [
+ 100.1,
+ "10"
+ ],
+ [
+ 100.9,
+ "20"
+ ],
+ [
+ 110.101,
+ "30"
+ ]
+ ]
+ }
+ ]
+ }
+}
diff --git a/swh/counters/tests/data/foo.json b/swh/counters/tests/data/foo.json
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/data/foo.json
@@ -0,0 +1,4 @@
+{
+ "key1": "value1",
+ "key2": "value2"
+}
diff --git a/swh/counters/tests/data/revision.json b/swh/counters/tests/data/revision.json
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/data/revision.json
@@ -0,0 +1,32 @@
+{
+ "status": "success",
+ "data": {
+ "resultType": "matrix",
+ "result": [
+ {
+ "metric": {
+ "__name__": "swh_archive_object_total",
+ "col": "value",
+ "environment": "production",
+ "instance": "counters1.internal.softwareheritage.org",
+ "job": "swh-counters",
+ "object_type": "revision"
+ },
+ "values": [
+ [
+ 80.1,
+ "1"
+ ],
+ [
+ 90.9,
+ "2"
+ ],
+ [
+ 95.1,
+ "5"
+ ]
+ ]
+ }
+ ]
+ }
+}
diff --git a/swh/counters/tests/test_history.py b/swh/counters/tests/test_history.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/test_history.py
@@ -0,0 +1,217 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import os
+from typing import List
+
+import pytest
+
+from swh.counters.history import History
+
+TEST_HISTORY_CONFIG = {
+ "prometheus_host": "prometheus",
+ "prometheus_port": 8888,
+ "prometheus_collection": "swh.collection",
+ "cache_base_directory": "/tmp",
+ "live_data_start": "10",
+ "interval": "20h",
+ "query_range_uri": "/my/uri",
+ "labels": {"label1": "value1", "label2": "value2"},
+}
+
+TEST_JSON = {"key1": "value1", "key2": "value2"}
+CACHED_DATA = {"content": [[10, 1.5], [12, 2.0]], "revision": [[11, 4], [13, 5]]}
+
+
+@pytest.fixture
+def history():
+ return History(**TEST_HISTORY_CONFIG)
+
+
+def test_history_compute_url(history):
+
+ end = 99
+ object_type = "content"
+
+ url = history._compute_url(object=object_type, end=end,)
+
+ assert url == (
+ f'http://{TEST_HISTORY_CONFIG["prometheus_host"]}:'
+ f'{TEST_HISTORY_CONFIG["prometheus_port"]}/'
+ f'{TEST_HISTORY_CONFIG["query_range_uri"]}?'
+ f'query=sum({TEST_HISTORY_CONFIG["prometheus_collection"]}'
+ f'{{label1="value1",label2="value2",'
+ f'object_type="{object_type}"}})&'
+ f'start={TEST_HISTORY_CONFIG["live_data_start"]}&end={end}'
+ f'&step={TEST_HISTORY_CONFIG["interval"]}'
+ )
+
+
+@pytest.mark.parametrize(
+ "source, expected", [([1, "10"], [1000, 10.0]), ([2, "10.1"], [2000, 10.1]),]
+)
+def test_history__adapt_format(history, source, expected):
+ result = history._adapt_format(source)
+
+ assert expected == result
+
+
+def test_history__validate_filename(history):
+ with pytest.raises(ValueError, match="path information"):
+ history._validate_filename("/test.json")
+
+ with pytest.raises(ValueError, match="path information"):
+ history._validate_filename("../../test.json")
+
+ history._validate_filename("test.json")
+
+
+def test_history_get_history(history, tmp_path):
+ history.cache_base_directory = tmp_path
+
+ json_file = "test.json"
+ full_path = f"{tmp_path}/{json_file}"
+
+ with open(full_path, "w") as f:
+ f.write(json.dumps(TEST_JSON))
+
+ result = history.get_history(json_file)
+ assert result == TEST_JSON
+
+
+def test_history_get_history_relative_path_failed(history):
+ with pytest.raises(ValueError, match="path information"):
+ history.get_history("/test.json")
+
+
+def test_history__get_timestamp_history(history, requests_mock, datadir, mocker):
+ object = "content"
+ end = 100
+ url = history._compute_url(object, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ request_content_file = os.path.join(datadir, "content.json")
+ with open(request_content_file, "r") as f:
+ content = f.read()
+
+ requests_mock.get(
+ url, [{"content": bytes(content, "utf-8"), "status_code": 200},],
+ )
+
+ result = history._get_timestamp_history(object)
+
+ assert result == [[100000, 10.0], [100000, 20.0], [110000, 30.0]]
+
+
+def test_history__get_timestamp_history_request_failed(
+ history, requests_mock, datadir, mocker
+):
+ object = "content"
+ end = 100
+ url = history._compute_url(object, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ requests_mock.get(
+ url, [{"content": None, "status_code": 503},],
+ )
+
+ result = history._get_timestamp_history(object)
+
+ assert result == []
+
+
+def _configure_request_mock(
+ history_object, mock, datadir, objects: List[str], end: int
+):
+ for object_type in objects:
+ url = history_object._compute_url(object_type, end)
+ request_content_file = os.path.join(datadir, f"{object_type}.json")
+ with open(request_content_file, "r") as f:
+ content = f.read()
+ mock.get(
+ url, [{"content": bytes(content, "utf-8"), "status_code": 200},],
+ )
+
+
+def test_history__refresh_history_with_static_data(
+ history, requests_mock, mocker, datadir, tmp_path
+):
+ """Test the generation of a cache file with an aggregation
+ of static data and live data from prometheus
+ """
+ objects = ["content", "revision"]
+ static_file_name = "static.json"
+ cache_file = "result.json"
+ end = 100
+
+ with open(f"{tmp_path}/{static_file_name}", "w") as f:
+ f.write(json.dumps(CACHED_DATA))
+
+ _configure_request_mock(history, requests_mock, datadir, objects, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ history.cache_base_directory = tmp_path
+
+ history.refresh_history(
+ cache_file=cache_file, objects=objects, static_file=static_file_name
+ )
+
+ result_file = f"{tmp_path}/{cache_file}"
+ assert os.path.isfile(result_file)
+
+ expected_history = {
+ "content": [
+ [10, 1.5],
+ [12, 2.0],
+ [100000, 10.0],
+ [100000, 20.0],
+ [110000, 30.0],
+ ],
+ "revision": [[11, 4], [13, 5], [80000, 1.0], [90000, 2.0], [95000, 5.0]],
+ }
+
+ with open(result_file, "r") as f:
+ content = json.load(f)
+
+ assert expected_history == content
+
+
+def test_history__refresh_history_without_historical(
+ history, requests_mock, mocker, datadir, tmp_path
+):
+ """Test the generation of a cache file with only
+ live data from prometheus"""
+ objects = ["content", "revision"]
+ cache_file = "result.json"
+ end = 100
+
+ _configure_request_mock(history, requests_mock, datadir, objects, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ history.cache_base_directory = tmp_path
+
+ history.refresh_history(cache_file=cache_file, objects=objects)
+
+ result_file = f"{tmp_path}/{cache_file}"
+ assert os.path.isfile(result_file)
+
+ expected_history = {
+ "content": [[100000, 10.0], [100000, 20.0], [110000, 30.0]],
+ "revision": [[80000, 1.0], [90000, 2.0], [95000, 5.0]],
+ }
+
+ with open(result_file, "r") as f:
+ content = json.load(f)
+
+ assert expected_history == content
diff --git a/swh/counters/tests/test_init.py b/swh/counters/tests/test_init.py
--- a/swh/counters/tests/test_init.py
+++ b/swh/counters/tests/test_init.py
@@ -7,8 +7,9 @@
import pytest
-from swh.counters import get_counters
+from swh.counters import get_counters, get_history
from swh.counters.api.client import RemoteCounters
+from swh.counters.history import History
from swh.counters.interface import CountersInterface
from swh.counters.redis import Redis
@@ -69,3 +70,21 @@
assert expected_signature == actual_signature, meth_name
assert missing_methods == []
+
+
+def test_get_history_failure():
+ with pytest.raises(ValueError, match="Unknown history class"):
+ get_history("unknown-history")
+
+
+def test_get_history():
+ concrete_history = get_history(
+ "prometheus",
+ **{
+ "prometheus_host": "",
+ "prometheus_port": "",
+ "live_data_start": "",
+ "cache_base_directory": "",
+ },
+ )
+ assert isinstance(concrete_history, History)

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 12:57 AM (2 d, 12 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220134

Event Timeline