Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show All 15 Lines | from swh.model.model import ( | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
) | ) | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import KeyType, key_to_kafka, value_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
OBJECT_TYPES: Dict[Type[BaseModel], str] = { | OBJECT_TYPES: Dict[Type[BaseModel], str] = { | ||||
Content: "content", | Content: "content", | ||||
Directory: "directory", | Directory: "directory", | ||||
Origin: "origin", | Origin: "origin", | ||||
OriginVisit: "origin_visit", | OriginVisit: "origin_visit", | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def _error_cb(self, error): | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
logger.info("Received non-fatal kafka error: %s", error) | logger.info("Received non-fatal kafka error: %s", error) | ||||
def _on_delivery(self, error, message): | def _on_delivery(self, error, message): | ||||
if error is not None: | if error is not None: | ||||
self._error_cb(error) | self._error_cb(error) | ||||
def send(self, topic: str, key, value): | def send(self, topic: str, key: KeyType, value): | ||||
kafka_key = key_to_kafka(key) | |||||
self.producer.produce( | self.producer.produce( | ||||
topic=topic, key=key_to_kafka(key), value=value_to_kafka(value), | topic=topic, key=kafka_key, value=value_to_kafka(value), | ||||
) | ) | ||||
# Need to service the callbacks regularly by calling poll | # Need to service the callbacks regularly by calling poll | ||||
self.producer.poll(0) | self.producer.poll(0) | ||||
def flush(self): | def flush(self): | ||||
self.producer.flush() | self.producer.flush() | ||||
Show All 16 Lines | class KafkaJournalWriter: | ||||
@overload | @overload | ||||
def _get_key(self, object_type: str, object_: Origin) -> Dict[str, bytes]: | def _get_key(self, object_type: str, object_: Origin) -> Dict[str, bytes]: | ||||
... | ... | ||||
@overload | @overload | ||||
def _get_key(self, object_type: str, object_: OriginVisit) -> Dict[str, str]: | def _get_key(self, object_type: str, object_: OriginVisit) -> Dict[str, str]: | ||||
... | ... | ||||
def _get_key(self, object_type, object_): | def _get_key(self, object_type: str, object_) -> KeyType: | ||||
if object_type in ("revision", "release", "directory", "snapshot"): | if object_type in ("revision", "release", "directory", "snapshot"): | ||||
return object_.id | return object_.id | ||||
elif object_type == "content": | elif object_type == "content": | ||||
return object_.sha1 # TODO: use a dict of hashes | return object_.sha1 # TODO: use a dict of hashes | ||||
elif object_type == "skipped_content": | elif object_type == "skipped_content": | ||||
return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} | return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} | ||||
elif object_type == "origin": | elif object_type == "origin": | ||||
return {"url": object_.url} | return {"url": object_.url} | ||||
Show All 38 Lines |