Page MenuHomeSoftware Heritage

D6165.diff
No OneTemporary

D6165.diff

diff --git a/docs/storage/remote.rst b/docs/storage/remote.rst
new file mode 100644
--- /dev/null
+++ b/docs/storage/remote.rst
@@ -0,0 +1,724 @@
+:orphan:
+
+ProvenanceStorageRabbitMQClient
+===============================
+
+The remote storage client connects to a remote server using a RabbitMQ
+broker, and delegates all writing operations to it. However, reading
+operations are still done using a local storage object (ie.
+``ProvenanceStorageMongoDb`` or ``ProvenanceStoragePostgreSql``). This
+is done to speed up the process by avoiding the unnecessary RabbitMQ
+overhead for operations that are conflict-free.
+
+.. warning::
+
+ No check is done to guarantee that the local storage used by the
+ client is the same as the one the server uses on its side, but it is
+ assumed to be the case.
+
+When a writing operation is invoked, the client splits the set of
+elements to be written by ID range, and send a series of packages to the
+server (via the RabbitMQ broker). Each package consists of elements
+belonging to the same ID range but, to avoid sending huge packages,
+there might be more than one package per range. After this, the client
+blocks waiting for the server to acknowledge the writing of all
+requested elements.
+
+To initialize a client object it is required to provide two mandatory
+parameters:
+
+- ``url``: the URL string of the broker where the server expects to
+ receive the packages.
+- ``storage_config``: a dictionary containing the storage configuration
+ for the local storage object, as expected by
+ ``swh.provenance.get_provenance_storage``.
+
+Additionally, some optional parameter can be specified that may affect
+the performance of the remote storage as a whole:
+
+- ``batch_size``: an integer specifying the maximum allowed amount of
+ elements per package, after range splitting. Default to 100.
+- ``prefetch_count``: an integer specifying how many ack packages are
+ prefetched from the broker. Default to 100.
+- ``wait_min``: a float specifying the minimum amount of seconds that
+ the client should wait for the server’s response before failing.
+ Default to 60.
+- ``wait_per_batch``: a float specifying the amount of seconds to wait
+ per sent batch if items (ie. package). If
+ ``wait_per_batch * number_of_packages`` is less than ``wait_min``,
+ the latter will be used instead. Default to 10.
+
+As all ``ProvenanceStorageInterface`` compliant objects, the remote
+storage client has to be opened before being able to use it, and closed
+after it is no longer needed to properly release resources. It can be
+operated as a context manager using the keyword ``with``. This will take
+care of actually opening/closing both the connection to the remote
+server as well as the underlying local storage objects.
+
+Client lifecycle
+----------------
+
+Connecting to the remote server object implies taking care of the
+RabbitMQ’s lifecycle. To that end, an internal thread is launched, which
+will re-establish the connection in case of an error until an explicit
+disconnect request is made. The class
+``ProvenanceStorageRabbitMQClient`` extends the standard
+``threading.Thread`` class, thus the ``run`` method in the class is the
+target function of such thread, that will loop indefinitely calling the
+``connect`` method and the blocking ``ioloop.connect`` method of the
+established connection. Interaction with such loop is done by setting
+callback functions.
+
+The following is a diagram of the interaction between the methods of the
+class:
+
+.. graphviz::
+
+ digraph {
+ ProvenanceStorageRabbitMQClient
+
+ close[shape=record]
+ open[shape=record]
+ add[shape=record,label="write method"]
+ get[shape=record,label="read method"]
+
+ ProvenanceStorageInterface
+
+ start[shape=record]
+ stop[shape=record]
+
+ request[shape=record]
+ wait_for_acks[shape=record]
+ wait_for_response[shape=record]
+
+ subgraph cluster_connection_thread {
+ style=rounded
+ bgcolor=gray95
+ color=gray
+ labelloc=b
+
+ run[shape=record]
+
+ connect[shape=record]
+ on_connection_open[shape=record]
+ on_connection_open_error[shape=record]
+ on_connection_closed[shape=record]
+ close_connection[shape=record]
+ open_channel[shape=record]
+ on_channel_open[shape=record]
+ on_channel_closed[shape=record]
+ setup_queue[shape=record]
+ on_queue_declare_ok[shape=record]
+ on_basic_qos_ok[shape=record]
+ start_consuming[shape=record]
+ on_consumer_cancelled[shape=record]
+ on_response[shape=record]
+ stop_consuming[shape=record]
+ on_cancel_ok[shape=record]
+ }
+
+ ProvenanceStorageRabbitMQClient->{add,close,get,open}
+
+ close->{stop}
+ open->{start}
+
+ start->{run}
+ stop->{stop_consuming}
+
+ run->{connect,stop}
+
+ connect->{on_connection_open,on_connection_open_error,on_connection_closed}
+
+ on_connection_open->{open_channel}
+
+ open_channel->{on_channel_open}
+
+ on_cancel_ok->{on_channel_closed}
+ on_consumer_cancelled->{on_channel_closed}
+ on_channel_open->{setup_queue}
+
+ on_channel_closed->{close_connection}
+
+ setup_queue->{on_queue_declare_ok}
+
+ on_queue_declare_ok->{on_basic_qos_ok}
+
+ on_basic_qos_ok->{start_consuming}
+
+ start_consuming->{on_consumer_cancelled,on_response}
+
+ on_response->{wait_for_response}[label="_response_queue",arrowhead="none"]
+
+ stop_consuming->{on_cancel_ok}
+
+ add->{request,wait_for_acks}
+ wait_for_acks->{wait_for_response}
+
+ get->{ProvenanceStorageInterface}
+ }
+
+Every write method in the ``ProvenanceStorageInterface`` performs
+splitting by ID range and send a series of messages to the server by
+calling the ``request`` method. After that, it blocks waiting for all
+necessary acks packages to arrive by calling the ``wait_for_acks``
+methods, which in-turn calls ``wait_for_response``. Calls to the write
+methods may come from distinct threads, thus the interaction between
+``on_response`` and ``wait_for_response`` is done with a thread-safe
+``Queue.queue`` structure ``_response_queue``.
+
+Below there is a summary of the methods present in the diagram and their
+functionality. Methods are listed in alphabetic order:
+
+- ``close_connection``: this method properly closes the connection to
+ RabbitMQ.
+- ``connect``: this method connects to RabbitMQ, returning the
+ connection handle. When the connection is established, the
+ ``on_connection_open`` callback method will be invoked by ``pika``.
+ If there is an error establishing the connection, the
+ ``on_connection_open_error`` callback method will be invoked by
+ ``pika``. If the connection closes unexpectedly, the
+ ``on_connection_closed`` callback method will be invoked by ``pika``.
+- ``on_basic_qos_ok``: this callback method is invoked by ``pika`` when
+ the ``Basic.QoS`` RPC call made in ``on_queue_declare_ok`` has
+ completed. At this point it is safe to start consuming messages,
+ hence the ``start_consuming`` method is called, which will invoke the
+ needed RPC commands to start the process.
+- ``on_cancel_ok``: this callback method is invoked by ``pika`` when
+ RabbitMQ acknowledges the cancellation of a consumer. At this point
+ the channel close is requested, thus indirectly invoking the
+ ``on_channel_closed`` callback method once the channel has been
+ closed, which will in-turn close the connection.
+- ``on_channel_closed``: this callback method is invoked by ``pika``
+ when RabbitMQ unexpectedly closes the channel. Channels are usually
+ closed if there is an attempt to do something that violates the
+ protocol, such as re-declare an exchange or queue with different
+ parameters. In this case, the connection will be closed by invoking
+ the ``close_connection``.
+- ``on_channel_open``: this callback method is invoked by ``pika`` when
+ the channel has been opened. The ``on_channel_closed`` callback is
+ set here. Since the channel is now open, it is safe to set up the
+ client’s response queue on RabbitMQ by invoking the ``setup_queue``
+ method.
+- ``on_connection_closed``: this callback method is invoked by ``pika``
+ when the connection to RabbitMQ is closed unexpectedly. Since it is
+ unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_connection_open``: this callback method is invoked by ``pika``
+ once the connection to RabbitMQ has been established. It proceeds to
+ open the channel by calling the ``open_channel`` method.
+- ``on_connection_open_error``: this callback method is invoked by
+ ``pika`` if the connection to RabbitMQ can’t be established. Since it
+ is unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_consumer_cancelled``: this callback methods is invoked by
+ ``pika`` when RabbitMQ sends a ``Basic.Cancel`` for a consumer
+ receiving messages. At this point the channel close is requested,
+ thus indirectly invoking the ``on_channel_closed`` callback method
+ once the channel has been closed, which will in-turn close the
+ connection.
+- ``on_queue_declare_ok``: this callback method is invoked by ``pika``
+ when the ``Queue.Declare`` RPC call made in ``setup_queue`` has
+ completed. This method sets up the consumer prefetch count by
+ invoking the ``Basic.QoS`` RPC command. When it is completed, the
+ ``on_basic_qos_ok`` method will be invoked by ``pika``.
+- ``on_response``: this callback method is invoked by ``pika`` when a
+ message is delivered from RabbitMQ. The decoded response together
+ with its correlation ID is enqueued in the internal
+ ``_response_queue``, so that the data is forwarded to the
+ ``wait_for_response`` method, that might be running on a distinct
+ thread. A ``Basic.Ack`` RPC command is issued to acknowledge the
+ delivery of the message to RabbitMQ.
+- ``open_channel``: this method opens a new channel with RabbitMQ by
+ issuing the ``Channel.Open`` RPC command. When RabbitMQ responds that
+ the channel is open, the ``on_channel_open`` callback will be invoked
+ by ``pika``.
+- ``request``: this methods send a message to RabbitMQ by issuing a
+ ``Basic.Publish`` RPC command. The body of the message is properly
+ encoded, while correlation ID and request key are used as passed by
+ the calling method.
+- ``run``: main method of the internal thread. It requests to open a
+ connection to RabbitMQ by calling the ``connect`` method, and starts
+ the internal ``IOLoop`` of the returned handle (blocking operation).
+ In case of failure, this method will indefinitely try to reconnect.
+ When an explicit ``TerminateSignal`` is received, the ``stop`` method
+ is invoked.
+- ``setup_queue``: this methods sets up an exclusive queue for the
+ client on RabbitMQ by invoking the ``Queue.Declare`` RPC command.
+ When it is completed, the ``on_queue_declare_ok`` method will be
+ invoked by ``pika``.
+- ``start``: inherited from ``threading.Thread``. It launches the
+ internal thread with method ``run`` as target.
+- ``start_consuming``: this method sets up the consumer by first
+ registering the ``on_consumer_cancelled`` callback, so that the
+ client is notified if RabbitMQ cancels the consumer. It then issues
+ the ``Basic.Consume`` RPC command which returns the consumer tag that
+ is used to uniquely identify the consumer with RabbitMQ. The
+ ``on_response`` method is passed in as a callback ``pika`` will
+ invoke when a message is fully received.
+- ``stop``: this method cleanly shutdown the connection to RabbitMQ by
+ calling the ``stop_consuming`` method. The ``IOLoop`` is started
+ again because this method is invoked by raising a ``TerminateSignal``
+ exception. This exception stops the ``IOLoop`` which needs to be
+ running for ``pika`` to communicate with RabbitMQ. All of the
+ commands issued prior to starting the ``IOLoop`` will be buffered but
+ not processed.
+- ``stop_consuming``: this method sends a ``Basic.Cancel`` RPC command.
+ When RabbitMQ confirms the cancellation, the ``on_cancel_ok``
+ callback methods will be invoked by ``pika``, which will then close
+ the channel and connection.
+- ``wait_for_acks``: this method is invoked by every write methods in
+ the ``ProvenanceStorageInterface``, after sending a series of write
+ requests to the server. It will call ``wait_for_response`` until it
+ receives all expected ack responses, or until it receives the first
+ timeout. The timeout is calculated based on the number of expected
+ acks and class initialization parameters ``wait_per_batch`` and
+ ``wait_min``.
+- ``wait_for_response``: this method is called from ``wait_for_acks``
+ to retrieve ack packages from the internal ``_response_queue``. The
+ response correlation ID is used to validate the received ack
+ corresponds to the current write request. The method returns the
+ decoded body of the received response.
+
+ProvenanceStorageRabbitMQServer
+===============================
+
+The remote storage server is responsible for defining the ID range
+splitting policy for each entity and relation in the provenance
+solution. Based on this policy, it will launch a series of worker
+processes that will take care of actually processing the requests for
+each defined range in the partition. These processes are implemented in
+the ``ProvenanceStorageRabbitMQWorker`` described below.
+
+To initialize a server object it is required to provide two mandatory
+parameters:
+
+- ``url``: the URL string of the broker where the server expects to
+ receive the packages.
+- ``storage_config``: a dictionary containing the storage configuration
+ for the local storage object, as expected by
+ ``swh.provenance.get_provenance_storage``.
+
+Additionally, some optional parameter can be specified that may affect
+the performance of the remote storage as a whole:
+
+- ``batch_size``: an integer specifying the maximum allowed amount of
+ elements to be processed at a time, ie. forwarded to the underlying
+ storage object by each worker. Default to 100.
+- ``prefetch_count``: an integer specifying how many packages are
+ prefetched from the broker by each worker. Default to 100.
+
+On initialization, the server will create the necessary
+``ProvenanceStorageRabbitMQWorker``, forwarding to them the parameters
+mentioned above, but it won’t launch these underlying processes until it
+is explicitly started. To that end, the
+``ProvenanceStorageRabbitMQServer`` objects provide the following
+methods: - ``start``: this method launches all the necessary worker
+subprocesses and ensures they all are in a proper consuming state before
+returning control to the caller. - ``stop``: this method signals all
+worker subprocesses for termination and blocks until they all finish
+successfully.
+
+ID range splitting policy:
+--------------------------
+
+The ID range splitting policy is defined as follows
+
+- There is an exchange in the RabbitMQ broker for each entity in the
+ provenance solution, plus an extra one to handle locations:
+ ``content``, ``directory``, ``location``, ``origin``, and
+ ``revision``.
+- Any request to add an entity should send the necessary packages to
+ the entity’s associated exchange for proper routing: ie. requests for
+ ``content_add`` will be handled by the ``content`` exchange.
+- Any request to add a relation entry is handled by the exchange of the
+ source entity in the relation: ie. requests for ``relation_add`` with
+ ``relation=CNT_EARLY_IN_REV`` will be handled by the ``content``
+ exchange.
+- ID range splitting is done by looking at the first byte in the SWHID
+ of the entity (ie. a hex value), hence 16 possible ranges are defined
+ for each operation associated to each entity. In the case of
+ locations, a hex hash is calculated over its value.
+- Each exchange then handles 16 queues for each method associated to
+ the exchange’s entity, with a ``direct`` routing policy. For
+ instance, the ``content`` exchange has 16 queues associated to each
+ of the following methods: ``content_add``, ``relation_add`` with
+ ``relation=CNT_EARLY_IN_REV``, and ``relation_add`` with
+ ``relation=CNT_IN_DIR`` (ie. a total of 48 queues).
+- For each exchange, 16 ``ProvenanceStorageRabbitMQWorker`` processes
+ are launched, each of them taking care of one ID range for the
+ associated entity.
+
+All in all, this gives a total of 80 ``ProvenanceStorageRabbitMQWorker``
+processes (16 per exchange) and 160 RabbitMQ queues (48 for ``content``
+and ``revision``, 32 for ``directory``, and 16 for ``location`` and
+``origin``). In this way, it is guaranteed that, regardless of the
+operation being performed, there would never be more than one process
+trying to write on a given ID range for a given entity. Thus, resolving
+all potential conflicts.
+
+Although the ID range splitting policy is defined on the server side, so
+it can properly configure and launch the necessary worker processes, it
+is the client the responsible for actually splitting the input to each
+write method and send the write requests to the proper queues for
+RabbitMQ route them to the correct worker. For that, the server defines
+a series of static methods that allow to query the ID range splitting
+policy:
+
+- ``get_binding_keys``: this method is meant to be used by the server
+ workers. Given an exchange and a range ID, it yields all the RabbitMQ
+ routing keys the worker process should bind to.
+- ``get_exchange``: given the name of a write method in the
+ ``ProvenanceStorageInterface``, and an optional relation type, it
+ return the name of the exchange to which the writing request should
+ be sent.
+- ``get_exchanges``: this method yields the names of all the exchanges
+ in the RabbitMQ broker.
+- ``get_meth_name``: this method is meant to be used by the server
+ workers. Given a binding key as returned by ``get_binding_keys``, it
+ return the ``ProvenanceStorageInterface`` method associated to it.
+- ``get_meth_names``: given an exchange name, it yields all the methods
+ that are associated to it. In case of ``relation_add``, the method
+ also returns the supported relation type.
+- ``get_ranges``: given an exchange name, it yields the integer value
+ of all supported ranges IDs (currently 0-15 for all exchanges). The
+ idea behind this method is to allow defining a custom ID range split
+ for each exchange.
+- ``get_routing_key``: given the name of a write method in the
+ ``ProvenanceStorageInterface``, an optional relation type, and a
+ tuple with the data to be passed to the method (first parameter),
+ this method returns the routing key of the queue responsible to
+ handle that tuple. It is assumed that the first value in the tuple is
+ a ``Sha1Git`` ID.
+- ``is_write_method``: given the name of a method in the
+ ``ProvenanceStorageInterface``, it decides if it is a write method or
+ not.
+
+ProvenanceStorageRabbitMQWorker
+===============================
+
+The remote storage worker consume messages published in the RabbitMQ
+broker by the remote storage clients, and proceed to perform the actual
+writing operations to a local storage object (ie.
+``ProvenanceStorageMongoDb`` or ``ProvenanceStoragePostgreSql``). Each
+worker process messages associated to a particular entity and range ID.
+It is the client’s responsibility to properly split data along messages
+according to the remote storage server policy.
+
+Since there is overlapping between methods in the
+``ProvenanceInterface`` operating over the same entity, one worker may
+have to handle more than one method to guarantee conflict-free writings
+to the underlying storage. For instance, consider the ``content``
+entity, for a given ID range, methods ``content_add`` and
+``relation_add`` with ``relation=CNT_EARLY_IN_REV`` may conflict. Is the
+worker’s responsibility to solve this kind of conflicts.
+
+To initialize a server object it is required to provide two mandatory
+parameters:
+
+- ``url``: the URL string of the broker where the server expects to
+ receive the packages.
+- ``exchange``: the RabbitMQ exchange to which the worker will
+ subscribe. See ``ProvenanceStorageRabbitMQServer``\ ’s ID range
+ splitting policy for further details.
+- ``range``: the range ID the worker will be processing. See
+ ``ProvenanceStorageRabbitMQServer``\ ’s ID range splitting policy for
+ further details.
+- ``storage_config``: a dictionary containing the storage configuration
+ for the local storage object, as expected by
+ ``swh.provenance.get_provenance_storage``.
+
+Additionally, some optional parameter can be specified that may affect
+the performance of the remote storage as a whole:
+
+- ``batch_size``: an integer specifying the maximum allowed amount of
+ elements to be processed at a time, ie. forwarded to the underlying
+ storage object for writing. Default to 100.
+- ``prefetch_count``: an integer specifying how many packages are
+ prefetched from the broker. Default to 100.
+
+.. warning::
+
+ This class is not meant to be used directly but through an instance
+ of ``ProvenanceStorageRabbitMQServer``. The parameters ``url``,
+ ``storage_config``, ``batch_size`` and ``prefetch_count`` above are
+ forwarded as passed to the server on initialization. Additional
+ arguments ``exchange`` and ``range`` are generated by the server
+ based on its ID range splitting policy.
+
+Worker lifecycle
+----------------
+
+All interaction between the provenance solution and a remote storage
+worker object happens through RabbitMQ packages. To maximize
+concurrency, each instance of the ``ProvenanceStorageRabbitMQWorker``
+launches a distinct sub-process, hence avoiding unnecessary
+synchronization with other components on the solution. For this,
+``ProvenanceStorageRabbitMQWorker`` extends ``multiprocessing.Process``
+and only has a direct channel of communication to the master
+``ProvenanceStorageRabbitMQServer``, through ``multiprocessing.Queue``
+structures. Then, the entry point of the sub-process is the method
+``run`` which will in-turn launch a bunch of threads to handle the
+different provenance methods the worker needs to support, and an extra
+thread to handle communication with the server object. RabbitMQ’s
+lifecycle will be taken care of in the main thread.
+
+The following is a diagram of the interaction between the methods of the
+class:
+
+.. graphviz::
+
+ digraph {
+ ProvenanceStorageRabbitMQServer
+ ProvenanceStorageRabbitMQWorker
+
+ start[shape=record]
+ run[shape=record]
+
+ connect[shape=record]
+ on_connection_open[shape=record]
+ on_connection_open_error[shape=record]
+ on_connection_closed[shape=record]
+ close_connection[shape=record]
+ open_channel[shape=record]
+ on_channel_open[shape=record]
+ on_channel_closed[shape=record]
+
+ setup_exchange[shape=record]
+ on_exchange_declare_ok[shape=record]
+
+ setup_queues[shape=record]
+ on_queue_declare_ok[shape=record]
+ on_bind_ok[shape=record]
+ on_basic_qos_ok[shape=record]
+ start_consuming[shape=record]
+ on_consumer_cancelled[shape=record]
+ on_request[shape=record]
+ stop_consuming[shape=record]
+ on_cancel_ok[shape=record]
+
+ request_termination[shape=record]
+ stop[shape=record]
+
+ respond[shape=record]
+ get_conflicts_func[shape=record]
+
+ subgraph cluster_command_thread {
+ style=rounded
+ bgcolor=gray95
+ color=gray
+ labelloc=b
+
+ run_command_thread[shape=record]
+ }
+
+ subgraph cluster_request_thread {
+ style=rounded
+ bgcolor=gray95
+ color=gray
+ labelloc=b
+
+ ProvenanceStorageInterface
+
+ run_request_thread[shape=record]
+ }
+
+ ProvenanceStorageRabbitMQWorker->{start}
+
+ start->{run}
+ stop->{stop_consuming}
+
+ run->{connect,run_command_thread,run_request_thread,stop}
+
+ connect->{on_connection_open,on_connection_open_error,on_connection_closed}
+
+ on_connection_open->{open_channel}
+
+ open_channel->{on_channel_open}
+
+ on_cancel_ok->{on_channel_closed}
+ on_consumer_cancelled->{on_channel_closed}
+ on_channel_open->{setup_exchange}
+
+ on_channel_closed->{close_connection}
+
+ setup_exchange->{on_exchange_declare_ok}
+ on_exchange_declare_ok->{setup_queues}
+
+ setup_queues->{on_queue_declare_ok}
+ on_queue_declare_ok->{on_bind_ok}
+ on_bind_ok->{on_basic_qos_ok}
+ on_basic_qos_ok->{start_consuming}
+
+ start_consuming->{on_consumer_cancelled,on_request}
+ start_consuming->{ProvenanceStorageRabbitMQServer}[label=" signal",arrowhead="none"]
+
+ on_request->{run_request_thread}[label=" _request_queues",arrowhead="none"]
+
+ stop_consuming->{on_cancel_ok}
+
+ run_command_thread->{request_termination}
+ run_command_thread->{ProvenanceStorageRabbitMQServer}[label=" command",arrowhead="none"]
+
+ run_request_thread->{ProvenanceStorageInterface,get_conflicts_func,respond,request_termination}
+
+ request_termination->{run}[label="TerminateSignal",arrowhead="none"]
+ }
+
+There is a request thread for each ``ProvenanceStorageInterface`` method
+the worker needs to handle, each thread with its own provenance storage
+object (ie. exclusive connection). Each of these threads will receive
+sets of parameters to be passed to their correspondent method and
+perform explicit conflict resolution by using the method-specific
+function returned by ``get_conflicts_func``, prior to passing these
+parameters to the underlying storage.
+
+Below there is a summary of the methods present in the diagram and their
+functionality. Methods are listed in alphabetic order:
+
+- ``close_connection``: this method properly closes the connection to
+ RabbitMQ.
+- ``connect``: this method connects to RabbitMQ, returning the
+ connection handle. When the connection is established, the
+ ``on_connection_open`` callback method will be invoked by ``pika``.
+ If there is an error establishing the connection, the
+ ``on_connection_open_error`` callback method will be invoked by
+ ``pika``. If the connection closes unexpectedly, the
+ ``on_connection_closed`` callback method will be invoked by ``pika``.
+- ``on_basic_qos_ok``: this callback method is invoked by ``pika`` when
+ the ``Basic.QoS`` RPC call made in ``on_bind_ok`` has completed. At
+ this point it is safe to start consuming messages, hence the
+ ``start_consuming`` method is called, which will invoke the needed
+ RPC commands to start the process.
+- ``on_bind_ok``:this callback method is invoked by ``pika`` when the
+ ``Queue.Bind`` RPC call made in ``on_queue_declare_ok`` has
+ completed. This method sets up the consumer prefetch count by
+ invoking the ``Basic.QoS`` RPC command. When it is completed, the
+ ``on_basic_qos_ok`` method will be invoked by ``pika``.
+- ``on_cancel_ok``: this method is invoked by ``pika`` when RabbitMQ
+ acknowledges the cancellation of a consumer. At this point the
+ channel close is requested, thus indirectly invoking the
+ ``on_channel_closed`` callback method once the channel has been
+ closed, which will in-turn close the connection.
+- ``on_channel_closed``: this callback method is invoked by ``pika``
+ when RabbitMQ unexpectedly closes the channel. Channels are usually
+ closed if there is an attempt to do something that violates the
+ protocol, such as re-declare an exchange or queue with different
+ parameters. In this case, the connection will be closed by invoking
+ the ``close_connection``.
+- ``on_channel_open``: this callback method is invoked by ``pika`` when
+ the channel has been opened. The ``on_channel_closed`` callback is
+ set here. Since the channel is now open, it is safe to declare the
+ exchange to use by invoking the ``setup_exchange`` method.
+- ``on_connection_closed``: this callback method is invoked by ``pika``
+ when the connection to RabbitMQ is closed unexpectedly. Since it is
+ unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_connection_open``: this callback method is called by ``pika``
+ once the connection to RabbitMQ has been established. It proceeds to
+ open the channel by calling the ``open_channel`` method.
+- ``on_connection_open_error``: this callback method is called by
+ ``pika`` if the connection to RabbitMQ can’t be established. Since it
+ is unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_consumer_cancelled``: this callback method is invoked by
+ ``pika`` when RabbitMQ sends a ``Basic.Cancel`` for a consumer
+ receiving messages. At this point the channel close is requested,
+ thus indirectly invoking the ``on_channel_closed`` callback method
+ once the channel has been closed, which will in-turn close the
+ connection.
+- ``on_exchange_declare_ok``: this callback methods is invoked by
+ ``pika`` when the ``Exchange.Declare`` RPC call made in
+ ``setup_exchange`` has completed. At this point it is time to set up
+ the queues for the different request handling threads. This is done
+ by calling the ``setup_queues`` method.
+- ``on_queue_declare_ok``: this callback method is invoked by ``pika``
+ when each ``Queue.Declare`` RPC call made in ``setup_queues`` has
+ completed. Now it is time to bind the current queue and exchange
+ together with the correspondent routing key by issuing the
+ ``Queue.Bind`` RPC command. When this command is completed, the
+ ``on_bind_ok`` method will be invoked by ``pika``.
+- ``on_request``: this callback method is invoked by ``pika`` when a
+ message is delivered from RabbitMQ to any of the queues bound by the
+ worker. The decoded request together with its correlation ID and
+ reply-to property are enqueued in the correspondent internal
+ ``_request_queues`` (the actual queue it identified by the message
+ routing key), so that the data is forwarded to the thread that
+ handles the particular method the message is associated to. A
+ ``Basic.Ack`` RPC command is issued to acknowledge the delivery of
+ the message to RabbitMQ.
+- ``open_channel``: this method opens a new channel with RabbitMQ by
+ issuing the ``Channel.Open`` RPC command. When RabbitMQ responds that
+ the channel is open, the ``on_channel_open`` callback will be invoked
+ by ``pika``.
+- ``request_termination``: this method send a signal to the main thread
+ of the process to cleanly release resources and terminate. This is
+ done by setting a callback in the ``IOLoop`` that raises a
+ ``TerminateSignal``, which will eventually be handled in by ``run``
+ method.
+- ``respond``: this methods send a message to RabbitMQ by issuing a
+ ``Basic.Publish`` RPC command. The body of the message is properly
+ encoded, while correlation ID and request key are used as passed by
+ the calling method.
+- ``run``: main method of the process. It launches a thread to handle
+ communication with the ``ProvenanceStorageRabbitMQServer`` object,
+ targeting the ``run_command_thread`` method. Also, it launches on
+ thread for each ``ProvenanceStorageInterface`` method the worker
+ needs to handle, each targeting the ``run_request_thread`` method.
+ Finally, it requests to open a connection to RabbitMQ by calling the
+ ``connect`` method, and starts the internal ``IOLoop`` of the
+ returned handle (blocking operation). In case of failure, this method
+ will indefinitely try to reconnect. When an explicit
+ ``TerminateSignal`` is received, the ``stop`` method is invoked. All
+ internal thread will be signalled for termination as well, and
+ resources released.
+- ``run_command_thread``: main method of the command thread. It
+ received external commands from the
+ ``ProvenanceStorageRabbitMQServer`` object through a
+ ``multiprocessing.Queue`` structure. The only supported command for
+ now is for the worker to be signalled to terminate, in which case the
+ ``request_termination`` method is invoked.
+- ``run_request_thread``: main method of the request threads. The
+ worker has one such thread per ``ProvenanceStorageInterface`` method
+ it needs to handle. This method will initialize its own storage
+ object and interact with the ``on_request`` through a ``queue.Queue``
+ structure, waiting for items to be passed to the storage method
+ associated with the thread. When elements become available, it will
+ perform a conflict resolution step by resorting to the
+ method-specific function returned by ``get_conflicts_func``. After
+ this it will forward the items to the underlying storage object. If
+ the storage method returns successfully, acknowledgements are sent
+ back to each client by through ``respond`` method. If the call to the
+ storage method fails, items will be enqueued back to retry in a
+ future iteration. In case of an unexpected exception, the worker is
+ signalled for termination by calling the ``request_termination``
+ method.
+- ``setup_exchange``: this method sets up the exchange on RabbitMQ by
+ invoking the ``Exchange.Declare`` RPC command. When it is completed,
+ the ``on_exchange_declare_ok`` method will be invoked by ``pika``.
+- ``setup_queues``: this methods sets up the necessary queues for the
+ worker on RabbitMQ by invoking several ``Queue.Declare`` RPC commands
+ (one per supported provenance storage method). When each command is
+ completed, the ``on_queue_declare_ok`` method will be invoked by
+ ``pika``.
+- ``start``: inherited from ``multiprocessing.Process``. It launches
+ the worker sub-process with method ``run`` as target.
+- ``start_consuming``: this method sets up the worker by first
+ registering the ``add_on_cancel_callback`` callback, so that the
+ object is notified if RabbitMQ cancels the consumer. It then issues
+ one ``Basic.Consume`` RPC command per supported provenance storage
+ method, which return the consumer tag that is used to uniquely
+ identify the consumer with RabbitMQ. The ``on_request`` method is
+ passed in as a callback ``pika`` will invoke when a message is fully
+ received. After setting up all consumers, a ``CONSUMING`` signal is
+ sent to the ``ProvenanceStorageRabbitMQServer`` object through a
+ ``multiprocessing.Queue`` structure.
+- ``stop``: this method cleanly shutdown the connection to RabbitMQ by
+ calling the ``stop_consuming`` method. The ``IOLoop`` is started
+ again because this method is invoked by raising a ``TerminateSignal``
+ exception. This exception stops the ``IOLoop`` which needs to be
+ running for ``pika`` to communicate with RabbitMQ. All of the
+ commands issued prior to starting the ``IOLoop`` will be buffered but
+ not processed.
+- ``stop_consuming``: this method sends a ``Basic.Cancel`` RPC command
+ for each supported provenance storage method. When RabbitMQ confirms
+ the cancellation, the ``on_cancel_ok`` callback methods will be
+ invoked by ``pika``, which will then close the channel and
+ connection.
+- ``get_conflicts_func``: this method returns the conflict resolution
+ function to be used based on the provenance storage method.
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -17,6 +17,9 @@
[mypy-msgpack.*]
ignore_missing_imports = True
+[mypy-pika.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,6 +5,7 @@
iso8601
methodtools
mongomock
+pika
pymongo
PyYAML
types-click
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -92,4 +92,12 @@
engine = kwargs.get("engine", "pymongo")
return ProvenanceStorageMongoDb(engine=engine, **kwargs["db"])
+ elif cls == "rabbitmq":
+ from .api.client import ProvenanceStorageRabbitMQClient
+
+ rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs)
+ if TYPE_CHECKING:
+ assert isinstance(rmq_storage, ProvenanceStorageInterface)
+ return rmq_storage
+
raise ValueError
diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
--- a/swh/provenance/api/client.py
+++ b/swh/provenance/api/client.py
@@ -2,3 +2,469 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+import functools
+import inspect
+import logging
+import queue
+import threading
+import time
+from types import TracebackType
+from typing import Any, Dict, Iterable, Optional, Set, Tuple, Type, Union
+import uuid
+
+import pika
+import pika.channel
+import pika.connection
+import pika.frame
+import pika.spec
+
+from swh.core.api.serializers import encode_data_client as encode_data
+from swh.core.api.serializers import msgpack_loads as decode_data
+from swh.core.statsd import statsd
+
+from .. import get_provenance_storage
+from ..interface import ProvenanceStorageInterface, RelationData, RelationType
+from .serializers import DECODERS, ENCODERS
+from .server import ProvenanceStorageRabbitMQServer
+
+LOG_FORMAT = (
+ "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
+ "-35s %(lineno) -5d: %(message)s"
+)
+LOGGER = logging.getLogger(__name__)
+
+STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds"
+
+
+class ResponseTimeout(Exception):
+ pass
+
+
+class TerminateSignal(Exception):
+ pass
+
+
+def split_ranges(
+ data: Iterable[bytes], meth_name: str, relation: Optional[RelationType] = None
+) -> Dict[str, Set[Tuple[Any, ...]]]:
+ ranges: Dict[str, Set[Tuple[Any, ...]]] = {}
+ if relation is not None:
+ assert isinstance(data, dict), "Relation data must be provided in a dictionary"
+ for src, dsts in data.items():
+ key = ProvenanceStorageRabbitMQServer.get_routing_key(
+ src, meth_name, relation
+ )
+ for rel in dsts:
+ assert isinstance(
+ rel, RelationData
+ ), "Values in the dictionary must be RelationData structures"
+ ranges.setdefault(key, set()).add((src, rel.dst, rel.path))
+ else:
+ items: Union[Set[Tuple[bytes, Any]], Set[Tuple[bytes]]]
+ if isinstance(data, dict):
+ items = set(data.items())
+ else:
+ items = {(item,) for item in data}
+ for id, *rest in items:
+ key = ProvenanceStorageRabbitMQServer.get_routing_key(id, meth_name)
+ ranges.setdefault(key, set()).add((id, *rest))
+ return ranges
+
+
+class MetaRabbitMQClient(type):
+ def __new__(cls, name, bases, attributes):
+ # For each method wrapped with @remote_api_endpoint in an API backend
+ # (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new
+ # method in RemoteStorage, with the same documentation.
+ #
+ # Note that, despite the usage of decorator magic (eg. functools.wrap),
+ # this never actually calls an IndexerStorage method.
+ backend_class = attributes.get("backend_class", None)
+ for base in bases:
+ if backend_class is not None:
+ break
+ backend_class = getattr(base, "backend_class", None)
+ if backend_class:
+ for meth_name, meth in backend_class.__dict__.items():
+ if hasattr(meth, "_endpoint_path"):
+ cls.__add_endpoint(meth_name, meth, attributes)
+ return super().__new__(cls, name, bases, attributes)
+
+ @staticmethod
+ def __add_endpoint(meth_name, meth, attributes):
+ wrapped_meth = inspect.unwrap(meth)
+
+ @functools.wraps(meth) # Copy signature and doc
+ def meth_(*args, **kwargs):
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+
+ # Remove arguments that should not be passed
+ self = data.pop("self")
+
+ # Call storage method with remaining arguments
+ return getattr(self._storage, meth_name)(**data)
+
+ @functools.wraps(meth) # Copy signature and doc
+ def write_meth_(*args, **kwargs):
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+
+ try:
+ # Remove arguments that should not be passed
+ self = post_data.pop("self")
+ relation = post_data.pop("relation", None)
+ assert len(post_data) == 1
+ data = next(iter(post_data.values()))
+
+ ranges = split_ranges(data, meth_name, relation)
+ acks_expected = sum(len(items) for items in ranges.values())
+ self._correlation_id = str(uuid.uuid4())
+
+ exchange = ProvenanceStorageRabbitMQServer.get_exchange(
+ meth_name, relation
+ )
+ for routing_key, items in ranges.items():
+ items_list = list(items)
+ batches = (
+ items_list[idx : idx + self._batch_size]
+ for idx in range(0, len(items_list), self._batch_size)
+ )
+ for batch in batches:
+ # FIXME: this is running in a different thread! Hence, if
+ # self._connection drops, there is no guarantee that the
+ # request can be sent for the current elements. This
+ # situation should be handled properly.
+ self._connection.ioloop.add_callback_threadsafe(
+ functools.partial(
+ ProvenanceStorageRabbitMQClient.request,
+ channel=self._channel,
+ reply_to=self._callback_queue,
+ exchange=exchange,
+ routing_key=routing_key,
+ correlation_id=self._correlation_id,
+ data=batch,
+ )
+ )
+ return self.wait_for_acks(acks_expected)
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ return False
+
+ if meth_name not in attributes:
+ attributes[meth_name] = (
+ write_meth_
+ if ProvenanceStorageRabbitMQServer.is_write_method(meth_name)
+ else meth_
+ )
+
+
+class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient):
+ backend_class = ProvenanceStorageInterface
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+ def __init__(
+ self,
+ url: str,
+ storage_config: Dict[str, Any],
+ batch_size: int = 100,
+ prefetch_count: int = 100,
+ wait_min: float = 60,
+ wait_per_batch: float = 10,
+ ) -> None:
+ """Setup the client object, passing in the URL we will use to connect to
+ RabbitMQ, and the connection information for the local storage object used
+ for read-only operations.
+
+ :param str url: The URL for connecting to RabbitMQ
+ :param dict storage_config: Configuration parameters for the underlying
+ ``ProvenanceStorage`` object expected by
+ ``swh.provenance.get_provenance_storage``
+ :param int batch_size: Max amount of elements per package (after range
+ splitting) for writing operations
+ :param int prefetch_count: Prefetch value for the RabbitMQ connection when
+ receiving ack packages
+ :param float wait_min: Min waiting time for response on a writing operation, in
+ seconds
+ :param float wait_per_batch: Waiting time for response per batch of items on a
+ writing operation, in seconds
+
+ """
+ super().__init__()
+
+ self._connection = None
+ self._callback_queue: Optional[str] = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag = None
+ self._consuming = False
+ self._correlation_id = str(uuid.uuid4())
+ self._prefetch_count = prefetch_count
+
+ self._batch_size = batch_size
+ self._response_queue: queue.Queue = queue.Queue()
+ self._storage = get_provenance_storage(**storage_config)
+ self._url = url
+
+ self._wait_min = wait_min
+ self._wait_per_batch = wait_per_batch
+
+ def __enter__(self) -> ProvenanceStorageInterface:
+ self.open()
+ assert isinstance(self, ProvenanceStorageInterface)
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.close()
+
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"})
+ def open(self) -> None:
+ self.start()
+ while self._callback_queue is None:
+ time.sleep(0.1)
+ self._storage.open()
+
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
+ def close(self) -> None:
+ assert self._connection is not None
+ self._connection.ioloop.add_callback_threadsafe(self.request_termination)
+ self.join()
+ self._storage.close()
+
+ def request_termination(self, reason: str = "Normal shutdown") -> None:
+ assert self._connection is not None
+
+ def termination_callback():
+ raise TerminateSignal(reason)
+
+ self._connection.ioloop.add_callback_threadsafe(termination_callback)
+
+ def connect(self) -> pika.SelectConnection:
+ LOGGER.info("Connecting to %s", self._url)
+ return pika.SelectConnection(
+ parameters=pika.URLParameters(self._url),
+ on_open_callback=self.on_connection_open,
+ on_open_error_callback=self.on_connection_open_error,
+ on_close_callback=self.on_connection_closed,
+ )
+
+ def close_connection(self) -> None:
+ assert self._connection is not None
+ self._consuming = False
+ if self._connection.is_closing or self._connection.is_closed:
+ LOGGER.info("Connection is closing or already closed")
+ else:
+ LOGGER.info("Closing connection")
+ self._connection.close()
+
+ def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None:
+ LOGGER.info("Connection opened")
+ self.open_channel()
+
+ def on_connection_open_error(
+ self, _unused_connection: pika.SelectConnection, err: Exception
+ ) -> None:
+ LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err)
+ assert self._connection is not None
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason):
+ assert self._connection is not None
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason)
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def open_channel(self) -> None:
+ LOGGER.info("Creating a new channel")
+ assert self._connection is not None
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_channel_open(self, channel: pika.channel.Channel) -> None:
+ LOGGER.info("Channel opened")
+ self._channel = channel
+ LOGGER.info("Adding channel close callback")
+ assert self._channel is not None
+ self._channel.add_on_close_callback(callback=self.on_channel_closed)
+ self.setup_queue()
+
+ def on_channel_closed(
+ self, channel: pika.channel.Channel, reason: Exception
+ ) -> None:
+ LOGGER.warning("Channel %i was closed: %s", channel, reason)
+ self.close_connection()
+
+ def setup_queue(self) -> None:
+ LOGGER.info("Declaring callback queue")
+ assert self._channel is not None
+ self._channel.queue_declare(
+ queue="", exclusive=True, callback=self.on_queue_declare_ok
+ )
+
+ def on_queue_declare_ok(self, frame: pika.frame.Method) -> None:
+ LOGGER.info("Binding queue to default exchanger")
+ assert self._channel is not None
+ self._callback_queue = frame.method.queue
+ self._channel.basic_qos(
+ prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok
+ )
+
+ def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None:
+ LOGGER.info("QOS set to: %d", self._prefetch_count)
+ self.start_consuming()
+
+ def start_consuming(self) -> None:
+ LOGGER.info("Issuing consumer related RPC commands")
+ LOGGER.info("Adding consumer cancellation callback")
+ assert self._channel is not None
+ self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)
+ assert self._callback_queue is not None
+ self._consumer_tag = self._channel.basic_consume(
+ queue=self._callback_queue, on_message_callback=self.on_response
+ )
+ self._consuming = True
+
+ def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
+ LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_response(
+ self,
+ channel: pika.channel.Channel,
+ deliver: pika.spec.Basic.Deliver,
+ properties: pika.spec.BasicProperties,
+ body: bytes,
+ ) -> None:
+ LOGGER.info(
+ "Received message # %s from %s: %s",
+ deliver.delivery_tag,
+ properties.app_id,
+ body,
+ )
+ self._response_queue.put(
+ (
+ properties.correlation_id,
+ decode_data(body, extra_decoders=self.extra_type_decoders),
+ )
+ )
+ LOGGER.info("Acknowledging message %s", deliver.delivery_tag)
+ channel.basic_ack(delivery_tag=deliver.delivery_tag)
+
+ def stop_consuming(self) -> None:
+ if self._channel:
+ LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ")
+ self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok)
+
+ def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None:
+ self._consuming = False
+ LOGGER.info(
+ "RabbitMQ acknowledged the cancellation of the consumer: %s",
+ self._consumer_tag,
+ )
+ LOGGER.info("Closing the channel")
+ assert self._channel is not None
+ self._channel.close()
+
+ def run(self) -> None:
+ while not self._closing:
+ try:
+ self._connection = self.connect()
+ assert self._connection is not None
+ self._connection.ioloop.start()
+ except KeyboardInterrupt:
+ LOGGER.info("Connection closed by keyboard interruption, reopening")
+ if self._connection is not None:
+ self._connection.ioloop.stop()
+ except TerminateSignal as ex:
+ LOGGER.info("Termination requested: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+ except BaseException as ex:
+ LOGGER.warning("Unexpected exception, terminating: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+ LOGGER.info("Stopped")
+
+ def stop(self) -> None:
+ assert self._connection is not None
+ if not self._closing:
+ self._closing = True
+ LOGGER.info("Stopping")
+ if self._consuming:
+ self.stop_consuming()
+ self._connection.ioloop.start()
+ else:
+ self._connection.ioloop.stop()
+ LOGGER.info("Stopped")
+
+ @staticmethod
+ def request(
+ channel: pika.channel.Channel,
+ reply_to: str,
+ exchange: str,
+ routing_key: str,
+ correlation_id: str,
+ **kwargs,
+ ) -> None:
+ channel.basic_publish(
+ exchange=exchange,
+ routing_key=routing_key,
+ properties=pika.BasicProperties(
+ content_type="application/msgpack",
+ correlation_id=correlation_id,
+ reply_to=reply_to,
+ ),
+ body=encode_data(
+ kwargs,
+ extra_encoders=ProvenanceStorageRabbitMQClient.extra_type_encoders,
+ ),
+ )
+
+ def wait_for_acks(self, acks_expected: int) -> bool:
+ acks_received = 0
+ timeout = max(
+ (acks_expected / self._batch_size) * self._wait_per_batch,
+ self._wait_min,
+ )
+ while acks_received < acks_expected:
+ try:
+ acks_received += self.wait_for_response(timeout=timeout)
+ except ResponseTimeout:
+ LOGGER.warning(
+ "Timed out waiting for acks, %s received, %s expected",
+ acks_received,
+ acks_expected,
+ )
+ return False
+ return acks_received == acks_expected
+
+ def wait_for_response(self, timeout: float = 60.0) -> Any:
+ while True:
+ try:
+ correlation_id, response = self._response_queue.get(timeout=timeout)
+ if correlation_id == self._correlation_id:
+ return response
+ except queue.Empty:
+ raise ResponseTimeout
diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py
--- a/swh/provenance/api/server.py
+++ b/swh/provenance/api/server.py
@@ -3,10 +3,660 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import Counter
+from datetime import datetime
+from enum import Enum
+import functools
+import logging
+import multiprocessing
import os
-from typing import Any, Dict, Optional
+import queue
+import threading
+from typing import Any, Callable
+from typing import Counter as TCounter
+from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union, cast
+
+import pika
+import pika.channel
+import pika.connection
+import pika.exceptions
+from pika.exchange_type import ExchangeType
+import pika.frame
+import pika.spec
from swh.core import config
+from swh.core.api.serializers import encode_data_client as encode_data
+from swh.core.api.serializers import msgpack_loads as decode_data
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import Sha1Git
+
+from .. import get_provenance_storage
+from ..interface import EntityType, RelationData, RelationType, RevisionData
+from ..util import path_id
+from .serializers import DECODERS, ENCODERS
+
+LOG_FORMAT = (
+ "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
+ "-35s %(lineno) -5d: %(message)s"
+)
+LOGGER = logging.getLogger(__name__)
+
+TERMINATE = object()
+
+
+class ServerCommand(Enum):
+ TERMINATE = "terminate"
+ CONSUMING = "consuming"
+
+
+class TerminateSignal(BaseException):
+ pass
+
+
+def resolve_dates(
+ dates: Iterable[Union[Tuple[Sha1Git, Optional[datetime]], Tuple[Sha1Git]]]
+) -> Dict[Sha1Git, Optional[datetime]]:
+ result: Dict[Sha1Git, Optional[datetime]] = {}
+ for row in dates:
+ sha1 = row[0]
+ date = (
+ cast(Tuple[Sha1Git, Optional[datetime]], row)[1] if len(row) > 1 else None
+ )
+ known = result.setdefault(sha1, None)
+ if date is not None and (known is None or date < known):
+ result[sha1] = date
+ return result
+
+
+def resolve_revision(
+ data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]]
+) -> Dict[Sha1Git, RevisionData]:
+ result: Dict[Sha1Git, RevisionData] = {}
+ for row in data:
+ sha1 = row[0]
+ rev = (
+ cast(Tuple[Sha1Git, RevisionData], row)[1]
+ if len(row) > 1
+ else RevisionData(date=None, origin=None)
+ )
+ known = result.setdefault(sha1, RevisionData(date=None, origin=None))
+ value = known
+ if rev.date is not None and (known.date is None or rev.date < known.date):
+ value = RevisionData(date=rev.date, origin=value.origin)
+ if rev.origin is not None:
+ value = RevisionData(date=value.date, origin=rev.origin)
+ if value != known:
+ result[sha1] = value
+ return result
+
+
+def resolve_relation(
+ data: Iterable[Tuple[Sha1Git, Sha1Git, bytes]]
+) -> Dict[Sha1Git, Set[RelationData]]:
+ result: Dict[Sha1Git, Set[RelationData]] = {}
+ for src, dst, path in data:
+ result.setdefault(src, set()).add(RelationData(dst=dst, path=path))
+ return result
+
+
+class ProvenanceStorageRabbitMQWorker(multiprocessing.Process):
+ EXCHANGE_TYPE = ExchangeType.direct
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+ def __init__(
+ self,
+ url: str,
+ exchange: str,
+ range: int,
+ storage_config: Dict[str, Any],
+ batch_size: int = 100,
+ prefetch_count: int = 100,
+ ) -> None:
+ """Setup the worker object, passing in the URL we will use to connect to
+ RabbitMQ, the exchange to use, the range id on which to operate, and the
+ connection information for the underlying local storage object.
+
+ :param str url: The URL for connecting to RabbitMQ
+ :param str exchange: The name of the RabbitMq exchange to use
+ :param str range: The ID range to operate on
+ :param dict storage_config: Configuration parameters for the underlying
+ ``ProvenanceStorage`` object expected by
+ ``swh.provenance.get_provenance_storage``
+ :param int batch_size: Max amount of elements call to the underlying storage
+ :param int prefetch_count: Prefetch value for the RabbitMQ connection when
+ receiving messaged
+
+ """
+ super().__init__(name=f"{exchange}_{range:x}")
+
+ self._connection = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag: Dict[str, str] = {}
+ self._consuming: Dict[str, bool] = {}
+ self._prefetch_count = prefetch_count
+
+ self._url = url
+ self._exchange = exchange
+ self._binding_keys = list(
+ ProvenanceStorageRabbitMQServer.get_binding_keys(self._exchange, range)
+ )
+ self._queues: Dict[str, str] = {}
+ self._storage_config = storage_config
+ self._batch_size = batch_size
+
+ self.command: multiprocessing.Queue = multiprocessing.Queue()
+ self.signal: multiprocessing.Queue = multiprocessing.Queue()
+
+ def connect(self) -> pika.SelectConnection:
+ LOGGER.info("Connecting to %s", self._url)
+ return pika.SelectConnection(
+ parameters=pika.URLParameters(self._url),
+ on_open_callback=self.on_connection_open,
+ on_open_error_callback=self.on_connection_open_error,
+ on_close_callback=self.on_connection_closed,
+ )
+
+ def close_connection(self) -> None:
+ assert self._connection is not None
+ self._consuming = {binding_key: False for binding_key in self._binding_keys}
+ if self._connection.is_closing or self._connection.is_closed:
+ LOGGER.info("Connection is closing or already closed")
+ else:
+ LOGGER.info("Closing connection")
+ self._connection.close()
+
+ def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None:
+ LOGGER.info("Connection opened")
+ self.open_channel()
+
+ def on_connection_open_error(
+ self, _unused_connection: pika.SelectConnection, err: Exception
+ ) -> None:
+ LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err)
+ assert self._connection is not None
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason):
+ assert self._connection is not None
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason)
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def open_channel(self) -> None:
+ LOGGER.info("Creating a new channel")
+ assert self._connection is not None
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_channel_open(self, channel: pika.channel.Channel) -> None:
+ LOGGER.info("Channel opened")
+ self._channel = channel
+ LOGGER.info("Adding channel close callback")
+ assert self._channel is not None
+ self._channel.add_on_close_callback(callback=self.on_channel_closed)
+ self.setup_exchange()
+
+ def on_channel_closed(
+ self, channel: pika.channel.Channel, reason: Exception
+ ) -> None:
+ LOGGER.warning("Channel %i was closed: %s", channel, reason)
+ self.close_connection()
+
+ def setup_exchange(self) -> None:
+ LOGGER.info("Declaring exchange %s", self._exchange)
+ assert self._channel is not None
+ self._channel.exchange_declare(
+ exchange=self._exchange,
+ exchange_type=self.EXCHANGE_TYPE,
+ callback=self.on_exchange_declare_ok,
+ )
+
+ def on_exchange_declare_ok(self, _unused_frame: pika.frame.Method) -> None:
+ LOGGER.info("Exchange declared: %s", self._exchange)
+ self.setup_queues()
+
+ def setup_queues(self) -> None:
+ for binding_key in self._binding_keys:
+ LOGGER.info("Declaring queue %s", binding_key)
+ assert self._channel is not None
+ callback = functools.partial(
+ self.on_queue_declare_ok,
+ binding_key=binding_key,
+ )
+ self._channel.queue_declare(queue=binding_key, callback=callback)
+
+ def on_queue_declare_ok(self, frame: pika.frame.Method, binding_key: str) -> None:
+ LOGGER.info(
+ "Binding queue %s to exchange %s with routing key %s",
+ frame.method.queue,
+ self._exchange,
+ binding_key,
+ )
+ assert self._channel is not None
+ callback = functools.partial(self.on_bind_ok, queue_name=frame.method.queue)
+ self._queues[binding_key] = frame.method.queue
+ self._channel.queue_bind(
+ queue=frame.method.queue,
+ exchange=self._exchange,
+ routing_key=binding_key,
+ callback=callback,
+ )
+
+ def on_bind_ok(self, _unused_frame: pika.frame.Method, queue_name: str) -> None:
+ LOGGER.info("Queue bound: %s", queue_name)
+ assert self._channel is not None
+ self._channel.basic_qos(
+ prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok
+ )
+
+ def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None:
+ LOGGER.info("QOS set to: %d", self._prefetch_count)
+ self.start_consuming()
+
+ def start_consuming(self) -> None:
+ LOGGER.info("Issuing consumer related RPC commands")
+ LOGGER.info("Adding consumer cancellation callback")
+ assert self._channel is not None
+ self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)
+ for binding_key in self._binding_keys:
+ self._consumer_tag[binding_key] = self._channel.basic_consume(
+ queue=self._queues[binding_key], on_message_callback=self.on_request
+ )
+ self._consuming[binding_key] = True
+ self.signal.put(ServerCommand.CONSUMING)
+
+ def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
+ LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_request(
+ self,
+ channel: pika.channel.Channel,
+ deliver: pika.spec.Basic.Deliver,
+ properties: pika.spec.BasicProperties,
+ body: bytes,
+ ) -> None:
+ LOGGER.info(
+ "Received message # %s from %s: %s",
+ deliver.delivery_tag,
+ properties.app_id,
+ body,
+ )
+ # XXX: for some reason this function is returning lists instead of tuples
+ # (the client send tuples)
+ batch = decode_data(data=body, extra_decoders=self.extra_type_decoders)["data"]
+ for item in batch:
+ self._request_queues[deliver.routing_key].put(
+ (tuple(item), (properties.correlation_id, properties.reply_to))
+ )
+ LOGGER.info("Acknowledging message %s", deliver.delivery_tag)
+ channel.basic_ack(delivery_tag=deliver.delivery_tag)
+
+ def stop_consuming(self) -> None:
+ if self._channel:
+ LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ")
+ for binding_key in self._binding_keys:
+ callback = functools.partial(self.on_cancel_ok, binding_key=binding_key)
+ self._channel.basic_cancel(
+ self._consumer_tag[binding_key], callback=callback
+ )
+
+ def on_cancel_ok(self, _unused_frame: pika.frame.Method, binding_key: str) -> None:
+ self._consuming[binding_key] = False
+ LOGGER.info(
+ "RabbitMQ acknowledged the cancellation of the consumer: %s",
+ self._consuming[binding_key],
+ )
+ LOGGER.info("Closing the channel")
+ assert self._channel is not None
+ self._channel.close()
+
+ def run(self) -> None:
+ self._command_thread = threading.Thread(target=self.run_command_thread)
+ self._command_thread.start()
+
+ self._request_queues: Dict[str, queue.Queue] = {}
+ self._request_threads: Dict[str, threading.Thread] = {}
+ for binding_key in self._binding_keys:
+ meth_name, relation = ProvenanceStorageRabbitMQServer.get_meth_name(
+ binding_key
+ )
+ self._request_queues[binding_key] = queue.Queue()
+ self._request_threads[binding_key] = threading.Thread(
+ target=self.run_request_thread,
+ args=(binding_key, meth_name, relation),
+ )
+ self._request_threads[binding_key].start()
+
+ while not self._closing:
+ try:
+ self._connection = self.connect()
+ assert self._connection is not None
+ self._connection.ioloop.start()
+ except KeyboardInterrupt:
+ LOGGER.info("Connection closed by keyboard interruption, reopening")
+ if self._connection is not None:
+ self._connection.ioloop.stop()
+ except TerminateSignal as ex:
+ LOGGER.info("Termination requested: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+ except BaseException as ex:
+ LOGGER.warning("Unexpected exception, terminating: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+
+ for binding_key in self._binding_keys:
+ self._request_queues[binding_key].put(TERMINATE)
+ for binding_key in self._binding_keys:
+ self._request_threads[binding_key].join()
+ self._command_thread.join()
+ LOGGER.info("Stopped")
+
+ def run_command_thread(self) -> None:
+ while True:
+ try:
+ command = self.command.get()
+ if command == ServerCommand.TERMINATE:
+ self.request_termination()
+ break
+ except queue.Empty:
+ pass
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ break
+
+ def request_termination(self, reason: str = "Normal shutdown") -> None:
+ assert self._connection is not None
+
+ def termination_callback():
+ raise TerminateSignal(reason)
+
+ self._connection.ioloop.add_callback_threadsafe(termination_callback)
+
+ def run_request_thread(
+ self, binding_key: str, meth_name: str, relation: Optional[RelationType]
+ ) -> None:
+ with get_provenance_storage(**self._storage_config) as storage:
+ request_queue = self._request_queues[binding_key]
+ merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name)
+ while True:
+ terminate = False
+ elements = []
+ while True:
+ try:
+ # TODO: consider reducing this timeout or removing it
+ elem = request_queue.get(timeout=0.1)
+ if elem is TERMINATE:
+ terminate = True
+ break
+ elements.append(elem)
+ except queue.Empty:
+ break
+
+ if len(elements) >= self._batch_size:
+ break
+
+ if terminate:
+ break
+
+ if not elements:
+ continue
+
+ try:
+ items, props = zip(*elements)
+ acks_count: TCounter[Tuple[str, str]] = Counter(props)
+ data = merge_items(items)
+
+ args = (relation, data) if relation is not None else (data,)
+ if getattr(storage, meth_name)(*args):
+ for (correlation_id, reply_to), count in acks_count.items():
+ # FIXME: this is running in a different thread! Hence, if
+ # self._connection drops, there is no guarantee that the
+ # response can be sent for the current elements. This
+ # situation should be handled properly.
+ assert self._connection is not None
+ self._connection.ioloop.add_callback_threadsafe(
+ functools.partial(
+ ProvenanceStorageRabbitMQWorker.respond,
+ channel=self._channel,
+ correlation_id=correlation_id,
+ reply_to=reply_to,
+ response=count,
+ )
+ )
+ else:
+ LOGGER.warning(
+ "Unable to process elements for queue %s", binding_key
+ )
+ for elem in elements:
+ request_queue.put(elem)
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ break
+
+ def stop(self) -> None:
+ assert self._connection is not None
+ if not self._closing:
+ self._closing = True
+ LOGGER.info("Stopping")
+ if any(self._consuming):
+ self.stop_consuming()
+ self._connection.ioloop.start()
+ else:
+ self._connection.ioloop.stop()
+ LOGGER.info("Stopped")
+
+ @staticmethod
+ def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]:
+ if meth_name in ["content_add", "directory_add"]:
+ return resolve_dates
+ elif meth_name == "location_add":
+ return lambda data: set(data) # just remove duplicates
+ elif meth_name == "origin_add":
+ return lambda data: dict(data) # last processed value is good enough
+ elif meth_name == "revision_add":
+ return resolve_revision
+ elif meth_name == "relation_add":
+ return resolve_relation
+ else:
+ LOGGER.warning(
+ "Unexpected conflict resolution function request for method %s",
+ meth_name,
+ )
+ return lambda x: x
+
+ @staticmethod
+ def respond(
+ channel: pika.channel.Channel,
+ correlation_id: str,
+ reply_to: str,
+ response: Any,
+ ):
+ channel.basic_publish(
+ exchange="",
+ routing_key=reply_to,
+ properties=pika.BasicProperties(
+ content_type="application/msgpack",
+ correlation_id=correlation_id,
+ ),
+ body=encode_data(
+ response,
+ extra_encoders=ProvenanceStorageRabbitMQServer.extra_type_encoders,
+ ),
+ )
+
+
+class ProvenanceStorageRabbitMQServer:
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+ queue_count = 16
+
+ def __init__(
+ self,
+ url: str,
+ storage_config: Dict[str, Any],
+ batch_size: int = 100,
+ prefetch_count: int = 100,
+ ) -> None:
+ """Setup the server object, passing in the URL we will use to connect to
+ RabbitMQ, and the connection information for the underlying local storage
+ object.
+
+ :param str url: The URL for connecting to RabbitMQ
+ :param dict storage_config: Configuration parameters for the underlying
+ ``ProvenanceStorage`` object expected by
+ ``swh.provenance.get_provenance_storage``
+ :param int batch_size: Max amount of elements call to the underlying storage
+ :param int prefetch_count: Prefetch value for the RabbitMQ connection when
+ receiving messaged
+
+ """
+ self._workers: List[ProvenanceStorageRabbitMQWorker] = []
+ for exchange in ProvenanceStorageRabbitMQServer.get_exchanges():
+ for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange):
+ worker = ProvenanceStorageRabbitMQWorker(
+ url=url,
+ exchange=exchange,
+ range=range,
+ storage_config=storage_config,
+ batch_size=batch_size,
+ prefetch_count=prefetch_count,
+ )
+ self._workers.append(worker)
+ self._running = False
+
+ def start(self) -> None:
+ if not self._running:
+ self._running = True
+ for worker in self._workers:
+ worker.start()
+ for worker in self._workers:
+ try:
+ signal = worker.signal.get(timeout=60)
+ assert signal == ServerCommand.CONSUMING
+ except queue.Empty:
+ LOGGER.error(
+ "Could not initialize worker %s. Leaving...", worker.name
+ )
+ self.stop()
+ return
+ LOGGER.info("Start serving")
+
+ def stop(self) -> None:
+ if self._running:
+ for worker in self._workers:
+ worker.command.put(ServerCommand.TERMINATE)
+ for worker in self._workers:
+ worker.join()
+ LOGGER.info("Stop serving")
+ self._running = False
+
+ @staticmethod
+ def get_binding_keys(exchange: str, range: int) -> Generator[str, None, None]:
+ for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names(
+ exchange
+ ):
+ if relation is None:
+ assert (
+ meth_name != "relation_add"
+ ), "'relation_add' requires 'relation' to be provided"
+ yield f"{meth_name}.unknown.{range:x}".lower()
+ else:
+ assert (
+ meth_name == "relation_add"
+ ), f"'{meth_name}' requires 'relation' to be None"
+ yield f"{meth_name}.{relation.value}.{range:x}".lower()
+
+ @staticmethod
+ def get_exchange(meth_name: str, relation: Optional[RelationType] = None) -> str:
+ if meth_name == "relation_add":
+ assert (
+ relation is not None
+ ), "'relation_add' requires 'relation' to be provided"
+ split = relation.value
+ else:
+ assert relation is None, f"'{meth_name}' requires 'relation' to be None"
+ split = meth_name
+ exchange, *_ = split.split("_")
+ return exchange
+
+ @staticmethod
+ def get_exchanges() -> Generator[str, None, None]:
+ yield from [entity.value for entity in EntityType] + ["location"]
+
+ @staticmethod
+ def get_meth_name(
+ binding_key: str,
+ ) -> Tuple[str, Optional[RelationType]]:
+ meth_name, relation, *_ = binding_key.split(".")
+ return meth_name, (RelationType(relation) if relation != "unknown" else None)
+
+ @staticmethod
+ def get_meth_names(
+ exchange: str,
+ ) -> Generator[Tuple[str, Optional[RelationType]], None, None]:
+ if exchange == EntityType.CONTENT.value:
+ yield from [
+ ("content_add", None),
+ ("relation_add", RelationType.CNT_EARLY_IN_REV),
+ ("relation_add", RelationType.CNT_IN_DIR),
+ ]
+ elif exchange == EntityType.DIRECTORY.value:
+ yield from [
+ ("directory_add", None),
+ ("relation_add", RelationType.DIR_IN_REV),
+ ]
+ elif exchange == EntityType.ORIGIN.value:
+ yield from [("origin_add", None)]
+ elif exchange == EntityType.REVISION.value:
+ yield from [
+ ("revision_add", None),
+ ("relation_add", RelationType.REV_BEFORE_REV),
+ ("relation_add", RelationType.REV_IN_ORG),
+ ]
+ elif exchange == "location":
+ yield "location_add", None
+
+ @staticmethod
+ def get_ranges(unused_exchange: str) -> Generator[int, None, None]:
+ # XXX: we might want to have a different range per exchange
+ yield from range(ProvenanceStorageRabbitMQServer.queue_count)
+
+ @staticmethod
+ def get_routing_key(
+ item: bytes, meth_name: str, relation: Optional[RelationType] = None
+ ) -> str:
+ hashid = (
+ path_id(item).hex()
+ if meth_name.startswith("location")
+ else hash_to_hex(item)
+ )
+ idx = int(hashid[0], 16) % ProvenanceStorageRabbitMQServer.queue_count
+ if relation is None:
+ assert (
+ meth_name != "relation_add"
+ ), "'relation_add' requires 'relation' to be provided"
+ return f"{meth_name}.unknown.{idx:x}".lower()
+ else:
+ assert (
+ meth_name == "relation_add"
+ ), f"'{meth_name}' requires 'relation' to be None"
+ return f"{meth_name}.{relation.value}.{idx:x}".lower()
+
+ @staticmethod
+ def is_write_method(meth_name: str) -> bool:
+ return "_add" in meth_name
def load_and_check_config(
@@ -39,9 +689,13 @@
if pcfg is None:
raise KeyError("Missing 'provenance' configuration")
- scfg: Optional[Dict[str, Any]] = pcfg.get("storage")
+ rcfg: Optional[Dict[str, Any]] = pcfg.get("rabbitmq")
+ if rcfg is None:
+ raise KeyError("Missing 'provenance.rabbitmq' configuration")
+
+ scfg: Optional[Dict[str, Any]] = rcfg.get("storage_config")
if scfg is None:
- raise KeyError("Missing 'provenance.storage' configuration")
+ raise KeyError("Missing 'provenance.rabbitmq.storage_config' configuration")
if type == "local":
cls = scfg.get("cls")
@@ -56,3 +710,9 @@
raise KeyError("Invalid configuration; missing 'db' config entry")
return cfg
+
+
+def make_server_from_configfile() -> ProvenanceStorageRabbitMQServer:
+ config_path = os.environ.get("SWH_CONFIG_FILENAME")
+ server_cfg = load_and_check_config(config_path)
+ return ProvenanceStorageRabbitMQServer(**server_cfg["provenance"]["rabbitmq"])
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -35,25 +35,40 @@
# Direct access Archive object
"cls": "direct",
"db": {
- "host": "db.internal.softwareheritage.org",
+ "host": "belvedere.internal.softwareheritage.org",
+ "port": 5432,
"dbname": "softwareheritage",
"user": "guest",
},
},
"storage": {
# Local PostgreSQL Storage
- "cls": "postgresql",
- "db": {
- "host": "localhost",
- "user": "postgres",
- "password": "postgres",
- "dbname": "provenance",
- },
+ # "cls": "postgresql",
+ # "db": {
+ # "host": "localhost",
+ # "user": "postgres",
+ # "password": "postgres",
+ # "dbname": "provenance",
+ # },
# Local MongoDB Storage
# "cls": "mongodb",
# "db": {
# "dbname": "provenance",
# },
+ # Remote RabbitMQ/PostgreSQL Storage
+ "cls": "rabbitmq",
+ "url": "amqp://localhost:5672/%2f",
+ "storage_config": {
+ "cls": "postgresql",
+ "db": {
+ "host": "localhost",
+ "user": "postgres",
+ "password": "postgres",
+ "dbname": "provenance",
+ },
+ },
+ "batch_size": 100,
+ "prefetch_count": 100,
},
}
}
diff --git a/swh/provenance/util.py b/swh/provenance/util.py
--- a/swh/provenance/util.py
+++ b/swh/provenance/util.py
@@ -3,8 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import hashlib
import os
+def path_id(path: bytes) -> bytes:
+ return hashlib.sha1(path).digest()
+
+
def path_normalize(path: bytes) -> bytes:
return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -33,3 +33,38 @@
mypy
commands =
mypy swh
+
+# build documentation outside swh-environment using the current
+# git HEAD of swh-docs, is executed on CI for each diff to prevent
+# breaking doc build
+[testenv:sphinx]
+whitelist_externals = make
+usedevelop = true
+extras =
+ testing
+deps =
+ # fetch and install swh-docs in develop mode
+ -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs
+setenv =
+ SWH_PACKAGE_DOC_TOX_BUILD = 1
+ # turn warnings into errors
+ SPHINXOPTS = -W
+commands =
+ make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs
+
+# build documentation only inside swh-environment using local state
+# of swh-docs package
+[testenv:sphinx-dev]
+whitelist_externals = make
+usedevelop = true
+extras =
+ testing
+deps =
+ # install swh-docs in develop mode
+ -e ../swh-docs
+setenv =
+ SWH_PACKAGE_DOC_TOX_BUILD = 1
+ # turn warnings into errors
+ SPHINXOPTS = -W
+commands =
+ make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 4:14 AM (22 h, 48 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224339

Event Timeline