Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/storage/rabbitmq/server.py
- This file was moved from swh/provenance/api/server.py.
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
from datetime import datetime | from datetime import datetime | ||||
from enum import Enum | from enum import Enum | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import multiprocessing | import multiprocessing | ||||
import os | import os | ||||
import queue | import queue | ||||
import threading | import threading | ||||
from typing import Any, Callable | from typing import Any, Callable | ||||
from typing import Counter as TCounter | from typing import Counter as TCounter | ||||
from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple, Union, cast | from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union, cast | ||||
import pika | import pika | ||||
import pika.channel | import pika.channel | ||||
import pika.connection | import pika.connection | ||||
import pika.exceptions | import pika.exceptions | ||||
from pika.exchange_type import ExchangeType | from pika.exchange_type import ExchangeType | ||||
import pika.frame | import pika.frame | ||||
import pika.spec | import pika.spec | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.api.serializers import encode_data_client as encode_data | from swh.core.api.serializers import encode_data_client as encode_data | ||||
from swh.core.api.serializers import msgpack_loads as decode_data | from swh.core.api.serializers import msgpack_loads as decode_data | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from swh.provenance.storage.interface import ( | |||||
from ..interface import ( | |||||
DirectoryData, | DirectoryData, | ||||
EntityType, | EntityType, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
from ..util import path_id | from swh.provenance.util import path_id | ||||
from .serializers import DECODERS, ENCODERS | from .serializers import DECODERS, ENCODERS | ||||
LOG_FORMAT = ( | LOG_FORMAT = ( | ||||
"%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | ||||
"-35s %(lineno) -5d: %(message)s" | "-35s %(lineno) -5d: %(message)s" | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 486 Lines • ▼ Show 20 Lines | ) -> None: | ||||
:param str url: The URL for connecting to RabbitMQ | :param str url: The URL for connecting to RabbitMQ | ||||
:param dict storage_config: Configuration parameters for the underlying | :param dict storage_config: Configuration parameters for the underlying | ||||
``ProvenanceStorage`` object expected by | ``ProvenanceStorage`` object expected by | ||||
``swh.provenance.get_provenance_storage`` | ``swh.provenance.get_provenance_storage`` | ||||
:param int batch_size: Max amount of elements call to the underlying storage | :param int batch_size: Max amount of elements call to the underlying storage | ||||
:param int prefetch_count: Prefetch value for the RabbitMQ connection when | :param int prefetch_count: Prefetch value for the RabbitMQ connection when | ||||
receiving messaged | receiving messaged | ||||
""" | """ | ||||
self._workers: List[ProvenanceStorageRabbitMQWorker] = [] | self._workers: List[ProvenanceStorageRabbitMQWorker] = [] | ||||
for exchange in ProvenanceStorageRabbitMQServer.get_exchanges(): | for exchange in ProvenanceStorageRabbitMQServer.get_exchanges(): | ||||
for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange): | for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange): | ||||
worker = ProvenanceStorageRabbitMQWorker( | worker = ProvenanceStorageRabbitMQWorker( | ||||
url=url, | url=url, | ||||
exchange=exchange, | exchange=exchange, | ||||
range=range, | range=range, | ||||
Show All 26 Lines | def stop(self) -> None: | ||||
for worker in self._workers: | for worker in self._workers: | ||||
worker.command.put(ServerCommand.TERMINATE) | worker.command.put(ServerCommand.TERMINATE) | ||||
for worker in self._workers: | for worker in self._workers: | ||||
worker.join() | worker.join() | ||||
LOGGER.info("Stop serving") | LOGGER.info("Stop serving") | ||||
self._running = False | self._running = False | ||||
@staticmethod | @staticmethod | ||||
def get_binding_keys(exchange: str, range: int) -> Generator[str, None, None]: | def get_binding_keys(exchange: str, range: int) -> Iterator[str]: | ||||
for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names( | for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names( | ||||
exchange | exchange | ||||
): | ): | ||||
if relation is None: | if relation is None: | ||||
assert ( | assert ( | ||||
meth_name != "relation_add" | meth_name != "relation_add" | ||||
), "'relation_add' requires 'relation' to be provided" | ), "'relation_add' requires 'relation' to be provided" | ||||
yield f"{meth_name}.unknown.{range:x}".lower() | yield f"{meth_name}.unknown.{range:x}".lower() | ||||
Show All 12 Lines | def get_exchange(meth_name: str, relation: Optional[RelationType] = None) -> str: | ||||
split = relation.value | split = relation.value | ||||
else: | else: | ||||
assert relation is None, f"'{meth_name}' requires 'relation' to be None" | assert relation is None, f"'{meth_name}' requires 'relation' to be None" | ||||
split = meth_name | split = meth_name | ||||
exchange, *_ = split.split("_") | exchange, *_ = split.split("_") | ||||
return exchange | return exchange | ||||
@staticmethod | @staticmethod | ||||
def get_exchanges() -> Generator[str, None, None]: | def get_exchanges() -> Iterator[str]: | ||||
yield from [entity.value for entity in EntityType] + ["location"] | yield from [entity.value for entity in EntityType] + ["location"] | ||||
@staticmethod | @staticmethod | ||||
def get_meth_name( | def get_meth_name( | ||||
binding_key: str, | binding_key: str, | ||||
) -> Tuple[str, Optional[RelationType]]: | ) -> Tuple[str, Optional[RelationType]]: | ||||
meth_name, relation, *_ = binding_key.split(".") | meth_name, relation, *_ = binding_key.split(".") | ||||
return meth_name, (RelationType(relation) if relation != "unknown" else None) | return meth_name, (RelationType(relation) if relation != "unknown" else None) | ||||
@staticmethod | @staticmethod | ||||
def get_meth_names( | def get_meth_names( | ||||
exchange: str, | exchange: str, | ||||
) -> Generator[Tuple[str, Optional[RelationType]], None, None]: | ) -> Iterator[Tuple[str, Optional[RelationType]]]: | ||||
if exchange == EntityType.CONTENT.value: | if exchange == EntityType.CONTENT.value: | ||||
yield from [ | yield from [ | ||||
("content_add", None), | ("content_add", None), | ||||
("relation_add", RelationType.CNT_EARLY_IN_REV), | ("relation_add", RelationType.CNT_EARLY_IN_REV), | ||||
("relation_add", RelationType.CNT_IN_DIR), | ("relation_add", RelationType.CNT_IN_DIR), | ||||
] | ] | ||||
elif exchange == EntityType.DIRECTORY.value: | elif exchange == EntityType.DIRECTORY.value: | ||||
yield from [ | yield from [ | ||||
("directory_add", None), | ("directory_add", None), | ||||
("relation_add", RelationType.DIR_IN_REV), | ("relation_add", RelationType.DIR_IN_REV), | ||||
] | ] | ||||
elif exchange == EntityType.ORIGIN.value: | elif exchange == EntityType.ORIGIN.value: | ||||
yield from [("origin_add", None)] | yield from [("origin_add", None)] | ||||
elif exchange == EntityType.REVISION.value: | elif exchange == EntityType.REVISION.value: | ||||
yield from [ | yield from [ | ||||
("revision_add", None), | ("revision_add", None), | ||||
("relation_add", RelationType.REV_BEFORE_REV), | ("relation_add", RelationType.REV_BEFORE_REV), | ||||
("relation_add", RelationType.REV_IN_ORG), | ("relation_add", RelationType.REV_IN_ORG), | ||||
] | ] | ||||
elif exchange == "location": | elif exchange == "location": | ||||
yield "location_add", None | yield "location_add", None | ||||
@staticmethod | @staticmethod | ||||
def get_ranges(unused_exchange: str) -> Generator[int, None, None]: | def get_ranges(unused_exchange: str) -> Iterator[int]: | ||||
# XXX: we might want to have a different range per exchange | # XXX: we might want to have a different range per exchange | ||||
yield from range(ProvenanceStorageRabbitMQServer.queue_count) | yield from range(ProvenanceStorageRabbitMQServer.queue_count) | ||||
@staticmethod | @staticmethod | ||||
def get_routing_key( | def get_routing_key( | ||||
item: bytes, meth_name: str, relation: Optional[RelationType] = None | item: bytes, meth_name: str, relation: Optional[RelationType] = None | ||||
) -> str: | ) -> str: | ||||
hashid = ( | hashid = ( | ||||
▲ Show 20 Lines • Show All 78 Lines • Show Last 20 Lines |