diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py index 117fff6..b07b495 100644 --- a/swh/provenance/api/client.py +++ b/swh/provenance/api/client.py @@ -1,470 +1,470 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import functools import inspect import logging import queue import threading import time from types import TracebackType from typing import Any, Dict, Iterable, Optional, Set, Tuple, Type, Union import uuid import pika import pika.channel import pika.connection import pika.frame import pika.spec from swh.core.api.serializers import encode_data_client as encode_data from swh.core.api.serializers import msgpack_loads as decode_data from swh.core.statsd import statsd from .. import get_provenance_storage from ..interface import ProvenanceStorageInterface, RelationData, RelationType from .serializers import DECODERS, ENCODERS from .server import ProvenanceStorageRabbitMQServer LOG_FORMAT = ( "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " "-35s %(lineno) -5d: %(message)s" ) LOGGER = logging.getLogger(__name__) STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds" class ResponseTimeout(Exception): pass class TerminateSignal(Exception): pass def split_ranges( data: Iterable[bytes], meth_name: str, relation: Optional[RelationType] = None ) -> Dict[str, Set[Tuple[Any, ...]]]: ranges: Dict[str, Set[Tuple[Any, ...]]] = {} if relation is not None: assert isinstance(data, dict), "Relation data must be provided in a dictionary" for src, dsts in data.items(): key = ProvenanceStorageRabbitMQServer.get_routing_key( src, meth_name, relation ) for rel in dsts: assert isinstance( rel, RelationData ), "Values in the dictionary must be RelationData structures" ranges.setdefault(key, set()).add((src, rel.dst, rel.path)) else: items: Union[Set[Tuple[bytes, Any]], Set[Tuple[bytes]]] if isinstance(data, dict): items = set(data.items()) else: items = {(item,) for item in data} for id, *rest in items: key = ProvenanceStorageRabbitMQServer.get_routing_key(id, meth_name) ranges.setdefault(key, set()).add((id, *rest)) return ranges class MetaRabbitMQClient(type): def __new__(cls, name, bases, attributes): # For each method wrapped with @remote_api_endpoint in an API backend # (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new # method in RemoteStorage, with the same documentation. # # Note that, despite the usage of decorator magic (eg. functools.wrap), # this never actually calls an IndexerStorage method. backend_class = attributes.get("backend_class", None) for base in bases: if backend_class is not None: break backend_class = getattr(base, "backend_class", None) if backend_class: for meth_name, meth in backend_class.__dict__.items(): if hasattr(meth, "_endpoint_path"): cls.__add_endpoint(meth_name, meth, attributes) return super().__new__(cls, name, bases, attributes) @staticmethod def __add_endpoint(meth_name, meth, attributes): wrapped_meth = inspect.unwrap(meth) @functools.wraps(meth) # Copy signature and doc def meth_(*args, **kwargs): with statsd.timed( metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} ): # Match arguments and parameters data = inspect.getcallargs(wrapped_meth, *args, **kwargs) # Remove arguments that should not be passed self = data.pop("self") # Call storage method with remaining arguments return getattr(self._storage, meth_name)(**data) @functools.wraps(meth) # Copy signature and doc def write_meth_(*args, **kwargs): with statsd.timed( metric=STORAGE_DURATION_METRIC, tags={"method": meth_name} ): # Match arguments and parameters post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs) try: # Remove arguments that should not be passed self = post_data.pop("self") relation = post_data.pop("relation", None) assert len(post_data) == 1 data = next(iter(post_data.values())) ranges = split_ranges(data, meth_name, relation) acks_expected = sum(len(items) for items in ranges.values()) self._correlation_id = str(uuid.uuid4()) exchange = ProvenanceStorageRabbitMQServer.get_exchange( meth_name, relation ) for routing_key, items in ranges.items(): items_list = list(items) batches = ( items_list[idx : idx + self._batch_size] for idx in range(0, len(items_list), self._batch_size) ) for batch in batches: # FIXME: this is running in a different thread! Hence, if # self._connection drops, there is no guarantee that the # request can be sent for the current elements. This # situation should be handled properly. self._connection.ioloop.add_callback_threadsafe( functools.partial( ProvenanceStorageRabbitMQClient.request, channel=self._channel, reply_to=self._callback_queue, exchange=exchange, routing_key=routing_key, correlation_id=self._correlation_id, data=batch, ) ) return self.wait_for_acks(acks_expected) except BaseException as ex: self.request_termination(str(ex)) return False if meth_name not in attributes: attributes[meth_name] = ( write_meth_ if ProvenanceStorageRabbitMQServer.is_write_method(meth_name) else meth_ ) class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient): backend_class = ProvenanceStorageInterface extra_type_decoders = DECODERS extra_type_encoders = ENCODERS def __init__( self, url: str, storage_config: Dict[str, Any], batch_size: int = 100, prefetch_count: int = 100, wait_min: float = 60, wait_per_batch: float = 10, ) -> None: """Setup the client object, passing in the URL we will use to connect to RabbitMQ, and the connection information for the local storage object used for read-only operations. :param str url: The URL for connecting to RabbitMQ :param dict storage_config: Configuration parameters for the underlying ``ProvenanceStorage`` object expected by ``swh.provenance.get_provenance_storage`` :param int batch_size: Max amount of elements per package (after range splitting) for writing operations :param int prefetch_count: Prefetch value for the RabbitMQ connection when receiving ack packages :param float wait_min: Min waiting time for response on a writing operation, in seconds :param float wait_per_batch: Waiting time for response per batch of items on a writing operation, in seconds """ super().__init__() self._connection = None self._callback_queue: Optional[str] = None self._channel = None self._closing = False self._consumer_tag = None self._consuming = False self._correlation_id = str(uuid.uuid4()) self._prefetch_count = prefetch_count self._batch_size = batch_size self._response_queue: queue.Queue = queue.Queue() self._storage = get_provenance_storage(**storage_config) self._url = url self._wait_min = wait_min self._wait_per_batch = wait_per_batch def __enter__(self) -> ProvenanceStorageInterface: self.open() assert isinstance(self, ProvenanceStorageInterface) return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) def open(self) -> None: self.start() while self._callback_queue is None: time.sleep(0.1) self._storage.open() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: assert self._connection is not None self._connection.ioloop.add_callback_threadsafe(self.request_termination) self.join() self._storage.close() def request_termination(self, reason: str = "Normal shutdown") -> None: assert self._connection is not None def termination_callback(): raise TerminateSignal(reason) self._connection.ioloop.add_callback_threadsafe(termination_callback) def connect(self) -> pika.SelectConnection: LOGGER.info("Connecting to %s", self._url) return pika.SelectConnection( parameters=pika.URLParameters(self._url), on_open_callback=self.on_connection_open, on_open_error_callback=self.on_connection_open_error, on_close_callback=self.on_connection_closed, ) def close_connection(self) -> None: assert self._connection is not None self._consuming = False if self._connection.is_closing or self._connection.is_closed: LOGGER.info("Connection is closing or already closed") else: LOGGER.info("Closing connection") self._connection.close() def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: LOGGER.info("Connection opened") self.open_channel() def on_connection_open_error( self, _unused_connection: pika.SelectConnection, err: Exception ) -> None: LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) assert self._connection is not None self._connection.ioloop.call_later(5, self._connection.ioloop.stop) def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): assert self._connection is not None self._channel = None if self._closing: self._connection.ioloop.stop() else: LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) self._connection.ioloop.call_later(5, self._connection.ioloop.stop) def open_channel(self) -> None: LOGGER.info("Creating a new channel") assert self._connection is not None self._connection.channel(on_open_callback=self.on_channel_open) def on_channel_open(self, channel: pika.channel.Channel) -> None: LOGGER.info("Channel opened") self._channel = channel LOGGER.info("Adding channel close callback") assert self._channel is not None self._channel.add_on_close_callback(callback=self.on_channel_closed) self.setup_queue() def on_channel_closed( self, channel: pika.channel.Channel, reason: Exception ) -> None: LOGGER.warning("Channel %i was closed: %s", channel, reason) self.close_connection() def setup_queue(self) -> None: LOGGER.info("Declaring callback queue") assert self._channel is not None self._channel.queue_declare( queue="", exclusive=True, callback=self.on_queue_declare_ok ) def on_queue_declare_ok(self, frame: pika.frame.Method) -> None: LOGGER.info("Binding queue to default exchanger") assert self._channel is not None self._callback_queue = frame.method.queue self._channel.basic_qos( prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok ) def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: LOGGER.info("QOS set to: %d", self._prefetch_count) self.start_consuming() def start_consuming(self) -> None: LOGGER.info("Issuing consumer related RPC commands") LOGGER.info("Adding consumer cancellation callback") assert self._channel is not None self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) assert self._callback_queue is not None self._consumer_tag = self._channel.basic_consume( queue=self._callback_queue, on_message_callback=self.on_response ) self._consuming = True def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) if self._channel: self._channel.close() def on_response( self, channel: pika.channel.Channel, deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes, ) -> None: - LOGGER.info( + LOGGER.debug( "Received message # %s from %s: %s", deliver.delivery_tag, properties.app_id, body, ) self._response_queue.put( ( properties.correlation_id, decode_data(body, extra_decoders=self.extra_type_decoders), ) ) - LOGGER.info("Acknowledging message %s", deliver.delivery_tag) + LOGGER.debug("Acknowledging message %s", deliver.delivery_tag) channel.basic_ack(delivery_tag=deliver.delivery_tag) def stop_consuming(self) -> None: if self._channel: LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok) def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None: self._consuming = False LOGGER.info( "RabbitMQ acknowledged the cancellation of the consumer: %s", self._consumer_tag, ) LOGGER.info("Closing the channel") assert self._channel is not None self._channel.close() def run(self) -> None: while not self._closing: try: self._connection = self.connect() assert self._connection is not None self._connection.ioloop.start() except KeyboardInterrupt: LOGGER.info("Connection closed by keyboard interruption, reopening") if self._connection is not None: self._connection.ioloop.stop() except TerminateSignal as ex: LOGGER.info("Termination requested: %s", ex) self.stop() if self._connection is not None and not self._connection.is_closed: # Finish closing self._connection.ioloop.start() except BaseException as ex: LOGGER.warning("Unexpected exception, terminating: %s", ex) self.stop() if self._connection is not None and not self._connection.is_closed: # Finish closing self._connection.ioloop.start() LOGGER.info("Stopped") def stop(self) -> None: assert self._connection is not None if not self._closing: self._closing = True LOGGER.info("Stopping") if self._consuming: self.stop_consuming() self._connection.ioloop.start() else: self._connection.ioloop.stop() LOGGER.info("Stopped") @staticmethod def request( channel: pika.channel.Channel, reply_to: str, exchange: str, routing_key: str, correlation_id: str, **kwargs, ) -> None: channel.basic_publish( exchange=exchange, routing_key=routing_key, properties=pika.BasicProperties( content_type="application/msgpack", correlation_id=correlation_id, reply_to=reply_to, ), body=encode_data( kwargs, extra_encoders=ProvenanceStorageRabbitMQClient.extra_type_encoders, ), ) def wait_for_acks(self, acks_expected: int) -> bool: acks_received = 0 timeout = max( (acks_expected / self._batch_size) * self._wait_per_batch, self._wait_min, ) while acks_received < acks_expected: try: acks_received += self.wait_for_response(timeout=timeout) except ResponseTimeout: LOGGER.warning( "Timed out waiting for acks, %s received, %s expected", acks_received, acks_expected, ) return False return acks_received == acks_expected def wait_for_response(self, timeout: float = 60.0) -> Any: while True: try: correlation_id, response = self._response_queue.get(timeout=timeout) if correlation_id == self._correlation_id: return response except queue.Empty: raise ResponseTimeout diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index e14e94e..5c7414e 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,107 +1,123 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging + +from datetime import datetime + from itertools import islice from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryGraph from .interface import ProvenanceInterface from .model import OriginEntry, RevisionEntry + ORIGIN_DURATION_METRIC = "swh_provenance_origin_revision_layer_duration_seconds" +LOG_FORMAT = ( + "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " + "-35s %(lineno) -5d: %(message)s" +) + +LOGGER = logging.getLogger(__name__) + class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "main"}) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], commit: bool = True, ) -> None: for origin in origins: proceed_origin(provenance, archive, origin) if commit: provenance.flush() + @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "proceed_origin"}) def proceed_origin( - provenance: ProvenanceInterface, - archive: ArchiveInterface, - origin: OriginEntry) -> None: + provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry +) -> None: + LOGGER.info("Processing origin %s", origin.url) + start = datetime.now() provenance.origin_add(origin) origin.retrieve_revisions(archive) for revision in origin.revisions: if not provenance.revision_is_head(revision): graph = HistoryGraph(archive, revision) origin_add_revision(provenance, origin, graph) # head is treated separately check_preferred_origin(provenance, origin, revision) provenance.revision_add_to_origin(origin, revision) + end = datetime.now() + LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_revision"}) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryGraph, ) -> None: visited = {graph.head} # head's history should be recursively iterated starting from its parents stack = list(graph.parents[graph.head]) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current) # create a link between it and the head, and recursively walk its history provenance.revision_add_before_revision(graph.head, current) visited.add(current) for parent in graph.parents[current]: if parent not in visited: stack.append(parent) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "check_preferred_origin"}) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py index f897a4f..3d6309a 100755 --- a/swh/provenance/tools/origins/client.py +++ b/swh/provenance/tools/origins/client.py @@ -1,105 +1,108 @@ #!/usr/bin/env python import logging import logging.handlers import multiprocessing import os import sys import time from typing import Any, Callable, Dict, List, Optional import yaml import zmq from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.origin import OriginEntry, origin_add CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) class Client(multiprocessing.Process): def __init__( self, conf: Dict[str, Any], group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.archive_conf = conf["archive"] self.storage_conf = conf["storage"] self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}" logging.info(f"Client {self.name} created") def run(self): logging.info(f"Client {self.name} started") # XXX: should we reconnect on each iteration to save resources? archive = get_archive(**self.archive_conf) context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) socket.connect(self.url) with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break batch = [] for origin in response: batch.append( OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"])) ) origin_add(provenance, archive, batch) logging.info(f"Client {self.name} stopped") if __name__ == "__main__": + + logging.basicConfig(level=logging.INFO) + # Check parameters if len(sys.argv) != 2: print("usage: client ") exit(-1) processes = int(sys.argv[1]) config_file = None # TODO: add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients: List[Client] = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(conf, name=f"worker{idx}") client.start() clients.append(client) # Wait for all processes to complete their work for client in clients: logging.info(f"MAIN: waiting for process {client.name} to finish") client.join() logging.info(f"MAIN: process {client.name} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds")