diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 02bfd0e..9e8b242 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,153 +1,176 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -from typing import Any, Dict, List +from typing import Any, Dict, List, Type from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject -from swh.journal.writer.kafka import OBJECT_TYPES + +from swh.model.model import ( + BaseModel, + Content, + Directory, + Origin, + OriginVisit, + Release, + Revision, + SkippedContent, + Snapshot, +) + + +OBJECT_TYPES: Dict[Type[BaseModel], str] = { + Content: "content", + Directory: "directory", + Origin: "origin", + OriginVisit: "origin_visit", + Release: "release", + Revision: "revision", + SkippedContent: "skipped_content", + Snapshot: "snapshot", +} CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": [], }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"baz": "qux"}, "type": "git", }, ] TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, "directory": [], "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "release": RELEASES, "revision": REVISIONS, "snapshot": [], "skipped_content": [], } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "origin_visit": obj_d = {**obj_d, "visit": num} elif object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 2a26e53..ac12f66 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,234 +1,211 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time from typing import Dict, Iterable, List, NamedTuple, Optional, Type from confluent_kafka import Producer, KafkaException -from swh.model.model import ( - BaseModel, - Content, - Directory, - Origin, - OriginVisit, - Release, - Revision, - SkippedContent, - Snapshot, -) - from swh.journal.serializers import ( KeyType, ModelObject, object_key, pprint_key, key_to_kafka, value_to_kafka, ) logger = logging.getLogger(__name__) -OBJECT_TYPES: Dict[Type[BaseModel], str] = { - Content: "content", - Directory: "directory", - Origin: "origin", - OriginVisit: "origin_visit", - Release: "release", - Revision: "revision", - SkippedContent: "skipped_content", - Snapshot: "snapshot", -} - class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself. Args: brokers: list of broker addresses and ports prefix: the prefix used to build the topic names for objects client_id: the id of the writer sent to kafka producer_config: extra configuration keys passed to the `Producer` flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. producer_class: override for the kafka producer class """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, ): self._prefix = prefix if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) self.producer.produce( topic=topic, key=kafka_key, value=value_to_kafka(value), ) self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) def flush(self): start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") def _sanitize_object( self, object_type: str, object_: ModelObject ) -> Dict[str, str]: dict_ = object_.to_dict() if object_type == "origin_visit": # :( dict_["date"] = str(dict_["date"]) if object_type == "content": dict_.pop("data", None) return dict_ def _write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" topic = f"{self._prefix}.{object_type}" key = object_key(object_type, object_) dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) def write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush()