Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/api/client.py
Show All 14 Lines | |||||
import pika | import pika | ||||
import pika.channel | import pika.channel | ||||
import pika.connection | import pika.connection | ||||
import pika.frame | import pika.frame | ||||
import pika.spec | import pika.spec | ||||
from swh.core.api.serializers import encode_data_client as encode_data | from swh.core.api.serializers import encode_data_client as encode_data | ||||
from swh.core.api.serializers import msgpack_loads as decode_data | from swh.core.api.serializers import msgpack_loads as decode_data | ||||
from swh.provenance import get_provenance_storage | from swh.core.statsd import statsd | ||||
from .. import get_provenance_storage | |||||
from ..interface import ProvenanceStorageInterface | from ..interface import ProvenanceStorageInterface | ||||
from .serializers import DECODERS, ENCODERS | from .serializers import DECODERS, ENCODERS | ||||
from .server import ProvenanceStorageRabbitMQServer | from .server import ProvenanceStorageRabbitMQServer | ||||
LOG_FORMAT = ( | LOG_FORMAT = ( | ||||
"%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | ||||
"-35s %(lineno) -5d: %(message)s" | "-35s %(lineno) -5d: %(message)s" | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | ||||
STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds" | |||||
class ResponseTimeout(Exception): | class ResponseTimeout(Exception): | ||||
pass | pass | ||||
class TerminateSignal(Exception): | class TerminateSignal(Exception): | ||||
pass | pass | ||||
Show All 18 Lines | def __new__(cls, name, bases, attributes): | ||||
return super().__new__(cls, name, bases, attributes) | return super().__new__(cls, name, bases, attributes) | ||||
@staticmethod | @staticmethod | ||||
def __add_endpoint(meth_name, meth, attributes): | def __add_endpoint(meth_name, meth, attributes): | ||||
wrapped_meth = inspect.unwrap(meth) | wrapped_meth = inspect.unwrap(meth) | ||||
@functools.wraps(meth) # Copy signature and doc | @functools.wraps(meth) # Copy signature and doc | ||||
def meth_(*args, **kwargs): | def meth_(*args, **kwargs): | ||||
with statsd.timed( | |||||
metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} | |||||
): | |||||
# Match arguments and parameters | # Match arguments and parameters | ||||
data = inspect.getcallargs(wrapped_meth, *args, **kwargs) | data = inspect.getcallargs(wrapped_meth, *args, **kwargs) | ||||
# Remove arguments that should not be passed | # Remove arguments that should not be passed | ||||
self = data.pop("self") | self = data.pop("self") | ||||
# Call storage method with remaining arguments | # Call storage method with remaining arguments | ||||
return getattr(self._storage, meth_name)(**data) | return getattr(self._storage, meth_name)(**data) | ||||
@functools.wraps(meth) # Copy signature and doc | @functools.wraps(meth) # Copy signature and doc | ||||
def write_meth_(*args, **kwargs): | def write_meth_(*args, **kwargs): | ||||
with statsd.timed( | |||||
metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} | |||||
): | |||||
# Match arguments and parameters | # Match arguments and parameters | ||||
post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) | post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) | ||||
try: | try: | ||||
# Remove arguments that should not be passed | # Remove arguments that should not be passed | ||||
self = post_data.pop("self") | self = post_data.pop("self") | ||||
relation = post_data.pop("relation", None) | relation = post_data.pop("relation", None) | ||||
assert len(post_data) == 1 | assert len(post_data) == 1 | ||||
if relation is not None: | if relation is not None: | ||||
items = [ | items = [ | ||||
(src, rel.dst, rel.path) | (src, rel.dst, rel.path) | ||||
for src, dsts in next(iter(post_data.values())).items() | for src, dsts in next(iter(post_data.values())).items() | ||||
for rel in dsts | for rel in dsts | ||||
] | ] | ||||
else: | else: | ||||
data = next(iter(post_data.values())) | data = next(iter(post_data.values())) | ||||
items = ( | items = ( | ||||
list(data.items()) | list(data.items()) | ||||
if isinstance(data, dict) | if isinstance(data, dict) | ||||
else list({(item,) for item in data}) | else list({(item,) for item in data}) | ||||
) | ) | ||||
acks_expected = len(items) | acks_expected = len(items) | ||||
self._correlation_id = str(uuid.uuid4()) | self._correlation_id = str(uuid.uuid4()) | ||||
exchange = ProvenanceStorageRabbitMQServer.get_exchange( | exchange = ProvenanceStorageRabbitMQServer.get_exchange( | ||||
meth_name, relation | meth_name, relation | ||||
) | ) | ||||
for item in items: | for item in items: | ||||
routing_key = ProvenanceStorageRabbitMQServer.get_routing_key( | routing_key = ProvenanceStorageRabbitMQServer.get_routing_key( | ||||
item, meth_name, relation | item, meth_name, relation | ||||
) | ) | ||||
# FIXME: this is running in a different thread! Hence, if | # FIXME: this is running in a different thread! Hence, if | ||||
# self._connection drops, there is no guarantee that the | # self._connection drops, there is no guarantee that the | ||||
# request can be sent for the current elements. This | # request can be sent for the current elements. This | ||||
# situation should be handled properly. | # situation should be handled properly. | ||||
self._connection.ioloop.add_callback_threadsafe( | self._connection.ioloop.add_callback_threadsafe( | ||||
functools.partial( | functools.partial( | ||||
ProvenanceStorageRabbitMQClient.request, | ProvenanceStorageRabbitMQClient.request, | ||||
channel=self._channel, | channel=self._channel, | ||||
reply_to=self._callback_queue, | reply_to=self._callback_queue, | ||||
exchange=exchange, | exchange=exchange, | ||||
routing_key=routing_key, | routing_key=routing_key, | ||||
correlation_id=self._correlation_id, | correlation_id=self._correlation_id, | ||||
data=item, | data=item, | ||||
) | ) | ||||
) | ) | ||||
return self.wait_for_acks(acks_expected) | return self.wait_for_acks(acks_expected) | ||||
except BaseException as ex: | except BaseException as ex: | ||||
self.request_termination(str(ex)) | self.request_termination(str(ex)) | ||||
return False | return False | ||||
if meth_name not in attributes: | if meth_name not in attributes: | ||||
attributes[meth_name] = ( | attributes[meth_name] = ( | ||||
write_meth_ | write_meth_ | ||||
if ProvenanceStorageRabbitMQServer.is_write_method(meth_name) | if ProvenanceStorageRabbitMQServer.is_write_method(meth_name) | ||||
else meth_ | else meth_ | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 402 Lines • Show Last 20 Lines |