Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123233
D6165.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
83 KB
Subscribers
None
D6165.diff
View Options
diff --git a/docs/storage/remote.rst b/docs/storage/remote.rst
new file mode 100644
--- /dev/null
+++ b/docs/storage/remote.rst
@@ -0,0 +1,724 @@
+:orphan:
+
+ProvenanceStorageRabbitMQClient
+===============================
+
+The remote storage client connects to a remote server using a RabbitMQ
+broker, and delegates all writing operations to it. However, reading
+operations are still done using a local storage object (ie.
+``ProvenanceStorageMongoDb`` or ``ProvenanceStoragePostgreSql``). This
+is done to speed up the process by avoiding the unnecessary RabbitMQ
+overhead for operations that are conflict-free.
+
+.. warning::
+
+ No check is done to guarantee that the local storage used by the
+ client is the same as the one the server uses on its side, but it is
+ assumed to be the case.
+
+When a writing operation is invoked, the client splits the set of
+elements to be written by ID range, and send a series of packages to the
+server (via the RabbitMQ broker). Each package consists of elements
+belonging to the same ID range but, to avoid sending huge packages,
+there might be more than one package per range. After this, the client
+blocks waiting for the server to acknowledge the writing of all
+requested elements.
+
+To initialize a client object it is required to provide two mandatory
+parameters:
+
+- ``url``: the URL string of the broker where the server expects to
+ receive the packages.
+- ``storage_config``: a dictionary containing the storage configuration
+ for the local storage object, as expected by
+ ``swh.provenance.get_provenance_storage``.
+
+Additionally, some optional parameter can be specified that may affect
+the performance of the remote storage as a whole:
+
+- ``batch_size``: an integer specifying the maximum allowed amount of
+ elements per package, after range splitting. Default to 100.
+- ``prefetch_count``: an integer specifying how many ack packages are
+ prefetched from the broker. Default to 100.
+- ``wait_min``: a float specifying the minimum amount of seconds that
+ the client should wait for the server’s response before failing.
+ Default to 60.
+- ``wait_per_batch``: a float specifying the amount of seconds to wait
+ per sent batch if items (ie. package). If
+ ``wait_per_batch * number_of_packages`` is less than ``wait_min``,
+ the latter will be used instead. Default to 10.
+
+As all ``ProvenanceStorageInterface`` compliant objects, the remote
+storage client has to be opened before being able to use it, and closed
+after it is no longer needed to properly release resources. It can be
+operated as a context manager using the keyword ``with``. This will take
+care of actually opening/closing both the connection to the remote
+server as well as the underlying local storage objects.
+
+Client lifecycle
+----------------
+
+Connecting to the remote server object implies taking care of the
+RabbitMQ’s lifecycle. To that end, an internal thread is launched, which
+will re-establish the connection in case of an error until an explicit
+disconnect request is made. The class
+``ProvenanceStorageRabbitMQClient`` extends the standard
+``threading.Thread`` class, thus the ``run`` method in the class is the
+target function of such thread, that will loop indefinitely calling the
+``connect`` method and the blocking ``ioloop.connect`` method of the
+established connection. Interaction with such loop is done by setting
+callback functions.
+
+The following is a diagram of the interaction between the methods of the
+class:
+
+.. graphviz::
+
+ digraph {
+ ProvenanceStorageRabbitMQClient
+
+ close[shape=record]
+ open[shape=record]
+ add[shape=record,label="write method"]
+ get[shape=record,label="read method"]
+
+ ProvenanceStorageInterface
+
+ start[shape=record]
+ stop[shape=record]
+
+ request[shape=record]
+ wait_for_acks[shape=record]
+ wait_for_response[shape=record]
+
+ subgraph cluster_connection_thread {
+ style=rounded
+ bgcolor=gray95
+ color=gray
+ labelloc=b
+
+ run[shape=record]
+
+ connect[shape=record]
+ on_connection_open[shape=record]
+ on_connection_open_error[shape=record]
+ on_connection_closed[shape=record]
+ close_connection[shape=record]
+ open_channel[shape=record]
+ on_channel_open[shape=record]
+ on_channel_closed[shape=record]
+ setup_queue[shape=record]
+ on_queue_declare_ok[shape=record]
+ on_basic_qos_ok[shape=record]
+ start_consuming[shape=record]
+ on_consumer_cancelled[shape=record]
+ on_response[shape=record]
+ stop_consuming[shape=record]
+ on_cancel_ok[shape=record]
+ }
+
+ ProvenanceStorageRabbitMQClient->{add,close,get,open}
+
+ close->{stop}
+ open->{start}
+
+ start->{run}
+ stop->{stop_consuming}
+
+ run->{connect,stop}
+
+ connect->{on_connection_open,on_connection_open_error,on_connection_closed}
+
+ on_connection_open->{open_channel}
+
+ open_channel->{on_channel_open}
+
+ on_cancel_ok->{on_channel_closed}
+ on_consumer_cancelled->{on_channel_closed}
+ on_channel_open->{setup_queue}
+
+ on_channel_closed->{close_connection}
+
+ setup_queue->{on_queue_declare_ok}
+
+ on_queue_declare_ok->{on_basic_qos_ok}
+
+ on_basic_qos_ok->{start_consuming}
+
+ start_consuming->{on_consumer_cancelled,on_response}
+
+ on_response->{wait_for_response}[label="_response_queue",arrowhead="none"]
+
+ stop_consuming->{on_cancel_ok}
+
+ add->{request,wait_for_acks}
+ wait_for_acks->{wait_for_response}
+
+ get->{ProvenanceStorageInterface}
+ }
+
+Every write method in the ``ProvenanceStorageInterface`` performs
+splitting by ID range and send a series of messages to the server by
+calling the ``request`` method. After that, it blocks waiting for all
+necessary acks packages to arrive by calling the ``wait_for_acks``
+methods, which in-turn calls ``wait_for_response``. Calls to the write
+methods may come from distinct threads, thus the interaction between
+``on_response`` and ``wait_for_response`` is done with a thread-safe
+``Queue.queue`` structure ``_response_queue``.
+
+Below there is a summary of the methods present in the diagram and their
+functionality. Methods are listed in alphabetic order:
+
+- ``close_connection``: this method properly closes the connection to
+ RabbitMQ.
+- ``connect``: this method connects to RabbitMQ, returning the
+ connection handle. When the connection is established, the
+ ``on_connection_open`` callback method will be invoked by ``pika``.
+ If there is an error establishing the connection, the
+ ``on_connection_open_error`` callback method will be invoked by
+ ``pika``. If the connection closes unexpectedly, the
+ ``on_connection_closed`` callback method will be invoked by ``pika``.
+- ``on_basic_qos_ok``: this callback method is invoked by ``pika`` when
+ the ``Basic.QoS`` RPC call made in ``on_queue_declare_ok`` has
+ completed. At this point it is safe to start consuming messages,
+ hence the ``start_consuming`` method is called, which will invoke the
+ needed RPC commands to start the process.
+- ``on_cancel_ok``: this callback method is invoked by ``pika`` when
+ RabbitMQ acknowledges the cancellation of a consumer. At this point
+ the channel close is requested, thus indirectly invoking the
+ ``on_channel_closed`` callback method once the channel has been
+ closed, which will in-turn close the connection.
+- ``on_channel_closed``: this callback method is invoked by ``pika``
+ when RabbitMQ unexpectedly closes the channel. Channels are usually
+ closed if there is an attempt to do something that violates the
+ protocol, such as re-declare an exchange or queue with different
+ parameters. In this case, the connection will be closed by invoking
+ the ``close_connection``.
+- ``on_channel_open``: this callback method is invoked by ``pika`` when
+ the channel has been opened. The ``on_channel_closed`` callback is
+ set here. Since the channel is now open, it is safe to set up the
+ client’s response queue on RabbitMQ by invoking the ``setup_queue``
+ method.
+- ``on_connection_closed``: this callback method is invoked by ``pika``
+ when the connection to RabbitMQ is closed unexpectedly. Since it is
+ unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_connection_open``: this callback method is invoked by ``pika``
+ once the connection to RabbitMQ has been established. It proceeds to
+ open the channel by calling the ``open_channel`` method.
+- ``on_connection_open_error``: this callback method is invoked by
+ ``pika`` if the connection to RabbitMQ can’t be established. Since it
+ is unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_consumer_cancelled``: this callback methods is invoked by
+ ``pika`` when RabbitMQ sends a ``Basic.Cancel`` for a consumer
+ receiving messages. At this point the channel close is requested,
+ thus indirectly invoking the ``on_channel_closed`` callback method
+ once the channel has been closed, which will in-turn close the
+ connection.
+- ``on_queue_declare_ok``: this callback method is invoked by ``pika``
+ when the ``Queue.Declare`` RPC call made in ``setup_queue`` has
+ completed. This method sets up the consumer prefetch count by
+ invoking the ``Basic.QoS`` RPC command. When it is completed, the
+ ``on_basic_qos_ok`` method will be invoked by ``pika``.
+- ``on_response``: this callback method is invoked by ``pika`` when a
+ message is delivered from RabbitMQ. The decoded response together
+ with its correlation ID is enqueued in the internal
+ ``_response_queue``, so that the data is forwarded to the
+ ``wait_for_response`` method, that might be running on a distinct
+ thread. A ``Basic.Ack`` RPC command is issued to acknowledge the
+ delivery of the message to RabbitMQ.
+- ``open_channel``: this method opens a new channel with RabbitMQ by
+ issuing the ``Channel.Open`` RPC command. When RabbitMQ responds that
+ the channel is open, the ``on_channel_open`` callback will be invoked
+ by ``pika``.
+- ``request``: this methods send a message to RabbitMQ by issuing a
+ ``Basic.Publish`` RPC command. The body of the message is properly
+ encoded, while correlation ID and request key are used as passed by
+ the calling method.
+- ``run``: main method of the internal thread. It requests to open a
+ connection to RabbitMQ by calling the ``connect`` method, and starts
+ the internal ``IOLoop`` of the returned handle (blocking operation).
+ In case of failure, this method will indefinitely try to reconnect.
+ When an explicit ``TerminateSignal`` is received, the ``stop`` method
+ is invoked.
+- ``setup_queue``: this methods sets up an exclusive queue for the
+ client on RabbitMQ by invoking the ``Queue.Declare`` RPC command.
+ When it is completed, the ``on_queue_declare_ok`` method will be
+ invoked by ``pika``.
+- ``start``: inherited from ``threading.Thread``. It launches the
+ internal thread with method ``run`` as target.
+- ``start_consuming``: this method sets up the consumer by first
+ registering the ``on_consumer_cancelled`` callback, so that the
+ client is notified if RabbitMQ cancels the consumer. It then issues
+ the ``Basic.Consume`` RPC command which returns the consumer tag that
+ is used to uniquely identify the consumer with RabbitMQ. The
+ ``on_response`` method is passed in as a callback ``pika`` will
+ invoke when a message is fully received.
+- ``stop``: this method cleanly shutdown the connection to RabbitMQ by
+ calling the ``stop_consuming`` method. The ``IOLoop`` is started
+ again because this method is invoked by raising a ``TerminateSignal``
+ exception. This exception stops the ``IOLoop`` which needs to be
+ running for ``pika`` to communicate with RabbitMQ. All of the
+ commands issued prior to starting the ``IOLoop`` will be buffered but
+ not processed.
+- ``stop_consuming``: this method sends a ``Basic.Cancel`` RPC command.
+ When RabbitMQ confirms the cancellation, the ``on_cancel_ok``
+ callback methods will be invoked by ``pika``, which will then close
+ the channel and connection.
+- ``wait_for_acks``: this method is invoked by every write methods in
+ the ``ProvenanceStorageInterface``, after sending a series of write
+ requests to the server. It will call ``wait_for_response`` until it
+ receives all expected ack responses, or until it receives the first
+ timeout. The timeout is calculated based on the number of expected
+ acks and class initialization parameters ``wait_per_batch`` and
+ ``wait_min``.
+- ``wait_for_response``: this method is called from ``wait_for_acks``
+ to retrieve ack packages from the internal ``_response_queue``. The
+ response correlation ID is used to validate the received ack
+ corresponds to the current write request. The method returns the
+ decoded body of the received response.
+
+ProvenanceStorageRabbitMQServer
+===============================
+
+The remote storage server is responsible for defining the ID range
+splitting policy for each entity and relation in the provenance
+solution. Based on this policy, it will launch a series of worker
+processes that will take care of actually processing the requests for
+each defined range in the partition. These processes are implemented in
+the ``ProvenanceStorageRabbitMQWorker`` described below.
+
+To initialize a server object it is required to provide two mandatory
+parameters:
+
+- ``url``: the URL string of the broker where the server expects to
+ receive the packages.
+- ``storage_config``: a dictionary containing the storage configuration
+ for the local storage object, as expected by
+ ``swh.provenance.get_provenance_storage``.
+
+Additionally, some optional parameter can be specified that may affect
+the performance of the remote storage as a whole:
+
+- ``batch_size``: an integer specifying the maximum allowed amount of
+ elements to be processed at a time, ie. forwarded to the underlying
+ storage object by each worker. Default to 100.
+- ``prefetch_count``: an integer specifying how many packages are
+ prefetched from the broker by each worker. Default to 100.
+
+On initialization, the server will create the necessary
+``ProvenanceStorageRabbitMQWorker``, forwarding to them the parameters
+mentioned above, but it won’t launch these underlying processes until it
+is explicitly started. To that end, the
+``ProvenanceStorageRabbitMQServer`` objects provide the following
+methods: - ``start``: this method launches all the necessary worker
+subprocesses and ensures they all are in a proper consuming state before
+returning control to the caller. - ``stop``: this method signals all
+worker subprocesses for termination and blocks until they all finish
+successfully.
+
+ID range splitting policy:
+--------------------------
+
+The ID range splitting policy is defined as follows
+
+- There is an exchange in the RabbitMQ broker for each entity in the
+ provenance solution, plus an extra one to handle locations:
+ ``content``, ``directory``, ``location``, ``origin``, and
+ ``revision``.
+- Any request to add an entity should send the necessary packages to
+ the entity’s associated exchange for proper routing: ie. requests for
+ ``content_add`` will be handled by the ``content`` exchange.
+- Any request to add a relation entry is handled by the exchange of the
+ source entity in the relation: ie. requests for ``relation_add`` with
+ ``relation=CNT_EARLY_IN_REV`` will be handled by the ``content``
+ exchange.
+- ID range splitting is done by looking at the first byte in the SWHID
+ of the entity (ie. a hex value), hence 16 possible ranges are defined
+ for each operation associated to each entity. In the case of
+ locations, a hex hash is calculated over its value.
+- Each exchange then handles 16 queues for each method associated to
+ the exchange’s entity, with a ``direct`` routing policy. For
+ instance, the ``content`` exchange has 16 queues associated to each
+ of the following methods: ``content_add``, ``relation_add`` with
+ ``relation=CNT_EARLY_IN_REV``, and ``relation_add`` with
+ ``relation=CNT_IN_DIR`` (ie. a total of 48 queues).
+- For each exchange, 16 ``ProvenanceStorageRabbitMQWorker`` processes
+ are launched, each of them taking care of one ID range for the
+ associated entity.
+
+All in all, this gives a total of 80 ``ProvenanceStorageRabbitMQWorker``
+processes (16 per exchange) and 160 RabbitMQ queues (48 for ``content``
+and ``revision``, 32 for ``directory``, and 16 for ``location`` and
+``origin``). In this way, it is guaranteed that, regardless of the
+operation being performed, there would never be more than one process
+trying to write on a given ID range for a given entity. Thus, resolving
+all potential conflicts.
+
+Although the ID range splitting policy is defined on the server side, so
+it can properly configure and launch the necessary worker processes, it
+is the client the responsible for actually splitting the input to each
+write method and send the write requests to the proper queues for
+RabbitMQ route them to the correct worker. For that, the server defines
+a series of static methods that allow to query the ID range splitting
+policy:
+
+- ``get_binding_keys``: this method is meant to be used by the server
+ workers. Given an exchange and a range ID, it yields all the RabbitMQ
+ routing keys the worker process should bind to.
+- ``get_exchange``: given the name of a write method in the
+ ``ProvenanceStorageInterface``, and an optional relation type, it
+ return the name of the exchange to which the writing request should
+ be sent.
+- ``get_exchanges``: this method yields the names of all the exchanges
+ in the RabbitMQ broker.
+- ``get_meth_name``: this method is meant to be used by the server
+ workers. Given a binding key as returned by ``get_binding_keys``, it
+ return the ``ProvenanceStorageInterface`` method associated to it.
+- ``get_meth_names``: given an exchange name, it yields all the methods
+ that are associated to it. In case of ``relation_add``, the method
+ also returns the supported relation type.
+- ``get_ranges``: given an exchange name, it yields the integer value
+ of all supported ranges IDs (currently 0-15 for all exchanges). The
+ idea behind this method is to allow defining a custom ID range split
+ for each exchange.
+- ``get_routing_key``: given the name of a write method in the
+ ``ProvenanceStorageInterface``, an optional relation type, and a
+ tuple with the data to be passed to the method (first parameter),
+ this method returns the routing key of the queue responsible to
+ handle that tuple. It is assumed that the first value in the tuple is
+ a ``Sha1Git`` ID.
+- ``is_write_method``: given the name of a method in the
+ ``ProvenanceStorageInterface``, it decides if it is a write method or
+ not.
+
+ProvenanceStorageRabbitMQWorker
+===============================
+
+The remote storage worker consume messages published in the RabbitMQ
+broker by the remote storage clients, and proceed to perform the actual
+writing operations to a local storage object (ie.
+``ProvenanceStorageMongoDb`` or ``ProvenanceStoragePostgreSql``). Each
+worker process messages associated to a particular entity and range ID.
+It is the client’s responsibility to properly split data along messages
+according to the remote storage server policy.
+
+Since there is overlapping between methods in the
+``ProvenanceInterface`` operating over the same entity, one worker may
+have to handle more than one method to guarantee conflict-free writings
+to the underlying storage. For instance, consider the ``content``
+entity, for a given ID range, methods ``content_add`` and
+``relation_add`` with ``relation=CNT_EARLY_IN_REV`` may conflict. Is the
+worker’s responsibility to solve this kind of conflicts.
+
+To initialize a server object it is required to provide two mandatory
+parameters:
+
+- ``url``: the URL string of the broker where the server expects to
+ receive the packages.
+- ``exchange``: the RabbitMQ exchange to which the worker will
+ subscribe. See ``ProvenanceStorageRabbitMQServer``\ ’s ID range
+ splitting policy for further details.
+- ``range``: the range ID the worker will be processing. See
+ ``ProvenanceStorageRabbitMQServer``\ ’s ID range splitting policy for
+ further details.
+- ``storage_config``: a dictionary containing the storage configuration
+ for the local storage object, as expected by
+ ``swh.provenance.get_provenance_storage``.
+
+Additionally, some optional parameter can be specified that may affect
+the performance of the remote storage as a whole:
+
+- ``batch_size``: an integer specifying the maximum allowed amount of
+ elements to be processed at a time, ie. forwarded to the underlying
+ storage object for writing. Default to 100.
+- ``prefetch_count``: an integer specifying how many packages are
+ prefetched from the broker. Default to 100.
+
+.. warning::
+
+ This class is not meant to be used directly but through an instance
+ of ``ProvenanceStorageRabbitMQServer``. The parameters ``url``,
+ ``storage_config``, ``batch_size`` and ``prefetch_count`` above are
+ forwarded as passed to the server on initialization. Additional
+ arguments ``exchange`` and ``range`` are generated by the server
+ based on its ID range splitting policy.
+
+Worker lifecycle
+----------------
+
+All interaction between the provenance solution and a remote storage
+worker object happens through RabbitMQ packages. To maximize
+concurrency, each instance of the ``ProvenanceStorageRabbitMQWorker``
+launches a distinct sub-process, hence avoiding unnecessary
+synchronization with other components on the solution. For this,
+``ProvenanceStorageRabbitMQWorker`` extends ``multiprocessing.Process``
+and only has a direct channel of communication to the master
+``ProvenanceStorageRabbitMQServer``, through ``multiprocessing.Queue``
+structures. Then, the entry point of the sub-process is the method
+``run`` which will in-turn launch a bunch of threads to handle the
+different provenance methods the worker needs to support, and an extra
+thread to handle communication with the server object. RabbitMQ’s
+lifecycle will be taken care of in the main thread.
+
+The following is a diagram of the interaction between the methods of the
+class:
+
+.. graphviz::
+
+ digraph {
+ ProvenanceStorageRabbitMQServer
+ ProvenanceStorageRabbitMQWorker
+
+ start[shape=record]
+ run[shape=record]
+
+ connect[shape=record]
+ on_connection_open[shape=record]
+ on_connection_open_error[shape=record]
+ on_connection_closed[shape=record]
+ close_connection[shape=record]
+ open_channel[shape=record]
+ on_channel_open[shape=record]
+ on_channel_closed[shape=record]
+
+ setup_exchange[shape=record]
+ on_exchange_declare_ok[shape=record]
+
+ setup_queues[shape=record]
+ on_queue_declare_ok[shape=record]
+ on_bind_ok[shape=record]
+ on_basic_qos_ok[shape=record]
+ start_consuming[shape=record]
+ on_consumer_cancelled[shape=record]
+ on_request[shape=record]
+ stop_consuming[shape=record]
+ on_cancel_ok[shape=record]
+
+ request_termination[shape=record]
+ stop[shape=record]
+
+ respond[shape=record]
+ get_conflicts_func[shape=record]
+
+ subgraph cluster_command_thread {
+ style=rounded
+ bgcolor=gray95
+ color=gray
+ labelloc=b
+
+ run_command_thread[shape=record]
+ }
+
+ subgraph cluster_request_thread {
+ style=rounded
+ bgcolor=gray95
+ color=gray
+ labelloc=b
+
+ ProvenanceStorageInterface
+
+ run_request_thread[shape=record]
+ }
+
+ ProvenanceStorageRabbitMQWorker->{start}
+
+ start->{run}
+ stop->{stop_consuming}
+
+ run->{connect,run_command_thread,run_request_thread,stop}
+
+ connect->{on_connection_open,on_connection_open_error,on_connection_closed}
+
+ on_connection_open->{open_channel}
+
+ open_channel->{on_channel_open}
+
+ on_cancel_ok->{on_channel_closed}
+ on_consumer_cancelled->{on_channel_closed}
+ on_channel_open->{setup_exchange}
+
+ on_channel_closed->{close_connection}
+
+ setup_exchange->{on_exchange_declare_ok}
+ on_exchange_declare_ok->{setup_queues}
+
+ setup_queues->{on_queue_declare_ok}
+ on_queue_declare_ok->{on_bind_ok}
+ on_bind_ok->{on_basic_qos_ok}
+ on_basic_qos_ok->{start_consuming}
+
+ start_consuming->{on_consumer_cancelled,on_request}
+ start_consuming->{ProvenanceStorageRabbitMQServer}[label=" signal",arrowhead="none"]
+
+ on_request->{run_request_thread}[label=" _request_queues",arrowhead="none"]
+
+ stop_consuming->{on_cancel_ok}
+
+ run_command_thread->{request_termination}
+ run_command_thread->{ProvenanceStorageRabbitMQServer}[label=" command",arrowhead="none"]
+
+ run_request_thread->{ProvenanceStorageInterface,get_conflicts_func,respond,request_termination}
+
+ request_termination->{run}[label="TerminateSignal",arrowhead="none"]
+ }
+
+There is a request thread for each ``ProvenanceStorageInterface`` method
+the worker needs to handle, each thread with its own provenance storage
+object (ie. exclusive connection). Each of these threads will receive
+sets of parameters to be passed to their correspondent method and
+perform explicit conflict resolution by using the method-specific
+function returned by ``get_conflicts_func``, prior to passing these
+parameters to the underlying storage.
+
+Below there is a summary of the methods present in the diagram and their
+functionality. Methods are listed in alphabetic order:
+
+- ``close_connection``: this method properly closes the connection to
+ RabbitMQ.
+- ``connect``: this method connects to RabbitMQ, returning the
+ connection handle. When the connection is established, the
+ ``on_connection_open`` callback method will be invoked by ``pika``.
+ If there is an error establishing the connection, the
+ ``on_connection_open_error`` callback method will be invoked by
+ ``pika``. If the connection closes unexpectedly, the
+ ``on_connection_closed`` callback method will be invoked by ``pika``.
+- ``on_basic_qos_ok``: this callback method is invoked by ``pika`` when
+ the ``Basic.QoS`` RPC call made in ``on_bind_ok`` has completed. At
+ this point it is safe to start consuming messages, hence the
+ ``start_consuming`` method is called, which will invoke the needed
+ RPC commands to start the process.
+- ``on_bind_ok``:this callback method is invoked by ``pika`` when the
+ ``Queue.Bind`` RPC call made in ``on_queue_declare_ok`` has
+ completed. This method sets up the consumer prefetch count by
+ invoking the ``Basic.QoS`` RPC command. When it is completed, the
+ ``on_basic_qos_ok`` method will be invoked by ``pika``.
+- ``on_cancel_ok``: this method is invoked by ``pika`` when RabbitMQ
+ acknowledges the cancellation of a consumer. At this point the
+ channel close is requested, thus indirectly invoking the
+ ``on_channel_closed`` callback method once the channel has been
+ closed, which will in-turn close the connection.
+- ``on_channel_closed``: this callback method is invoked by ``pika``
+ when RabbitMQ unexpectedly closes the channel. Channels are usually
+ closed if there is an attempt to do something that violates the
+ protocol, such as re-declare an exchange or queue with different
+ parameters. In this case, the connection will be closed by invoking
+ the ``close_connection``.
+- ``on_channel_open``: this callback method is invoked by ``pika`` when
+ the channel has been opened. The ``on_channel_closed`` callback is
+ set here. Since the channel is now open, it is safe to declare the
+ exchange to use by invoking the ``setup_exchange`` method.
+- ``on_connection_closed``: this callback method is invoked by ``pika``
+ when the connection to RabbitMQ is closed unexpectedly. Since it is
+ unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_connection_open``: this callback method is called by ``pika``
+ once the connection to RabbitMQ has been established. It proceeds to
+ open the channel by calling the ``open_channel`` method.
+- ``on_connection_open_error``: this callback method is called by
+ ``pika`` if the connection to RabbitMQ can’t be established. Since it
+ is unexpected, an attempt to reconnect to RabbitMQ will be done.
+- ``on_consumer_cancelled``: this callback method is invoked by
+ ``pika`` when RabbitMQ sends a ``Basic.Cancel`` for a consumer
+ receiving messages. At this point the channel close is requested,
+ thus indirectly invoking the ``on_channel_closed`` callback method
+ once the channel has been closed, which will in-turn close the
+ connection.
+- ``on_exchange_declare_ok``: this callback methods is invoked by
+ ``pika`` when the ``Exchange.Declare`` RPC call made in
+ ``setup_exchange`` has completed. At this point it is time to set up
+ the queues for the different request handling threads. This is done
+ by calling the ``setup_queues`` method.
+- ``on_queue_declare_ok``: this callback method is invoked by ``pika``
+ when each ``Queue.Declare`` RPC call made in ``setup_queues`` has
+ completed. Now it is time to bind the current queue and exchange
+ together with the correspondent routing key by issuing the
+ ``Queue.Bind`` RPC command. When this command is completed, the
+ ``on_bind_ok`` method will be invoked by ``pika``.
+- ``on_request``: this callback method is invoked by ``pika`` when a
+ message is delivered from RabbitMQ to any of the queues bound by the
+ worker. The decoded request together with its correlation ID and
+ reply-to property are enqueued in the correspondent internal
+ ``_request_queues`` (the actual queue it identified by the message
+ routing key), so that the data is forwarded to the thread that
+ handles the particular method the message is associated to. A
+ ``Basic.Ack`` RPC command is issued to acknowledge the delivery of
+ the message to RabbitMQ.
+- ``open_channel``: this method opens a new channel with RabbitMQ by
+ issuing the ``Channel.Open`` RPC command. When RabbitMQ responds that
+ the channel is open, the ``on_channel_open`` callback will be invoked
+ by ``pika``.
+- ``request_termination``: this method send a signal to the main thread
+ of the process to cleanly release resources and terminate. This is
+ done by setting a callback in the ``IOLoop`` that raises a
+ ``TerminateSignal``, which will eventually be handled in by ``run``
+ method.
+- ``respond``: this methods send a message to RabbitMQ by issuing a
+ ``Basic.Publish`` RPC command. The body of the message is properly
+ encoded, while correlation ID and request key are used as passed by
+ the calling method.
+- ``run``: main method of the process. It launches a thread to handle
+ communication with the ``ProvenanceStorageRabbitMQServer`` object,
+ targeting the ``run_command_thread`` method. Also, it launches on
+ thread for each ``ProvenanceStorageInterface`` method the worker
+ needs to handle, each targeting the ``run_request_thread`` method.
+ Finally, it requests to open a connection to RabbitMQ by calling the
+ ``connect`` method, and starts the internal ``IOLoop`` of the
+ returned handle (blocking operation). In case of failure, this method
+ will indefinitely try to reconnect. When an explicit
+ ``TerminateSignal`` is received, the ``stop`` method is invoked. All
+ internal thread will be signalled for termination as well, and
+ resources released.
+- ``run_command_thread``: main method of the command thread. It
+ received external commands from the
+ ``ProvenanceStorageRabbitMQServer`` object through a
+ ``multiprocessing.Queue`` structure. The only supported command for
+ now is for the worker to be signalled to terminate, in which case the
+ ``request_termination`` method is invoked.
+- ``run_request_thread``: main method of the request threads. The
+ worker has one such thread per ``ProvenanceStorageInterface`` method
+ it needs to handle. This method will initialize its own storage
+ object and interact with the ``on_request`` through a ``queue.Queue``
+ structure, waiting for items to be passed to the storage method
+ associated with the thread. When elements become available, it will
+ perform a conflict resolution step by resorting to the
+ method-specific function returned by ``get_conflicts_func``. After
+ this it will forward the items to the underlying storage object. If
+ the storage method returns successfully, acknowledgements are sent
+ back to each client by through ``respond`` method. If the call to the
+ storage method fails, items will be enqueued back to retry in a
+ future iteration. In case of an unexpected exception, the worker is
+ signalled for termination by calling the ``request_termination``
+ method.
+- ``setup_exchange``: this method sets up the exchange on RabbitMQ by
+ invoking the ``Exchange.Declare`` RPC command. When it is completed,
+ the ``on_exchange_declare_ok`` method will be invoked by ``pika``.
+- ``setup_queues``: this methods sets up the necessary queues for the
+ worker on RabbitMQ by invoking several ``Queue.Declare`` RPC commands
+ (one per supported provenance storage method). When each command is
+ completed, the ``on_queue_declare_ok`` method will be invoked by
+ ``pika``.
+- ``start``: inherited from ``multiprocessing.Process``. It launches
+ the worker sub-process with method ``run`` as target.
+- ``start_consuming``: this method sets up the worker by first
+ registering the ``add_on_cancel_callback`` callback, so that the
+ object is notified if RabbitMQ cancels the consumer. It then issues
+ one ``Basic.Consume`` RPC command per supported provenance storage
+ method, which return the consumer tag that is used to uniquely
+ identify the consumer with RabbitMQ. The ``on_request`` method is
+ passed in as a callback ``pika`` will invoke when a message is fully
+ received. After setting up all consumers, a ``CONSUMING`` signal is
+ sent to the ``ProvenanceStorageRabbitMQServer`` object through a
+ ``multiprocessing.Queue`` structure.
+- ``stop``: this method cleanly shutdown the connection to RabbitMQ by
+ calling the ``stop_consuming`` method. The ``IOLoop`` is started
+ again because this method is invoked by raising a ``TerminateSignal``
+ exception. This exception stops the ``IOLoop`` which needs to be
+ running for ``pika`` to communicate with RabbitMQ. All of the
+ commands issued prior to starting the ``IOLoop`` will be buffered but
+ not processed.
+- ``stop_consuming``: this method sends a ``Basic.Cancel`` RPC command
+ for each supported provenance storage method. When RabbitMQ confirms
+ the cancellation, the ``on_cancel_ok`` callback methods will be
+ invoked by ``pika``, which will then close the channel and
+ connection.
+- ``get_conflicts_func``: this method returns the conflict resolution
+ function to be used based on the provenance storage method.
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -17,6 +17,9 @@
[mypy-msgpack.*]
ignore_missing_imports = True
+[mypy-pika.*]
+ignore_missing_imports = True
+
[mypy-pkg_resources.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,6 +5,7 @@
iso8601
methodtools
mongomock
+pika
pymongo
PyYAML
types-click
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -92,4 +92,12 @@
engine = kwargs.get("engine", "pymongo")
return ProvenanceStorageMongoDb(engine=engine, **kwargs["db"])
+ elif cls == "rabbitmq":
+ from .api.client import ProvenanceStorageRabbitMQClient
+
+ rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs)
+ if TYPE_CHECKING:
+ assert isinstance(rmq_storage, ProvenanceStorageInterface)
+ return rmq_storage
+
raise ValueError
diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
--- a/swh/provenance/api/client.py
+++ b/swh/provenance/api/client.py
@@ -2,3 +2,469 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+import functools
+import inspect
+import logging
+import queue
+import threading
+import time
+from types import TracebackType
+from typing import Any, Dict, Iterable, Optional, Set, Tuple, Type, Union
+import uuid
+
+import pika
+import pika.channel
+import pika.connection
+import pika.frame
+import pika.spec
+
+from swh.core.api.serializers import encode_data_client as encode_data
+from swh.core.api.serializers import msgpack_loads as decode_data
+from swh.core.statsd import statsd
+
+from .. import get_provenance_storage
+from ..interface import ProvenanceStorageInterface, RelationData, RelationType
+from .serializers import DECODERS, ENCODERS
+from .server import ProvenanceStorageRabbitMQServer
+
+LOG_FORMAT = (
+ "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
+ "-35s %(lineno) -5d: %(message)s"
+)
+LOGGER = logging.getLogger(__name__)
+
+STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds"
+
+
+class ResponseTimeout(Exception):
+ pass
+
+
+class TerminateSignal(Exception):
+ pass
+
+
+def split_ranges(
+ data: Iterable[bytes], meth_name: str, relation: Optional[RelationType] = None
+) -> Dict[str, Set[Tuple[Any, ...]]]:
+ ranges: Dict[str, Set[Tuple[Any, ...]]] = {}
+ if relation is not None:
+ assert isinstance(data, dict), "Relation data must be provided in a dictionary"
+ for src, dsts in data.items():
+ key = ProvenanceStorageRabbitMQServer.get_routing_key(
+ src, meth_name, relation
+ )
+ for rel in dsts:
+ assert isinstance(
+ rel, RelationData
+ ), "Values in the dictionary must be RelationData structures"
+ ranges.setdefault(key, set()).add((src, rel.dst, rel.path))
+ else:
+ items: Union[Set[Tuple[bytes, Any]], Set[Tuple[bytes]]]
+ if isinstance(data, dict):
+ items = set(data.items())
+ else:
+ items = {(item,) for item in data}
+ for id, *rest in items:
+ key = ProvenanceStorageRabbitMQServer.get_routing_key(id, meth_name)
+ ranges.setdefault(key, set()).add((id, *rest))
+ return ranges
+
+
+class MetaRabbitMQClient(type):
+ def __new__(cls, name, bases, attributes):
+ # For each method wrapped with @remote_api_endpoint in an API backend
+ # (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new
+ # method in RemoteStorage, with the same documentation.
+ #
+ # Note that, despite the usage of decorator magic (eg. functools.wrap),
+ # this never actually calls an IndexerStorage method.
+ backend_class = attributes.get("backend_class", None)
+ for base in bases:
+ if backend_class is not None:
+ break
+ backend_class = getattr(base, "backend_class", None)
+ if backend_class:
+ for meth_name, meth in backend_class.__dict__.items():
+ if hasattr(meth, "_endpoint_path"):
+ cls.__add_endpoint(meth_name, meth, attributes)
+ return super().__new__(cls, name, bases, attributes)
+
+ @staticmethod
+ def __add_endpoint(meth_name, meth, attributes):
+ wrapped_meth = inspect.unwrap(meth)
+
+ @functools.wraps(meth) # Copy signature and doc
+ def meth_(*args, **kwargs):
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+
+ # Remove arguments that should not be passed
+ self = data.pop("self")
+
+ # Call storage method with remaining arguments
+ return getattr(self._storage, meth_name)(**data)
+
+ @functools.wraps(meth) # Copy signature and doc
+ def write_meth_(*args, **kwargs):
+ with statsd.timed(
+ metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
+ ):
+ # Match arguments and parameters
+ post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
+
+ try:
+ # Remove arguments that should not be passed
+ self = post_data.pop("self")
+ relation = post_data.pop("relation", None)
+ assert len(post_data) == 1
+ data = next(iter(post_data.values()))
+
+ ranges = split_ranges(data, meth_name, relation)
+ acks_expected = sum(len(items) for items in ranges.values())
+ self._correlation_id = str(uuid.uuid4())
+
+ exchange = ProvenanceStorageRabbitMQServer.get_exchange(
+ meth_name, relation
+ )
+ for routing_key, items in ranges.items():
+ items_list = list(items)
+ batches = (
+ items_list[idx : idx + self._batch_size]
+ for idx in range(0, len(items_list), self._batch_size)
+ )
+ for batch in batches:
+ # FIXME: this is running in a different thread! Hence, if
+ # self._connection drops, there is no guarantee that the
+ # request can be sent for the current elements. This
+ # situation should be handled properly.
+ self._connection.ioloop.add_callback_threadsafe(
+ functools.partial(
+ ProvenanceStorageRabbitMQClient.request,
+ channel=self._channel,
+ reply_to=self._callback_queue,
+ exchange=exchange,
+ routing_key=routing_key,
+ correlation_id=self._correlation_id,
+ data=batch,
+ )
+ )
+ return self.wait_for_acks(acks_expected)
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ return False
+
+ if meth_name not in attributes:
+ attributes[meth_name] = (
+ write_meth_
+ if ProvenanceStorageRabbitMQServer.is_write_method(meth_name)
+ else meth_
+ )
+
+
+class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient):
+ backend_class = ProvenanceStorageInterface
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+ def __init__(
+ self,
+ url: str,
+ storage_config: Dict[str, Any],
+ batch_size: int = 100,
+ prefetch_count: int = 100,
+ wait_min: float = 60,
+ wait_per_batch: float = 10,
+ ) -> None:
+ """Setup the client object, passing in the URL we will use to connect to
+ RabbitMQ, and the connection information for the local storage object used
+ for read-only operations.
+
+ :param str url: The URL for connecting to RabbitMQ
+ :param dict storage_config: Configuration parameters for the underlying
+ ``ProvenanceStorage`` object expected by
+ ``swh.provenance.get_provenance_storage``
+ :param int batch_size: Max amount of elements per package (after range
+ splitting) for writing operations
+ :param int prefetch_count: Prefetch value for the RabbitMQ connection when
+ receiving ack packages
+ :param float wait_min: Min waiting time for response on a writing operation, in
+ seconds
+ :param float wait_per_batch: Waiting time for response per batch of items on a
+ writing operation, in seconds
+
+ """
+ super().__init__()
+
+ self._connection = None
+ self._callback_queue: Optional[str] = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag = None
+ self._consuming = False
+ self._correlation_id = str(uuid.uuid4())
+ self._prefetch_count = prefetch_count
+
+ self._batch_size = batch_size
+ self._response_queue: queue.Queue = queue.Queue()
+ self._storage = get_provenance_storage(**storage_config)
+ self._url = url
+
+ self._wait_min = wait_min
+ self._wait_per_batch = wait_per_batch
+
+ def __enter__(self) -> ProvenanceStorageInterface:
+ self.open()
+ assert isinstance(self, ProvenanceStorageInterface)
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.close()
+
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"})
+ def open(self) -> None:
+ self.start()
+ while self._callback_queue is None:
+ time.sleep(0.1)
+ self._storage.open()
+
+ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
+ def close(self) -> None:
+ assert self._connection is not None
+ self._connection.ioloop.add_callback_threadsafe(self.request_termination)
+ self.join()
+ self._storage.close()
+
+ def request_termination(self, reason: str = "Normal shutdown") -> None:
+ assert self._connection is not None
+
+ def termination_callback():
+ raise TerminateSignal(reason)
+
+ self._connection.ioloop.add_callback_threadsafe(termination_callback)
+
+ def connect(self) -> pika.SelectConnection:
+ LOGGER.info("Connecting to %s", self._url)
+ return pika.SelectConnection(
+ parameters=pika.URLParameters(self._url),
+ on_open_callback=self.on_connection_open,
+ on_open_error_callback=self.on_connection_open_error,
+ on_close_callback=self.on_connection_closed,
+ )
+
+ def close_connection(self) -> None:
+ assert self._connection is not None
+ self._consuming = False
+ if self._connection.is_closing or self._connection.is_closed:
+ LOGGER.info("Connection is closing or already closed")
+ else:
+ LOGGER.info("Closing connection")
+ self._connection.close()
+
+ def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None:
+ LOGGER.info("Connection opened")
+ self.open_channel()
+
+ def on_connection_open_error(
+ self, _unused_connection: pika.SelectConnection, err: Exception
+ ) -> None:
+ LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err)
+ assert self._connection is not None
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason):
+ assert self._connection is not None
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason)
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def open_channel(self) -> None:
+ LOGGER.info("Creating a new channel")
+ assert self._connection is not None
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_channel_open(self, channel: pika.channel.Channel) -> None:
+ LOGGER.info("Channel opened")
+ self._channel = channel
+ LOGGER.info("Adding channel close callback")
+ assert self._channel is not None
+ self._channel.add_on_close_callback(callback=self.on_channel_closed)
+ self.setup_queue()
+
+ def on_channel_closed(
+ self, channel: pika.channel.Channel, reason: Exception
+ ) -> None:
+ LOGGER.warning("Channel %i was closed: %s", channel, reason)
+ self.close_connection()
+
+ def setup_queue(self) -> None:
+ LOGGER.info("Declaring callback queue")
+ assert self._channel is not None
+ self._channel.queue_declare(
+ queue="", exclusive=True, callback=self.on_queue_declare_ok
+ )
+
+ def on_queue_declare_ok(self, frame: pika.frame.Method) -> None:
+ LOGGER.info("Binding queue to default exchanger")
+ assert self._channel is not None
+ self._callback_queue = frame.method.queue
+ self._channel.basic_qos(
+ prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok
+ )
+
+ def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None:
+ LOGGER.info("QOS set to: %d", self._prefetch_count)
+ self.start_consuming()
+
+ def start_consuming(self) -> None:
+ LOGGER.info("Issuing consumer related RPC commands")
+ LOGGER.info("Adding consumer cancellation callback")
+ assert self._channel is not None
+ self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)
+ assert self._callback_queue is not None
+ self._consumer_tag = self._channel.basic_consume(
+ queue=self._callback_queue, on_message_callback=self.on_response
+ )
+ self._consuming = True
+
+ def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
+ LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_response(
+ self,
+ channel: pika.channel.Channel,
+ deliver: pika.spec.Basic.Deliver,
+ properties: pika.spec.BasicProperties,
+ body: bytes,
+ ) -> None:
+ LOGGER.info(
+ "Received message # %s from %s: %s",
+ deliver.delivery_tag,
+ properties.app_id,
+ body,
+ )
+ self._response_queue.put(
+ (
+ properties.correlation_id,
+ decode_data(body, extra_decoders=self.extra_type_decoders),
+ )
+ )
+ LOGGER.info("Acknowledging message %s", deliver.delivery_tag)
+ channel.basic_ack(delivery_tag=deliver.delivery_tag)
+
+ def stop_consuming(self) -> None:
+ if self._channel:
+ LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ")
+ self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok)
+
+ def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None:
+ self._consuming = False
+ LOGGER.info(
+ "RabbitMQ acknowledged the cancellation of the consumer: %s",
+ self._consumer_tag,
+ )
+ LOGGER.info("Closing the channel")
+ assert self._channel is not None
+ self._channel.close()
+
+ def run(self) -> None:
+ while not self._closing:
+ try:
+ self._connection = self.connect()
+ assert self._connection is not None
+ self._connection.ioloop.start()
+ except KeyboardInterrupt:
+ LOGGER.info("Connection closed by keyboard interruption, reopening")
+ if self._connection is not None:
+ self._connection.ioloop.stop()
+ except TerminateSignal as ex:
+ LOGGER.info("Termination requested: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+ except BaseException as ex:
+ LOGGER.warning("Unexpected exception, terminating: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+ LOGGER.info("Stopped")
+
+ def stop(self) -> None:
+ assert self._connection is not None
+ if not self._closing:
+ self._closing = True
+ LOGGER.info("Stopping")
+ if self._consuming:
+ self.stop_consuming()
+ self._connection.ioloop.start()
+ else:
+ self._connection.ioloop.stop()
+ LOGGER.info("Stopped")
+
+ @staticmethod
+ def request(
+ channel: pika.channel.Channel,
+ reply_to: str,
+ exchange: str,
+ routing_key: str,
+ correlation_id: str,
+ **kwargs,
+ ) -> None:
+ channel.basic_publish(
+ exchange=exchange,
+ routing_key=routing_key,
+ properties=pika.BasicProperties(
+ content_type="application/msgpack",
+ correlation_id=correlation_id,
+ reply_to=reply_to,
+ ),
+ body=encode_data(
+ kwargs,
+ extra_encoders=ProvenanceStorageRabbitMQClient.extra_type_encoders,
+ ),
+ )
+
+ def wait_for_acks(self, acks_expected: int) -> bool:
+ acks_received = 0
+ timeout = max(
+ (acks_expected / self._batch_size) * self._wait_per_batch,
+ self._wait_min,
+ )
+ while acks_received < acks_expected:
+ try:
+ acks_received += self.wait_for_response(timeout=timeout)
+ except ResponseTimeout:
+ LOGGER.warning(
+ "Timed out waiting for acks, %s received, %s expected",
+ acks_received,
+ acks_expected,
+ )
+ return False
+ return acks_received == acks_expected
+
+ def wait_for_response(self, timeout: float = 60.0) -> Any:
+ while True:
+ try:
+ correlation_id, response = self._response_queue.get(timeout=timeout)
+ if correlation_id == self._correlation_id:
+ return response
+ except queue.Empty:
+ raise ResponseTimeout
diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py
--- a/swh/provenance/api/server.py
+++ b/swh/provenance/api/server.py
@@ -3,10 +3,660 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import Counter
+from datetime import datetime
+from enum import Enum
+import functools
+import logging
+import multiprocessing
import os
-from typing import Any, Dict, Optional
+import queue
+import threading
+from typing import Any, Callable
+from typing import Counter as TCounter
+from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union, cast
+
+import pika
+import pika.channel
+import pika.connection
+import pika.exceptions
+from pika.exchange_type import ExchangeType
+import pika.frame
+import pika.spec
from swh.core import config
+from swh.core.api.serializers import encode_data_client as encode_data
+from swh.core.api.serializers import msgpack_loads as decode_data
+from swh.model.hashutil import hash_to_hex
+from swh.model.model import Sha1Git
+
+from .. import get_provenance_storage
+from ..interface import EntityType, RelationData, RelationType, RevisionData
+from ..util import path_id
+from .serializers import DECODERS, ENCODERS
+
+LOG_FORMAT = (
+ "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
+ "-35s %(lineno) -5d: %(message)s"
+)
+LOGGER = logging.getLogger(__name__)
+
+TERMINATE = object()
+
+
+class ServerCommand(Enum):
+ TERMINATE = "terminate"
+ CONSUMING = "consuming"
+
+
+class TerminateSignal(BaseException):
+ pass
+
+
+def resolve_dates(
+ dates: Iterable[Union[Tuple[Sha1Git, Optional[datetime]], Tuple[Sha1Git]]]
+) -> Dict[Sha1Git, Optional[datetime]]:
+ result: Dict[Sha1Git, Optional[datetime]] = {}
+ for row in dates:
+ sha1 = row[0]
+ date = (
+ cast(Tuple[Sha1Git, Optional[datetime]], row)[1] if len(row) > 1 else None
+ )
+ known = result.setdefault(sha1, None)
+ if date is not None and (known is None or date < known):
+ result[sha1] = date
+ return result
+
+
+def resolve_revision(
+ data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]]
+) -> Dict[Sha1Git, RevisionData]:
+ result: Dict[Sha1Git, RevisionData] = {}
+ for row in data:
+ sha1 = row[0]
+ rev = (
+ cast(Tuple[Sha1Git, RevisionData], row)[1]
+ if len(row) > 1
+ else RevisionData(date=None, origin=None)
+ )
+ known = result.setdefault(sha1, RevisionData(date=None, origin=None))
+ value = known
+ if rev.date is not None and (known.date is None or rev.date < known.date):
+ value = RevisionData(date=rev.date, origin=value.origin)
+ if rev.origin is not None:
+ value = RevisionData(date=value.date, origin=rev.origin)
+ if value != known:
+ result[sha1] = value
+ return result
+
+
+def resolve_relation(
+ data: Iterable[Tuple[Sha1Git, Sha1Git, bytes]]
+) -> Dict[Sha1Git, Set[RelationData]]:
+ result: Dict[Sha1Git, Set[RelationData]] = {}
+ for src, dst, path in data:
+ result.setdefault(src, set()).add(RelationData(dst=dst, path=path))
+ return result
+
+
+class ProvenanceStorageRabbitMQWorker(multiprocessing.Process):
+ EXCHANGE_TYPE = ExchangeType.direct
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+ def __init__(
+ self,
+ url: str,
+ exchange: str,
+ range: int,
+ storage_config: Dict[str, Any],
+ batch_size: int = 100,
+ prefetch_count: int = 100,
+ ) -> None:
+ """Setup the worker object, passing in the URL we will use to connect to
+ RabbitMQ, the exchange to use, the range id on which to operate, and the
+ connection information for the underlying local storage object.
+
+ :param str url: The URL for connecting to RabbitMQ
+ :param str exchange: The name of the RabbitMq exchange to use
+ :param str range: The ID range to operate on
+ :param dict storage_config: Configuration parameters for the underlying
+ ``ProvenanceStorage`` object expected by
+ ``swh.provenance.get_provenance_storage``
+ :param int batch_size: Max amount of elements call to the underlying storage
+ :param int prefetch_count: Prefetch value for the RabbitMQ connection when
+ receiving messaged
+
+ """
+ super().__init__(name=f"{exchange}_{range:x}")
+
+ self._connection = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag: Dict[str, str] = {}
+ self._consuming: Dict[str, bool] = {}
+ self._prefetch_count = prefetch_count
+
+ self._url = url
+ self._exchange = exchange
+ self._binding_keys = list(
+ ProvenanceStorageRabbitMQServer.get_binding_keys(self._exchange, range)
+ )
+ self._queues: Dict[str, str] = {}
+ self._storage_config = storage_config
+ self._batch_size = batch_size
+
+ self.command: multiprocessing.Queue = multiprocessing.Queue()
+ self.signal: multiprocessing.Queue = multiprocessing.Queue()
+
+ def connect(self) -> pika.SelectConnection:
+ LOGGER.info("Connecting to %s", self._url)
+ return pika.SelectConnection(
+ parameters=pika.URLParameters(self._url),
+ on_open_callback=self.on_connection_open,
+ on_open_error_callback=self.on_connection_open_error,
+ on_close_callback=self.on_connection_closed,
+ )
+
+ def close_connection(self) -> None:
+ assert self._connection is not None
+ self._consuming = {binding_key: False for binding_key in self._binding_keys}
+ if self._connection.is_closing or self._connection.is_closed:
+ LOGGER.info("Connection is closing or already closed")
+ else:
+ LOGGER.info("Closing connection")
+ self._connection.close()
+
+ def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None:
+ LOGGER.info("Connection opened")
+ self.open_channel()
+
+ def on_connection_open_error(
+ self, _unused_connection: pika.SelectConnection, err: Exception
+ ) -> None:
+ LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err)
+ assert self._connection is not None
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason):
+ assert self._connection is not None
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason)
+ self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
+
+ def open_channel(self) -> None:
+ LOGGER.info("Creating a new channel")
+ assert self._connection is not None
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_channel_open(self, channel: pika.channel.Channel) -> None:
+ LOGGER.info("Channel opened")
+ self._channel = channel
+ LOGGER.info("Adding channel close callback")
+ assert self._channel is not None
+ self._channel.add_on_close_callback(callback=self.on_channel_closed)
+ self.setup_exchange()
+
+ def on_channel_closed(
+ self, channel: pika.channel.Channel, reason: Exception
+ ) -> None:
+ LOGGER.warning("Channel %i was closed: %s", channel, reason)
+ self.close_connection()
+
+ def setup_exchange(self) -> None:
+ LOGGER.info("Declaring exchange %s", self._exchange)
+ assert self._channel is not None
+ self._channel.exchange_declare(
+ exchange=self._exchange,
+ exchange_type=self.EXCHANGE_TYPE,
+ callback=self.on_exchange_declare_ok,
+ )
+
+ def on_exchange_declare_ok(self, _unused_frame: pika.frame.Method) -> None:
+ LOGGER.info("Exchange declared: %s", self._exchange)
+ self.setup_queues()
+
+ def setup_queues(self) -> None:
+ for binding_key in self._binding_keys:
+ LOGGER.info("Declaring queue %s", binding_key)
+ assert self._channel is not None
+ callback = functools.partial(
+ self.on_queue_declare_ok,
+ binding_key=binding_key,
+ )
+ self._channel.queue_declare(queue=binding_key, callback=callback)
+
+ def on_queue_declare_ok(self, frame: pika.frame.Method, binding_key: str) -> None:
+ LOGGER.info(
+ "Binding queue %s to exchange %s with routing key %s",
+ frame.method.queue,
+ self._exchange,
+ binding_key,
+ )
+ assert self._channel is not None
+ callback = functools.partial(self.on_bind_ok, queue_name=frame.method.queue)
+ self._queues[binding_key] = frame.method.queue
+ self._channel.queue_bind(
+ queue=frame.method.queue,
+ exchange=self._exchange,
+ routing_key=binding_key,
+ callback=callback,
+ )
+
+ def on_bind_ok(self, _unused_frame: pika.frame.Method, queue_name: str) -> None:
+ LOGGER.info("Queue bound: %s", queue_name)
+ assert self._channel is not None
+ self._channel.basic_qos(
+ prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok
+ )
+
+ def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None:
+ LOGGER.info("QOS set to: %d", self._prefetch_count)
+ self.start_consuming()
+
+ def start_consuming(self) -> None:
+ LOGGER.info("Issuing consumer related RPC commands")
+ LOGGER.info("Adding consumer cancellation callback")
+ assert self._channel is not None
+ self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)
+ for binding_key in self._binding_keys:
+ self._consumer_tag[binding_key] = self._channel.basic_consume(
+ queue=self._queues[binding_key], on_message_callback=self.on_request
+ )
+ self._consuming[binding_key] = True
+ self.signal.put(ServerCommand.CONSUMING)
+
+ def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
+ LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_request(
+ self,
+ channel: pika.channel.Channel,
+ deliver: pika.spec.Basic.Deliver,
+ properties: pika.spec.BasicProperties,
+ body: bytes,
+ ) -> None:
+ LOGGER.info(
+ "Received message # %s from %s: %s",
+ deliver.delivery_tag,
+ properties.app_id,
+ body,
+ )
+ # XXX: for some reason this function is returning lists instead of tuples
+ # (the client send tuples)
+ batch = decode_data(data=body, extra_decoders=self.extra_type_decoders)["data"]
+ for item in batch:
+ self._request_queues[deliver.routing_key].put(
+ (tuple(item), (properties.correlation_id, properties.reply_to))
+ )
+ LOGGER.info("Acknowledging message %s", deliver.delivery_tag)
+ channel.basic_ack(delivery_tag=deliver.delivery_tag)
+
+ def stop_consuming(self) -> None:
+ if self._channel:
+ LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ")
+ for binding_key in self._binding_keys:
+ callback = functools.partial(self.on_cancel_ok, binding_key=binding_key)
+ self._channel.basic_cancel(
+ self._consumer_tag[binding_key], callback=callback
+ )
+
+ def on_cancel_ok(self, _unused_frame: pika.frame.Method, binding_key: str) -> None:
+ self._consuming[binding_key] = False
+ LOGGER.info(
+ "RabbitMQ acknowledged the cancellation of the consumer: %s",
+ self._consuming[binding_key],
+ )
+ LOGGER.info("Closing the channel")
+ assert self._channel is not None
+ self._channel.close()
+
+ def run(self) -> None:
+ self._command_thread = threading.Thread(target=self.run_command_thread)
+ self._command_thread.start()
+
+ self._request_queues: Dict[str, queue.Queue] = {}
+ self._request_threads: Dict[str, threading.Thread] = {}
+ for binding_key in self._binding_keys:
+ meth_name, relation = ProvenanceStorageRabbitMQServer.get_meth_name(
+ binding_key
+ )
+ self._request_queues[binding_key] = queue.Queue()
+ self._request_threads[binding_key] = threading.Thread(
+ target=self.run_request_thread,
+ args=(binding_key, meth_name, relation),
+ )
+ self._request_threads[binding_key].start()
+
+ while not self._closing:
+ try:
+ self._connection = self.connect()
+ assert self._connection is not None
+ self._connection.ioloop.start()
+ except KeyboardInterrupt:
+ LOGGER.info("Connection closed by keyboard interruption, reopening")
+ if self._connection is not None:
+ self._connection.ioloop.stop()
+ except TerminateSignal as ex:
+ LOGGER.info("Termination requested: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+ except BaseException as ex:
+ LOGGER.warning("Unexpected exception, terminating: %s", ex)
+ self.stop()
+ if self._connection is not None and not self._connection.is_closed:
+ # Finish closing
+ self._connection.ioloop.start()
+
+ for binding_key in self._binding_keys:
+ self._request_queues[binding_key].put(TERMINATE)
+ for binding_key in self._binding_keys:
+ self._request_threads[binding_key].join()
+ self._command_thread.join()
+ LOGGER.info("Stopped")
+
+ def run_command_thread(self) -> None:
+ while True:
+ try:
+ command = self.command.get()
+ if command == ServerCommand.TERMINATE:
+ self.request_termination()
+ break
+ except queue.Empty:
+ pass
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ break
+
+ def request_termination(self, reason: str = "Normal shutdown") -> None:
+ assert self._connection is not None
+
+ def termination_callback():
+ raise TerminateSignal(reason)
+
+ self._connection.ioloop.add_callback_threadsafe(termination_callback)
+
+ def run_request_thread(
+ self, binding_key: str, meth_name: str, relation: Optional[RelationType]
+ ) -> None:
+ with get_provenance_storage(**self._storage_config) as storage:
+ request_queue = self._request_queues[binding_key]
+ merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name)
+ while True:
+ terminate = False
+ elements = []
+ while True:
+ try:
+ # TODO: consider reducing this timeout or removing it
+ elem = request_queue.get(timeout=0.1)
+ if elem is TERMINATE:
+ terminate = True
+ break
+ elements.append(elem)
+ except queue.Empty:
+ break
+
+ if len(elements) >= self._batch_size:
+ break
+
+ if terminate:
+ break
+
+ if not elements:
+ continue
+
+ try:
+ items, props = zip(*elements)
+ acks_count: TCounter[Tuple[str, str]] = Counter(props)
+ data = merge_items(items)
+
+ args = (relation, data) if relation is not None else (data,)
+ if getattr(storage, meth_name)(*args):
+ for (correlation_id, reply_to), count in acks_count.items():
+ # FIXME: this is running in a different thread! Hence, if
+ # self._connection drops, there is no guarantee that the
+ # response can be sent for the current elements. This
+ # situation should be handled properly.
+ assert self._connection is not None
+ self._connection.ioloop.add_callback_threadsafe(
+ functools.partial(
+ ProvenanceStorageRabbitMQWorker.respond,
+ channel=self._channel,
+ correlation_id=correlation_id,
+ reply_to=reply_to,
+ response=count,
+ )
+ )
+ else:
+ LOGGER.warning(
+ "Unable to process elements for queue %s", binding_key
+ )
+ for elem in elements:
+ request_queue.put(elem)
+ except BaseException as ex:
+ self.request_termination(str(ex))
+ break
+
+ def stop(self) -> None:
+ assert self._connection is not None
+ if not self._closing:
+ self._closing = True
+ LOGGER.info("Stopping")
+ if any(self._consuming):
+ self.stop_consuming()
+ self._connection.ioloop.start()
+ else:
+ self._connection.ioloop.stop()
+ LOGGER.info("Stopped")
+
+ @staticmethod
+ def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]:
+ if meth_name in ["content_add", "directory_add"]:
+ return resolve_dates
+ elif meth_name == "location_add":
+ return lambda data: set(data) # just remove duplicates
+ elif meth_name == "origin_add":
+ return lambda data: dict(data) # last processed value is good enough
+ elif meth_name == "revision_add":
+ return resolve_revision
+ elif meth_name == "relation_add":
+ return resolve_relation
+ else:
+ LOGGER.warning(
+ "Unexpected conflict resolution function request for method %s",
+ meth_name,
+ )
+ return lambda x: x
+
+ @staticmethod
+ def respond(
+ channel: pika.channel.Channel,
+ correlation_id: str,
+ reply_to: str,
+ response: Any,
+ ):
+ channel.basic_publish(
+ exchange="",
+ routing_key=reply_to,
+ properties=pika.BasicProperties(
+ content_type="application/msgpack",
+ correlation_id=correlation_id,
+ ),
+ body=encode_data(
+ response,
+ extra_encoders=ProvenanceStorageRabbitMQServer.extra_type_encoders,
+ ),
+ )
+
+
+class ProvenanceStorageRabbitMQServer:
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+ queue_count = 16
+
+ def __init__(
+ self,
+ url: str,
+ storage_config: Dict[str, Any],
+ batch_size: int = 100,
+ prefetch_count: int = 100,
+ ) -> None:
+ """Setup the server object, passing in the URL we will use to connect to
+ RabbitMQ, and the connection information for the underlying local storage
+ object.
+
+ :param str url: The URL for connecting to RabbitMQ
+ :param dict storage_config: Configuration parameters for the underlying
+ ``ProvenanceStorage`` object expected by
+ ``swh.provenance.get_provenance_storage``
+ :param int batch_size: Max amount of elements call to the underlying storage
+ :param int prefetch_count: Prefetch value for the RabbitMQ connection when
+ receiving messaged
+
+ """
+ self._workers: List[ProvenanceStorageRabbitMQWorker] = []
+ for exchange in ProvenanceStorageRabbitMQServer.get_exchanges():
+ for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange):
+ worker = ProvenanceStorageRabbitMQWorker(
+ url=url,
+ exchange=exchange,
+ range=range,
+ storage_config=storage_config,
+ batch_size=batch_size,
+ prefetch_count=prefetch_count,
+ )
+ self._workers.append(worker)
+ self._running = False
+
+ def start(self) -> None:
+ if not self._running:
+ self._running = True
+ for worker in self._workers:
+ worker.start()
+ for worker in self._workers:
+ try:
+ signal = worker.signal.get(timeout=60)
+ assert signal == ServerCommand.CONSUMING
+ except queue.Empty:
+ LOGGER.error(
+ "Could not initialize worker %s. Leaving...", worker.name
+ )
+ self.stop()
+ return
+ LOGGER.info("Start serving")
+
+ def stop(self) -> None:
+ if self._running:
+ for worker in self._workers:
+ worker.command.put(ServerCommand.TERMINATE)
+ for worker in self._workers:
+ worker.join()
+ LOGGER.info("Stop serving")
+ self._running = False
+
+ @staticmethod
+ def get_binding_keys(exchange: str, range: int) -> Generator[str, None, None]:
+ for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names(
+ exchange
+ ):
+ if relation is None:
+ assert (
+ meth_name != "relation_add"
+ ), "'relation_add' requires 'relation' to be provided"
+ yield f"{meth_name}.unknown.{range:x}".lower()
+ else:
+ assert (
+ meth_name == "relation_add"
+ ), f"'{meth_name}' requires 'relation' to be None"
+ yield f"{meth_name}.{relation.value}.{range:x}".lower()
+
+ @staticmethod
+ def get_exchange(meth_name: str, relation: Optional[RelationType] = None) -> str:
+ if meth_name == "relation_add":
+ assert (
+ relation is not None
+ ), "'relation_add' requires 'relation' to be provided"
+ split = relation.value
+ else:
+ assert relation is None, f"'{meth_name}' requires 'relation' to be None"
+ split = meth_name
+ exchange, *_ = split.split("_")
+ return exchange
+
+ @staticmethod
+ def get_exchanges() -> Generator[str, None, None]:
+ yield from [entity.value for entity in EntityType] + ["location"]
+
+ @staticmethod
+ def get_meth_name(
+ binding_key: str,
+ ) -> Tuple[str, Optional[RelationType]]:
+ meth_name, relation, *_ = binding_key.split(".")
+ return meth_name, (RelationType(relation) if relation != "unknown" else None)
+
+ @staticmethod
+ def get_meth_names(
+ exchange: str,
+ ) -> Generator[Tuple[str, Optional[RelationType]], None, None]:
+ if exchange == EntityType.CONTENT.value:
+ yield from [
+ ("content_add", None),
+ ("relation_add", RelationType.CNT_EARLY_IN_REV),
+ ("relation_add", RelationType.CNT_IN_DIR),
+ ]
+ elif exchange == EntityType.DIRECTORY.value:
+ yield from [
+ ("directory_add", None),
+ ("relation_add", RelationType.DIR_IN_REV),
+ ]
+ elif exchange == EntityType.ORIGIN.value:
+ yield from [("origin_add", None)]
+ elif exchange == EntityType.REVISION.value:
+ yield from [
+ ("revision_add", None),
+ ("relation_add", RelationType.REV_BEFORE_REV),
+ ("relation_add", RelationType.REV_IN_ORG),
+ ]
+ elif exchange == "location":
+ yield "location_add", None
+
+ @staticmethod
+ def get_ranges(unused_exchange: str) -> Generator[int, None, None]:
+ # XXX: we might want to have a different range per exchange
+ yield from range(ProvenanceStorageRabbitMQServer.queue_count)
+
+ @staticmethod
+ def get_routing_key(
+ item: bytes, meth_name: str, relation: Optional[RelationType] = None
+ ) -> str:
+ hashid = (
+ path_id(item).hex()
+ if meth_name.startswith("location")
+ else hash_to_hex(item)
+ )
+ idx = int(hashid[0], 16) % ProvenanceStorageRabbitMQServer.queue_count
+ if relation is None:
+ assert (
+ meth_name != "relation_add"
+ ), "'relation_add' requires 'relation' to be provided"
+ return f"{meth_name}.unknown.{idx:x}".lower()
+ else:
+ assert (
+ meth_name == "relation_add"
+ ), f"'{meth_name}' requires 'relation' to be None"
+ return f"{meth_name}.{relation.value}.{idx:x}".lower()
+
+ @staticmethod
+ def is_write_method(meth_name: str) -> bool:
+ return "_add" in meth_name
def load_and_check_config(
@@ -39,9 +689,13 @@
if pcfg is None:
raise KeyError("Missing 'provenance' configuration")
- scfg: Optional[Dict[str, Any]] = pcfg.get("storage")
+ rcfg: Optional[Dict[str, Any]] = pcfg.get("rabbitmq")
+ if rcfg is None:
+ raise KeyError("Missing 'provenance.rabbitmq' configuration")
+
+ scfg: Optional[Dict[str, Any]] = rcfg.get("storage_config")
if scfg is None:
- raise KeyError("Missing 'provenance.storage' configuration")
+ raise KeyError("Missing 'provenance.rabbitmq.storage_config' configuration")
if type == "local":
cls = scfg.get("cls")
@@ -56,3 +710,9 @@
raise KeyError("Invalid configuration; missing 'db' config entry")
return cfg
+
+
+def make_server_from_configfile() -> ProvenanceStorageRabbitMQServer:
+ config_path = os.environ.get("SWH_CONFIG_FILENAME")
+ server_cfg = load_and_check_config(config_path)
+ return ProvenanceStorageRabbitMQServer(**server_cfg["provenance"]["rabbitmq"])
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -35,25 +35,40 @@
# Direct access Archive object
"cls": "direct",
"db": {
- "host": "db.internal.softwareheritage.org",
+ "host": "belvedere.internal.softwareheritage.org",
+ "port": 5432,
"dbname": "softwareheritage",
"user": "guest",
},
},
"storage": {
# Local PostgreSQL Storage
- "cls": "postgresql",
- "db": {
- "host": "localhost",
- "user": "postgres",
- "password": "postgres",
- "dbname": "provenance",
- },
+ # "cls": "postgresql",
+ # "db": {
+ # "host": "localhost",
+ # "user": "postgres",
+ # "password": "postgres",
+ # "dbname": "provenance",
+ # },
# Local MongoDB Storage
# "cls": "mongodb",
# "db": {
# "dbname": "provenance",
# },
+ # Remote RabbitMQ/PostgreSQL Storage
+ "cls": "rabbitmq",
+ "url": "amqp://localhost:5672/%2f",
+ "storage_config": {
+ "cls": "postgresql",
+ "db": {
+ "host": "localhost",
+ "user": "postgres",
+ "password": "postgres",
+ "dbname": "provenance",
+ },
+ },
+ "batch_size": 100,
+ "prefetch_count": 100,
},
}
}
diff --git a/swh/provenance/util.py b/swh/provenance/util.py
--- a/swh/provenance/util.py
+++ b/swh/provenance/util.py
@@ -3,8 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import hashlib
import os
+def path_id(path: bytes) -> bytes:
+ return hashlib.sha1(path).digest()
+
+
def path_normalize(path: bytes) -> bytes:
return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -33,3 +33,38 @@
mypy
commands =
mypy swh
+
+# build documentation outside swh-environment using the current
+# git HEAD of swh-docs, is executed on CI for each diff to prevent
+# breaking doc build
+[testenv:sphinx]
+whitelist_externals = make
+usedevelop = true
+extras =
+ testing
+deps =
+ # fetch and install swh-docs in develop mode
+ -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs
+setenv =
+ SWH_PACKAGE_DOC_TOX_BUILD = 1
+ # turn warnings into errors
+ SPHINXOPTS = -W
+commands =
+ make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs
+
+# build documentation only inside swh-environment using local state
+# of swh-docs package
+[testenv:sphinx-dev]
+whitelist_externals = make
+usedevelop = true
+extras =
+ testing
+deps =
+ # install swh-docs in develop mode
+ -e ../swh-docs
+setenv =
+ SWH_PACKAGE_DOC_TOX_BUILD = 1
+ # turn warnings into errors
+ SPHINXOPTS = -W
+commands =
+ make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 4:14 AM (22 h, 48 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224339
Attached To
D6165: Add new RabbitMQ-based client/server API
Event Timeline
Log In to Comment