diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py --- a/swh/provenance/api/client.py +++ b/swh/provenance/api/client.py @@ -20,8 +20,9 @@ from swh.core.api.serializers import encode_data_client as encode_data from swh.core.api.serializers import msgpack_loads as decode_data -from swh.provenance import get_provenance_storage +from swh.core.statsd import statsd +from .. import get_provenance_storage from ..interface import ProvenanceStorageInterface from .serializers import DECODERS, ENCODERS from .server import ProvenanceStorageRabbitMQServer @@ -32,6 +33,8 @@ ) LOGGER = logging.getLogger(__name__) +STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds" + class ResponseTimeout(Exception): pass @@ -66,67 +69,73 @@ @functools.wraps(meth) # Copy signature and doc def meth_(*args, **kwargs): - # Match arguments and parameters - data = inspect.getcallargs(wrapped_meth, *args, **kwargs) + with statsd.timed( + metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} + ): + # Match arguments and parameters + data = inspect.getcallargs(wrapped_meth, *args, **kwargs) - # Remove arguments that should not be passed - self = data.pop("self") + # Remove arguments that should not be passed + self = data.pop("self") - # Call storage method with remaining arguments - return getattr(self._storage, meth_name)(**data) + # Call storage method with remaining arguments + return getattr(self._storage, meth_name)(**data) @functools.wraps(meth) # Copy signature and doc def write_meth_(*args, **kwargs): - # Match arguments and parameters - post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) - - try: - # Remove arguments that should not be passed - self = post_data.pop("self") - relation = post_data.pop("relation", None) - assert len(post_data) == 1 - if relation is not None: - items = [ - (src, rel.dst, rel.path) - for src, dsts in next(iter(post_data.values())).items() - for rel in dsts - ] - else: - data = next(iter(post_data.values())) - items = ( - list(data.items()) - if isinstance(data, dict) - else list({(item,) for item in data}) - ) + with statsd.timed( + metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} + ): + # Match arguments and parameters + post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) + + try: + # Remove arguments that should not be passed + self = post_data.pop("self") + relation = post_data.pop("relation", None) + assert len(post_data) == 1 + if relation is not None: + items = [ + (src, rel.dst, rel.path) + for src, dsts in next(iter(post_data.values())).items() + for rel in dsts + ] + else: + data = next(iter(post_data.values())) + items = ( + list(data.items()) + if isinstance(data, dict) + else list({(item,) for item in data}) + ) - acks_expected = len(items) - self._correlation_id = str(uuid.uuid4()) - exchange = ProvenanceStorageRabbitMQServer.get_exchange( - meth_name, relation - ) - for item in items: - routing_key = ProvenanceStorageRabbitMQServer.get_routing_key( - item, meth_name, relation + acks_expected = len(items) + self._correlation_id = str(uuid.uuid4()) + exchange = ProvenanceStorageRabbitMQServer.get_exchange( + meth_name, relation ) - # FIXME: this is running in a different thread! Hence, if - # self._connection drops, there is no guarantee that the - # request can be sent for the current elements. This - # situation should be handled properly. - self._connection.ioloop.add_callback_threadsafe( - functools.partial( - ProvenanceStorageRabbitMQClient.request, - channel=self._channel, - reply_to=self._callback_queue, - exchange=exchange, - routing_key=routing_key, - correlation_id=self._correlation_id, - data=item, + for item in items: + routing_key = ProvenanceStorageRabbitMQServer.get_routing_key( + item, meth_name, relation ) - ) - return self.wait_for_acks(acks_expected) - except BaseException as ex: - self.request_termination(str(ex)) - return False + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # request can be sent for the current elements. This + # situation should be handled properly. + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQClient.request, + channel=self._channel, + reply_to=self._callback_queue, + exchange=exchange, + routing_key=routing_key, + correlation_id=self._correlation_id, + data=item, + ) + ) + return self.wait_for_acks(acks_expected) + except BaseException as ex: + self.request_termination(str(ex)) + return False if meth_name not in attributes: attributes[meth_name] = ( diff --git a/swh/provenance/mongo/backend.py b/swh/provenance/mongo/backend.py --- a/swh/provenance/mongo/backend.py +++ b/swh/provenance/mongo/backend.py @@ -11,6 +11,7 @@ import mongomock import pymongo +from swh.core.statsd import statsd from swh.model.model import Sha1Git from ..interface import ( @@ -21,6 +22,8 @@ RevisionData, ) +STORAGE_DURATION_METRIC = "swh_provenance_storage_mongodb_duration_seconds" + class ProvenanceStorageMongoDb: def __init__(self, engine: str, **kwargs): @@ -28,9 +31,11 @@ self.dbname = kwargs.pop("dbname") self.conn_args = kwargs + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: self.db.client.close() + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) def content_add( self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: @@ -60,6 +65,7 @@ ) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: # get all the revisions # iterate and find the earliest @@ -88,6 +94,7 @@ ) return sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path))[0] + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: @@ -145,6 +152,7 @@ ) yield from sorted(occurs, key=lambda x: (x.date, x.revision, x.origin, x.path)) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) @@ -154,6 +162,7 @@ ) } + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) def directory_add( self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: @@ -176,6 +185,7 @@ self.db.directory.insert_one({"sha1": sha1, "ts": ts, "revision": {}}) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return { x["sha1"]: datetime.fromtimestamp(x["ts"], timezone.utc) @@ -185,6 +195,7 @@ ) } + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: return { x["sha1"] @@ -193,10 +204,12 @@ ) } + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"}) def location_add(self, paths: Iterable[bytes]) -> bool: # TODO: implement this methods if path are to be stored in a separate collection return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) def location_get_all(self) -> Set[bytes]: contents = self.db.content.find({}, {"revision": 1, "_id": 0, "directory": 1}) paths: List[Iterable[bytes]] = [] @@ -209,6 +222,7 @@ paths.extend(value for _, value in each_dir["revision"].items()) return set(sum(paths, [])) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) def open(self) -> None: if self.engine == "mongomock": self.db = mongomock.MongoClient(**self.conn_args).get_database(self.dbname) @@ -216,6 +230,7 @@ # assume real MongoDB server by default self.db = pymongo.MongoClient(**self.conn_args).get_database(self.dbname) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: existing = { x["sha1"]: x @@ -229,6 +244,7 @@ self.db.origin.insert_one({"sha1": sha1, "url": url}) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: return { x["sha1"]: x["url"] @@ -237,6 +253,7 @@ ) } + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) def revision_add( self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] ) -> bool: @@ -278,6 +295,7 @@ ) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: return { x["sha1"]: RevisionData( @@ -293,6 +311,7 @@ ) } + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"}) def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: @@ -349,6 +368,7 @@ ) return True + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"}) def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: @@ -427,6 +447,7 @@ ) return result + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"}) def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: @@ -469,5 +490,6 @@ for src_sha1, denorm in src_objs.items() } + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"}) def with_path(self) -> bool: return True diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -14,6 +14,7 @@ from typing_extensions import Literal from swh.core.db import BaseDb +from swh.core.statsd import statsd from swh.model.model import Sha1Git from ..interface import ( @@ -26,6 +27,8 @@ LOGGER = logging.getLogger(__name__) +STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds" + class ProvenanceStoragePostgreSql: def __init__(self, raise_on_commit: bool = False, **kwargs) -> None: @@ -55,14 +58,17 @@ def denormalized(self) -> bool: return "denormalized" in self.flavor + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: self.conn.close() + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) def content_add( self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: return self._entity_set_date("content", cnts) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" with self.transaction(readonly=True) as cursor: @@ -70,6 +76,7 @@ row = cursor.fetchone() return ProvenanceResult(**row) if row is not None else None + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: @@ -78,22 +85,27 @@ cursor.execute(query=sql, vars=(id, limit)) yield from (ProvenanceResult(**row) for row in cursor) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) def directory_add( self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] ) -> bool: return self._entity_set_date("directory", dirs) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: with self.transaction(readonly=True) as cursor: cursor.execute(f"SELECT sha1 FROM {entity.value}") return {row["sha1"] for row in cursor} + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"}) def location_add(self, paths: Iterable[bytes]) -> bool: if not self.with_path(): return True @@ -114,11 +126,13 @@ raise return False + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) def location_get_all(self) -> Set[bytes]: with self.transaction(readonly=True) as cursor: cursor.execute("SELECT location.path AS path FROM location") return {row["path"] for row in cursor} + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: try: if orgs: @@ -138,12 +152,14 @@ raise return False + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) def open(self) -> None: self.conn = BaseDb.connect(**self.conn_args).conn BaseDb.adapt_conn(self.conn) with self.transaction() as cursor: cursor.execute("SET timezone TO 'UTC'") + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) @@ -160,6 +176,7 @@ urls.update((row["sha1"], row["url"]) for row in cursor) return urls + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) def revision_add( self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]] ) -> bool: @@ -189,6 +206,7 @@ raise return False + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) @@ -210,6 +228,7 @@ ) return result + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"}) def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: @@ -238,11 +257,13 @@ raise return False + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"}) def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: return self._relation_get(relation, ids, reverse) + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"}) def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: @@ -322,5 +343,6 @@ result.setdefault(src, set()).add(RelationData(**row)) return result + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "with_path"}) def with_path(self) -> bool: return "with-path" in self.flavor