Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/api/client.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | |||||
import inspect | |||||
# import json | |||||
import logging | |||||
import queue | |||||
import time | |||||
from typing import Any, Optional | |||||
# from typing import Dict | |||||
import uuid | |||||
import pika | |||||
import pika.channel | |||||
import pika.connection | |||||
# from pika.exchange_type import ExchangeType | |||||
import pika.frame | |||||
import pika.spec | |||||
from swh.core.api import RPCClient | from swh.core.api import RPCClient | ||||
from swh.core.api.serializers import encode_data_client as encode_data | |||||
from swh.core.api.serializers import msgpack_loads as decode_data | |||||
from swh.provenance import get_provenance_storage | |||||
from ..interface import ProvenanceStorageInterface | from ..interface import ProvenanceStorageInterface | ||||
from .serializers import DECODERS, ENCODERS | from .serializers import DECODERS, ENCODERS | ||||
from .server import ProvenanceStorageRabbitMQServer | |||||
LOG_FORMAT = ( | |||||
"%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | |||||
"-35s %(lineno) -5d: %(message)s" | |||||
) | |||||
LOGGER = logging.getLogger(__name__) | |||||
class RemoteProvenanceStorage(RPCClient): | class ProvenanceStorageRPCClient(RPCClient): | ||||
"""Proxy to a remote provenance storage API""" | """Proxy to a remote provenance storage API""" | ||||
backend_class = ProvenanceStorageInterface | backend_class = ProvenanceStorageInterface | ||||
extra_type_decoders = DECODERS | extra_type_decoders = DECODERS | ||||
extra_type_encoders = ENCODERS | extra_type_encoders = ENCODERS | ||||
class MetaRabbitMQClient(type): | |||||
def __new__(cls, name, bases, attributes): | |||||
# For each method wrapped with @remote_api_endpoint in an API backend | |||||
# (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new | |||||
# method in RemoteStorage, with the same documentation. | |||||
# | |||||
# Note that, despite the usage of decorator magic (eg. functools.wrap), | |||||
# this never actually calls an IndexerStorage method. | |||||
backend_class = attributes.get("backend_class", None) | |||||
for base in bases: | |||||
if backend_class is not None: | |||||
break | |||||
backend_class = getattr(base, "backend_class", None) | |||||
if backend_class: | |||||
for meth_name, meth in backend_class.__dict__.items(): | |||||
if hasattr(meth, "_endpoint_path"): | |||||
cls.__add_endpoint(meth_name, meth, attributes) | |||||
return super().__new__(cls, name, bases, attributes) | |||||
@staticmethod | |||||
def __add_endpoint(meth_name, meth, attributes): | |||||
wrapped_meth = inspect.unwrap(meth) | |||||
@functools.wraps(meth) # Copy signature and doc | |||||
def meth_(*args, **kwargs): | |||||
# Match arguments and parameters | |||||
data = inspect.getcallargs(wrapped_meth, *args, **kwargs) | |||||
# Remove arguments that should not be passed | |||||
self = data.pop("self") | |||||
# Call storage method with remaining arguments | |||||
return getattr(self._storage, meth_name)(**data) | |||||
@functools.wraps(meth) # Copy signature and doc | |||||
def write_meth_(*args, **kwargs): | |||||
# Match arguments and parameters | |||||
post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) | |||||
# Remove arguments that should not be passed | |||||
self = post_data.pop("self") | |||||
relation = post_data.pop("relation", None) | |||||
assert len(post_data) == 1 | |||||
if relation is not None: | |||||
data = [ | |||||
(row.src, row.dst, row.path) | |||||
for row in next(iter(post_data.values())) | |||||
] | |||||
else: | |||||
data = list(next(iter(post_data.values())).items()) | |||||
acks_expected = len(data) | |||||
correlation_id = str(uuid.uuid4()) | |||||
for item in data: | |||||
routing_key = ProvenanceStorageRabbitMQServer.get_routing_key( | |||||
meth_name, relation, item | |||||
) | |||||
self.request(routing_key, data=item, correlation_id=correlation_id) | |||||
return self.wait_for_acks(acks_expected) | |||||
if meth_name not in attributes: | |||||
attributes[meth_name] = ( | |||||
write_meth_ | |||||
if ProvenanceStorageRabbitMQServer.is_write_method(meth_name) | |||||
else meth_ | |||||
) | |||||
class ConfigurationError(Exception): | |||||
pass | |||||
class ResponseTimeout(Exception): | |||||
pass | |||||
class ProvenanceStorageRabbitMQClient(metaclass=MetaRabbitMQClient): | |||||
backend_class = ProvenanceStorageInterface | |||||
extra_type_decoders = DECODERS | |||||
extra_type_encoders = ENCODERS | |||||
def __init__(self, url: str, **kwargs) -> None: | |||||
self.conn = pika.BlockingConnection(pika.connection.URLParameters(url)) | |||||
self.channel = self.conn.channel() | |||||
result = self.channel.queue_declare(queue="", exclusive=True) | |||||
self.callback_queue = result.method.queue | |||||
self.channel.basic_consume( | |||||
queue=self.callback_queue, | |||||
on_message_callback=self.on_response, | |||||
auto_ack=True, | |||||
) | |||||
self.response_queue: queue.Queue = queue.Queue() | |||||
# Get storage configuration from server. | |||||
self.request("get_storage_config") | |||||
try: | |||||
self._storage = get_provenance_storage(**self.wait_for_response()) | |||||
except ResponseTimeout: | |||||
LOGGER.warning("Timed out waiting for response on get_storage_config") | |||||
raise ConfigurationError | |||||
def on_response( | |||||
self, | |||||
channel: pika.channel.Channel, | |||||
method: pika.spec.Basic.Deliver, | |||||
props: pika.spec.BasicProperties, | |||||
body: bytes, | |||||
) -> None: | |||||
self.response_queue.put( | |||||
( | |||||
props.correlation_id, | |||||
decode_data(body, extra_decoders=self.extra_type_decoders), | |||||
) | |||||
) | |||||
def request( | |||||
self, routing_key: str, correlation_id: Optional[str] = None, **kwargs | |||||
) -> Any: | |||||
self.response = None | |||||
self.correlation_id = ( | |||||
correlation_id if correlation_id is not None else str(uuid.uuid4()) | |||||
) | |||||
self.channel.basic_publish( | |||||
exchange="", | |||||
routing_key=routing_key, | |||||
properties=pika.BasicProperties( | |||||
reply_to=self.callback_queue, | |||||
correlation_id=self.correlation_id, | |||||
), | |||||
body=encode_data(kwargs, extra_encoders=self.extra_type_encoders), | |||||
) | |||||
def wait_for_acks(self, acks_expected: int) -> bool: | |||||
acks_received = 0 | |||||
while acks_received < acks_expected: | |||||
try: | |||||
acks_received += self.wait_for_response() | |||||
except ResponseTimeout: | |||||
LOGGER.warning( | |||||
"Timed out waiting for acks, %s received, %s expected", | |||||
acks_received, | |||||
acks_expected, | |||||
) | |||||
return False | |||||
return acks_received == acks_expected | |||||
def wait_for_response(self, timeout: float = 60.0) -> Any: | |||||
start = time.monotonic() | |||||
while True: | |||||
try: | |||||
correlation_id, response = self.response_queue.get(block=False) | |||||
if correlation_id == self.correlation_id: | |||||
return response | |||||
except queue.Empty: | |||||
# Always make time_limit > 1 second | |||||
time_limit = max(timeout - (start - time.monotonic()), 1.0) | |||||
self.conn.process_data_events(time_limit=time_limit) | |||||
if self.response_queue.empty() and time.monotonic() > start + timeout: | |||||
raise ResponseTimeout | |||||
# class ProvenanceStorageRabbitMQClient(metaclass=MetaRabbitMQClient): | |||||
# """This is an example publisher that will handle unexpected interactions | |||||
# with RabbitMQ such as channel and connection closures. | |||||
# If RabbitMQ closes the connection, it will reopen it. You should | |||||
# look at the output, as there are limited reasons why the connection may | |||||
# be closed, which usually are tied to permission related issues or | |||||
# socket timeouts. | |||||
# It uses delivery confirmations and illustrates one way to keep track of | |||||
# messages that have been sent and if they've been confirmed by RabbitMQ. | |||||
# """ | |||||
# EXCHANGE_TYPE = ExchangeType.topic | |||||
# backend_class = ProvenanceStorageInterface | |||||
# extra_type_decoders = DECODERS | |||||
# extra_type_encoders = ENCODERS | |||||
# def __init__(self, url: str, storage_config: Dict[str, Any]) -> None: | |||||
# """Setup the example publisher object, passing in the URL we will use | |||||
# to connect to RabbitMQ. | |||||
# :param str url: The URL for connecting to RabbitMQ | |||||
# :param str routing_key: The routing key name from which this worker will | |||||
# consume messages | |||||
# :param str storage_config: Configuration parameters for the underlying | |||||
# ``ProvenanceStorage`` object | |||||
# """ | |||||
# self.should_reconnect = False | |||||
# self.was_consuming = False | |||||
# self._connection = None | |||||
# self._channel = None | |||||
# self._closing = False | |||||
# self._consumer_tag = None | |||||
# self._consuming = False | |||||
# self._prefetch_count = 100 | |||||
# self._url = url | |||||
# self._storage = get_provenance_storage(**storage_config) | |||||
# self._response_queue: queue.Queue = queue.Queue() | |||||
# self._mutex = threading.Lock() | |||||
# self._consumer_thread = threading.Thread(target=self.run) | |||||
# self._consumer_thread.start() | |||||
# # def __del__(self) -> None: | |||||
# # self.stop() | |||||
# # self._consumer_thread.join() | |||||
# def connect(self) -> pika.SelectConnection: | |||||
# """This method connects to RabbitMQ, returning the connection handle. | |||||
# When the connection is established, the on_connection_open method | |||||
# will be invoked by pika. | |||||
# :rtype: pika.SelectConnection | |||||
# """ | |||||
# LOGGER.info("Connecting to %s", self._url) | |||||
# LOGGER.debug("Acquiring mutex to establish connection") | |||||
# self._mutex.acquire() | |||||
# return pika.SelectConnection( | |||||
# parameters=pika.URLParameters(self._url), | |||||
# on_open_callback=self.on_connection_open, | |||||
# on_open_error_callback=self.on_connection_open_error, | |||||
# on_close_callback=self.on_connection_closed, | |||||
# ) | |||||
# def close_connection(self) -> None: | |||||
# assert self._connection is not None | |||||
# self._consuming = False | |||||
# if self._connection.is_closing or self._connection.is_closed: | |||||
# LOGGER.info("Connection is closing or already closed") | |||||
# else: | |||||
# LOGGER.info("Closing connection") | |||||
# self._connection.close() | |||||
# def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: | |||||
# """This method is called by pika once the connection to RabbitMQ has | |||||
# been established. It passes the handle to the connection object in | |||||
# case we need it, but in this case, we'll just mark it unused. | |||||
# :param pika.SelectConnection _unused_connection: The connection | |||||
# """ | |||||
# LOGGER.info("Connection opened") | |||||
# self.open_channel() | |||||
# def on_connection_open_error( | |||||
# self, _unused_connection: pika.SelectConnection, err: Exception | |||||
# ) -> None: | |||||
# """This method is called by pika if the connection to RabbitMQ | |||||
# can't be established. | |||||
# :param pika.SelectConnection _unused_connection: The connection | |||||
# :param Exception err: The error | |||||
# """ | |||||
# LOGGER.error("Connection open failed: %s", err) | |||||
# self._mutex.release() | |||||
# LOGGER.debug("Mutex released due to connection error") | |||||
# self.reconnect() | |||||
# def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): | |||||
# """This method is invoked by pika when the connection to RabbitMQ is | |||||
# closed unexpectedly. Since it is unexpected, we will reconnect to | |||||
# RabbitMQ if it disconnects. | |||||
# :param pika.connection.Connection connection: The closed connection obj | |||||
# :param Exception reason: exception representing reason for loss of | |||||
# connection. | |||||
# """ | |||||
# self._channel = None | |||||
# if self._closing: | |||||
# assert self._connection is not None | |||||
# self._connection.ioloop.stop() | |||||
# else: | |||||
# LOGGER.warning("Connection closed, reconnect necessary: %s", reason) | |||||
# self.reconnect() | |||||
# def reconnect(self) -> None: | |||||
# """Will be invoked if the connection can't be opened or is | |||||
# closed. Indicates that a reconnect is necessary then stops the | |||||
# ioloop. | |||||
# """ | |||||
# self.should_reconnect = True | |||||
# self.stop() | |||||
# def open_channel(self) -> None: | |||||
# """Open a new channel with RabbitMQ by issuing the Channel.Open RPC | |||||
# command. When RabbitMQ responds that the channel is open, the | |||||
# on_channel_open callback will be invoked by pika. | |||||
# """ | |||||
# LOGGER.info("Creating a new channel") | |||||
# assert self._connection is not None | |||||
# self._connection.channel(on_open_callback=self.on_channel_open) | |||||
# def on_channel_open(self, channel: pika.channel.Channel) -> None: | |||||
# """This method is invoked by pika when the channel has been opened. | |||||
# The channel object is passed in so we can make use of it. | |||||
# Since the channel is now open, we'll declare the exchange to use. | |||||
# :param pika.channel.Channel channel: The channel object | |||||
# """ | |||||
# LOGGER.info("Channel opened") | |||||
# self._channel = channel | |||||
# self.add_on_channel_close_callback() | |||||
# self.setup_queue() | |||||
# def add_on_channel_close_callback(self) -> None: | |||||
# """This method tells pika to call the on_channel_closed method if | |||||
# RabbitMQ unexpectedly closes the channel. | |||||
# """ | |||||
# LOGGER.info("Adding channel close callback") | |||||
# assert self._channel is not None | |||||
# self._channel.add_on_close_callback(callback=self.on_channel_closed) | |||||
# def on_channel_closed( | |||||
# self, channel: pika.channel.Channel, reason: Exception | |||||
# ) -> None: | |||||
# """Invoked by pika when RabbitMQ unexpectedly closes the channel. | |||||
# Channels are usually closed if you attempt to do something that | |||||
# violates the protocol, such as re-declare an exchange or queue with | |||||
# different parameters. In this case, we'll close the connection | |||||
# to shutdown the object. | |||||
# :param pika.channel.Channel: The closed channel | |||||
# :param Exception reason: why the channel was closed | |||||
# """ | |||||
# LOGGER.warning("Channel %i was closed: %s", channel, reason) | |||||
# self.close_connection() | |||||
# def setup_queue(self) -> None: | |||||
# """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC | |||||
# command. When it is complete, the on_queue_declareok method will | |||||
# be invoked by pika. | |||||
# """ | |||||
# LOGGER.info("Declaring callback queue") | |||||
# assert self._channel is not None | |||||
# self._channel.queue_declare( | |||||
# queue="", exclusive=True, callback=self.on_queue_declareok | |||||
# ) | |||||
# def on_queue_declareok(self, frame: pika.frame.Method) -> None: | |||||
# """Method invoked by pika when the Queue.Declare RPC call made in | |||||
# setup_queue has completed. In this method we will bind the queue | |||||
# and exchange together with the routing key by issuing the Queue.Bind | |||||
# RPC command. When this command is complete, the on_bindok method will | |||||
# be invoked by pika. | |||||
# :param pika.frame.Method method_frame: The Queue.DeclareOk frame | |||||
# 4 | |||||
# """ | |||||
# LOGGER.info("Binding queue to default exchanger") | |||||
# self._callback_queue = frame.method.queue | |||||
# # assert self._channel is not None | |||||
# # self._channel.queue_bind( | |||||
# # queue=self._callback_queue, exchange="", callback=self.on_bindok | |||||
# # ) | |||||
# # def on_bindok(self, _unused_frame: pika.frame.Method) -> None: | |||||
# # """Invoked by pika when the Queue.Bind method has completed. At this | |||||
# # point we will set the prefetch count for the channel. | |||||
# # :param pika.frame.Method _unused_frame: The Queue.BindOk response frame | |||||
# # :param str|unicode queue_name: The name of the queue to declare | |||||
# # """ | |||||
# # LOGGER.info("Queue bound") | |||||
# # self.set_qos() | |||||
# # def set_qos(self) -> None: | |||||
# """This method sets up the consumer prefetch to only be delivered | |||||
# one message at a time. The consumer must acknowledge this message | |||||
# before RabbitMQ will deliver another one. You should experiment | |||||
# with different prefetch values to achieve desired performance. | |||||
# """ | |||||
# assert self._channel is not None | |||||
# self._channel.basic_qos( | |||||
# prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok | |||||
# ) | |||||
# def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: | |||||
# """Invoked by pika when the Basic.QoS method has completed. At this | |||||
# point we will start consuming messages by calling start_consuming | |||||
# which will invoke the needed RPC commands to start the process. | |||||
# :param pika.frame.Method _unused_frame: The Basic.QosOk response frame | |||||
# """ | |||||
# LOGGER.info("QOS set to: %d", self._prefetch_count) | |||||
# self._mutex.release() | |||||
# LOGGER.debug("Mutex released with connection successfully established") | |||||
# self.start_consuming() | |||||
# def start_consuming(self) -> None: | |||||
# """This method sets up the consumer by first calling | |||||
# add_on_cancel_callback so that the object is notified if RabbitMQ | |||||
# cancels the consumer. It then issues the Basic.Consume RPC command | |||||
# which returns the consumer tag that is used to uniquely identify the | |||||
# consumer with RabbitMQ. We keep the value to use it when we want to | |||||
# cancel consuming. The on_message method is passed in as a callback pika | |||||
# will invoke when a message is fully received. | |||||
# """ | |||||
# LOGGER.info("Issuing consumer related RPC commands") | |||||
# assert self._channel is not None | |||||
# self.add_on_cancel_callback() | |||||
# # Get storage configuration from server. | |||||
# # if self._storage is None: | |||||
# # self.request("get_storage_config") | |||||
# # try: | |||||
# # self._storage = get_provenance_storage(**self.wait_for_response()) | |||||
# # except ResponseTimeout: | |||||
# # LOGGER.warning( | |||||
# # "Timed out waiting for response on get_storage_config" | |||||
# # ) | |||||
# # raise ConfigurationError | |||||
# self._consumer_tag = self._channel.basic_consume( | |||||
# queue=self._callback_queue, on_message_callback=self.on_message | |||||
# ) | |||||
# self.was_consuming = True | |||||
# self._consuming = True | |||||
# def add_on_cancel_callback(self) -> None: | |||||
# """Add a callback that will be invoked if RabbitMQ cancels the consumer | |||||
# for some reason. If RabbitMQ does cancel the consumer, | |||||
# on_consumer_cancelled will be invoked by pika. | |||||
# """ | |||||
# LOGGER.info("Adding consumer cancellation callback") | |||||
# assert self._channel is not None | |||||
# self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) | |||||
# def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: | |||||
# """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer | |||||
# receiving messages. | |||||
# :param pika.frame.Method method_frame: The Basic.Cancel frame | |||||
# """ | |||||
# LOGGER.info( | |||||
# "Consumer was cancelled remotely, shutting down: %r", method_frame | |||||
# ) | |||||
# if self._channel: | |||||
# self._channel.close() | |||||
# def on_message( | |||||
# self, | |||||
# _unused_channel: pika.channel.Channel, | |||||
# basic_deliver: pika.spec.Basic.Deliver, | |||||
# properties: pika.spec.BasicProperties, | |||||
# body: bytes, | |||||
# ) -> None: | |||||
# """Invoked by pika when a message is delivered from RabbitMQ. The | |||||
# channel is passed for your convenience. The basic_deliver object that | |||||
# is passed in carries the exchange, routing key, delivery tag and | |||||
# a redelivered flag for the message. The properties passed in is an | |||||
# instance of BasicProperties with the message properties and the body | |||||
# is the message that was sent. | |||||
# :param pika.channel.Channel _unused_channel: The channel object | |||||
# :param pika.spec.Basic.Deliver: basic_deliver method | |||||
# :param pika.spec.BasicProperties: properties | |||||
# :param bytes body: The message body | |||||
# """ | |||||
# LOGGER.info( | |||||
# "Received message # %s from %s: %s", | |||||
# basic_deliver.delivery_tag, | |||||
# properties.app_id, | |||||
# body, | |||||
# ) | |||||
# self._response_queue.put( | |||||
# ( | |||||
# properties.correlation_id, | |||||
# decode_data(body, extra_decoders=self.extra_type_decoders), | |||||
# ) | |||||
# ) | |||||
# self.acknowledge_message(delivery_tag=basic_deliver.delivery_tag) | |||||
# def acknowledge_message(self, delivery_tag: int) -> None: | |||||
# """Acknowledge the message delivery from RabbitMQ by sending a | |||||
# Basic.Ack RPC method for the delivery tag. | |||||
# :param int delivery_tag: The delivery tag from the Basic.Deliver frame | |||||
# """ | |||||
# LOGGER.info("Acknowledging message %s", delivery_tag) | |||||
# assert self._channel is not None | |||||
# self._channel.basic_ack(delivery_tag=delivery_tag) | |||||
# def stop_consuming(self) -> None: | |||||
# """Tell RabbitMQ that you would like to stop consuming by sending the | |||||
# Basic.Cancel RPC command. | |||||
# """ | |||||
# if self._channel: | |||||
# LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") | |||||
# self._channel.basic_cancel(self._consumer_tag, self.on_cancelok) | |||||
# def on_cancelok(self, _unused_frame: pika.frame.Method) -> None: | |||||
# """This method is invoked by pika when RabbitMQ acknowledges the | |||||
# cancellation of a consumer. At this point we will close the channel. | |||||
# This will invoke the on_channel_closed method once the channel has been | |||||
# closed, which will in-turn close the connection. | |||||
# :param pika.frame.Method _unused_frame: The Basic.CancelOk frame | |||||
# :param str|unicode consumer_tag: Tag of the consumer to be stopped | |||||
# """ | |||||
# self._consuming = False | |||||
# LOGGER.info( | |||||
# "RabbitMQ acknowledged the cancellation of the consumer: %s", | |||||
# self._consumer_tag, | |||||
# ) | |||||
# self.close_channel() | |||||
# def close_channel(self) -> None: | |||||
# """Call to close the channel with RabbitMQ cleanly by issuing the | |||||
# Channel.Close RPC command. | |||||
# """ | |||||
# LOGGER.info("Closing the channel") | |||||
# assert self._channel is not None | |||||
# self._channel.close() | |||||
# def run(self) -> None: | |||||
# """Run the example code by connecting and then starting the IOLoop.""" | |||||
# while not self._closing: | |||||
# try: | |||||
# self._connection = self.connect() | |||||
# assert self._connection is not None | |||||
# self._connection.ioloop.start() | |||||
# except KeyboardInterrupt: | |||||
# self.stop() | |||||
# if self._connection is not None and not self._connection.is_closed: | |||||
# # Finish closing | |||||
# self._connection.ioloop.start() | |||||
# LOGGER.info("Stopped") | |||||
# def stop(self) -> None: | |||||
# """Cleanly shutdown the connection to RabbitMQ by stopping the consumer | |||||
# with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok | |||||
# will be invoked by pika, which will then closing the channel and | |||||
# connection. The IOLoop is started again because this method is invoked | |||||
# when CTRL-C is pressed raising a KeyboardInterrupt exception. This | |||||
# exception stops the IOLoop which needs to be running for pika to | |||||
# communicate with RabbitMQ. All of the commands issued prior to starting | |||||
# the IOLoop will be buffered but not processed. | |||||
# """ | |||||
# assert self._connection is not None | |||||
# if not self._closing: | |||||
# self._closing = True | |||||
# LOGGER.info("Stopping") | |||||
# if self._consuming: | |||||
# self.stop_consuming() | |||||
# self._connection.ioloop.start() | |||||
# else: | |||||
# self._connection.ioloop.stop() | |||||
# LOGGER.info("Stopped") | |||||
# def request( | |||||
# self, routing_key: str, correlation_id: Optional[str] = None, **kwargs | |||||
# ) -> Any: | |||||
# LOGGER.debug("Acquiring mutex to send request on %s", routing_key) | |||||
# self._mutex.acquire() | |||||
# assert self._channel is not None | |||||
# self._correlation_id = ( | |||||
# correlation_id if correlation_id is not None else str(uuid.uuid4()) | |||||
# ) | |||||
# self._channel.basic_publish( | |||||
# exchange="", | |||||
# routing_key=routing_key, | |||||
# properties=pika.BasicProperties( | |||||
# reply_to=self._callback_queue, | |||||
# correlation_id=self._correlation_id, | |||||
# ), | |||||
# body=encode_data(kwargs, extra_encoders=self.extra_type_encoders), | |||||
# ) | |||||
# self._mutex.release() | |||||
# LOGGER.debug("Mutex released after sending request") | |||||
# def wait_for_acks(self, acks_expected: int) -> bool: | |||||
# acks_received = 0 | |||||
# while acks_received < acks_expected: | |||||
# try: | |||||
# acks_received += self.wait_for_response() | |||||
# except ResponseTimeout: | |||||
# LOGGER.warning( | |||||
# "Timed out waiting for acks, %s received, %s expected", | |||||
# acks_received, | |||||
# acks_expected, | |||||
# ) | |||||
# return False | |||||
# return acks_received == acks_expected | |||||
# def wait_for_response(self, timeout: float = 60.0) -> Any: | |||||
# start = time.monotonic() | |||||
# while True: | |||||
# try: | |||||
# correlation_id, response = self._response_queue.get(block=False) | |||||
# if correlation_id == self._correlation_id: | |||||
# return response | |||||
# except queue.Empty: | |||||
# pass | |||||
# if self._response_queue.empty() and time.monotonic() > start + timeout: | |||||
# raise ResponseTimeout |