diff --git a/docs/storage/remote.rst b/docs/storage/remote.rst new file mode 100644 --- /dev/null +++ b/docs/storage/remote.rst @@ -0,0 +1,340 @@ +# ProvenanceStorageRabbitMQClient + +The remote storage client connects to a remote server using a RabbitMQ broker, and delegates all writing operations to it. However, reading operations are still done using a local storage object (ie. `ProvenanceStorageMongoDb` or `ProvenanceStoragePostgreSql`). This is done to speed up the process by avoiding the unnecessary RabbitMQ overhead for operations that are conflict-free. + +> :warning: **No check is done to guarantee that the local storage used by the client is the same as the one the server uses on its side, but it is assumed to be the case.** + +When a writing operation is invoked, the client splits the set of elements to be written by ID range, and send a series of packages to the server (via the RabbitMQ broker). Each package consists of elements belonging to the same ID range but, to avoid sending huge packages, there might be more than one package per range. After this, the client blocks waiting for the server to acknowledge the writing of all requested elements. + +To initialize a client object it is required to provide two mandatory parameters: + +- `url`: the URL string of the broker where the server expects to receive the packages. +- `storage_config`: a dictionary containing the storage configuration for the local storage object, as expected by `swh.provenance.get_provenance_storage`. + +Additionally, some optional parameter can be specified that may affect the performance of the remote storage as a whole: + +- `batch_size`: an integer specifying the maximum allowed amount of elements per package, after range splitting. Default to 100. +- `prefetch_count`: an integer specifying how many ack packages are prefetched from the broker. Default to 100. +- `wait_min`: a float specifying the minimum amount of seconds that the client should wait for the server's response before failing. Default to 60. +- `wait_per_batch`: a float specifying the amount of seconds to wait per sent batch if items (ie. package). If `wait_per_batch * number_of_packages` is less than `wait_min`, the latter will be used instead. Default to 10. + +As all `ProvenanceStorageInterface` compliant objects, the remote storage client has to be opened before being able to use it, and closed after it is no longer needed to properly release resources. It can be operated as a context manager using the keyword `with`. This will take care of actually opening/closing both the connection to the remote server as well as the underlying local storage objects. + + +## Client lifecycle + +Connecting to the remote server object implies taking care of the RabbitMQ’s lifecycle. To that end, an internal thread is launched, which will re-establish the connection in case of an error until an explicit disconnect request is made. The class `ProvenanceStorageRabbitMQClient` extends the standard `threading.Thread` class, thus the `run` method in the class is the target function of such thread, that will loop indefinitely calling the `connect` method and the blocking `ioloop.connect` method of the established connection. Interaction with such loop is done by setting callback functions. + +The following is a diagram of the interaction between the methods of the class: + +```graphviz +digraph { + ProvenanceStorageRabbitMQClient + + close[shape=record] + open[shape=record] + add[shape=record,label="write method"] + get[shape=record,label="read method"] + + ProvenanceStorageInterface + + start[shape=record] + stop[shape=record] + + request[shape=record] + wait_for_acks[shape=record] + wait_for_response[shape=record] + + subgraph cluster_connection_thread { + style=rounded + bgcolor=gray95 + color=gray + labelloc=b + + run[shape=record] + + connect[shape=record] + on_connection_open[shape=record] + on_connection_open_error[shape=record] + on_connection_closed[shape=record] + close_connection[shape=record] + open_channel[shape=record] + on_channel_open[shape=record] + on_channel_closed[shape=record] + setup_queue[shape=record] + on_queue_declare_ok[shape=record] + on_basic_qos_ok[shape=record] + start_consuming[shape=record] + on_consumer_cancelled[shape=record] + on_response[shape=record] + stop_consuming[shape=record] + on_cancel_ok[shape=record] + } + + ProvenanceStorageRabbitMQClient->{add,close,get,open} + + close->{stop} + open->{start} + + start->{run} + stop->{stop_consuming} + + run->{connect,stop} + + connect->{on_connection_open,on_connection_open_error,on_connection_closed} + + on_connection_open->{open_channel} + + open_channel->{on_channel_open} + + on_cancel_ok->{on_channel_closed} + on_consumer_cancelled->{on_channel_closed} + on_channel_open->{setup_queue} + + on_channel_closed->{close_connection} + + setup_queue->{on_queue_declare_ok} + + on_queue_declare_ok->{on_basic_qos_ok} + + on_basic_qos_ok->{start_consuming} + + start_consuming->{on_consumer_cancelled,on_response} + + on_response->{wait_for_response}[label="_response_queue",arrowhead="none"] + + stop_consuming->{on_cancel_ok} + + add->{request,wait_for_acks} + wait_for_acks->{wait_for_response} + + get->{ProvenanceStorageInterface} +} + +``` + +Every write method in the `ProvenanceStorageInterface` performs splitting by ID range and send a series of messages to the server by calling the `request` method. After that, it blocks waiting for all necessary acks packages to arrive by calling the `wait_for_acks` methods, which in-turn calls `wait_for_response`. Calls to the write methods may come from distinct threads, thus the interaction between `on_response` and `wait_for_response` is done with a thread-safe `Queue.queue` structure `_response_queue`. + +Below there is a summary of the methods present in the diagram and their functionality. Methods are listed in alphabetic order: +- `close_connection`: this method properly closes the connection to RabbitMQ. +- `connect`: this method connects to RabbitMQ, returning the connection handle. When the connection is established, the `on_connection_open` callback method will be invoked by `pika`. If there is an error establishing the connection, the `on_connection_open_error` callback method will be invoked by `pika`. If the connection closes unexpectedly, the `on_connection_closed` callback method will be invoked by `pika`. +- `on_basic_qos_ok`: this callback method is invoked by `pika` when the `Basic.QoS` RPC call made in `on_queue_declare_ok` has completed. At this point it is safe to start consuming messages, hence the `start_consuming` method is called, which will invoke the needed RPC commands to start the process. +- `on_cancel_ok`: this callback method is invoked by `pika` when RabbitMQ acknowledges the cancellation of a consumer. At this point the channel close is requested, thus indirectly invoking the `on_channel_closed` callback method once the channel has been closed, which will in-turn close the connection. +- `on_channel_closed`: this callback method is invoked by `pika` when RabbitMQ unexpectedly closes the channel. Channels are usually closed if there is an attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, the connection will be closed by invoking the `close_connection`. +- `on_channel_open`: this callback method is invoked by `pika` when the channel has been opened. The `on_channel_closed` callback is set here. Since the channel is now open, it is safe to set up the client's response queue on RabbitMQ by invoking the `setup_queue` method. +- `on_connection_closed`: this callback method is invoked by `pika` when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, an attempt to reconnect to RabbitMQ will be done. +- `on_connection_open`: this callback method is invoked by `pika` once the connection to RabbitMQ has been established. It proceeds to open the channel by calling the `open_channel` method. +- `on_connection_open_error`: this callback method is invoked by `pika` if the connection to RabbitMQ can't be established. Since it is unexpected, an attempt to reconnect to RabbitMQ will be done. +- `on_consumer_cancelled`: this callback methods is invoked by `pika` when RabbitMQ sends a `Basic.Cancel` for a consumer receiving messages. At this point the channel close is requested, thus indirectly invoking the `on_channel_closed` callback method once the channel has been closed, which will in-turn close the connection. +- `on_queue_declare_ok`: this callback method is invoked by `pika` when the `Queue.Declare` RPC call made in `setup_queue` has completed. This method sets up the consumer prefetch count by invoking the `Basic.QoS` RPC command. When it is completed, the `on_basic_qos_ok` method will be invoked by `pika`. +- `on_response`: this callback method is invoked by `pika` when a message is delivered from RabbitMQ. The decoded response together with its correlation ID is enqueued in the internal `_response_queue`, so that the data is forwarded to the `wait_for_response` method, that might be running on a distinct thread. A `Basic.Ack` RPC command is issued to acknowledge the delivery of the message to RabbitMQ. +- `open_channel`: this method opens a new channel with RabbitMQ by issuing the `Channel.Open` RPC command. When RabbitMQ responds that the channel is open, the `on_channel_open` callback will be invoked by `pika`. +- `request`: this methods send a message to RabbitMQ by issuing a `Basic.Publish` RPC command. The body of the message is properly encoded, while correlation ID and request key are used as passed by the calling method. +- `run`: main method of the internal thread. It requests to open a connection to RabbitMQ by calling the `connect` method, and starts the internal `IOLoop` of the returned handle (blocking operation). In case of failure, this method will indefinitely try to reconnect. When an explicit `TerminateSignal` is received, the `stop` method is invoked. +- `setup_queue`: this methods sets up an exclusive queue for the client on RabbitMQ by invoking the `Queue.Declare` RPC command. When it is completed, the `on_queue_declare_ok` method will be invoked by `pika`. +- `start`: inherited from `threading.Thread`. It launches the internal thread with method `run` as target. +- `start_consuming`: this method sets up the consumer by first registering the `on_consumer_cancelled` callback, so that the client is notified if RabbitMQ cancels the consumer. It then issues the `Basic.Consume` RPC command which returns the consumer tag that is used to uniquely identify the consumer with RabbitMQ. The `on_response` method is passed in as a callback `pika` will invoke when a message is fully received. +- `stop`: this method cleanly shutdown the connection to RabbitMQ by calling the `stop_consuming` method. The `IOLoop` is started again because this method is invoked by raising a `TerminateSignal` exception. This exception stops the `IOLoop` which needs to be running for `pika` to communicate with RabbitMQ. All of the commands issued prior to starting the `IOLoop` will be buffered but not processed. +- `stop_consuming`: this method sends a `Basic.Cancel` RPC command. When RabbitMQ confirms the cancellation, the `on_cancel_ok` callback methods will be invoked by `pika`, which will then close the channel and connection. +- `wait_for_acks`: this method is invoked by every write methods in the `ProvenanceStorageInterface`, after sending a series of write requests to the server. It will call `wait_for_response` until it receives all expected ack responses, or until it receives the first timeout. The timeout is calculated based on the number of expected acks and class initialization parameters `wait_per_batch` and `wait_min`. +- `wait_for_response`: this method is called from `wait_for_acks` to retrieve ack packages from the internal `_response_queue`. The response correlation ID is used to validate the received ack corresponds to the current write request. The method returns the decoded body of the received response. + + +# ProvenanceStorageRabbitMQServer + +The remote storage server is responsible for defining the ID range splitting policy for each entity and relation in the provenance solution. Based on this policy, it will launch a series of worker processes that will take care of actually processing the requests for each defined range in the partition. These processes are implemented in the `ProvenanceStorageRabbitMQWorker` described below. + +To initialize a server object it is required to provide two mandatory parameters: + +- `url`: the URL string of the broker where the server expects to receive the packages. +- `storage_config`: a dictionary containing the storage configuration for the local storage object, as expected by `swh.provenance.get_provenance_storage`. + +Additionally, some optional parameter can be specified that may affect the performance of the remote storage as a whole: + +- `batch_size`: an integer specifying the maximum allowed amount of elements to be processed at a time, ie. forwarded to the underlying storage object by each worker. Default to 100. +- `prefetch_count`: an integer specifying how many packages are prefetched from the broker by each worker. Default to 100. + +On initialization, the server will create the necessary `ProvenanceStorageRabbitMQWorker`, forwarding to them the parameters mentioned above, but it won't launch these underlying processes until it is explicitly started. To that end, the `ProvenanceStorageRabbitMQServer` objects provide the following methods: +- `start`: this method launches all the necessary worker subprocesses and ensures they all are in a proper consuming state before returning control to the caller. +- `stop`: this method signals all worker subprocesses for termination and blocks until they all finish successfully. + + +## ID range splitting policy: + +The ID range splitting policy is defined as follows +- There is an exchange in the RabbitMQ broker for each entity in the provenance solution, plus an extra one to handle locations: `content`, `directory`, `location`, `origin`, and `revision`. +- Any request to add an entity should send the necessary packages to the entity's associated exchange for proper routing: ie. requests for `content_add` will be handled by the `content` exchange. +- Any request to add a relation entry is handled by the exchange of the source entity in the relation: ie. requests for `relation_add` with `relation=CNT_EARLY_IN_REV` will be handled by the `content` exchange. +- ID range splitting is done by looking at the first byte in the SWHID of the entity (ie. a hex value), hence 16 possible ranges are defined for each operation associated to each entity. In the case of locations, a hex hash is calculated over its value. +- Each exchange then handles 16 queues for each method associated to the exchange's entity, with a `direct` routing policy. For instance, the `content` exchange has 16 queues associated to each of the following methods: `content_add`, `relation_add` with `relation=CNT_EARLY_IN_REV`, and `relation_add` with `relation=CNT_IN_DIR` (ie. a total of 48 queues). +- For each exchange, 16 `ProvenanceStorageRabbitMQWorker` processes are launched, each of them taking care of one ID range for the associated entity. + +All in all, this gives a total of 80 `ProvenanceStorageRabbitMQWorker` processes (16 per exchange) and 160 RabbitMQ queues (48 for `content` and `revision`, 32 for `directory`, and 16 for `location` and `origin`). In this way, it is guaranteed that, regardless of the operation being performed, there would never be more than one process trying to write on a given ID range for a given entity. Thus, resolving all potential conflicts. + +Although the ID range splitting policy is defined on the server side, so it can properly configure and launch the necessary worker processes, it is the client the responsible for actually splitting the input to each write method and send the write requests to the proper queues for RabbitMQ route them to the correct worker. For that, the server defines a series of static methods that allow to query the ID range splitting policy: +- `get_binding_keys`: this method is meant to be used by the server workers. Given an exchange and a range ID, it yields all the RabbitMQ routing keys the worker process should bind to. +- `get_exchange`: given the name of a write method in the `ProvenanceStorageInterface`, and an optional relation type, it return the name of the exchange to which the writing request should be sent. +- `get_exchanges`: this method yields the names of all the exchanges in the RabbitMQ broker. +- `get_meth_name`: this method is meant to be used by the server workers. Given a binding key as returned by `get_binding_keys`, it return the `ProvenanceStorageInterface` method associated to it. +- `get_meth_names`: given an exchange name, it yields all the methods that are associated to it. In case of `relation_add`, the method also returns the supported relation type. +- `get_ranges`: given an exchange name, it yields the integer value of all supported ranges IDs (currently 0-15 for all exchanges). The idea behind this method is to allow defining a custom ID range split for each exchange. +- `get_routing_key`: given the name of a write method in the `ProvenanceStorageInterface`, an optional relation type, and a tuple with the data to be passed to the method (first parameter), this method returns the routing key of the queue responsible to handle that tuple. It is assumed that the first value in the tuple is a `Sha1Git` ID. +- `is_write_method`: given the name of a method in the `ProvenanceStorageInterface`, it decides if it is a write method or not. + + +# ProvenanceStorageRabbitMQWorker + +The remote storage worker consume messages published in the RabbitMQ broker by the remote storage clients, and proceed to perform the actual writing operations to a local storage object (ie. `ProvenanceStorageMongoDb` or `ProvenanceStoragePostgreSql`). Each worker process messages associated to a particular entity and range ID. It is the client's responsibility to properly split data along messages according to the remote storage server policy. + +Since there is overlapping between methods in the `ProvenanceInterface` operating over the same entity, one worker may have to handle more than one method to guarantee conflict-free writings to the underlying storage. For instance, consider the `content` entity, for a given ID range, methods `content_add` and `relation_add` with `relation=CNT_EARLY_IN_REV` may conflict. Is the worker's responsibility to solve this kind of conflicts. + +To initialize a server object it is required to provide two mandatory parameters: + +- `url`: the URL string of the broker where the server expects to receive the packages. +- `exchange`: the RabbitMQ exchange to which the worker will subscribe. See `ProvenanceStorageRabbitMQServer`'s ID range splitting policy for further details. +- `range`: the range ID the worker will be processing. See `ProvenanceStorageRabbitMQServer`'s ID range splitting policy for further details. +- `storage_config`: a dictionary containing the storage configuration for the local storage object, as expected by `swh.provenance.get_provenance_storage`. + +Additionally, some optional parameter can be specified that may affect the performance of the remote storage as a whole: + +- `batch_size`: an integer specifying the maximum allowed amount of elements to be processed at a time, ie. forwarded to the underlying storage object for writing. Default to 100. +- `prefetch_count`: an integer specifying how many packages are prefetched from the broker. Default to 100. + +> :warning: **This class is not meant to be used directly but through an instance of `ProvenanceStorageRabbitMQServer`. The parameters `url`, `storage_config`, `batch_size` and `prefetch_count` above are forwarded as passed to the server on initialization. Additional arguments `exchange` and `range` are generated by the server based on its ID range splitting policy.** + + +## Worker lifecycle + +All interaction between the provenance solution and a remote storage worker object happens through RabbitMQ packages. To maximize concurrency, each instance of the `ProvenanceStorageRabbitMQWorker` launches a distinct sub-process, hence avoiding unnecessary synchronization with other components on the solution. For this, `ProvenanceStorageRabbitMQWorker` extends `multiprocessing.Process` and only has a direct channel of communication to the master `ProvenanceStorageRabbitMQServer`, through `multiprocessing.Queue` structures. Then, the entry point of the sub-process is the method `run` which will in-turn launch a bunch of threads to handle the different provenance methods the worker needs to support, and an extra thread to handle communication with the server object. RabbitMQ's lifecycle will be taken care of in the main thread. + +The following is a diagram of the interaction between the methods of the class: + +```graphviz +digraph { + ProvenanceStorageRabbitMQServer + ProvenanceStorageRabbitMQWorker + + start[shape=record] + run[shape=record] + + connect[shape=record] + on_connection_open[shape=record] + on_connection_open_error[shape=record] + on_connection_closed[shape=record] + close_connection[shape=record] + open_channel[shape=record] + on_channel_open[shape=record] + on_channel_closed[shape=record] + + setup_exchange[shape=record] + on_exchange_declare_ok[shape=record] + + setup_queues[shape=record] + on_queue_declare_ok[shape=record] + on_bind_ok[shape=record] + on_basic_qos_ok[shape=record] + start_consuming[shape=record] + on_consumer_cancelled[shape=record] + on_request[shape=record] + stop_consuming[shape=record] + on_cancel_ok[shape=record] + + request_termination[shape=record] + stop[shape=record] + + respond[shape=record] + get_conflicts_func[shape=record] + + subgraph cluster_command_thread { + style=rounded + bgcolor=gray95 + color=gray + labelloc=b + + run_command_thread[shape=record] + } + + subgraph cluster_request_thread { + style=rounded + bgcolor=gray95 + color=gray + labelloc=b + + ProvenanceStorageInterface + + run_request_thread[shape=record] + } + + ProvenanceStorageRabbitMQWorker->{start} + + start->{run} + stop->{stop_consuming} + + run->{connect,run_command_thread,run_request_thread,stop} + + connect->{on_connection_open,on_connection_open_error,on_connection_closed} + + on_connection_open->{open_channel} + + open_channel->{on_channel_open} + + on_cancel_ok->{on_channel_closed} + on_consumer_cancelled->{on_channel_closed} + on_channel_open->{setup_exchange} + + on_channel_closed->{close_connection} + + setup_exchange->{on_exchange_declare_ok} + on_exchange_declare_ok->{setup_queues} + + setup_queues->{on_queue_declare_ok} + on_queue_declare_ok->{on_bind_ok} + on_bind_ok->{on_basic_qos_ok} + on_basic_qos_ok->{start_consuming} + + start_consuming->{on_consumer_cancelled,on_request} + start_consuming->{ProvenanceStorageRabbitMQServer}[label=" signal",arrowhead="none"] + + on_request->{run_request_thread}[label=" _request_queues",arrowhead="none"] + + stop_consuming->{on_cancel_ok} + + run_command_thread->{request_termination} + run_command_thread->{ProvenanceStorageRabbitMQServer}[label=" command",arrowhead="none"] + + run_request_thread->{ProvenanceStorageInterface,get_conflicts_func,respond,request_termination} + + request_termination->{run}[label="TerminateSignal",arrowhead="none"] +} +``` + +There is a request thread for each `ProvenanceStorageInterface` method the worker needs to handle, each thread with its own provenance storage object (ie. exclusive connection). Each of these threads will receive sets of parameters to be passed to their correspondent method and perform explicit conflict resolution by using the method-specific function returned by `get_conflicts_func`, prior to passing these parameters to the underlying storage. + +Below there is a summary of the methods present in the diagram and their functionality. Methods are listed in alphabetic order: +- `close_connection`: this method properly closes the connection to RabbitMQ. +- `connect`: this method connects to RabbitMQ, returning the connection handle. When the connection is established, the `on_connection_open` callback method will be invoked by `pika`. If there is an error establishing the connection, the `on_connection_open_error` callback method will be invoked by `pika`. If the connection closes unexpectedly, the `on_connection_closed` callback method will be invoked by `pika`. +- `on_basic_qos_ok`: this callback method is invoked by `pika` when the `Basic.QoS` RPC call made in `on_bind_ok` has completed. At this point it is safe to start consuming messages, hence the `start_consuming` method is called, which will invoke the needed RPC commands to start the process. +- `on_bind_ok`:this callback method is invoked by `pika` when the `Queue.Bind` RPC call made in `on_queue_declare_ok` has completed. This method sets up the consumer prefetch count by invoking the `Basic.QoS` RPC command. When it is completed, the `on_basic_qos_ok` method will be invoked by `pika`. +- `on_cancel_ok`: this method is invoked by `pika` when RabbitMQ acknowledges the cancellation of a consumer. At this point the channel close is requested, thus indirectly invoking the `on_channel_closed` callback method once the channel has been closed, which will in-turn close the connection. +- `on_channel_closed`: this callback method is invoked by `pika` when RabbitMQ unexpectedly closes the channel. Channels are usually closed if there is an attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, the connection will be closed by invoking the `close_connection`. +- `on_channel_open`: this callback method is invoked by `pika` when the channel has been opened. The `on_channel_closed` callback is set here. Since the channel is now open, it is safe to declare the exchange to use by invoking the `setup_exchange` method. +- `on_connection_closed`: this callback method is invoked by `pika` when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, an attempt to reconnect to RabbitMQ will be done. +- `on_connection_open`: this callback method is called by `pika` once the connection to RabbitMQ has been established. It proceeds to open the channel by calling the `open_channel` method. +- `on_connection_open_error`: this callback method is called by `pika` if the connection to RabbitMQ can't be established. Since it is unexpected, an attempt to reconnect to RabbitMQ will be done. +- `on_consumer_cancelled`: this callback method is invoked by `pika` when RabbitMQ sends a `Basic.Cancel` for a consumer receiving messages. At this point the channel close is requested, thus indirectly invoking the `on_channel_closed` callback method once the channel has been closed, which will in-turn close the connection. +- `on_exchange_declare_ok`: this callback methods is invoked by `pika` when the `Exchange.Declare` RPC call made in `setup_exchange` has completed. At this point it is time to set up the queues for the different request handling threads. This is done by calling the `setup_queues` method. +- `on_queue_declare_ok`: this callback method is invoked by `pika` when each `Queue.Declare` RPC call made in `setup_queues` has completed. Now it is time to bind the current queue and exchange together with the correspondent routing key by issuing the `Queue.Bind` RPC command. When this command is completed, the `on_bind_ok` method will be invoked by `pika`. +- `on_request`: this callback method is invoked by `pika` when a message is delivered from RabbitMQ to any of the queues bound by the worker. The decoded request together with its correlation ID and reply-to property are enqueued in the correspondent internal `_request_queues` (the actual queue it identified by the message routing key), so that the data is forwarded to the thread that handles the particular method the message is associated to. A `Basic.Ack` RPC command is issued to acknowledge the delivery of the message to RabbitMQ. +- `open_channel`: this method opens a new channel with RabbitMQ by issuing the `Channel.Open` RPC command. When RabbitMQ responds that the channel is open, the `on_channel_open` callback will be invoked by `pika`. +- `request_termination`: this method send a signal to the main thread of the process to cleanly release resources and terminate. This is done by setting a callback in the `IOLoop` that raises a `TerminateSignal`, which will eventually be handled in by `run` method. +- `respond`: this methods send a message to RabbitMQ by issuing a `Basic.Publish` RPC command. The body of the message is properly encoded, while correlation ID and request key are used as passed by the calling method. +- `run`: main method of the process. It launches a thread to handle communication with the `ProvenanceStorageRabbitMQServer` object, targeting the `run_command_thread` method. Also, it launches on thread for each `ProvenanceStorageInterface` method the worker needs to handle, each targeting the `run_request_thread` method. Finally, it requests to open a connection to RabbitMQ by calling the `connect` method, and starts the internal `IOLoop` of the returned handle (blocking operation). In case of failure, this method will indefinitely try to reconnect. When an explicit `TerminateSignal` is received, the `stop` method is invoked. All internal thread will be signalled for termination as well, and resources released. +- `run_command_thread`: main method of the command thread. It received external commands from the `ProvenanceStorageRabbitMQServer` object through a `multiprocessing.Queue` structure. The only supported command for now is for the worker to be signalled to terminate, in which case the `request_termination` method is invoked. +- `run_request_thread`: main method of the request threads. The worker has one such thread per `ProvenanceStorageInterface` method it needs to handle. This method will initialize its own storage object and interact with the `on_request` through a `queue.Queue` structure, waiting for items to be passed to the storage method associated with the thread. When elements become available, it will perform a conflict resolution step by resorting to the method-specific function returned by `get_conflicts_func`. After this it will forward the items to the underlying storage object. If the storage method returns successfully, acknowledgements are sent back to each client by through `respond` method. If the call to the storage method fails, items will be enqueued back to retry in a future iteration. In case of an unexpected exception, the worker is signalled for termination by calling the `request_termination` method. +- `setup_exchange`: this method sets up the exchange on RabbitMQ by invoking the `Exchange.Declare` RPC command. When it is completed, the `on_exchange_declare_ok` method will be invoked by `pika`. +- `setup_queues`: this methods sets up the necessary queues for the worker on RabbitMQ by invoking several `Queue.Declare` RPC commands (one per supported provenance storage method). When each command is completed, the `on_queue_declare_ok` method will be invoked by `pika`. +- `start`: inherited from `multiprocessing.Process`. It launches the worker sub-process with method `run` as target. +- `start_consuming`: this method sets up the worker by first registering the `add_on_cancel_callback` callback, so that the object is notified if RabbitMQ cancels the consumer. It then issues one `Basic.Consume` RPC command per supported provenance storage method, which return the consumer tag that is used to uniquely identify the consumer with RabbitMQ. The `on_request` method is passed in as a callback `pika` will invoke when a message is fully received. After setting up all consumers, a `CONSUMING` signal is sent to the `ProvenanceStorageRabbitMQServer` object through a `multiprocessing.Queue` structure. +- `stop`: this method cleanly shutdown the connection to RabbitMQ by calling the `stop_consuming` method. The `IOLoop` is started again because this method is invoked by raising a `TerminateSignal` exception. This exception stops the `IOLoop` which needs to be running for `pika` to communicate with RabbitMQ. All of the commands issued prior to starting the `IOLoop` will be buffered but not processed. +- `stop_consuming`: this method sends a `Basic.Cancel` RPC command for each supported provenance storage method. When RabbitMQ confirms the cancellation, the `on_cancel_ok` callback methods will be invoked by `pika`, which will then close the channel and connection. +- `get_conflicts_func`: this method returns the conflict resolution function to be used based on the provenance storage method. diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -17,6 +17,9 @@ [mypy-msgpack.*] ignore_missing_imports = True +[mypy-pika.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ iso8601 methodtools mongomock +pika pymongo PyYAML types-click diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -92,4 +92,12 @@ engine = kwargs.get("engine", "pymongo") return ProvenanceStorageMongoDb(engine=engine, **kwargs["db"]) + elif cls == "rabbitmq": + from .api.client import ProvenanceStorageRabbitMQClient + + rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs) + if TYPE_CHECKING: + assert isinstance(rmq_storage, ProvenanceStorageInterface) + return rmq_storage + raise ValueError diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py --- a/swh/provenance/api/client.py +++ b/swh/provenance/api/client.py @@ -2,3 +2,466 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + +from __future__ import annotations + +import functools +import inspect +import logging +import queue +import threading +import time +from types import TracebackType +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union +import uuid + +import pika +import pika.channel +import pika.connection +import pika.frame +import pika.spec + +from swh.core.api.serializers import encode_data_client as encode_data +from swh.core.api.serializers import msgpack_loads as decode_data +from swh.core.statsd import statsd + +from .. import get_provenance_storage +from ..interface import ProvenanceStorageInterface, RelationData, RelationType +from .serializers import DECODERS, ENCODERS +from .server import ProvenanceStorageRabbitMQServer + +LOG_FORMAT = ( + "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " + "-35s %(lineno) -5d: %(message)s" +) +LOGGER = logging.getLogger(__name__) + +STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds" + + +class ResponseTimeout(Exception): + pass + + +class TerminateSignal(Exception): + pass + + +def split_ranges( + data: Iterable[bytes], meth_name: str, relation: Optional[RelationType] = None +) -> Dict[str, List[Tuple[Any, ...]]]: + ranges: Dict[str, List[Tuple[Any, ...]]] = {} + if relation is not None: + assert isinstance(data, dict) + for src, dsts in data.items(): + key = ProvenanceStorageRabbitMQServer.get_routing_key( + (src,), meth_name, relation + ) + for rel in dsts: + assert isinstance(rel, RelationData) + ranges.setdefault(key, []).append((src, rel.dst, rel.path)) + else: + items: Union[List[Tuple[Any, Any]], List[Tuple[Any]]] + if isinstance(data, dict): + items = list(data.items()) + else: + items = list({(item,) for item in data}) + for item in items: + key = ProvenanceStorageRabbitMQServer.get_routing_key(item, meth_name) + ranges.setdefault(key, []).append(item) + return ranges + + +class MetaRabbitMQClient(type): + def __new__(cls, name, bases, attributes): + # For each method wrapped with @remote_api_endpoint in an API backend + # (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new + # method in RemoteStorage, with the same documentation. + # + # Note that, despite the usage of decorator magic (eg. functools.wrap), + # this never actually calls an IndexerStorage method. + backend_class = attributes.get("backend_class", None) + for base in bases: + if backend_class is not None: + break + backend_class = getattr(base, "backend_class", None) + if backend_class: + for meth_name, meth in backend_class.__dict__.items(): + if hasattr(meth, "_endpoint_path"): + cls.__add_endpoint(meth_name, meth, attributes) + return super().__new__(cls, name, bases, attributes) + + @staticmethod + def __add_endpoint(meth_name, meth, attributes): + wrapped_meth = inspect.unwrap(meth) + + @functools.wraps(meth) # Copy signature and doc + def meth_(*args, **kwargs): + with statsd.timed( + metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} + ): + # Match arguments and parameters + data = inspect.getcallargs(wrapped_meth, *args, **kwargs) + + # Remove arguments that should not be passed + self = data.pop("self") + + # Call storage method with remaining arguments + return getattr(self._storage, meth_name)(**data) + + @functools.wraps(meth) # Copy signature and doc + def write_meth_(*args, **kwargs): + with statsd.timed( + metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} + ): + # Match arguments and parameters + post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) + + try: + # Remove arguments that should not be passed + self = post_data.pop("self") + relation = post_data.pop("relation", None) + assert len(post_data) == 1 + data = next(iter(post_data.values())) + + ranges = split_ranges(data, meth_name, relation) + acks_expected = sum(len(items) for items in ranges.values()) + self._correlation_id = str(uuid.uuid4()) + + exchange = ProvenanceStorageRabbitMQServer.get_exchange( + meth_name, relation + ) + for routing_key, items in ranges.items(): + batches = ( + items[idx : idx + self._batch_size] + for idx in range(0, len(items), self._batch_size) + ) + for batch in batches: + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # request can be sent for the current elements. This + # situation should be handled properly. + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQClient.request, + channel=self._channel, + reply_to=self._callback_queue, + exchange=exchange, + routing_key=routing_key, + correlation_id=self._correlation_id, + data=batch, + ) + ) + return self.wait_for_acks(acks_expected) + except BaseException as ex: + self.request_termination(str(ex)) + return False + + if meth_name not in attributes: + attributes[meth_name] = ( + write_meth_ + if ProvenanceStorageRabbitMQServer.is_write_method(meth_name) + else meth_ + ) + + +class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient): + backend_class = ProvenanceStorageInterface + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + def __init__( + self, + url: str, + storage_config: Dict[str, Any], + batch_size: int = 100, + prefetch_count: int = 100, + wait_min: float = 60, + wait_per_batch: float = 10, + ) -> None: + """Setup the client object, passing in the URL we will use to connect to + RabbitMQ, and the connection information for the local storage object used + for read-only operations. + + :param str url: The URL for connecting to RabbitMQ + :param dict storage_config: Configuration parameters for the underlying + ``ProvenanceStorage`` object expected by + ``swh.provenance.get_provenance_storage`` + :param int batch_size: Max amount of elements per package (after range + splitting) for writing operations + :param int prefetch_count: Prefetch value for the RabbitMQ connection when + receiving ack packages + :param float wait_min: Min waiting time for response on a writing operation, in + seconds + :param float wait_per_batch: Waiting time for response per batch of items on a + writing operation, in seconds + + """ + super().__init__() + + self._connection = None + self._callback_queue: Optional[str] = None + self._channel = None + self._closing = False + self._consumer_tag = None + self._consuming = False + self._correlation_id = str(uuid.uuid4()) + self._prefetch_count = prefetch_count + + self._batch_size = batch_size + self._response_queue: queue.Queue = queue.Queue() + self._storage = get_provenance_storage(**storage_config) + self._url = url + + self._wait_min = wait_min + self._wait_per_batch = wait_per_batch + + def __enter__(self) -> ProvenanceStorageInterface: + self.open() + assert isinstance(self, ProvenanceStorageInterface) + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.close() + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) + def open(self) -> None: + self.start() + while self._callback_queue is None: + time.sleep(0.1) + self._storage.open() + + @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) + def close(self) -> None: + assert self._connection is not None + self._connection.ioloop.add_callback_threadsafe(self.request_termination) + self.join() + self._storage.close() + + def request_termination(self, reason: str = "Normal shutdown") -> None: + assert self._connection is not None + + def termination_callback(): + raise TerminateSignal(reason) + + self._connection.ioloop.add_callback_threadsafe(termination_callback) + + def connect(self) -> pika.SelectConnection: + LOGGER.info("Connecting to %s", self._url) + return pika.SelectConnection( + parameters=pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_open_error_callback=self.on_connection_open_error, + on_close_callback=self.on_connection_closed, + ) + + def close_connection(self) -> None: + assert self._connection is not None + self._consuming = False + if self._connection.is_closing or self._connection.is_closed: + LOGGER.info("Connection is closing or already closed") + else: + LOGGER.info("Closing connection") + self._connection.close() + + def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: + LOGGER.info("Connection opened") + self.open_channel() + + def on_connection_open_error( + self, _unused_connection: pika.SelectConnection, err: Exception + ) -> None: + LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) + assert self._connection is not None + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): + assert self._connection is not None + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def open_channel(self) -> None: + LOGGER.info("Creating a new channel") + assert self._connection is not None + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_channel_open(self, channel: pika.channel.Channel) -> None: + LOGGER.info("Channel opened") + self._channel = channel + LOGGER.info("Adding channel close callback") + assert self._channel is not None + self._channel.add_on_close_callback(callback=self.on_channel_closed) + self.setup_queue() + + def on_channel_closed( + self, channel: pika.channel.Channel, reason: Exception + ) -> None: + LOGGER.warning("Channel %i was closed: %s", channel, reason) + self.close_connection() + + def setup_queue(self) -> None: + LOGGER.info("Declaring callback queue") + assert self._channel is not None + self._channel.queue_declare( + queue="", exclusive=True, callback=self.on_queue_declare_ok + ) + + def on_queue_declare_ok(self, frame: pika.frame.Method) -> None: + LOGGER.info("Binding queue to default exchanger") + assert self._channel is not None + self._callback_queue = frame.method.queue + self._channel.basic_qos( + prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok + ) + + def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: + LOGGER.info("QOS set to: %d", self._prefetch_count) + self.start_consuming() + + def start_consuming(self) -> None: + LOGGER.info("Issuing consumer related RPC commands") + LOGGER.info("Adding consumer cancellation callback") + assert self._channel is not None + self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) + assert self._callback_queue is not None + self._consumer_tag = self._channel.basic_consume( + queue=self._callback_queue, on_message_callback=self.on_response + ) + self._consuming = True + + def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: + LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) + if self._channel: + self._channel.close() + + def on_response( + self, + channel: pika.channel.Channel, + deliver: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes, + ) -> None: + LOGGER.info( + "Received message # %s from %s: %s", + deliver.delivery_tag, + properties.app_id, + body, + ) + self._response_queue.put( + ( + properties.correlation_id, + decode_data(body, extra_decoders=self.extra_type_decoders), + ) + ) + LOGGER.info("Acknowledging message %s", deliver.delivery_tag) + channel.basic_ack(delivery_tag=deliver.delivery_tag) + + def stop_consuming(self) -> None: + if self._channel: + LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") + self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok) + + def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None: + self._consuming = False + LOGGER.info( + "RabbitMQ acknowledged the cancellation of the consumer: %s", + self._consumer_tag, + ) + LOGGER.info("Closing the channel") + assert self._channel is not None + self._channel.close() + + def run(self) -> None: + while not self._closing: + try: + self._connection = self.connect() + assert self._connection is not None + self._connection.ioloop.start() + except KeyboardInterrupt: + LOGGER.info("Connection closed by keyboard interruption, reopening") + if self._connection is not None: + self._connection.ioloop.stop() + except TerminateSignal as ex: + LOGGER.info("Termination requested: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + except BaseException as ex: + LOGGER.warning("Unexpected exception, terminating: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + LOGGER.info("Stopped") + + def stop(self) -> None: + assert self._connection is not None + if not self._closing: + self._closing = True + LOGGER.info("Stopping") + if self._consuming: + self.stop_consuming() + self._connection.ioloop.start() + else: + self._connection.ioloop.stop() + LOGGER.info("Stopped") + + @staticmethod + def request( + channel: pika.channel.Channel, + reply_to: str, + exchange: str, + routing_key: str, + correlation_id: str, + **kwargs, + ) -> None: + channel.basic_publish( + exchange=exchange, + routing_key=routing_key, + properties=pika.BasicProperties( + content_type="application/msgpack", + correlation_id=correlation_id, + reply_to=reply_to, + ), + body=encode_data( + kwargs, + extra_encoders=ProvenanceStorageRabbitMQClient.extra_type_encoders, + ), + ) + + def wait_for_acks(self, acks_expected: int) -> bool: + acks_received = 0 + timeout = max( + (acks_expected / self._batch_size) * self._wait_per_batch, + self._wait_min, + ) + while acks_received < acks_expected: + try: + acks_received += self.wait_for_response(timeout=timeout) + except ResponseTimeout: + LOGGER.warning( + "Timed out waiting for acks, %s received, %s expected", + acks_received, + acks_expected, + ) + return False + return acks_received == acks_expected + + def wait_for_response(self, timeout: float = 60.0) -> Any: + while True: + try: + correlation_id, response = self._response_queue.get(timeout=timeout) + if correlation_id == self._correlation_id: + return response + except queue.Empty: + raise ResponseTimeout diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py --- a/swh/provenance/api/server.py +++ b/swh/provenance/api/server.py @@ -3,10 +3,640 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter +from datetime import datetime +from enum import Enum +import functools +import logging +import multiprocessing import os -from typing import Any, Dict, Optional +import queue +import threading +from typing import Any, Callable +from typing import Counter as TCounter +from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union, cast + +import pika +import pika.channel +import pika.connection +import pika.exceptions +from pika.exchange_type import ExchangeType +import pika.frame +import pika.spec from swh.core import config +from swh.core.api.serializers import encode_data_client as encode_data +from swh.core.api.serializers import msgpack_loads as decode_data +from swh.model.model import Sha1Git + +from .. import get_provenance_storage +from ..interface import EntityType, RelationData, RelationType, RevisionData +from ..util import path_id +from .serializers import DECODERS, ENCODERS + +LOG_FORMAT = ( + "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " + "-35s %(lineno) -5d: %(message)s" +) +LOGGER = logging.getLogger(__name__) + +TERMINATE = object() + + +class ServerCommand(Enum): + TERMINATE = "terminate" + CONSUMING = "consuming" + + +class TerminateSignal(BaseException): + pass + + +def resolve_dates( + dates: Iterable[Union[Tuple[Sha1Git, Optional[datetime]], Tuple[Sha1Git]]] +) -> Dict[Sha1Git, Optional[datetime]]: + result: Dict[Sha1Git, Optional[datetime]] = {} + for row in dates: + sha1 = row[0] + date = ( + cast(Tuple[Sha1Git, Optional[datetime]], row)[1] if len(row) > 1 else None + ) + known = result.setdefault(sha1, None) + if date is not None and (known is None or date < known): + result[sha1] = date + return result + + +def resolve_revision( + data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]] +) -> Dict[Sha1Git, RevisionData]: + result: Dict[Sha1Git, RevisionData] = {} + for row in data: + sha1 = row[0] + rev = ( + cast(Tuple[Sha1Git, RevisionData], row)[1] + if len(row) > 1 + else RevisionData(date=None, origin=None) + ) + known = result.setdefault(sha1, RevisionData(date=None, origin=None)) + value = known + if rev.date is not None and (known.date is None or rev.date < known.date): + value = RevisionData(date=rev.date, origin=value.origin) + if rev.origin is not None: + value = RevisionData(date=value.date, origin=rev.origin) + if value != known: + result[sha1] = value + return result + + +def resolve_relation( + data: Iterable[Tuple[Sha1Git, Sha1Git, bytes]] +) -> Dict[Sha1Git, Set[RelationData]]: + result: Dict[Sha1Git, Set[RelationData]] = {} + for src, dst, path in data: + result.setdefault(src, set()).add(RelationData(dst=dst, path=path)) + return result + + +class ProvenanceStorageRabbitMQWorker(multiprocessing.Process): + EXCHANGE_TYPE = ExchangeType.direct + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + def __init__( + self, + url: str, + exchange: str, + range: int, + storage_config: Dict[str, Any], + batch_size: int = 100, + prefetch_count: int = 100, + ) -> None: + """Setup the worker object, passing in the URL we will use to connect to + RabbitMQ, the exchange to use, the range id on which to operate, and the + connection information for the underlying local storage object. + + :param str url: The URL for connecting to RabbitMQ + :param str exchange: The name of the RabbitMq exchange to use + :param str range: The ID range to operate on + :param dict storage_config: Configuration parameters for the underlying + ``ProvenanceStorage`` object expected by + ``swh.provenance.get_provenance_storage`` + :param int batch_size: Max amount of elements call to the underlying storage + :param int prefetch_count: Prefetch value for the RabbitMQ connection when + receiving messaged + + """ + super().__init__(name=f"{exchange}_{range:x}") + + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag: Dict[str, str] = {} + self._consuming: Dict[str, bool] = {} + self._prefetch_count = prefetch_count + + self._url = url + self._exchange = exchange + self._binding_keys = list( + ProvenanceStorageRabbitMQServer.get_binding_keys(self._exchange, range) + ) + self._queues: Dict[str, str] = {} + self._storage_config = storage_config + self._batch_size = batch_size + + self.command: multiprocessing.Queue = multiprocessing.Queue() + self.signal: multiprocessing.Queue = multiprocessing.Queue() + + def connect(self) -> pika.SelectConnection: + LOGGER.info("Connecting to %s", self._url) + return pika.SelectConnection( + parameters=pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_open_error_callback=self.on_connection_open_error, + on_close_callback=self.on_connection_closed, + ) + + def close_connection(self) -> None: + assert self._connection is not None + self._consuming = {binding_key: False for binding_key in self._binding_keys} + if self._connection.is_closing or self._connection.is_closed: + LOGGER.info("Connection is closing or already closed") + else: + LOGGER.info("Closing connection") + self._connection.close() + + def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: + LOGGER.info("Connection opened") + self.open_channel() + + def on_connection_open_error( + self, _unused_connection: pika.SelectConnection, err: Exception + ) -> None: + LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) + assert self._connection is not None + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): + assert self._connection is not None + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) + self._connection.ioloop.call_later(5, self._connection.ioloop.stop) + + def open_channel(self) -> None: + LOGGER.info("Creating a new channel") + assert self._connection is not None + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_channel_open(self, channel: pika.channel.Channel) -> None: + LOGGER.info("Channel opened") + self._channel = channel + LOGGER.info("Adding channel close callback") + assert self._channel is not None + self._channel.add_on_close_callback(callback=self.on_channel_closed) + self.setup_exchange() + + def on_channel_closed( + self, channel: pika.channel.Channel, reason: Exception + ) -> None: + LOGGER.warning("Channel %i was closed: %s", channel, reason) + self.close_connection() + + def setup_exchange(self) -> None: + LOGGER.info("Declaring exchange %s", self._exchange) + assert self._channel is not None + self._channel.exchange_declare( + exchange=self._exchange, + exchange_type=self.EXCHANGE_TYPE, + callback=self.on_exchange_declare_ok, + ) + + def on_exchange_declare_ok(self, _unused_frame: pika.frame.Method) -> None: + LOGGER.info("Exchange declared: %s", self._exchange) + self.setup_queues() + + def setup_queues(self) -> None: + for binding_key in self._binding_keys: + LOGGER.info("Declaring queue %s", binding_key) + assert self._channel is not None + callback = functools.partial( + self.on_queue_declare_ok, + binding_key=binding_key, + ) + self._channel.queue_declare(queue=binding_key, callback=callback) + + def on_queue_declare_ok(self, frame: pika.frame.Method, binding_key: str) -> None: + LOGGER.info( + "Binding queue %s to exchange %s with routing key %s", + frame.method.queue, + self._exchange, + binding_key, + ) + assert self._channel is not None + callback = functools.partial(self.on_bind_ok, queue_name=frame.method.queue) + self._queues[binding_key] = frame.method.queue + self._channel.queue_bind( + queue=frame.method.queue, + exchange=self._exchange, + routing_key=binding_key, + callback=callback, + ) + + def on_bind_ok(self, _unused_frame: pika.frame.Method, queue_name: str) -> None: + LOGGER.info("Queue bound: %s", queue_name) + assert self._channel is not None + self._channel.basic_qos( + prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok + ) + + def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: + LOGGER.info("QOS set to: %d", self._prefetch_count) + self.start_consuming() + + def start_consuming(self) -> None: + LOGGER.info("Issuing consumer related RPC commands") + LOGGER.info("Adding consumer cancellation callback") + assert self._channel is not None + self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) + for binding_key in self._binding_keys: + self._consumer_tag[binding_key] = self._channel.basic_consume( + queue=self._queues[binding_key], on_message_callback=self.on_request + ) + self._consuming[binding_key] = True + self.signal.put(ServerCommand.CONSUMING) + + def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: + LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) + if self._channel: + self._channel.close() + + def on_request( + self, + channel: pika.channel.Channel, + deliver: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes, + ) -> None: + LOGGER.info( + "Received message # %s from %s: %s", + deliver.delivery_tag, + properties.app_id, + body, + ) + # XXX: for some reason this function is returning lists instead of tuples + # (the client send tuples) + batch = decode_data(data=body, extra_decoders=self.extra_type_decoders)["data"] + for item in batch: + self._request_queues[deliver.routing_key].put( + (tuple(item), (properties.correlation_id, properties.reply_to)) + ) + LOGGER.info("Acknowledging message %s", deliver.delivery_tag) + channel.basic_ack(delivery_tag=deliver.delivery_tag) + + def stop_consuming(self) -> None: + if self._channel: + LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") + for binding_key in self._binding_keys: + callback = functools.partial(self.on_cancel_ok, binding_key=binding_key) + self._channel.basic_cancel( + self._consumer_tag[binding_key], callback=callback + ) + + def on_cancel_ok(self, _unused_frame: pika.frame.Method, binding_key: str) -> None: + self._consuming[binding_key] = False + LOGGER.info( + "RabbitMQ acknowledged the cancellation of the consumer: %s", + self._consuming[binding_key], + ) + LOGGER.info("Closing the channel") + assert self._channel is not None + self._channel.close() + + def run(self) -> None: + self._command_thread = threading.Thread(target=self.run_command_thread) + self._command_thread.start() + + self._request_queues: Dict[str, queue.Queue] = {} + self._request_threads: Dict[str, threading.Thread] = {} + for binding_key in self._binding_keys: + meth_name, relation = ProvenanceStorageRabbitMQServer.get_meth_name( + binding_key + ) + self._request_queues[binding_key] = queue.Queue() + self._request_threads[binding_key] = threading.Thread( + target=self.run_request_thread, + args=(binding_key, meth_name, relation), + ) + self._request_threads[binding_key].start() + + while not self._closing: + try: + self._connection = self.connect() + assert self._connection is not None + self._connection.ioloop.start() + except KeyboardInterrupt: + LOGGER.info("Connection closed by keyboard interruption, reopening") + if self._connection is not None: + self._connection.ioloop.stop() + except TerminateSignal as ex: + LOGGER.info("Termination requested: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + except BaseException as ex: + LOGGER.warning("Unexpected exception, terminating: %s", ex) + self.stop() + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + + for binding_key in self._binding_keys: + self._request_queues[binding_key].put(TERMINATE) + for binding_key in self._binding_keys: + self._request_threads[binding_key].join() + self._command_thread.join() + LOGGER.info("Stopped") + + def run_command_thread(self) -> None: + while True: + try: + command = self.command.get() + if command == ServerCommand.TERMINATE: + self.request_termination() + break + except queue.Empty: + pass + except BaseException as ex: + self.request_termination(str(ex)) + break + + def request_termination(self, reason: str = "Normal shutdown") -> None: + assert self._connection is not None + + def termination_callback(): + raise TerminateSignal(reason) + + self._connection.ioloop.add_callback_threadsafe(termination_callback) + + def run_request_thread( + self, binding_key: str, meth_name: str, relation: Optional[RelationType] + ) -> None: + with get_provenance_storage(**self._storage_config) as storage: + request_queue = self._request_queues[binding_key] + merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name) + while True: + terminate = False + elements = [] + while True: + try: + # TODO: consider reducing this timeout or removing it + elem = request_queue.get(timeout=0.1) + if elem is TERMINATE: + terminate = True + break + elements.append(elem) + except queue.Empty: + break + + if len(elements) >= self._batch_size: + break + + if terminate: + break + + if not elements: + continue + + try: + items, props = zip(*elements) + acks_count: TCounter[Tuple[str, str]] = Counter(props) + data = merge_items(items) + + args = (relation, data) if relation is not None else (data,) + if getattr(storage, meth_name)(*args): + for (correlation_id, reply_to), count in acks_count.items(): + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # response can be sent for the current elements. This + # situation should be handled properly. + assert self._connection is not None + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQWorker.respond, + channel=self._channel, + correlation_id=correlation_id, + reply_to=reply_to, + response=count, + ) + ) + else: + LOGGER.warning( + "Unable to process elements for queue %s", binding_key + ) + for elem in elements: + request_queue.put(elem) + except BaseException as ex: + self.request_termination(str(ex)) + break + + def stop(self) -> None: + assert self._connection is not None + if not self._closing: + self._closing = True + LOGGER.info("Stopping") + if any(self._consuming): + self.stop_consuming() + self._connection.ioloop.start() + else: + self._connection.ioloop.stop() + LOGGER.info("Stopped") + + @staticmethod + def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]: + if meth_name in ["content_add", "directory_add"]: + return resolve_dates + elif meth_name == "location_add": + return lambda data: set(data) # just remove duplicates + elif meth_name == "origin_add": + return lambda data: dict(data) # last processed value is good enough + elif meth_name == "revision_add": + return resolve_revision + elif meth_name == "relation_add": + return resolve_relation + else: + LOGGER.warning( + "Unexpected conflict resolution function request for method %s", + meth_name, + ) + return lambda x: x + + @staticmethod + def respond( + channel: pika.channel.Channel, + correlation_id: str, + reply_to: str, + response: Any, + ): + channel.basic_publish( + exchange="", + routing_key=reply_to, + properties=pika.BasicProperties( + content_type="application/msgpack", + correlation_id=correlation_id, + ), + body=encode_data( + response, + extra_encoders=ProvenanceStorageRabbitMQServer.extra_type_encoders, + ), + ) + + +class ProvenanceStorageRabbitMQServer: + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + queue_count = 16 + + def __init__( + self, + url: str, + storage_config: Dict[str, Any], + batch_size: int = 100, + prefetch_count: int = 100, + ) -> None: + """Setup the server object, passing in the URL we will use to connect to + RabbitMQ, and the connection information for the underlying local storage + object. + + :param str url: The URL for connecting to RabbitMQ + :param dict storage_config: Configuration parameters for the underlying + ``ProvenanceStorage`` object expected by + ``swh.provenance.get_provenance_storage`` + :param int batch_size: Max amount of elements call to the underlying storage + :param int prefetch_count: Prefetch value for the RabbitMQ connection when + receiving messaged + + """ + self._workers: List[ProvenanceStorageRabbitMQWorker] = [] + for exchange in ProvenanceStorageRabbitMQServer.get_exchanges(): + for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange): + worker = ProvenanceStorageRabbitMQWorker( + url=url, + exchange=exchange, + range=range, + storage_config=storage_config, + batch_size=batch_size, + prefetch_count=prefetch_count, + ) + self._workers.append(worker) + self._running = False + + def start(self) -> None: + if not self._running: + self._running = True + for worker in self._workers: + worker.start() + for worker in self._workers: + try: + signal = worker.signal.get(timeout=60) + assert signal == ServerCommand.CONSUMING + except queue.Empty: + LOGGER.error( + "Could not initialize worker %s. Leaving...", worker.name + ) + self.stop() + return + LOGGER.info("Start serving") + + def stop(self) -> None: + if self._running: + for worker in self._workers: + worker.command.put(ServerCommand.TERMINATE) + for worker in self._workers: + worker.join() + LOGGER.info("Stop serving") + self._running = False + + @staticmethod + def get_binding_keys(exchange: str, range: int) -> Generator[str, None, None]: + for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names( + exchange + ): + if relation is None: + yield f"{meth_name}.unknown.{range:x}".lower() + else: + yield f"{meth_name}.{relation.value}.{range:x}".lower() + + @staticmethod + def get_exchange(meth_name: str, relation: Optional[RelationType] = None) -> str: + if meth_name == "relation_add": + assert relation is not None + split = relation.value + else: + split = meth_name + exchange, *_ = split.split("_") + return exchange + + @staticmethod + def get_exchanges() -> Generator[str, None, None]: + yield from [entity.value for entity in EntityType] + ["location"] + + @staticmethod + def get_meth_name( + binding_key: str, + ) -> Tuple[str, Optional[RelationType]]: + meth_name, relation, *_ = binding_key.split(".") + return meth_name, (RelationType(relation) if relation != "unknown" else None) + + @staticmethod + def get_meth_names( + exchange: str, + ) -> Generator[Tuple[str, Optional[RelationType]], None, None]: + if exchange == EntityType.CONTENT.value: + yield from [ + ("content_add", None), + ("relation_add", RelationType.CNT_EARLY_IN_REV), + ("relation_add", RelationType.CNT_IN_DIR), + ] + elif exchange == EntityType.DIRECTORY.value: + yield from [ + ("directory_add", None), + ("relation_add", RelationType.DIR_IN_REV), + ] + elif exchange == EntityType.ORIGIN.value: + yield from [("origin_add", None)] + elif exchange == EntityType.REVISION.value: + yield from [ + ("revision_add", None), + ("relation_add", RelationType.REV_BEFORE_REV), + ("relation_add", RelationType.REV_IN_ORG), + ] + elif exchange == "location": + yield "location_add", None + + @staticmethod + def get_ranges(unused_exchange: str) -> Generator[int, None, None]: + # XXX: we might want to have a different range per exchange + yield from range(ProvenanceStorageRabbitMQServer.queue_count) + + @staticmethod + def get_routing_key( + data: Tuple[bytes, ...], meth_name: str, relation: Optional[RelationType] = None + ) -> str: + hashid = path_id(data[0]) if meth_name.startswith("location") else data[0] + idx = int(hashid[0]) % ProvenanceStorageRabbitMQServer.queue_count + if relation is None: + return f"{meth_name}.unknown.{idx:x}".lower() + else: + return f"{meth_name}.{relation.value}.{idx:x}".lower() + + @staticmethod + def is_write_method(meth_name: str) -> bool: + return "_add" in meth_name def load_and_check_config( @@ -39,9 +669,13 @@ if pcfg is None: raise KeyError("Missing 'provenance' configuration") - scfg: Optional[Dict[str, Any]] = pcfg.get("storage") + rcfg: Optional[Dict[str, Any]] = pcfg.get("rabbitmq") + if rcfg is None: + raise KeyError("Missing 'provenance.rabbitmq' configuration") + + scfg: Optional[Dict[str, Any]] = rcfg.get("storage_config") if scfg is None: - raise KeyError("Missing 'provenance.storage' configuration") + raise KeyError("Missing 'provenance.rabbitmq.storage_config' configuration") if type == "local": cls = scfg.get("cls") @@ -56,3 +690,9 @@ raise KeyError("Invalid configuration; missing 'db' config entry") return cfg + + +def make_server_from_configfile() -> ProvenanceStorageRabbitMQServer: + config_path = os.environ.get("SWH_CONFIG_FILENAME") + server_cfg = load_and_check_config(config_path) + return ProvenanceStorageRabbitMQServer(**server_cfg["provenance"]["rabbitmq"]) diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -35,25 +35,40 @@ # Direct access Archive object "cls": "direct", "db": { - "host": "db.internal.softwareheritage.org", + "host": "belvedere.internal.softwareheritage.org", + "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage - "cls": "postgresql", - "db": { - "host": "localhost", - "user": "postgres", - "password": "postgres", - "dbname": "provenance", - }, + # "cls": "postgresql", + # "db": { + # "host": "localhost", + # "user": "postgres", + # "password": "postgres", + # "dbname": "provenance", + # }, # Local MongoDB Storage # "cls": "mongodb", # "db": { # "dbname": "provenance", # }, + # Remote RabbitMQ/PostgreSQL Storage + "cls": "rabbitmq", + "url": "amqp://localhost:5672/%2f", + "storage_config": { + "cls": "postgresql", + "db": { + "host": "localhost", + "user": "postgres", + "password": "postgres", + "dbname": "provenance", + }, + }, + "batch_size": 100, + "prefetch_count": 100, }, } } diff --git a/swh/provenance/util.py b/swh/provenance/util.py --- a/swh/provenance/util.py +++ b/swh/provenance/util.py @@ -3,8 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import hashlib import os +def path_id(path: bytes) -> bytes: + return hashlib.sha1(path).digest() + + def path_normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path