Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 264 Lines • ▼ Show 20 Lines | def handle_messages(self, messages, worker_fn): | ||||
for message in messages: | for message in messages: | ||||
error = message.error() | error = message.error() | ||||
if error is not None: | if error is not None: | ||||
if error.code() == KafkaError._PARTITION_EOF: | if error.code() == KafkaError._PARTITION_EOF: | ||||
self.eof_reached.add((message.topic(), message.partition())) | self.eof_reached.add((message.topic(), message.partition())) | ||||
else: | else: | ||||
_error_cb(error) | _error_cb(error) | ||||
continue | continue | ||||
if message.value() is None: | |||||
anlambert: can be / cannot be ? | |||||
# ignore message with no payload, these can be generated in tests | |||||
continue | |||||
nb_processed += 1 | nb_processed += 1 | ||||
object_type = message.topic().split(".")[-1] | object_type = message.topic().split(".")[-1] | ||||
objects[object_type].append(self.deserialize_message(message)) | objects[object_type].append(self.deserialize_message(message)) | ||||
if objects: | if objects: | ||||
worker_fn(dict(objects)) | worker_fn(dict(objects)) | ||||
self.consumer.commit() | self.consumer.commit() | ||||
Show All 12 Lines |
can be / cannot be ?