diff --git a/docs/storage/remote.rst b/docs/storage/remote.rst new file mode 100644 --- /dev/null +++ b/docs/storage/remote.rst @@ -0,0 +1,720 @@ +ProvenanceStorageRabbitMQClient +=============================== + +The remote storage client connects to a remote server using a RabbitMQ +broker, and delegates all writing operations to it. However, reading +operations are still done using a local storage object (ie. +``ProvenanceStorageMongoDb`` or ``ProvenanceStoragePostgreSql``). This +is done to speed up the process by avoiding the unnecessary RabbitMQ +overhead for operations that are conflict-free. + + :warning: **No check is done to guarantee that the local storage used + by the client is the same as the one the server uses on its side, but + it is assumed to be the case.** + +When a writing operation is invoked, the client splits the set of +elements to be written by ID range, and send a series of packages to the +server (via the RabbitMQ broker). Each package consists of elements +belonging to the same ID range but, to avoid sending huge packages, +there might be more than one package per range. After this, the client +blocks waiting for the server to acknowledge the writing of all +requested elements. + +To initialize a client object it is required to provide two mandatory +parameters: + +- ``url``: the URL string of the broker where the server expects to + receive the packages. +- ``storage_config``: a dictionary containing the storage configuration + for the local storage object, as expected by + ``swh.provenance.get_provenance_storage``. + +Additionally, some optional parameter can be specified that may affect +the performance of the remote storage as a whole: + +- ``batch_size``: an integer specifying the maximum allowed amount of + elements per package, after range splitting. Default to 100. +- ``prefetch_count``: an integer specifying how many ack packages are + prefetched from the broker. Default to 100. +- ``wait_min``: a float specifying the minimum amount of seconds that + the client should wait for the server’s response before failing. + Default to 60. +- ``wait_per_batch``: a float specifying the amount of seconds to wait + per sent batch if items (ie. package). If + ``wait_per_batch * number_of_packages`` is less than ``wait_min``, + the latter will be used instead. Default to 10. + +As all ``ProvenanceStorageInterface`` compliant objects, the remote +storage client has to be opened before being able to use it, and closed +after it is no longer needed to properly release resources. It can be +operated as a context manager using the keyword ``with``. This will take +care of actually opening/closing both the connection to the remote +server as well as the underlying local storage objects. + +Client lifecycle +---------------- + +Connecting to the remote server object implies taking care of the +RabbitMQ’s lifecycle. To that end, an internal thread is launched, which +will re-establish the connection in case of an error until an explicit +disconnect request is made. The class +``ProvenanceStorageRabbitMQClient`` extends the standard +``threading.Thread`` class, thus the ``run`` method in the class is the +target function of such thread, that will loop indefinitely calling the +``connect`` method and the blocking ``ioloop.connect`` method of the +established connection. Interaction with such loop is done by setting +callback functions. + +The following is a diagram of the interaction between the methods of the +class: + +.. code:: graphviz + + digraph { + ProvenanceStorageRabbitMQClient + + close[shape=record] + open[shape=record] + add[shape=record,label="write method"] + get[shape=record,label="read method"] + + ProvenanceStorageInterface + + start[shape=record] + stop[shape=record] + + request[shape=record] + wait_for_acks[shape=record] + wait_for_response[shape=record] + + subgraph cluster_connection_thread { + style=rounded + bgcolor=gray95 + color=gray + labelloc=b + + run[shape=record] + + connect[shape=record] + on_connection_open[shape=record] + on_connection_open_error[shape=record] + on_connection_closed[shape=record] + close_connection[shape=record] + open_channel[shape=record] + on_channel_open[shape=record] + on_channel_closed[shape=record] + setup_queue[shape=record] + on_queue_declare_ok[shape=record] + on_basic_qos_ok[shape=record] + start_consuming[shape=record] + on_consumer_cancelled[shape=record] + on_response[shape=record] + stop_consuming[shape=record] + on_cancel_ok[shape=record] + } + + ProvenanceStorageRabbitMQClient->{add,close,get,open} + + close->{stop} + open->{start} + + start->{run} + stop->{stop_consuming} + + run->{connect,stop} + + connect->{on_connection_open,on_connection_open_error,on_connection_closed} + + on_connection_open->{open_channel} + + open_channel->{on_channel_open} + + on_cancel_ok->{on_channel_closed} + on_consumer_cancelled->{on_channel_closed} + on_channel_open->{setup_queue} + + on_channel_closed->{close_connection} + + setup_queue->{on_queue_declare_ok} + + on_queue_declare_ok->{on_basic_qos_ok} + + on_basic_qos_ok->{start_consuming} + + start_consuming->{on_consumer_cancelled,on_response} + + on_response->{wait_for_response}[label="_response_queue",arrowhead="none"] + + stop_consuming->{on_cancel_ok} + + add->{request,wait_for_acks} + wait_for_acks->{wait_for_response} + + get->{ProvenanceStorageInterface} + } + +Every write method in the ``ProvenanceStorageInterface`` performs +splitting by ID range and send a series of messages to the server by +calling the ``request`` method. After that, it blocks waiting for all +necessary acks packages to arrive by calling the ``wait_for_acks`` +methods, which in-turn calls ``wait_for_response``. Calls to the write +methods may come from distinct threads, thus the interaction between +``on_response`` and ``wait_for_response`` is done with a thread-safe +``Queue.queue`` structure ``_response_queue``. + +Below there is a summary of the methods present in the diagram and their +functionality. Methods are listed in alphabetic order: + +- ``close_connection``: this method properly closes the connection to + RabbitMQ. +- ``connect``: this method connects to RabbitMQ, returning the + connection handle. When the connection is established, the + ``on_connection_open`` callback method will be invoked by ``pika``. + If there is an error establishing the connection, the + ``on_connection_open_error`` callback method will be invoked by + ``pika``. If the connection closes unexpectedly, the + ``on_connection_closed`` callback method will be invoked by ``pika``. +- ``on_basic_qos_ok``: this callback method is invoked by ``pika`` when + the ``Basic.QoS`` RPC call made in ``on_queue_declare_ok`` has + completed. At this point it is safe to start consuming messages, + hence the ``start_consuming`` method is called, which will invoke the + needed RPC commands to start the process. +- ``on_cancel_ok``: this callback method is invoked by ``pika`` when + RabbitMQ acknowledges the cancellation of a consumer. At this point + the channel close is requested, thus indirectly invoking the + ``on_channel_closed`` callback method once the channel has been + closed, which will in-turn close the connection. +- ``on_channel_closed``: this callback method is invoked by ``pika`` + when RabbitMQ unexpectedly closes the channel. Channels are usually + closed if there is an attempt to do something that violates the + protocol, such as re-declare an exchange or queue with different + parameters. In this case, the connection will be closed by invoking + the ``close_connection``. +- ``on_channel_open``: this callback method is invoked by ``pika`` when + the channel has been opened. The ``on_channel_closed`` callback is + set here. Since the channel is now open, it is safe to set up the + client’s response queue on RabbitMQ by invoking the ``setup_queue`` + method. +- ``on_connection_closed``: this callback method is invoked by ``pika`` + when the connection to RabbitMQ is closed unexpectedly. Since it is + unexpected, an attempt to reconnect to RabbitMQ will be done. +- ``on_connection_open``: this callback method is invoked by ``pika`` + once the connection to RabbitMQ has been established. It proceeds to + open the channel by calling the ``open_channel`` method. +- ``on_connection_open_error``: this callback method is invoked by + ``pika`` if the connection to RabbitMQ can’t be established. Since it + is unexpected, an attempt to reconnect to RabbitMQ will be done. +- ``on_consumer_cancelled``: this callback methods is invoked by + ``pika`` when RabbitMQ sends a ``Basic.Cancel`` for a consumer + receiving messages. At this point the channel close is requested, + thus indirectly invoking the ``on_channel_closed`` callback method + once the channel has been closed, which will in-turn close the + connection. +- ``on_queue_declare_ok``: this callback method is invoked by ``pika`` + when the ``Queue.Declare`` RPC call made in ``setup_queue`` has + completed. This method sets up the consumer prefetch count by + invoking the ``Basic.QoS`` RPC command. When it is completed, the + ``on_basic_qos_ok`` method will be invoked by ``pika``. +- ``on_response``: this callback method is invoked by ``pika`` when a + message is delivered from RabbitMQ. The decoded response together + with its correlation ID is enqueued in the internal + ``_response_queue``, so that the data is forwarded to the + ``wait_for_response`` method, that might be running on a distinct + thread. A ``Basic.Ack`` RPC command is issued to acknowledge the + delivery of the message to RabbitMQ. +- ``open_channel``: this method opens a new channel with RabbitMQ by + issuing the ``Channel.Open`` RPC command. When RabbitMQ responds that + the channel is open, the ``on_channel_open`` callback will be invoked + by ``pika``. +- ``request``: this methods send a message to RabbitMQ by issuing a + ``Basic.Publish`` RPC command. The body of the message is properly + encoded, while correlation ID and request key are used as passed by + the calling method. +- ``run``: main method of the internal thread. It requests to open a + connection to RabbitMQ by calling the ``connect`` method, and starts + the internal ``IOLoop`` of the returned handle (blocking operation). + In case of failure, this method will indefinitely try to reconnect. + When an explicit ``TerminateSignal`` is received, the ``stop`` method + is invoked. +- ``setup_queue``: this methods sets up an exclusive queue for the + client on RabbitMQ by invoking the ``Queue.Declare`` RPC command. + When it is completed, the ``on_queue_declare_ok`` method will be + invoked by ``pika``. +- ``start``: inherited from ``threading.Thread``. It launches the + internal thread with method ``run`` as target. +- ``start_consuming``: this method sets up the consumer by first + registering the ``on_consumer_cancelled`` callback, so that the + client is notified if RabbitMQ cancels the consumer. It then issues + the ``Basic.Consume`` RPC command which returns the consumer tag that + is used to uniquely identify the consumer with RabbitMQ. The + ``on_response`` method is passed in as a callback ``pika`` will + invoke when a message is fully received. +- ``stop``: this method cleanly shutdown the connection to RabbitMQ by + calling the ``stop_consuming`` method. The ``IOLoop`` is started + again because this method is invoked by raising a ``TerminateSignal`` + exception. This exception stops the ``IOLoop`` which needs to be + running for ``pika`` to communicate with RabbitMQ. All of the + commands issued prior to starting the ``IOLoop`` will be buffered but + not processed. +- ``stop_consuming``: this method sends a ``Basic.Cancel`` RPC command. + When RabbitMQ confirms the cancellation, the ``on_cancel_ok`` + callback methods will be invoked by ``pika``, which will then close + the channel and connection. +- ``wait_for_acks``: this method is invoked by every write methods in + the ``ProvenanceStorageInterface``, after sending a series of write + requests to the server. It will call ``wait_for_response`` until it + receives all expected ack responses, or until it receives the first + timeout. The timeout is calculated based on the number of expected + acks and class initialization parameters ``wait_per_batch`` and + ``wait_min``. +- ``wait_for_response``: this method is called from ``wait_for_acks`` + to retrieve ack packages from the internal ``_response_queue``. The + response correlation ID is used to validate the received ack + corresponds to the current write request. The method returns the + decoded body of the received response. + +ProvenanceStorageRabbitMQServer +=============================== + +The remote storage server is responsible for defining the ID range +splitting policy for each entity and relation in the provenance +solution. Based on this policy, it will launch a series of worker +processes that will take care of actually processing the requests for +each defined range in the partition. These processes are implemented in +the ``ProvenanceStorageRabbitMQWorker`` described below. + +To initialize a server object it is required to provide two mandatory +parameters: + +- ``url``: the URL string of the broker where the server expects to + receive the packages. +- ``storage_config``: a dictionary containing the storage configuration + for the local storage object, as expected by + ``swh.provenance.get_provenance_storage``. + +Additionally, some optional parameter can be specified that may affect +the performance of the remote storage as a whole: + +- ``batch_size``: an integer specifying the maximum allowed amount of + elements to be processed at a time, ie. forwarded to the underlying + storage object by each worker. Default to 100. +- ``prefetch_count``: an integer specifying how many packages are + prefetched from the broker by each worker. Default to 100. + +On initialization, the server will create the necessary +``ProvenanceStorageRabbitMQWorker``, forwarding to them the parameters +mentioned above, but it won’t launch these underlying processes until it +is explicitly started. To that end, the +``ProvenanceStorageRabbitMQServer`` objects provide the following +methods: - ``start``: this method launches all the necessary worker +subprocesses and ensures they all are in a proper consuming state before +returning control to the caller. - ``stop``: this method signals all +worker subprocesses for termination and blocks until they all finish +successfully. + +ID range splitting policy: +-------------------------- + +The ID range splitting policy is defined as follows + +- There is an exchange in the RabbitMQ broker for each entity in the + provenance solution, plus an extra one to handle locations: + ``content``, ``directory``, ``location``, ``origin``, and + ``revision``. +- Any request to add an entity should send the necessary packages to + the entity’s associated exchange for proper routing: ie. requests for + ``content_add`` will be handled by the ``content`` exchange. +- Any request to add a relation entry is handled by the exchange of the + source entity in the relation: ie. requests for ``relation_add`` with + ``relation=CNT_EARLY_IN_REV`` will be handled by the ``content`` + exchange. +- ID range splitting is done by looking at the first byte in the SWHID + of the entity (ie. a hex value), hence 16 possible ranges are defined + for each operation associated to each entity. In the case of + locations, a hex hash is calculated over its value. +- Each exchange then handles 16 queues for each method associated to + the exchange’s entity, with a ``direct`` routing policy. For + instance, the ``content`` exchange has 16 queues associated to each + of the following methods: ``content_add``, ``relation_add`` with + ``relation=CNT_EARLY_IN_REV``, and ``relation_add`` with + ``relation=CNT_IN_DIR`` (ie. a total of 48 queues). +- For each exchange, 16 ``ProvenanceStorageRabbitMQWorker`` processes + are launched, each of them taking care of one ID range for the + associated entity. + +All in all, this gives a total of 80 ``ProvenanceStorageRabbitMQWorker`` +processes (16 per exchange) and 160 RabbitMQ queues (48 for ``content`` +and ``revision``, 32 for ``directory``, and 16 for ``location`` and +``origin``). In this way, it is guaranteed that, regardless of the +operation being performed, there would never be more than one process +trying to write on a given ID range for a given entity. Thus, resolving +all potential conflicts. + +Although the ID range splitting policy is defined on the server side, so +it can properly configure and launch the necessary worker processes, it +is the client the responsible for actually splitting the input to each +write method and send the write requests to the proper queues for +RabbitMQ route them to the correct worker. For that, the server defines +a series of static methods that allow to query the ID range splitting +policy: + +- ``get_binding_keys``: this method is meant to be used by the server + workers. Given an exchange and a range ID, it yields all the RabbitMQ + routing keys the worker process should bind to. +- ``get_exchange``: given the name of a write method in the + ``ProvenanceStorageInterface``, and an optional relation type, it + return the name of the exchange to which the writing request should + be sent. +- ``get_exchanges``: this method yields the names of all the exchanges + in the RabbitMQ broker. +- ``get_meth_name``: this method is meant to be used by the server + workers. Given a binding key as returned by ``get_binding_keys``, it + return the ``ProvenanceStorageInterface`` method associated to it. +- ``get_meth_names``: given an exchange name, it yields all the methods + that are associated to it. In case of ``relation_add``, the method + also returns the supported relation type. +- ``get_ranges``: given an exchange name, it yields the integer value + of all supported ranges IDs (currently 0-15 for all exchanges). The + idea behind this method is to allow defining a custom ID range split + for each exchange. +- ``get_routing_key``: given the name of a write method in the + ``ProvenanceStorageInterface``, an optional relation type, and a + tuple with the data to be passed to the method (first parameter), + this method returns the routing key of the queue responsible to + handle that tuple. It is assumed that the first value in the tuple is + a ``Sha1Git`` ID. +- ``is_write_method``: given the name of a method in the + ``ProvenanceStorageInterface``, it decides if it is a write method or + not. + +ProvenanceStorageRabbitMQWorker +=============================== + +The remote storage worker consume messages published in the RabbitMQ +broker by the remote storage clients, and proceed to perform the actual +writing operations to a local storage object (ie. +``ProvenanceStorageMongoDb`` or ``ProvenanceStoragePostgreSql``). Each +worker process messages associated to a particular entity and range ID. +It is the client’s responsibility to properly split data along messages +according to the remote storage server policy. + +Since there is overlapping between methods in the +``ProvenanceInterface`` operating over the same entity, one worker may +have to handle more than one method to guarantee conflict-free writings +to the underlying storage. For instance, consider the ``content`` +entity, for a given ID range, methods ``content_add`` and +``relation_add`` with ``relation=CNT_EARLY_IN_REV`` may conflict. Is the +worker’s responsibility to solve this kind of conflicts. + +To initialize a server object it is required to provide two mandatory +parameters: + +- ``url``: the URL string of the broker where the server expects to + receive the packages. +- ``exchange``: the RabbitMQ exchange to which the worker will + subscribe. See ``ProvenanceStorageRabbitMQServer``\ ’s ID range + splitting policy for further details. +- ``range``: the range ID the worker will be processing. See + ``ProvenanceStorageRabbitMQServer``\ ’s ID range splitting policy for + further details. +- ``storage_config``: a dictionary containing the storage configuration + for the local storage object, as expected by + ``swh.provenance.get_provenance_storage``. + +Additionally, some optional parameter can be specified that may affect +the performance of the remote storage as a whole: + +- ``batch_size``: an integer specifying the maximum allowed amount of + elements to be processed at a time, ie. forwarded to the underlying + storage object for writing. Default to 100. +- ``prefetch_count``: an integer specifying how many packages are + prefetched from the broker. Default to 100. + +.. + + :warning: **This class is not meant to be used directly but through + an instance of ``ProvenanceStorageRabbitMQServer``. The parameters + ``url``, ``storage_config``, ``batch_size`` and ``prefetch_count`` + above are forwarded as passed to the server on initialization. + Additional arguments ``exchange`` and ``range`` are generated by the + server based on its ID range splitting policy.** + +Worker lifecycle +---------------- + +All interaction between the provenance solution and a remote storage +worker object happens through RabbitMQ packages. To maximize +concurrency, each instance of the ``ProvenanceStorageRabbitMQWorker`` +launches a distinct sub-process, hence avoiding unnecessary +synchronization with other components on the solution. For this, +``ProvenanceStorageRabbitMQWorker`` extends ``multiprocessing.Process`` +and only has a direct channel of communication to the master +``ProvenanceStorageRabbitMQServer``, through ``multiprocessing.Queue`` +structures. Then, the entry point of the sub-process is the method +``run`` which will in-turn launch a bunch of threads to handle the +different provenance methods the worker needs to support, and an extra +thread to handle communication with the server object. RabbitMQ’s +lifecycle will be taken care of in the main thread. + +The following is a diagram of the interaction between the methods of the +class: + +.. code:: graphviz + + digraph { + ProvenanceStorageRabbitMQServer + ProvenanceStorageRabbitMQWorker + + start[shape=record] + run[shape=record] + + connect[shape=record] + on_connection_open[shape=record] + on_connection_open_error[shape=record] + on_connection_closed[shape=record] + close_connection[shape=record] + open_channel[shape=record] + on_channel_open[shape=record] + on_channel_closed[shape=record] + + setup_exchange[shape=record] + on_exchange_declare_ok[shape=record] + + setup_queues[shape=record] + on_queue_declare_ok[shape=record] + on_bind_ok[shape=record] + on_basic_qos_ok[shape=record] + start_consuming[shape=record] + on_consumer_cancelled[shape=record] + on_request[shape=record] + stop_consuming[shape=record] + on_cancel_ok[shape=record] + + request_termination[shape=record] + stop[shape=record] + + respond[shape=record] + get_conflicts_func[shape=record] + + subgraph cluster_command_thread { + style=rounded + bgcolor=gray95 + color=gray + labelloc=b + + run_command_thread[shape=record] + } + + subgraph cluster_request_thread { + style=rounded + bgcolor=gray95 + color=gray + labelloc=b + + ProvenanceStorageInterface + + run_request_thread[shape=record] + } + + ProvenanceStorageRabbitMQWorker->{start} + + start->{run} + stop->{stop_consuming} + + run->{connect,run_command_thread,run_request_thread,stop} + + connect->{on_connection_open,on_connection_open_error,on_connection_closed} + + on_connection_open->{open_channel} + + open_channel->{on_channel_open} + + on_cancel_ok->{on_channel_closed} + on_consumer_cancelled->{on_channel_closed} + on_channel_open->{setup_exchange} + + on_channel_closed->{close_connection} + + setup_exchange->{on_exchange_declare_ok} + on_exchange_declare_ok->{setup_queues} + + setup_queues->{on_queue_declare_ok} + on_queue_declare_ok->{on_bind_ok} + on_bind_ok->{on_basic_qos_ok} + on_basic_qos_ok->{start_consuming} + + start_consuming->{on_consumer_cancelled,on_request} + start_consuming->{ProvenanceStorageRabbitMQServer}[label=" signal",arrowhead="none"] + + on_request->{run_request_thread}[label=" _request_queues",arrowhead="none"] + + stop_consuming->{on_cancel_ok} + + run_command_thread->{request_termination} + run_command_thread->{ProvenanceStorageRabbitMQServer}[label=" command",arrowhead="none"] + + run_request_thread->{ProvenanceStorageInterface,get_conflicts_func,respond,request_termination} + + request_termination->{run}[label="TerminateSignal",arrowhead="none"] + } + +There is a request thread for each ``ProvenanceStorageInterface`` method +the worker needs to handle, each thread with its own provenance storage +object (ie. exclusive connection). Each of these threads will receive +sets of parameters to be passed to their correspondent method and +perform explicit conflict resolution by using the method-specific +function returned by ``get_conflicts_func``, prior to passing these +parameters to the underlying storage. + +Below there is a summary of the methods present in the diagram and their +functionality. Methods are listed in alphabetic order: + +- ``close_connection``: this method properly closes the connection to + RabbitMQ. +- ``connect``: this method connects to RabbitMQ, returning the + connection handle. When the connection is established, the + ``on_connection_open`` callback method will be invoked by ``pika``. + If there is an error establishing the connection, the + ``on_connection_open_error`` callback method will be invoked by + ``pika``. If the connection closes unexpectedly, the + ``on_connection_closed`` callback method will be invoked by ``pika``. +- ``on_basic_qos_ok``: this callback method is invoked by ``pika`` when + the ``Basic.QoS`` RPC call made in ``on_bind_ok`` has completed. At + this point it is safe to start consuming messages, hence the + ``start_consuming`` method is called, which will invoke the needed + RPC commands to start the process. +- ``on_bind_ok``:this callback method is invoked by ``pika`` when the + ``Queue.Bind`` RPC call made in ``on_queue_declare_ok`` has + completed. This method sets up the consumer prefetch count by + invoking the ``Basic.QoS`` RPC command. When it is completed, the + ``on_basic_qos_ok`` method will be invoked by ``pika``. +- ``on_cancel_ok``: this method is invoked by ``pika`` when RabbitMQ + acknowledges the cancellation of a consumer. At this point the + channel close is requested, thus indirectly invoking the + ``on_channel_closed`` callback method once the channel has been + closed, which will in-turn close the connection. +- ``on_channel_closed``: this callback method is invoked by ``pika`` + when RabbitMQ unexpectedly closes the channel. Channels are usually + closed if there is an attempt to do something that violates the + protocol, such as re-declare an exchange or queue with different + parameters. In this case, the connection will be closed by invoking + the ``close_connection``. +- ``on_channel_open``: this callback method is invoked by ``pika`` when + the channel has been opened. The ``on_channel_closed`` callback is + set here. Since the channel is now open, it is safe to declare the + exchange to use by invoking the ``setup_exchange`` method. +- ``on_connection_closed``: this callback method is invoked by ``pika`` + when the connection to RabbitMQ is closed unexpectedly. Since it is + unexpected, an attempt to reconnect to RabbitMQ will be done. +- ``on_connection_open``: this callback method is called by ``pika`` + once the connection to RabbitMQ has been established. It proceeds to + open the channel by calling the ``open_channel`` method. +- ``on_connection_open_error``: this callback method is called by + ``pika`` if the connection to RabbitMQ can’t be established. Since it + is unexpected, an attempt to reconnect to RabbitMQ will be done. +- ``on_consumer_cancelled``: this callback method is invoked by + ``pika`` when RabbitMQ sends a ``Basic.Cancel`` for a consumer + receiving messages. At this point the channel close is requested, + thus indirectly invoking the ``on_channel_closed`` callback method + once the channel has been closed, which will in-turn close the + connection. +- ``on_exchange_declare_ok``: this callback methods is invoked by + ``pika`` when the ``Exchange.Declare`` RPC call made in + ``setup_exchange`` has completed. At this point it is time to set up + the queues for the different request handling threads. This is done + by calling the ``setup_queues`` method. +- ``on_queue_declare_ok``: this callback method is invoked by ``pika`` + when each ``Queue.Declare`` RPC call made in ``setup_queues`` has + completed. Now it is time to bind the current queue and exchange + together with the correspondent routing key by issuing the + ``Queue.Bind`` RPC command. When this command is completed, the + ``on_bind_ok`` method will be invoked by ``pika``. +- ``on_request``: this callback method is invoked by ``pika`` when a + message is delivered from RabbitMQ to any of the queues bound by the + worker. The decoded request together with its correlation ID and + reply-to property are enqueued in the correspondent internal + ``_request_queues`` (the actual queue it identified by the message + routing key), so that the data is forwarded to the thread that + handles the particular method the message is associated to. A + ``Basic.Ack`` RPC command is issued to acknowledge the delivery of + the message to RabbitMQ. +- ``open_channel``: this method opens a new channel with RabbitMQ by + issuing the ``Channel.Open`` RPC command. When RabbitMQ responds that + the channel is open, the ``on_channel_open`` callback will be invoked + by ``pika``. +- ``request_termination``: this method send a signal to the main thread + of the process to cleanly release resources and terminate. This is + done by setting a callback in the ``IOLoop`` that raises a + ``TerminateSignal``, which will eventually be handled in by ``run`` + method. +- ``respond``: this methods send a message to RabbitMQ by issuing a + ``Basic.Publish`` RPC command. The body of the message is properly + encoded, while correlation ID and request key are used as passed by + the calling method. +- ``run``: main method of the process. It launches a thread to handle + communication with the ``ProvenanceStorageRabbitMQServer`` object, + targeting the ``run_command_thread`` method. Also, it launches on + thread for each ``ProvenanceStorageInterface`` method the worker + needs to handle, each targeting the ``run_request_thread`` method. + Finally, it requests to open a connection to RabbitMQ by calling the + ``connect`` method, and starts the internal ``IOLoop`` of the + returned handle (blocking operation). In case of failure, this method + will indefinitely try to reconnect. When an explicit + ``TerminateSignal`` is received, the ``stop`` method is invoked. All + internal thread will be signalled for termination as well, and + resources released. +- ``run_command_thread``: main method of the command thread. It + received external commands from the + ``ProvenanceStorageRabbitMQServer`` object through a + ``multiprocessing.Queue`` structure. The only supported command for + now is for the worker to be signalled to terminate, in which case the + ``request_termination`` method is invoked. +- ``run_request_thread``: main method of the request threads. The + worker has one such thread per ``ProvenanceStorageInterface`` method + it needs to handle. This method will initialize its own storage + object and interact with the ``on_request`` through a ``queue.Queue`` + structure, waiting for items to be passed to the storage method + associated with the thread. When elements become available, it will + perform a conflict resolution step by resorting to the + method-specific function returned by ``get_conflicts_func``. After + this it will forward the items to the underlying storage object. If + the storage method returns successfully, acknowledgements are sent + back to each client by through ``respond`` method. If the call to the + storage method fails, items will be enqueued back to retry in a + future iteration. In case of an unexpected exception, the worker is + signalled for termination by calling the ``request_termination`` + method. +- ``setup_exchange``: this method sets up the exchange on RabbitMQ by + invoking the ``Exchange.Declare`` RPC command. When it is completed, + the ``on_exchange_declare_ok`` method will be invoked by ``pika``. +- ``setup_queues``: this methods sets up the necessary queues for the + worker on RabbitMQ by invoking several ``Queue.Declare`` RPC commands + (one per supported provenance storage method). When each command is + completed, the ``on_queue_declare_ok`` method will be invoked by + ``pika``. +- ``start``: inherited from ``multiprocessing.Process``. It launches + the worker sub-process with method ``run`` as target. +- ``start_consuming``: this method sets up the worker by first + registering the ``add_on_cancel_callback`` callback, so that the + object is notified if RabbitMQ cancels the consumer. It then issues + one ``Basic.Consume`` RPC command per supported provenance storage + method, which return the consumer tag that is used to uniquely + identify the consumer with RabbitMQ. The ``on_request`` method is + passed in as a callback ``pika`` will invoke when a message is fully + received. After setting up all consumers, a ``CONSUMING`` signal is + sent to the ``ProvenanceStorageRabbitMQServer`` object through a + ``multiprocessing.Queue`` structure. +- ``stop``: this method cleanly shutdown the connection to RabbitMQ by + calling the ``stop_consuming`` method. The ``IOLoop`` is started + again because this method is invoked by raising a ``TerminateSignal`` + exception. This exception stops the ``IOLoop`` which needs to be + running for ``pika`` to communicate with RabbitMQ. All of the + commands issued prior to starting the ``IOLoop`` will be buffered but + not processed. +- ``stop_consuming``: this method sends a ``Basic.Cancel`` RPC command + for each supported provenance storage method. When RabbitMQ confirms + the cancellation, the ``on_cancel_ok`` callback methods will be + invoked by ``pika``, which will then close the channel and + connection. +- ``get_conflicts_func``: this method returns the conflict resolution + function to be used based on the provenance storage method. diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -17,6 +17,9 @@ [mypy-msgpack.*] ignore_missing_imports = True +[mypy-pika.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ iso8601 methodtools mongomock +pika pymongo PyYAML types-click diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -92,4 +92,12 @@ engine = kwargs.get("engine", "pymongo") return ProvenanceStorageMongoDb(engine=engine, **kwargs["db"]) + elif cls == "rabbitmq": + from .api.client import ProvenanceStorageRabbitMQClient + + rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs) + if TYPE_CHECKING: + assert isinstance(rmq_storage, ProvenanceStorageInterface) + return rmq_storage + raise ValueError diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py --- a/swh/provenance/api/client.py +++ b/swh/provenance/api/client.py @@ -2,3 +2,466 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + +from __future__ import annotations + +import functools +import inspect +import logging +import queue +import threading +import time +from types import TracebackType +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +import uuid + +import pika +import pika.channel +import pika.connection +import pika.frame +import pika.spec + +from swh.core.api.serializers import encode_data_client as encode_data +from swh.core.api.serializers import msgpack_loads as decode_data +from swh.core.statsd import statsd + +from .. import get_provenance_storage +from ..interface import ProvenanceStorageInterface, RelationData, RelationType +from .serializers import DECODERS, ENCODERS +from .server import ProvenanceStorageRabbitMQServer + +LOG_FORMAT = ( + "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " + "-35s %(lineno) -5d: %(message)s" +) +LOGGER = logging.getLogger(__name__) + +STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds" + + +class ResponseTimeout(Exception): + pass + + +class TerminateSignal(Exception): + pass + + +def split_ranges( + data: Iterable[bytes], meth_name: str, relation: Optional[RelationType] = None +) -> Dict[str, List[Tuple[Any, ...]]]: + ranges: Dict[str, List[Tuple[Any, ...]]] = {} + if relation is not None: + assert isinstance(data, dict) + for src, dsts in data.items(): + key = ProvenanceStorageRabbitMQServer.get_routing_key( + (src,), meth_name, relation + ) + for rel in dsts: + assert isinstance(rel, RelationData) + ranges.setdefault(key, []).append((src, rel.dst, rel.path)) + else: + items: Union[List[Tuple[Any, Any]], List[Tuple[Any]]] + if isinstance(data, dict): + items = list(data.items()) + else: + items = list({(item,) for item in data}) + for item in items: + key = ProvenanceStorageRabbitMQServer.get_routing_key(item, meth_name) + ranges.setdefault(key, []).append(item) + return ranges + + +class MetaRabbitMQClient(type): + def __new__(cls, name, bases, attributes): + # For each method wrapped with @remote_api_endpoint in an API backend + # (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new + # method in RemoteStorage, with the same documentation. + # + # Note that, despite the usage of decorator magic (eg. functools.wrap), + # this never actually calls an IndexerStorage method. + backend_class = attributes.get("backend_class", None) + for base in bases: + if backend_class is not None: + break + backend_class = getattr(base, "backend_class", None) + if backend_class: + for meth_name, meth in backend_class.__dict__.items(): + if hasattr(meth, "_endpoint_path"): + cls.__add_endpoint(meth_name, meth, attributes) + return super().__new__(cls, name, bases, attributes) + + @staticmethod + def __add_endpoint(meth_name, meth, attributes): + wrapped_meth = inspect.unwrap(meth) + + @functools.wraps(meth) # Copy signature and doc + def meth_(*args, **kwargs): + with statsd.timed( + metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} + ): + # Match arguments and parameters + data = inspect.getcallargs(wrapped_meth, *args, **kwargs) + + # Remove arguments that should not be passed + self = data.pop("self") + + # Call storage method with remaining arguments + return getattr(self._storage, meth_name)(**data) + + @functools.wraps(meth) # Copy signature and doc + def write_meth_(*args, **kwargs): + with statsd.timed( + metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} + ): + # Match arguments and parameters + post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) + + try: + # Remove arguments that should not be passed + self = post_data.pop("self") + relation = post_data.pop("relation", None) + assert len(post_data) == 1 + data = next(iter(post_data.values())) + + ranges = split_ranges(data, meth_name, relation) + acks_expected = sum(len(items) for items in ranges.values()) + self._correlation_id = str(uuid.uuid4()) + + exchange = ProvenanceStorageRabbitMQServer.get_exchange( + meth_name, relation + ) + for routing_key, items in ranges.items(): + batches = ( + items[idx : idx + self._batch_size] + for idx in range(0, len(items), self._batch_size) + ) + for batch in batches: + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # request can be sent for the current elements. This + # situation should be handled properly. + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQClient.request, + channel=self._channel, + reply_to=self._callback_queue, + exchange=exchange, + routing_key=routing_key, + correlation_id=self._correlation_id, + data=batch, + ) + ) + return self.wait_for_acks(acks_expected) + except BaseException as ex: + self.request_termination(str(ex)) + return False + + if meth_name not in attributes: + attributes[meth_name] = ( + write_meth_ + if ProvenanceStorageRabbitMQServer.is_write_method(meth_name) + else meth_ + ) + + +class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient): + backend_class = ProvenanceStorageInterface + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + def __init__( + self, + url: str, + storage_config: Dict[str, Any], + batch_size: int = 100, + prefetch_count: int = 100, + wait_min: float = 60, + wait_per_batch: float = 10, + ) -> None: + """Setup the client object, passing in the URL we will use to connect to + RabbitMQ, and the connection information for the local storage object used + for read-only operations. + + :param str url: The URL for connecting to RabbitMQ + :param dict storage_config: Configuration parameters for the underlying + ``ProvenanceStorage`` object expected by + ``swh.provenance.get_provenance_storage`` + :param int batch_size: Max amount of elements per package (after range + splitting) for writing operations + :param int prefetch_count: Prefetch value for the RabbitMQ connection when + receiving ack packages + :param float wait_min: Min waiting time for response on a writing operation, in + seconds + :param float wait_per_batch: Waiting time for response per batch of items on a + writing operation, in seconds + + """ + super().__init__() + + self._connection = None + self._callback_queue: Optional[str] = None + self._channel = None + self._closing = False + self._consumer_tag = None + self._consuming = False + self._correlation_id = str(uuid.uuid4()) + self._prefetch_count = prefetch_count + + self._batch_size = batch_size + self._response_queue: queue.Queue = queue.Queue() + self._storage = get_provenance_storage(**storage_config) + self._url = url + + self._wait_min = wait_min + self._wait_per_batch = wait_per_batch + + def __enter__(self) -> ProvenanceStorageInterface: + self.open() + assert isinstance(self, ProvenanceStorageInterface) + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.close() + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) + def open(self) -> None: + self.start() + while self._callback_queue is None: + time.sleep(0.1) + self._storage.open() + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) + def close(self) -> None: + assert self._connection is not None + self._connection.ioloop.add_callback_threadsafe(self.request_termination) + self.join() + self._storage.close() + + def request_termination(self, reason: str = "Normal shutdown") -> None: + assert self._connection is not None + + def termination_callback(): + raise TerminateSignal(reason) + + self._connection.ioloop.add_callback_threadsafe(termination_callback) + + def connect(self) -> pika.SelectConnection: + LOGGER.info("Connecting to %s", self._url) + return pika.SelectConnection( + parameters=pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_open_error_callback=self.on_connection_open_error, + on_close_callback=self.on_connection_closed, + ) + + def close_connection(self) -> None: + assert self._connection is not None + self._consuming = False + if self._connection.is_closing or self._connection.is_closed: + LOGGER.info("Connection is closing or already closed") + else: + LOGGER.info("Closing connection") + self._connection.close() + + def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: + LOGGER.info("Connection opened") + self.open_channel() + + def on_connection_open_error( + self, _unused_connection: pika.SelectConnection, err: Exception + ) -> None: + LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) + assert self._connection is not None + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): + assert self._connection is not None + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def open_channel(self) -> None: + LOGGER.info("Creating a new channel") + assert self._connection is not None + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_channel_open(self, channel: pika.channel.Channel) -> None: + LOGGER.info("Channel opened") + self._channel = channel + LOGGER.info("Adding channel close callback") + assert self._channel is not None + self._channel.add_on_close_callback(callback=self.on_channel_closed) + self.setup_queue() + + def on_channel_closed( + self, channel: pika.channel.Channel, reason: Exception + ) -> None: + LOGGER.warning("Channel %i was closed: %s", channel, reason) + self.close_connection() + + def setup_queue(self) -> None: + LOGGER.info("Declaring callback queue") + assert self._channel is not None + self._channel.queue_declare( + queue="", exclusive=True, callback=self.on_queue_declare_ok + ) + + def on_queue_declare_ok(self, frame: pika.frame.Method) -> None: + LOGGER.info("Binding queue to default exchanger") + assert self._channel is not None + self._callback_queue = frame.method.queue + self._channel.basic_qos( + prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok + ) + + def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: + LOGGER.info("QOS set to: %d", self._prefetch_count) + self.start_consuming() + + def start_consuming(self) -> None: + LOGGER.info("Issuing consumer related RPC commands") + LOGGER.info("Adding consumer cancellation callback") + assert self._channel is not None + self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) + assert self._callback_queue is not None + self._consumer_tag = self._channel.basic_consume( + queue=self._callback_queue, on_message_callback=self.on_response + ) + self._consuming = True + + def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: + LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) + if self._channel: + self._channel.close() + + def on_response( + self, + channel: pika.channel.Channel, + deliver: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes, + ) -> None: + LOGGER.info( + "Received message # %s from %s: %s", + deliver.delivery_tag, + properties.app_id, + body, + ) + self._response_queue.put( + ( + properties.correlation_id, + decode_data(body, extra_decoders=self.extra_type_decoders), + ) + ) + LOGGER.info("Acknowledging message %s", deliver.delivery_tag) + channel.basic_ack(delivery_tag=deliver.delivery_tag) + + def stop_consuming(self) -> None: + if self._channel: + LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") + self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok) + + def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None: + self._consuming = False + LOGGER.info( + "RabbitMQ acknowledged the cancellation of the consumer: %s", + self._consumer_tag, + ) + LOGGER.info("Closing the channel") + assert self._channel is not None + self._channel.close() + + def run(self) -> None: + while not self._closing: + try: + self._connection = self.connect() + assert self._connection is not None + self._connection.ioloop.start() + except KeyboardInterrupt: + LOGGER.info("Connection closed by keyboard interruption, reopening") + if self._connection is not None: + self._connection.ioloop.stop() + except TerminateSignal as ex: + LOGGER.info("Termination requested: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + except BaseException as ex: + LOGGER.warning("Unexpected exception, terminating: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + LOGGER.info("Stopped") + + def stop(self) -> None: + assert self._connection is not None + if not self._closing: + self._closing = True + LOGGER.info("Stopping") + if self._consuming: + self.stop_consuming() + self._connection.ioloop.start() + else: + self._connection.ioloop.stop() + LOGGER.info("Stopped") + + @staticmethod + def request( + channel: pika.channel.Channel, + reply_to: str, + exchange: str, + routing_key: str, + correlation_id: str, + **kwargs, + ) -> None: + channel.basic_publish( + exchange=exchange, + routing_key=routing_key, + properties=pika.BasicProperties( + content_type="application/msgpack", + correlation_id=correlation_id, + reply_to=reply_to, + ), + body=encode_data( + kwargs, + extra_encoders=ProvenanceStorageRabbitMQClient.extra_type_encoders, + ), + ) + + def wait_for_acks(self, acks_expected: int) -> bool: + acks_received = 0 + timeout = max( + (acks_expected / self._batch_size) * self._wait_per_batch, + self._wait_min, + ) + while acks_received < acks_expected: + try: + acks_received += self.wait_for_response(timeout=timeout) + except ResponseTimeout: + LOGGER.warning( + "Timed out waiting for acks, %s received, %s expected", + acks_received, + acks_expected, + ) + return False + return acks_received == acks_expected + + def wait_for_response(self, timeout: float = 60.0) -> Any: + while True: + try: + correlation_id, response = self._response_queue.get(timeout=timeout) + if correlation_id == self._correlation_id: + return response + except queue.Empty: + raise ResponseTimeout diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py --- a/swh/provenance/api/server.py +++ b/swh/provenance/api/server.py @@ -3,10 +3,640 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter +from datetime import datetime +from enum import Enum +import functools +import logging +import multiprocessing import os -from typing import Any, Dict, Optional +import queue +import threading +from typing import Any, Callable +from typing import Counter as TCounter +from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union, cast + +import pika +import pika.channel +import pika.connection +import pika.exceptions +from pika.exchange_type import ExchangeType +import pika.frame +import pika.spec from swh.core import config +from swh.core.api.serializers import encode_data_client as encode_data +from swh.core.api.serializers import msgpack_loads as decode_data +from swh.model.model import Sha1Git + +from .. import get_provenance_storage +from ..interface import EntityType, RelationData, RelationType, RevisionData +from ..util import path_id +from .serializers import DECODERS, ENCODERS + +LOG_FORMAT = ( + "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " + "-35s %(lineno) -5d: %(message)s" +) +LOGGER = logging.getLogger(__name__) + +TERMINATE = object() + + +class ServerCommand(Enum): + TERMINATE = "terminate" + CONSUMING = "consuming" + + +class TerminateSignal(BaseException): + pass + + +def resolve_dates( + dates: Iterable[Union[Tuple[Sha1Git, Optional[datetime]], Tuple[Sha1Git]]] +) -> Dict[Sha1Git, Optional[datetime]]: + result: Dict[Sha1Git, Optional[datetime]] = {} + for row in dates: + sha1 = row[0] + date = ( + cast(Tuple[Sha1Git, Optional[datetime]], row)[1] if len(row) > 1 else None + ) + known = result.setdefault(sha1, None) + if date is not None and (known is None or date < known): + result[sha1] = date + return result + + +def resolve_revision( + data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]] +) -> Dict[Sha1Git, RevisionData]: + result: Dict[Sha1Git, RevisionData] = {} + for row in data: + sha1 = row[0] + rev = ( + cast(Tuple[Sha1Git, RevisionData], row)[1] + if len(row) > 1 + else RevisionData(date=None, origin=None) + ) + known = result.setdefault(sha1, RevisionData(date=None, origin=None)) + value = known + if rev.date is not None and (known.date is None or rev.date < known.date): + value = RevisionData(date=rev.date, origin=value.origin) + if rev.origin is not None: + value = RevisionData(date=value.date, origin=rev.origin) + if value != known: + result[sha1] = value + return result + + +def resolve_relation( + data: Iterable[Tuple[Sha1Git, Sha1Git, bytes]] +) -> Dict[Sha1Git, Set[RelationData]]: + result: Dict[Sha1Git, Set[RelationData]] = {} + for src, dst, path in data: + result.setdefault(src, set()).add(RelationData(dst=dst, path=path)) + return result + + +class ProvenanceStorageRabbitMQWorker(multiprocessing.Process): + EXCHANGE_TYPE = ExchangeType.direct + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + def __init__( + self, + url: str, + exchange: str, + range: int, + storage_config: Dict[str, Any], + batch_size: int = 100, + prefetch_count: int = 100, + ) -> None: + """Setup the worker object, passing in the URL we will use to connect to + RabbitMQ, the exchange to use, the range id on which to operate, and the + connection information for the underlying local storage object. + + :param str url: The URL for connecting to RabbitMQ + :param str exchange: The name of the RabbitMq exchange to use + :param str range: The ID range to operate on + :param dict storage_config: Configuration parameters for the underlying + ``ProvenanceStorage`` object expected by + ``swh.provenance.get_provenance_storage`` + :param int batch_size: Max amount of elements call to the underlying storage + :param int prefetch_count: Prefetch value for the RabbitMQ connection when + receiving messaged + + """ + super().__init__(name=f"{exchange}_{range:x}") + + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag: Dict[str, str] = {} + self._consuming: Dict[str, bool] = {} + self._prefetch_count = prefetch_count + + self._url = url + self._exchange = exchange + self._binding_keys = list( + ProvenanceStorageRabbitMQServer.get_binding_keys(self._exchange, range) + ) + self._queues: Dict[str, str] = {} + self._storage_config = storage_config + self._batch_size = batch_size + + self.command: multiprocessing.Queue = multiprocessing.Queue() + self.signal: multiprocessing.Queue = multiprocessing.Queue() + + def connect(self) -> pika.SelectConnection: + LOGGER.info("Connecting to %s", self._url) + return pika.SelectConnection( + parameters=pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_open_error_callback=self.on_connection_open_error, + on_close_callback=self.on_connection_closed, + ) + + def close_connection(self) -> None: + assert self._connection is not None + self._consuming = {binding_key: False for binding_key in self._binding_keys} + if self._connection.is_closing or self._connection.is_closed: + LOGGER.info("Connection is closing or already closed") + else: + LOGGER.info("Closing connection") + self._connection.close() + + def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: + LOGGER.info("Connection opened") + self.open_channel() + + def on_connection_open_error( + self, _unused_connection: pika.SelectConnection, err: Exception + ) -> None: + LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) + assert self._connection is not None + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): + assert self._connection is not None + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def open_channel(self) -> None: + LOGGER.info("Creating a new channel") + assert self._connection is not None + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_channel_open(self, channel: pika.channel.Channel) -> None: + LOGGER.info("Channel opened") + self._channel = channel + LOGGER.info("Adding channel close callback") + assert self._channel is not None + self._channel.add_on_close_callback(callback=self.on_channel_closed) + self.setup_exchange() + + def on_channel_closed( + self, channel: pika.channel.Channel, reason: Exception + ) -> None: + LOGGER.warning("Channel %i was closed: %s", channel, reason) + self.close_connection() + + def setup_exchange(self) -> None: + LOGGER.info("Declaring exchange %s", self._exchange) + assert self._channel is not None + self._channel.exchange_declare( + exchange=self._exchange, + exchange_type=self.EXCHANGE_TYPE, + callback=self.on_exchange_declare_ok, + ) + + def on_exchange_declare_ok(self, _unused_frame: pika.frame.Method) -> None: + LOGGER.info("Exchange declared: %s", self._exchange) + self.setup_queues() + + def setup_queues(self) -> None: + for binding_key in self._binding_keys: + LOGGER.info("Declaring queue %s", binding_key) + assert self._channel is not None + callback = functools.partial( + self.on_queue_declare_ok, + binding_key=binding_key, + ) + self._channel.queue_declare(queue=binding_key, callback=callback) + + def on_queue_declare_ok(self, frame: pika.frame.Method, binding_key: str) -> None: + LOGGER.info( + "Binding queue %s to exchange %s with routing key %s", + frame.method.queue, + self._exchange, + binding_key, + ) + assert self._channel is not None + callback = functools.partial(self.on_bind_ok, queue_name=frame.method.queue) + self._queues[binding_key] = frame.method.queue + self._channel.queue_bind( + queue=frame.method.queue, + exchange=self._exchange, + routing_key=binding_key, + callback=callback, + ) + + def on_bind_ok(self, _unused_frame: pika.frame.Method, queue_name: str) -> None: + LOGGER.info("Queue bound: %s", queue_name) + assert self._channel is not None + self._channel.basic_qos( + prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok + ) + + def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: + LOGGER.info("QOS set to: %d", self._prefetch_count) + self.start_consuming() + + def start_consuming(self) -> None: + LOGGER.info("Issuing consumer related RPC commands") + LOGGER.info("Adding consumer cancellation callback") + assert self._channel is not None + self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) + for binding_key in self._binding_keys: + self._consumer_tag[binding_key] = self._channel.basic_consume( + queue=self._queues[binding_key], on_message_callback=self.on_request + ) + self._consuming[binding_key] = True + self.signal.put(ServerCommand.CONSUMING) + + def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: + LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) + if self._channel: + self._channel.close() + + def on_request( + self, + channel: pika.channel.Channel, + deliver: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes, + ) -> None: + LOGGER.info( + "Received message # %s from %s: %s", + deliver.delivery_tag, + properties.app_id, + body, + ) + # XXX: for some reason this function is returning lists instead of tuples + # (the client send tuples) + batch = decode_data(data=body, extra_decoders=self.extra_type_decoders)["data"] + for item in batch: + self._request_queues[deliver.routing_key].put( + (tuple(item), (properties.correlation_id, properties.reply_to)) + ) + LOGGER.info("Acknowledging message %s", deliver.delivery_tag) + channel.basic_ack(delivery_tag=deliver.delivery_tag) + + def stop_consuming(self) -> None: + if self._channel: + LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") + for binding_key in self._binding_keys: + callback = functools.partial(self.on_cancel_ok, binding_key=binding_key) + self._channel.basic_cancel( + self._consumer_tag[binding_key], callback=callback + ) + + def on_cancel_ok(self, _unused_frame: pika.frame.Method, binding_key: str) -> None: + self._consuming[binding_key] = False + LOGGER.info( + "RabbitMQ acknowledged the cancellation of the consumer: %s", + self._consuming[binding_key], + ) + LOGGER.info("Closing the channel") + assert self._channel is not None + self._channel.close() + + def run(self) -> None: + self._command_thread = threading.Thread(target=self.run_command_thread) + self._command_thread.start() + + self._request_queues: Dict[str, queue.Queue] = {} + self._request_threads: Dict[str, threading.Thread] = {} + for binding_key in self._binding_keys: + meth_name, relation = ProvenanceStorageRabbitMQServer.get_meth_name( + binding_key + ) + self._request_queues[binding_key] = queue.Queue() + self._request_threads[binding_key] = threading.Thread( + target=self.run_request_thread, + args=(binding_key, meth_name, relation), + ) + self._request_threads[binding_key].start() + + while not self._closing: + try: + self._connection = self.connect() + assert self._connection is not None + self._connection.ioloop.start() + except KeyboardInterrupt: + LOGGER.info("Connection closed by keyboard interruption, reopening") + if self._connection is not None: + self._connection.ioloop.stop() + except TerminateSignal as ex: + LOGGER.info("Termination requested: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + except BaseException as ex: + LOGGER.warning("Unexpected exception, terminating: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + + for binding_key in self._binding_keys: + self._request_queues[binding_key].put(TERMINATE) + for binding_key in self._binding_keys: + self._request_threads[binding_key].join() + self._command_thread.join() + LOGGER.info("Stopped") + + def run_command_thread(self) -> None: + while True: + try: + command = self.command.get() + if command == ServerCommand.TERMINATE: + self.request_termination() + break + except queue.Empty: + pass + except BaseException as ex: + self.request_termination(str(ex)) + break + + def request_termination(self, reason: str = "Normal shutdown") -> None: + assert self._connection is not None + + def termination_callback(): + raise TerminateSignal(reason) + + self._connection.ioloop.add_callback_threadsafe(termination_callback) + + def run_request_thread( + self, binding_key: str, meth_name: str, relation: Optional[RelationType] + ) -> None: + with get_provenance_storage(**self._storage_config) as storage: + request_queue = self._request_queues[binding_key] + merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name) + while True: + terminate = False + elements = [] + while True: + try: + # TODO: consider reducing this timeout or removing it + elem = request_queue.get(timeout=0.1) + if elem is TERMINATE: + terminate = True + break + elements.append(elem) + except queue.Empty: + break + + if len(elements) >= self._batch_size: + break + + if terminate: + break + + if not elements: + continue + + try: + items, props = zip(*elements) + acks_count: TCounter[Tuple[str, str]] = Counter(props) + data = merge_items(items) + + args = (relation, data) if relation is not None else (data,) + if getattr(storage, meth_name)(*args): + for (correlation_id, reply_to), count in acks_count.items(): + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # response can be sent for the current elements. This + # situation should be handled properly. + assert self._connection is not None + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQWorker.respond, + channel=self._channel, + correlation_id=correlation_id, + reply_to=reply_to, + response=count, + ) + ) + else: + LOGGER.warning( + "Unable to process elements for queue %s", binding_key + ) + for elem in elements: + request_queue.put(elem) + except BaseException as ex: + self.request_termination(str(ex)) + break + + def stop(self) -> None: + assert self._connection is not None + if not self._closing: + self._closing = True + LOGGER.info("Stopping") + if any(self._consuming): + self.stop_consuming() + self._connection.ioloop.start() + else: + self._connection.ioloop.stop() + LOGGER.info("Stopped") + + @staticmethod + def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]: + if meth_name in ["content_add", "directory_add"]: + return resolve_dates + elif meth_name == "location_add": + return lambda data: set(data) # just remove duplicates + elif meth_name == "origin_add": + return lambda data: dict(data) # last processed value is good enough + elif meth_name == "revision_add": + return resolve_revision + elif meth_name == "relation_add": + return resolve_relation + else: + LOGGER.warning( + "Unexpected conflict resolution function request for method %s", + meth_name, + ) + return lambda x: x + + @staticmethod + def respond( + channel: pika.channel.Channel, + correlation_id: str, + reply_to: str, + response: Any, + ): + channel.basic_publish( + exchange="", + routing_key=reply_to, + properties=pika.BasicProperties( + content_type="application/msgpack", + correlation_id=correlation_id, + ), + body=encode_data( + response, + extra_encoders=ProvenanceStorageRabbitMQServer.extra_type_encoders, + ), + ) + + +class ProvenanceStorageRabbitMQServer: + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + queue_count = 16 + + def __init__( + self, + url: str, + storage_config: Dict[str, Any], + batch_size: int = 100, + prefetch_count: int = 100, + ) -> None: + """Setup the server object, passing in the URL we will use to connect to + RabbitMQ, and the connection information for the underlying local storage + object. + + :param str url: The URL for connecting to RabbitMQ + :param dict storage_config: Configuration parameters for the underlying + ``ProvenanceStorage`` object expected by + ``swh.provenance.get_provenance_storage`` + :param int batch_size: Max amount of elements call to the underlying storage + :param int prefetch_count: Prefetch value for the RabbitMQ connection when + receiving messaged + + """ + self._workers: List[ProvenanceStorageRabbitMQWorker] = [] + for exchange in ProvenanceStorageRabbitMQServer.get_exchanges(): + for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange): + worker = ProvenanceStorageRabbitMQWorker( + url=url, + exchange=exchange, + range=range, + storage_config=storage_config, + batch_size=batch_size, + prefetch_count=prefetch_count, + ) + self._workers.append(worker) + self._running = False + + def start(self) -> None: + if not self._running: + self._running = True + for worker in self._workers: + worker.start() + for worker in self._workers: + try: + signal = worker.signal.get(timeout=60) + assert signal == ServerCommand.CONSUMING + except queue.Empty: + LOGGER.error( + "Could not initialize worker %s. Leaving...", worker.name + ) + self.stop() + return + LOGGER.info("Start serving") + + def stop(self) -> None: + if self._running: + for worker in self._workers: + worker.command.put(ServerCommand.TERMINATE) + for worker in self._workers: + worker.join() + LOGGER.info("Stop serving") + self._running = False + + @staticmethod + def get_binding_keys(exchange: str, range: int) -> Generator[str, None, None]: + for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names( + exchange + ): + if relation is None: + yield f"{meth_name}.unknown.{range:x}".lower() + else: + yield f"{meth_name}.{relation.value}.{range:x}".lower() + + @staticmethod + def get_exchange(meth_name: str, relation: Optional[RelationType] = None) -> str: + if meth_name == "relation_add": + assert relation is not None + split = relation.value + else: + split = meth_name + exchange, *_ = split.split("_") + return exchange + + @staticmethod + def get_exchanges() -> Generator[str, None, None]: + yield from [entity.value for entity in EntityType] + ["location"] + + @staticmethod + def get_meth_name( + binding_key: str, + ) -> Tuple[str, Optional[RelationType]]: + meth_name, relation, *_ = binding_key.split(".") + return meth_name, (RelationType(relation) if relation != "unknown" else None) + + @staticmethod + def get_meth_names( + exchange: str, + ) -> Generator[Tuple[str, Optional[RelationType]], None, None]: + if exchange == EntityType.CONTENT.value: + yield from [ + ("content_add", None), + ("relation_add", RelationType.CNT_EARLY_IN_REV), + ("relation_add", RelationType.CNT_IN_DIR), + ] + elif exchange == EntityType.DIRECTORY.value: + yield from [ + ("directory_add", None), + ("relation_add", RelationType.DIR_IN_REV), + ] + elif exchange == EntityType.ORIGIN.value: + yield from [("origin_add", None)] + elif exchange == EntityType.REVISION.value: + yield from [ + ("revision_add", None), + ("relation_add", RelationType.REV_BEFORE_REV), + ("relation_add", RelationType.REV_IN_ORG), + ] + elif exchange == "location": + yield "location_add", None + + @staticmethod + def get_ranges(unused_exchange: str) -> Generator[int, None, None]: + # XXX: we might want to have a different range per exchange + yield from range(ProvenanceStorageRabbitMQServer.queue_count) + + @staticmethod + def get_routing_key( + data: Tuple[bytes, ...], meth_name: str, relation: Optional[RelationType] = None + ) -> str: + hashid = path_id(data[0]) if meth_name.startswith("location") else data[0] + idx = int(hashid[0]) % ProvenanceStorageRabbitMQServer.queue_count + if relation is None: + return f"{meth_name}.unknown.{idx:x}".lower() + else: + return f"{meth_name}.{relation.value}.{idx:x}".lower() + + @staticmethod + def is_write_method(meth_name: str) -> bool: + return "_add" in meth_name def load_and_check_config( @@ -39,9 +669,13 @@ if pcfg is None: raise KeyError("Missing 'provenance' configuration") - scfg: Optional[Dict[str, Any]] = pcfg.get("storage") + rcfg: Optional[Dict[str, Any]] = pcfg.get("rabbitmq") + if rcfg is None: + raise KeyError("Missing 'provenance.rabbitmq' configuration") + + scfg: Optional[Dict[str, Any]] = rcfg.get("storage_config") if scfg is None: - raise KeyError("Missing 'provenance.storage' configuration") + raise KeyError("Missing 'provenance.rabbitmq.storage_config' configuration") if type == "local": cls = scfg.get("cls") @@ -56,3 +690,9 @@ raise KeyError("Invalid configuration; missing 'db' config entry") return cfg + + +def make_server_from_configfile() -> ProvenanceStorageRabbitMQServer: + config_path = os.environ.get("SWH_CONFIG_FILENAME") + server_cfg = load_and_check_config(config_path) + return ProvenanceStorageRabbitMQServer(**server_cfg["provenance"]["rabbitmq"]) diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -35,25 +35,40 @@ # Direct access Archive object "cls": "direct", "db": { - "host": "db.internal.softwareheritage.org", + "host": "belvedere.internal.softwareheritage.org", + "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage - "cls": "postgresql", - "db": { - "host": "localhost", - "user": "postgres", - "password": "postgres", - "dbname": "provenance", - }, + # "cls": "postgresql", + # "db": { + # "host": "localhost", + # "user": "postgres", + # "password": "postgres", + # "dbname": "provenance", + # }, # Local MongoDB Storage # "cls": "mongodb", # "db": { # "dbname": "provenance", # }, + # Remote RabbitMQ/PostgreSQL Storage + "cls": "rabbitmq", + "url": "amqp://localhost:5672/%2f", + "storage_config": { + "cls": "postgresql", + "db": { + "host": "localhost", + "user": "postgres", + "password": "postgres", + "dbname": "provenance", + }, + }, + "batch_size": 100, + "prefetch_count": 100, }, } } diff --git a/swh/provenance/util.py b/swh/provenance/util.py --- a/swh/provenance/util.py +++ b/swh/provenance/util.py @@ -3,8 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import hashlib import os +def path_id(path: bytes) -> bytes: + return hashlib.sha1(path).digest() + + def path_normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path