Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123159
D5429.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
D5429.diff
View Options
diff --git a/MANIFEST.in b/MANIFEST.in
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -3,3 +3,4 @@
include version.txt
include README.md
recursive-include swh py.typed
+recursive-include swh/counters/tests/data/ *
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -2,3 +2,4 @@
pytest-mock
confluent-kafka
pytest-redis
+requests_mock
diff --git a/swh/counters/__init__.py b/swh/counters/__init__.py
--- a/swh/counters/__init__.py
+++ b/swh/counters/__init__.py
@@ -9,13 +9,17 @@
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
- from swh.counters.interface import CountersInterface
+ from swh.counters.interface import CountersInterface, HistoryInterface
COUNTERS_IMPLEMENTATIONS = {
"redis": ".redis.Redis",
"remote": ".api.client.RemoteCounters",
}
+HISTORY_IMPLEMENTATIONS = {
+ "prometheus": ".history.History",
+}
+
def get_counters(cls: str, **kwargs: Dict[str, Any]) -> CountersInterface:
"""Get an counters object of class `cls` with arguments `args`.
@@ -42,3 +46,30 @@
module = importlib.import_module(module_path, package=__package__)
Counters = getattr(module, class_name)
return Counters(**kwargs)
+
+
+def get_history(cls: str, **kwargs: Dict[str, Any]) -> HistoryInterface:
+ """Get a history object of class `cls` with arguments `kwargs`.
+
+ Args:
+ cls: history's class, only 'prometheus' is supported actually
+ kwargs: dictionary of arguments passed to the
+ counters class constructor
+
+ Returns:
+ an instance of swh.counters.history's classes (either local or remote)
+
+ Raises:
+ ValueError if passed an unknown history class.
+ """
+ class_path = HISTORY_IMPLEMENTATIONS.get(cls)
+ if class_path is None:
+ raise ValueError(
+ "Unknown history class `%s`. Supported: %s"
+ % (cls, ", ".join(HISTORY_IMPLEMENTATIONS))
+ )
+
+ (module_path, class_name) = class_path.rsplit(".", 1)
+ module = importlib.import_module(module_path, package=__package__)
+ History = getattr(module, class_name)
+ return History(**kwargs)
diff --git a/swh/counters/api/server.py b/swh/counters/api/server.py
--- a/swh/counters/api/server.py
+++ b/swh/counters/api/server.py
@@ -8,8 +8,8 @@
from swh.core import config
from swh.core.api import RPCServerApp
-from swh.counters import get_counters
-from swh.counters.interface import CountersInterface
+from swh.counters import get_counters, get_history
+from swh.counters.interface import CountersInterface, HistoryInterface
logger = logging.getLogger(__name__)
@@ -26,6 +26,11 @@
backend_factory=lambda: get_counters(**config["counters"]),
)
+ app.add_backend_class(
+ backend_class=HistoryInterface,
+ backend_factory=lambda: get_history(**config["history"]),
+ )
+
handler = logging.StreamHandler()
app.logger.addHandler(handler)
diff --git a/swh/counters/history.py b/swh/counters/history.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/history.py
@@ -0,0 +1,122 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+import json
+import logging
+import time
+from typing import Dict, List, Optional
+
+import requests
+
+logger = logging.getLogger(__name__)
+
+
+class History:
+ """Manage the historical data of the counters"""
+
+ def __init__(
+ self,
+ prometheus_host: str,
+ prometheus_port: int,
+ live_data_start: int,
+ cache_base_directory: str,
+ interval: str = "12h",
+ prometheus_collection: str = "swh_archive_object_total",
+ query_range_uri="/api/v1/query_range",
+ labels: Dict[str, str] = {},
+ ):
+ self.prometheus_host = prometheus_host
+ self.prometheus_port = prometheus_port
+ self.cache_base_directory = cache_base_directory
+ self.live_data_start = live_data_start
+ self.interval = interval
+ self.prometheus_collection = prometheus_collection
+ self.query_range_uri = query_range_uri
+ self.labels = labels
+
+ def _validate_filename(self, filename: str):
+ if "/" in str(filename):
+ raise ValueError("filename must not contain path information")
+
+ def _compute_url(self, object: str, end: int,) -> str:
+ """Compute the api url to request data from, specific to a label.
+
+ Args:
+ object: object_type/label data (ex: content, revision, ...)
+ end: retrieve the data until this date (timestamp)
+
+ Returns:
+ The api url to fetch the label's data
+ """
+ labels = self.labels.copy()
+ labels["object_type"] = object
+ formated_labels = ",".join([f'{k}="{v}"' for k, v in labels.items()])
+
+ url = (
+ f"http://{self.prometheus_host}:{self.prometheus_port}/"
+ f"{self.query_range_uri}?query=sum({self.prometheus_collection}"
+ f"{{{formated_labels}}})&start={self.live_data_start}&end={end}&"
+ f"step={self.interval}"
+ )
+ return url
+
+ def get_history(self, cache_file: str) -> Dict:
+ self._validate_filename(cache_file)
+
+ path = f"{self.cache_base_directory}/{cache_file}"
+
+ with open(path, "r") as f:
+ return json.load(f)
+
+ def _adapt_format(self, item: List) -> List:
+ """Javascript expects timestamps to be in milliseconds
+ and counter values as floats
+
+ Args
+ item: List of 2 elements, timestamp and counter
+
+ Return:
+ Normalized tuple (timestamp in js expected time, counter as float)
+
+ """
+ timestamp = int(item[0])
+ counter_value = item[1]
+ return [timestamp * 1000, float(counter_value)]
+
+ def _get_timestamp_history(self, object: str,) -> List:
+ """Return the live values of an object"""
+ result = []
+
+ now = int(time.time())
+
+ url = self._compute_url(object=object, end=now,)
+ response = requests.get(url)
+ if response.ok:
+ data = response.json()
+ # data answer format:
+ # {"status":"success","data":{"result":[{"values":[[1544586427,"5375557897"]... # noqa
+ # Prometheus-provided data has to be adapted to js expectations
+ result = [
+ self._adapt_format(i) for i in data["data"]["result"][0]["values"]
+ ]
+ return result
+
+ def refresh_history(
+ self, cache_file: str, objects: List[str], static_file: Optional[str] = None,
+ ):
+ self._validate_filename(cache_file)
+
+ if static_file is not None:
+ static_data = self.get_history(static_file)
+ else:
+ static_data = {}
+
+ # for live content, we retrieve existing data and merges with the new one
+ live_data = {}
+ for object in objects:
+ prometheus_data = self._get_timestamp_history(object=object)
+ live_data[object] = static_data.get(object, []) + prometheus_data
+
+ with open(f"{self.cache_base_directory}/{cache_file}", "w") as f:
+ f.write(json.dumps(live_data))
diff --git a/swh/counters/interface.py b/swh/counters/interface.py
--- a/swh/counters/interface.py
+++ b/swh/counters/interface.py
@@ -30,3 +30,18 @@
@remote_api_endpoint("counters")
def get_counters(self) -> Iterable[str]:
"""Return the list of managed counters"""
+ ...
+
+
+class HistoryInterface:
+ @remote_api_endpoint("/history")
+ def get_history(self, cache_file: str):
+ """Return the content of an history file previously created
+ by the refresh_counters method"""
+
+ @remote_api_endpoint("/refresh_history")
+ def refresh_history(self, cache_file: str):
+ """Refresh the cache file containing the counters historical data.
+ It can be an aggregate of live data and static data stored on
+ a separate file"""
+ ...
diff --git a/swh/counters/tests/data/content.json b/swh/counters/tests/data/content.json
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/data/content.json
@@ -0,0 +1,32 @@
+{
+ "status": "success",
+ "data": {
+ "resultType": "matrix",
+ "result": [
+ {
+ "metric": {
+ "__name__": "swh_archive_object_total",
+ "col": "value",
+ "environment": "production",
+ "instance": "counters1.internal.softwareheritage.org",
+ "job": "swh-counters",
+ "object_type": "content"
+ },
+ "values": [
+ [
+ 100.1,
+ "10"
+ ],
+ [
+ 100.9,
+ "20"
+ ],
+ [
+ 110.101,
+ "30"
+ ]
+ ]
+ }
+ ]
+ }
+}
diff --git a/swh/counters/tests/data/foo.json b/swh/counters/tests/data/foo.json
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/data/foo.json
@@ -0,0 +1,4 @@
+{
+ "key1": "value1",
+ "key2": "value2"
+}
diff --git a/swh/counters/tests/data/revision.json b/swh/counters/tests/data/revision.json
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/data/revision.json
@@ -0,0 +1,32 @@
+{
+ "status": "success",
+ "data": {
+ "resultType": "matrix",
+ "result": [
+ {
+ "metric": {
+ "__name__": "swh_archive_object_total",
+ "col": "value",
+ "environment": "production",
+ "instance": "counters1.internal.softwareheritage.org",
+ "job": "swh-counters",
+ "object_type": "revision"
+ },
+ "values": [
+ [
+ 80.1,
+ "1"
+ ],
+ [
+ 90.9,
+ "2"
+ ],
+ [
+ 95.1,
+ "5"
+ ]
+ ]
+ }
+ ]
+ }
+}
diff --git a/swh/counters/tests/test_history.py b/swh/counters/tests/test_history.py
new file mode 100644
--- /dev/null
+++ b/swh/counters/tests/test_history.py
@@ -0,0 +1,217 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import os
+from typing import List
+
+import pytest
+
+from swh.counters.history import History
+
+TEST_HISTORY_CONFIG = {
+ "prometheus_host": "prometheus",
+ "prometheus_port": 8888,
+ "prometheus_collection": "swh.collection",
+ "cache_base_directory": "/tmp",
+ "live_data_start": "10",
+ "interval": "20h",
+ "query_range_uri": "/my/uri",
+ "labels": {"label1": "value1", "label2": "value2"},
+}
+
+TEST_JSON = {"key1": "value1", "key2": "value2"}
+CACHED_DATA = {"content": [[10, 1.5], [12, 2.0]], "revision": [[11, 4], [13, 5]]}
+
+
+@pytest.fixture
+def history():
+ return History(**TEST_HISTORY_CONFIG)
+
+
+def test_history_compute_url(history):
+
+ end = 99
+ object_type = "content"
+
+ url = history._compute_url(object=object_type, end=end,)
+
+ assert url == (
+ f'http://{TEST_HISTORY_CONFIG["prometheus_host"]}:'
+ f'{TEST_HISTORY_CONFIG["prometheus_port"]}/'
+ f'{TEST_HISTORY_CONFIG["query_range_uri"]}?'
+ f'query=sum({TEST_HISTORY_CONFIG["prometheus_collection"]}'
+ f'{{label1="value1",label2="value2",'
+ f'object_type="{object_type}"}})&'
+ f'start={TEST_HISTORY_CONFIG["live_data_start"]}&end={end}'
+ f'&step={TEST_HISTORY_CONFIG["interval"]}'
+ )
+
+
+@pytest.mark.parametrize(
+ "source, expected", [([1, "10"], [1000, 10.0]), ([2, "10.1"], [2000, 10.1]),]
+)
+def test_history__adapt_format(history, source, expected):
+ result = history._adapt_format(source)
+
+ assert expected == result
+
+
+def test_history__validate_filename(history):
+ with pytest.raises(ValueError, match="path information"):
+ history._validate_filename("/test.json")
+
+ with pytest.raises(ValueError, match="path information"):
+ history._validate_filename("../../test.json")
+
+ history._validate_filename("test.json")
+
+
+def test_history_get_history(history, tmp_path):
+ history.cache_base_directory = tmp_path
+
+ json_file = "test.json"
+ full_path = f"{tmp_path}/{json_file}"
+
+ with open(full_path, "w") as f:
+ f.write(json.dumps(TEST_JSON))
+
+ result = history.get_history(json_file)
+ assert result == TEST_JSON
+
+
+def test_history_get_history_relative_path_failed(history):
+ with pytest.raises(ValueError, match="path information"):
+ history.get_history("/test.json")
+
+
+def test_history__get_timestamp_history(history, requests_mock, datadir, mocker):
+ object = "content"
+ end = 100
+ url = history._compute_url(object, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ request_content_file = os.path.join(datadir, "content.json")
+ with open(request_content_file, "r") as f:
+ content = f.read()
+
+ requests_mock.get(
+ url, [{"content": bytes(content, "utf-8"), "status_code": 200},],
+ )
+
+ result = history._get_timestamp_history(object)
+
+ assert result == [[100000, 10.0], [100000, 20.0], [110000, 30.0]]
+
+
+def test_history__get_timestamp_history_request_failed(
+ history, requests_mock, datadir, mocker
+):
+ object = "content"
+ end = 100
+ url = history._compute_url(object, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ requests_mock.get(
+ url, [{"content": None, "status_code": 503},],
+ )
+
+ result = history._get_timestamp_history(object)
+
+ assert result == []
+
+
+def _configure_request_mock(
+ history_object, mock, datadir, objects: List[str], end: int
+):
+ for object_type in objects:
+ url = history_object._compute_url(object_type, end)
+ request_content_file = os.path.join(datadir, f"{object_type}.json")
+ with open(request_content_file, "r") as f:
+ content = f.read()
+ mock.get(
+ url, [{"content": bytes(content, "utf-8"), "status_code": 200},],
+ )
+
+
+def test_history__refresh_history_with_static_data(
+ history, requests_mock, mocker, datadir, tmp_path
+):
+ """Test the generation of a cache file with an aggregation
+ of static data and live data from prometheus
+ """
+ objects = ["content", "revision"]
+ static_file_name = "static.json"
+ cache_file = "result.json"
+ end = 100
+
+ with open(f"{tmp_path}/{static_file_name}", "w") as f:
+ f.write(json.dumps(CACHED_DATA))
+
+ _configure_request_mock(history, requests_mock, datadir, objects, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ history.cache_base_directory = tmp_path
+
+ history.refresh_history(
+ cache_file=cache_file, objects=objects, static_file=static_file_name
+ )
+
+ result_file = f"{tmp_path}/{cache_file}"
+ assert os.path.isfile(result_file)
+
+ expected_history = {
+ "content": [
+ [10, 1.5],
+ [12, 2.0],
+ [100000, 10.0],
+ [100000, 20.0],
+ [110000, 30.0],
+ ],
+ "revision": [[11, 4], [13, 5], [80000, 1.0], [90000, 2.0], [95000, 5.0]],
+ }
+
+ with open(result_file, "r") as f:
+ content = json.load(f)
+
+ assert expected_history == content
+
+
+def test_history__refresh_history_without_historical(
+ history, requests_mock, mocker, datadir, tmp_path
+):
+ """Test the generation of a cache file with only
+ live data from prometheus"""
+ objects = ["content", "revision"]
+ cache_file = "result.json"
+ end = 100
+
+ _configure_request_mock(history, requests_mock, datadir, objects, end)
+
+ mock = mocker.patch("time.time")
+ mock.return_value = end
+
+ history.cache_base_directory = tmp_path
+
+ history.refresh_history(cache_file=cache_file, objects=objects)
+
+ result_file = f"{tmp_path}/{cache_file}"
+ assert os.path.isfile(result_file)
+
+ expected_history = {
+ "content": [[100000, 10.0], [100000, 20.0], [110000, 30.0]],
+ "revision": [[80000, 1.0], [90000, 2.0], [95000, 5.0]],
+ }
+
+ with open(result_file, "r") as f:
+ content = json.load(f)
+
+ assert expected_history == content
diff --git a/swh/counters/tests/test_init.py b/swh/counters/tests/test_init.py
--- a/swh/counters/tests/test_init.py
+++ b/swh/counters/tests/test_init.py
@@ -7,8 +7,9 @@
import pytest
-from swh.counters import get_counters
+from swh.counters import get_counters, get_history
from swh.counters.api.client import RemoteCounters
+from swh.counters.history import History
from swh.counters.interface import CountersInterface
from swh.counters.redis import Redis
@@ -69,3 +70,21 @@
assert expected_signature == actual_signature, meth_name
assert missing_methods == []
+
+
+def test_get_history_failure():
+ with pytest.raises(ValueError, match="Unknown history class"):
+ get_history("unknown-history")
+
+
+def test_get_history():
+ concrete_history = get_history(
+ "prometheus",
+ **{
+ "prometheus_host": "",
+ "prometheus_port": "",
+ "live_data_start": "",
+ "cache_base_directory": "",
+ },
+ )
+ assert isinstance(concrete_history, History)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 12:57 AM (2 d, 6 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220134
Attached To
D5429: Manage and expose the historical data
Event Timeline
Log In to Comment