Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | def process(self, worker_fn): | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
""" | """ | ||||
start_time = time.monotonic() | start_time = time.monotonic() | ||||
nb_messages = 0 | nb_messages = 0 | ||||
objects = defaultdict(list) | |||||
while True: | while True: | ||||
# timeout for message poll | # timeout for message poll | ||||
timeout = 1.0 | timeout = 1.0 | ||||
elapsed = time.monotonic() - start_time | elapsed = time.monotonic() - start_time | ||||
if self.process_timeout: | if self.process_timeout: | ||||
if elapsed + 0.01 >= self.process_timeout: | if elapsed + 0.01 >= self.process_timeout: | ||||
break | break | ||||
timeout = self.process_timeout - elapsed | timeout = self.process_timeout - elapsed | ||||
num_messages = 20 | num_messages = 20 | ||||
if self.max_messages: | if self.max_messages: | ||||
if nb_messages >= self.max_messages: | if nb_messages >= self.max_messages: | ||||
break | break | ||||
num_messages = min(num_messages, self.max_messages-nb_messages) | num_messages = min(num_messages, self.max_messages-nb_messages) | ||||
messages = self.consumer.consume( | messages = self.consumer.consume( | ||||
timeout=timeout, num_messages=num_messages) | timeout=timeout, num_messages=num_messages) | ||||
if not messages: | if not messages: | ||||
continue | continue | ||||
nb_processed = self.handle_messages(messages, worker_fn) | |||||
nb_messages += nb_processed | |||||
douardda: no need for the nb_processed var here | |||||
return nb_messages | |||||
def handle_messages(self, messages, worker_fn): | |||||
objects = defaultdict(list) | |||||
nb_processed = 0 | |||||
for message in messages: | for message in messages: | ||||
error = message.error() | error = message.error() | ||||
if error is not None: | if error is not None: | ||||
_error_cb(error) | _error_cb(error) | ||||
continue | continue | ||||
nb_messages += 1 | nb_processed += 1 | ||||
object_type = message.topic().split('.')[-1] | object_type = message.topic().split('.')[-1] | ||||
# Got a message from a topic we did not subscribe to. | # Got a message from a topic we did not subscribe to. | ||||
assert object_type in self._object_types, object_type | assert object_type in self._object_types, object_type | ||||
objects[object_type].append( | objects[object_type].append(self.deserialize_message(message)) | ||||
self.value_deserializer(message.value()) | |||||
) | |||||
if objects: | if objects: | ||||
worker_fn(dict(objects)) | worker_fn(dict(objects)) | ||||
objects.clear() | |||||
self.consumer.commit() | self.consumer.commit() | ||||
return nb_messages | |||||
return nb_processed | |||||
def deserialize_message(self, message): | |||||
return self.value_deserializer(message.value()) | |||||
def close(self): | def close(self): | ||||
self.consumer.close() | self.consumer.close() |
no need for the nb_processed var here