Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124046
D6352.id23100.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
D6352.id23100.diff
View Options
diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
--- a/swh/provenance/api/client.py
+++ b/swh/provenance/api/client.py
@@ -20,8 +20,9 @@
from swh.core.api.serializers import encode_data_client as encode_data
from swh.core.api.serializers import msgpack_loads as decode_data
-from swh.provenance import get_provenance_storage
+from swh.core.statsd import statsd
+from .. import get_provenance_storage
from ..interface import ProvenanceStorageInterface
from .serializers import DECODERS, ENCODERS
from .server import ProvenanceStorageRabbitMQServer
@@ -32,6 +33,8 @@
)
LOGGER = logging.getLogger(__name__)
+STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds"
+
class ResponseTimeout(Exception):
pass
@@ -66,67 +69,73 @@
@functools.wraps(meth) # Copy signature and doc
def meth_(*args, **kwargs):
- # Match arguments and parameters
- data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
- # Remove arguments that should not be passed
- self = data.pop("self")
+ # Remove arguments that should not be passed
+ self = data.pop("self")
- # Call storage method with remaining arguments
- return getattr(self._storage, meth_name)(**data)
+ # Call storage method with remaining arguments
+ return getattr(self._storage, meth_name)(**data)
@functools.wraps(meth) # Copy signature and doc
def write_meth_(*args, **kwargs):
- # Match arguments and parameters
- post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
-
- try:
- # Remove arguments that should not be passed
- self = post_data.pop("self")
- relation = post_data.pop("relation", None)
- assert len(post_data) == 1
- if relation is not None:
- items = [
- (src, rel.dst, rel.path)
- for src, dsts in next(iter(post_data.values())).items()
- for rel in dsts
- ]
- else:
- data = next(iter(post_data.values()))
- items = (
- list(data.items())
- if isinstance(data, dict)
- else list({(item,) for item in data})
- )
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+
+ try:
+ # Remove arguments that should not be passed
+ self = post_data.pop("self")
+ relation = post_data.pop("relation", None)
+ assert len(post_data) == 1
+ if relation is not None:
+ items = [
+ (src, rel.dst, rel.path)
+ for src, dsts in next(iter(post_data.values())).items()
+ for rel in dsts
+ ]
+ else:
+ data = next(iter(post_data.values()))
+ items = (
+ list(data.items())
+ if isinstance(data, dict)
+ else list({(item,) for item in data})
+ )
- acks_expected = len(items)
- self._correlation_id = str(uuid.uuid4())
- exchange = ProvenanceStorageRabbitMQServer.get_exchange(
- meth_name, relation
- )
- for item in items:
- routing_key = ProvenanceStorageRabbitMQServer.get_routing_key(
- item, meth_name, relation
+ acks_expected = len(items)
+ self._correlation_id = str(uuid.uuid4())
+ exchange = ProvenanceStorageRabbitMQServer.get_exchange(
+ meth_name, relation
)
- # FIXME: this is running in a different thread! Hence, if
- # self._connection drops, there is no guarantee that the
- # request can be sent for the current elements. This
- # situation should be handled properly.
- self._connection.ioloop.add_callback_threadsafe(
- functools.partial(
- ProvenanceStorageRabbitMQClient.request,
- channel=self._channel,
- reply_to=self._callback_queue,
- exchange=exchange,
- routing_key=routing_key,
- correlation_id=self._correlation_id,
- data=item,
+ for item in items:
+ routing_key = ProvenanceStorageRabbitMQServer.get_routing_key(
+ item, meth_name, relation
)
- )
- return self.wait_for_acks(acks_expected)
- except BaseException as ex:
- self.request_termination(str(ex))
- return False
+ # FIXME: this is running in a different thread! Hence, if
+ # self._connection drops, there is no guarantee that the
+ # request can be sent for the current elements. This
+ # situation should be handled properly.
+ self._connection.ioloop.add_callback_threadsafe(
+ functools.partial(
+ ProvenanceStorageRabbitMQClient.request,
+ channel=self._channel,
+ reply_to=self._callback_queue,
+ exchange=exchange,
+ routing_key=routing_key,
+ correlation_id=self._correlation_id,
+ data=item,
+ )
+ )
+ return self.wait_for_acks(acks_expected)
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ return False
if meth_name not in attributes:
attributes[meth_name] = (
diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py
--- a/swh/provenance/mongo/backend.py
+++ b/swh/provenance/mongo/backend.py
@@ -10,6 +10,7 @@
from bson import ObjectId
import pymongo.database
+from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from ..interface import (
@@ -20,14 +21,18 @@
RevisionData,
)
+STORAGE_DURATION_METRIC = "swh_provenance_storage_mongodb_duration_seconds"
+
class ProvenanceStorageMongoDb:
def __init__(self, db: pymongo.database.Database):
self.db = db
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
pass
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
def content_add(
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
@@ -57,6 +62,7 @@
)
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
# get all the revisions
# iterate and find the earliest
@@ -85,6 +91,7 @@
)
return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0]
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"})
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
@@ -142,6 +149,7 @@
)
yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"})
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return {
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
@@ -151,6 +159,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
def directory_add(
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
@@ -173,6 +182,7 @@
self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}})
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return {
x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc)
@@ -182,6 +192,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"})
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
return {
x["sha1"]
@@ -190,10 +201,12 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
def location_add(self, paths: Iterable[bytes]) -> bool:
# TODO: implement this methods if path are to be stored in a separate collection
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
def location_get_all(self) -> Set[bytes]:
contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1})
paths: List[Iterable[bytes]] = []
@@ -206,6 +219,7 @@
paths.extend(value for _, value in each_dir["revision"].items())
return set(sum(paths, []))
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
existing = {
x["sha1"]: x
@@ -219,6 +233,7 @@
self.db.origin.insert_one({"sha1": sha1, "url": url})
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"})
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
return {
x["sha1"]: x["url"]
@@ -227,6 +242,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
@@ -268,6 +284,7 @@
)
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
return {
x["sha1"]: RevisionData(
@@ -283,6 +300,7 @@
)
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
@@ -339,6 +357,7 @@
)
return True
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Dict[Sha1Git, Set[RelationData]]:
@@ -417,6 +436,7 @@
)
return result
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"})
def relation_get_all(
self, relation: RelationType
) -> Dict[Sha1Git, Set[RelationData]]:
@@ -459,5 +479,6 @@
for src_sha1, denorm in src_objs.items()
}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"})
def with_path(self) -> bool:
return True
diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -14,6 +14,7 @@
from typing_extensions import Literal
from swh.core.db import BaseDb
+from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from ..interface import (
@@ -26,6 +27,8 @@
LOGGER = logging.getLogger(__name__)
+STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds"
+
class ProvenanceStoragePostgreSql:
def __init__(
@@ -60,14 +63,17 @@
def denormalized(self) -> bool:
return "denormalized" in self.flavor
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
pass
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
def content_add(
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
return self._entity_set_date("content", cnts)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"})
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
sql = "SELECT * FROM swh_provenance_content_find_first(%s)"
with self.transaction(readonly=True) as cursor:
@@ -75,6 +81,7 @@
row = cursor.fetchone()
return ProvenanceResult(**row) if row is not None else None
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"})
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
@@ -83,22 +90,27 @@
cursor.execute(query=sql, vars=(id, limit))
yield from (ProvenanceResult(**row) for row in cursor)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"})
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return self._entity_get_date("content", ids)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
def directory_add(
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]]
) -> bool:
return self._entity_set_date("directory", dirs)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"})
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
return self._entity_get_date("directory", ids)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"})
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
with self.transaction(readonly=True) as cursor:
cursor.execute(f"SELECT sha1 FROM {entity.value}")
return {row["sha1"] for row in cursor}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
def location_add(self, paths: Iterable[bytes]) -> bool:
if not self.with_path():
return True
@@ -119,11 +131,13 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"})
def location_get_all(self) -> Set[bytes]:
with self.transaction(readonly=True) as cursor:
cursor.execute("SELECT location.path AS path FROM location")
return {row["path"] for row in cursor}
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"})
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
try:
if orgs:
@@ -143,6 +157,7 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"})
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
urls: Dict[Sha1Git, str] = {}
sha1s = tuple(ids)
@@ -159,6 +174,7 @@
urls.update((row["sha1"], row["url"]) for row in cursor)
return urls
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"})
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
@@ -188,6 +204,7 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"})
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
result: Dict[Sha1Git, RevisionData] = {}
sha1s = tuple(ids)
@@ -209,6 +226,7 @@
)
return result
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"})
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
@@ -237,11 +255,13 @@
raise
return False
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"})
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Dict[Sha1Git, Set[RelationData]]:
return self._relation_get(relation, ids, reverse)
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"})
def relation_get_all(
self, relation: RelationType
) -> Dict[Sha1Git, Set[RelationData]]:
@@ -321,5 +341,6 @@
result.setdefault(src, set()).add(RelationData(**row))
return result
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"})
def with_path(self) -> bool:
return "with-path" in self.flavor
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 11:08 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216008
Attached To
D6352: Add StatsD support to provenance storage implementations
Event Timeline
Log In to Comment