Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017-2022 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||
from collections import defaultdict | from collections import defaultdict | ||||||||
from importlib import import_module | from importlib import import_module | ||||||||
from itertools import cycle | |||||||||
import logging | import logging | ||||||||
import os | import os | ||||||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | ||||||||
from confluent_kafka import Consumer, KafkaError, KafkaException | from confluent_kafka import Consumer, KafkaError, KafkaException | ||||||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||||||
▲ Show 20 Lines • Show All 273 Lines • ▼ Show 20 Lines | def process(self, worker_fn): | ||||||||
if total_objects_processed >= self.stop_after_objects: | if total_objects_processed >= self.stop_after_objects: | ||||||||
break | break | ||||||||
# clamp batch size to avoid overrunning stop_after_objects | # clamp batch size to avoid overrunning stop_after_objects | ||||||||
batch_size = min( | batch_size = min( | ||||||||
self.stop_after_objects - total_objects_processed, | self.stop_after_objects - total_objects_processed, | ||||||||
batch_size, | batch_size, | ||||||||
) | ) | ||||||||
set_status("waiting") | set_status("waiting") | ||||||||
while True: | for i in cycle(reversed(range(10))): | ||||||||
ardumont: not sure whether you need to wrap the range with a list call or not though
```
In [2]: list… | |||||||||
Done Inline ActionsMy first version did use the `range()` function, but I find it more explicit the way I wrote (I especially do not like to have to use -1 as stop limit) douardda: My first version did use the ``range()`` function, but I find it more explicit the way I wrote… | |||||||||
Not Done Inline Actionsyeah, it's more readable this way vlorentz: yeah, it's more readable this way | |||||||||
messages = self.consumer.consume( | messages = self.consumer.consume( | ||||||||
timeout=timeout, num_messages=batch_size | timeout=timeout, num_messages=batch_size | ||||||||
) | ) | ||||||||
if messages: | if messages: | ||||||||
break | break | ||||||||
# do check for an EOF condition iff we already consumed | |||||||||
# messages, otherwise we could detect an EOF condition | |||||||||
Done Inline Actions
ardumont: | |||||||||
# before messages had a chance to reach us (e.g. in tests) | |||||||||
if total_objects_processed > 0 and self.stop_on_eof and i == 0: | |||||||||
at_eof = all( | |||||||||
(tp.topic, tp.partition) in self.eof_reached | |||||||||
for tp in self.consumer.assignment() | |||||||||
) | |||||||||
if at_eof: | |||||||||
break | |||||||||
if messages: | |||||||||
set_status("processing") | set_status("processing") | ||||||||
batch_processed, at_eof = self.handle_messages(messages, worker_fn) | batch_processed, at_eof = self.handle_messages(messages, worker_fn) | ||||||||
set_status("idle") | set_status("idle") | ||||||||
# report the number of handled messages | # report the number of handled messages | ||||||||
statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) | statsd.increment( | ||||||||
JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed | |||||||||
) | |||||||||
total_objects_processed += batch_processed | total_objects_processed += batch_processed | ||||||||
if at_eof: | if at_eof: | ||||||||
break | break | ||||||||
return total_objects_processed | return total_objects_processed | ||||||||
def handle_messages(self, messages, worker_fn): | def handle_messages(self, messages, worker_fn): | ||||||||
objects: Dict[str, List[Any]] = defaultdict(list) | objects: Dict[str, List[Any]] = defaultdict(list) | ||||||||
nb_processed = 0 | nb_processed = 0 | ||||||||
Show All 36 Lines |
not sure whether you need to wrap the range with a list call or not though