Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
import os | import os | ||||
import time | |||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | ||||
from confluent_kafka import Consumer, KafkaError, KafkaException | from confluent_kafka import Consumer, KafkaError, KafkaException | ||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
from .serializers import kafka_to_value | from .serializers import kafka_to_value | ||||
▲ Show 20 Lines • Show All 197 Lines • ▼ Show 20 Lines | ): | ||||
f"Topic(s) for object types {','.join(unknown_types)} " | f"Topic(s) for object types {','.join(unknown_types)} " | ||||
"are unknown on the kafka broker" | "are unknown on the kafka broker" | ||||
) | ) | ||||
logger.debug(f"Upstream topics: {existing_topics}") | logger.debug(f"Upstream topics: {existing_topics}") | ||||
self.subscribe() | self.subscribe() | ||||
self.stop_after_objects = stop_after_objects | self.stop_after_objects = stop_after_objects | ||||
self.process_timeout = process_timeout | |||||
self.eof_reached: Set[Tuple[str, str]] = set() | self.eof_reached: Set[Tuple[str, str]] = set() | ||||
self.batch_size = batch_size | self.batch_size = batch_size | ||||
if process_timeout is not None: | |||||
raise DeprecationWarning( | |||||
"'process_timeout' argument is not supported anymore by " | |||||
"JournalClient; please remove it from your configuration.", | |||||
vlorentz: this should be an exception IMO | |||||
Done Inline Actionsyou are probably right, will do douardda: you are probably right, will do | |||||
) | |||||
def subscribe(self): | def subscribe(self): | ||||
"""Subscribe to topics listed in self.subscription | """Subscribe to topics listed in self.subscription | ||||
This can be overridden if you need, for instance, to manually assign partitions. | This can be overridden if you need, for instance, to manually assign partitions. | ||||
""" | """ | ||||
logger.debug(f"Subscribing to: {self.subscription}") | logger.debug(f"Subscribing to: {self.subscription}") | ||||
self.consumer.subscribe(topics=self.subscription) | self.consumer.subscribe(topics=self.subscription) | ||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
"""Polls Kafka for a batch of messages, and calls the worker_fn | """Polls Kafka for a batch of messages, and calls the worker_fn | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
""" | """ | ||||
start_time = time.monotonic() | |||||
total_objects_processed = 0 | total_objects_processed = 0 | ||||
while True: | |||||
# timeout for message poll | # timeout for message poll | ||||
timeout = 1.0 | timeout = 1.0 | ||||
elapsed = time.monotonic() - start_time | while True: | ||||
if self.process_timeout: | |||||
# +0.01 to prevent busy-waiting on / spamming consumer.poll. | |||||
# consumer.consume() returns shortly before X expired | |||||
# (a matter of milliseconds), so after it returns a first | |||||
# time, it would then be called with a timeout in the order | |||||
# of milliseconds, therefore returning immediately, then be | |||||
# called again, etc. | |||||
if elapsed + 0.01 >= self.process_timeout: | |||||
break | |||||
timeout = self.process_timeout - elapsed | |||||
batch_size = self.batch_size | batch_size = self.batch_size | ||||
if self.stop_after_objects: | if self.stop_after_objects: | ||||
if total_objects_processed >= self.stop_after_objects: | if total_objects_processed >= self.stop_after_objects: | ||||
break | break | ||||
# clamp batch size to avoid overrunning stop_after_objects | # clamp batch size to avoid overrunning stop_after_objects | ||||
batch_size = min( | batch_size = min( | ||||
self.stop_after_objects - total_objects_processed, batch_size, | self.stop_after_objects - total_objects_processed, batch_size, | ||||
▲ Show 20 Lines • Show All 52 Lines • Show Last 20 Lines |
this should be an exception IMO