Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/api/client.py
Show First 20 Lines • Show All 347 Lines • ▼ Show 20 Lines | class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient): | ||||
def on_response( | def on_response( | ||||
self, | self, | ||||
channel: pika.channel.Channel, | channel: pika.channel.Channel, | ||||
deliver: pika.spec.Basic.Deliver, | deliver: pika.spec.Basic.Deliver, | ||||
properties: pika.spec.BasicProperties, | properties: pika.spec.BasicProperties, | ||||
body: bytes, | body: bytes, | ||||
) -> None: | ) -> None: | ||||
LOGGER.info( | LOGGER.debug( | ||||
"Received message # %s from %s: %s", | "Received message # %s from %s: %s", | ||||
deliver.delivery_tag, | deliver.delivery_tag, | ||||
properties.app_id, | properties.app_id, | ||||
body, | body, | ||||
) | ) | ||||
self._response_queue.put( | self._response_queue.put( | ||||
( | ( | ||||
properties.correlation_id, | properties.correlation_id, | ||||
decode_data(body, extra_decoders=self.extra_type_decoders), | decode_data(body, extra_decoders=self.extra_type_decoders), | ||||
) | ) | ||||
) | ) | ||||
LOGGER.info("Acknowledging message %s", deliver.delivery_tag) | LOGGER.debug("Acknowledging message %s", deliver.delivery_tag) | ||||
channel.basic_ack(delivery_tag=deliver.delivery_tag) | channel.basic_ack(delivery_tag=deliver.delivery_tag) | ||||
def stop_consuming(self) -> None: | def stop_consuming(self) -> None: | ||||
if self._channel: | if self._channel: | ||||
LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") | LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") | ||||
self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok) | self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok) | ||||
def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None: | def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None: | ||||
▲ Show 20 Lines • Show All 94 Lines • Show Last 20 Lines |