Page MenuHomeSoftware Heritage

D6352.id23100.diff
No OneTemporary

D6352.id23100.diff

diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
--- a/swh/provenance/api/client.py
+++ b/swh/provenance/api/client.py
@@ -20,8 +20,9 @@
from swh.core.api.serializers import encode_data_client as encode_data
from swh.core.api.serializers import msgpack_loads as decode_data
-from swh.provenance import get_provenance_storage
+from swh.core.statsd import statsd
+from .. import get_provenance_storage
from ..interface import ProvenanceStorageInterface
from .serializers import DECODERS, ENCODERS
from .server import ProvenanceStorageRabbitMQServer
@@ -32,6 +33,8 @@
)
LOGGER = logging.getLogger(__name__)
+STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds"
+
class ResponseTimeout(Exception):
pass
@@ -66,67 +69,73 @@
@functools.wraps(meth) # Copy signature and doc
def meth_(*args, **kwargs):
- # Match arguments and parameters
- data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
- # Remove arguments that should not be passed
- self = data.pop("self")
+ # Remove arguments that should not be passed
+ self = data.pop("self")
- # Call storage method with remaining arguments
- return getattr(self._storage, meth_name)(**data)
+ # Call storage method with remaining arguments
+ return getattr(self._storage, meth_name)(**data)
@functools.wraps(meth) # Copy signature and doc
def write_meth_(*args, **kwargs):
- # Match arguments and parameters
- post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
-
- try:
- # Remove arguments that should not be passed
- self = post_data.pop("self")
- relation = post_data.pop("relation", None)
- assert len(post_data) == 1
- if relation is not None:
- items = [
- (src, rel.dst, rel.path)
- for src, dsts in next(iter(post_data.values())).items()
- for rel in dsts
- ]
- else:
- data = next(iter(post_data.values()))
- items = (
- list(data.items())
- if isinstance(data, dict)
- else list({(item,) for item in data})
- )
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+
+ try:
+ # Remove arguments that should not be passed
+ self = post_data.pop("self")
+ relation = post_data.pop("relation", None)
+ assert len(post_data) == 1
+ if relation is not None:
+ items = [
+ (src, rel.dst, rel.path)
+ for src, dsts in next(iter(post_data.values())).items()
+ for rel in dsts
+ ]
+ else:
+ data = next(iter(post_data.values()))
+ items = (
+ list(data.items())
+ if isinstance(data, dict)
+ else list({(item,) for item in data})
+ )
- acks_expected = len(items)
- self._correlation_id = str(uuid.uuid4())
- exchange = ProvenanceStorageRabbitMQServer.get_exchange(
- meth_name, relation
- )
- for item in items:
- routing_key = ProvenanceStorageRabbitMQServer.get_routing_key(
- item, meth_name, relation
+ acks_expected = len(items)
+ self._correlation_id = str(uuid.uuid4())
+ exchange = ProvenanceStorageRabbitMQServer.get_exchange(
+ meth_name, relation
)
- # FIXME: this is running in a different thread! Hence, if
- # self._connection drops, there is no guarantee that the
- # request can be sent for the current elements. This
- # situation should be handled properly.
- self._connection.ioloop.add_callback_threadsafe(
- functools.partial(
- ProvenanceStorageRabbitMQClient.request,
- channel=self._channel,
- reply_to=self._callback_queue,
- exchange=exchange,
- routing_key=routing_key,
- correlation_id=self._correlation_id,
- data=item,
+ for item in items:
+ routing_key = ProvenanceStorageRabbitMQServer.get_routing_key(
+ item, meth_name, relation
)
- )
- return self.wait_for_acks(acks_expected)
- except BaseException as ex:
- self.request_termination(str(ex))
- return False
+ # FIXME: this is running in a different thread! Hence, if
+ # self._connection drops, there is no guarantee that the
+ # request can be sent for the current elements. This
+ # situation should be handled properly.
+ self._connection.ioloop.add_callback_threadsafe(
+ functools.partial(
+ ProvenanceStorageRabbitMQClient.request,
+ channel=self._channel,
+ reply_to=self._callback_queue,
+ exchange=exchange,
+ routing_key=routing_key,
+ correlation_id=self._correlation_id,
+ data=item,
+ )
+ )
+ return self.wait_for_acks(acks_expected)
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ return False
if meth_name not in attributes:
attributes[meth_name] = (
diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py
--- a/swh/provenance/mongo/backend.py
+++ b/swh/provenance/mongo/backend.py
@@ -10,6 +10,7 @@
from bson import ObjectId
import pymongo.database
+from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from ..interface import (
@@ -20,14 +21,18 @@
RevisionData,
)
+STORAGE_DURATION_METRIC = "swh_provenance_storage_mongodb_duration_seconds"
+
class ProvenanceStorageMongoDb:
def __init__(self, db: pymongo.database.Database):
self.db = db
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
pass
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
def content_add(
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
@@ -57,6 +62,7 @@
)
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
# get all the revisions
# iterate and find the earliest
@@ -85,6 +91,7 @@
)
return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0]
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"})
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
@@ -142,6 +149,7 @@
)
yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"})
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return {
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
@@ -151,6 +159,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
def directory_add(
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
@@ -173,6 +182,7 @@
self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}})
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return {
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
@@ -182,6 +192,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"})
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
return {
x["sha1"]
@@ -190,10 +201,12 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
def location_add(self, paths: Iterable[bytes]) -> bool:
# TODO: implement this methods if path are to be stored in a separate collection
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
def location_get_all(self) -> Set[bytes]:
contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1})
paths: List[Iterable[bytes]] = []
@@ -206,6 +219,7 @@
paths.extend(value for _, value in each_dir["revision"].items())
return set(sum(paths, []))
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
existing = {
x["sha1"]: x
@@ -219,6 +233,7 @@
self.db.origin.insert_one({"sha1": sha1, "url": url})
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"})
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
return {
x["sha1"]: x["url"]
@@ -227,6 +242,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
@@ -268,6 +284,7 @@
)
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
return {
x["sha1"]: RevisionData(
@@ -283,6 +300,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
@@ -339,6 +357,7 @@
)
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Dict[Sha1Git, Set[RelationData]]:
@@ -417,6 +436,7 @@
)
return result
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"})
def relation_get_all(
self, relation: RelationType
) -> Dict[Sha1Git, Set[RelationData]]:
@@ -459,5 +479,6 @@
for src_sha1, denorm in src_objs.items()
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"})
def with_path(self) -> bool:
return True
diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -14,6 +14,7 @@
from typing_extensions import Literal
from swh.core.db import BaseDb
+from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from ..interface import (
@@ -26,6 +27,8 @@
LOGGER = logging.getLogger(__name__)
+STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds"
+
class ProvenanceStoragePostgreSql:
def __init__(
@@ -60,14 +63,17 @@
def denormalized(self) -> bool:
return "denormalized" in self.flavor
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
pass
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
def content_add(
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
return self._entity_set_date("content", cnts)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
sql = "SELECT * FROM swh_provenance_content_find_first(%s)"
with self.transaction(readonly=True) as cursor:
@@ -75,6 +81,7 @@
row = cursor.fetchone()
return ProvenanceResult(**row) if row is not None else None
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"})
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
@@ -83,22 +90,27 @@
cursor.execute(query=sql, vars=(id, limit))
yield from (ProvenanceResult(**row) for row in cursor)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"})
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return self._entity_get_date("content", ids)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
def directory_add(
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
return self._entity_set_date("directory", dirs)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return self._entity_get_date("directory", ids)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"})
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
with self.transaction(readonly=True) as cursor:
cursor.execute(f"SELECT sha1 FROM {entity.value}")
return {row["sha1"] for row in cursor}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
def location_add(self, paths: Iterable[bytes]) -> bool:
if not self.with_path():
return True
@@ -119,11 +131,13 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
def location_get_all(self) -> Set[bytes]:
with self.transaction(readonly=True) as cursor:
cursor.execute("SELECT location.path AS path FROM location")
return {row["path"] for row in cursor}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
try:
if orgs:
@@ -143,6 +157,7 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"})
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
urls: Dict[Sha1Git, str] = {}
sha1s = tuple(ids)
@@ -159,6 +174,7 @@
urls.update((row["sha1"], row["url"]) for row in cursor)
return urls
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
@@ -188,6 +204,7 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
result: Dict[Sha1Git, RevisionData] = {}
sha1s = tuple(ids)
@@ -209,6 +226,7 @@
)
return result
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
@@ -237,11 +255,13 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Dict[Sha1Git, Set[RelationData]]:
return self._relation_get(relation, ids, reverse)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"})
def relation_get_all(
self, relation: RelationType
) -> Dict[Sha1Git, Set[RelationData]]:
@@ -321,5 +341,6 @@
result.setdefault(src, set()).add(RelationData(**row))
return result
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"})
def with_path(self) -> bool:
return "with-path" in self.flavor

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 11:08 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216008

Event Timeline