Page MenuHomeSoftware Heritage

D2199.diff
No OneTemporary

D2199.diff

diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -30,6 +30,17 @@
]
+def _error_cb(error):
+ if error.fatal():
+ raise KafkaException(error)
+ logger.info('Received non-fatal kafka error: %s', error)
+
+
+def _on_commit(error, partitions):
+ if error is not None:
+ _error_cb(error)
+
+
class JournalClient:
"""A base client for the Software Heritage journal.
@@ -81,8 +92,8 @@
'bootstrap.servers': ','.join(brokers),
'auto.offset.reset': auto_offset_reset,
'group.id': group_id,
- 'on_commit': self._on_commit,
- 'error_cb': self._error_cb,
+ 'on_commit': _on_commit,
+ 'error_cb': _error_cb,
'enable.auto.commit': False,
'logger': logger,
}
@@ -105,15 +116,6 @@
self._object_types = object_types
- def _error_cb(self, error):
- if error.fatal():
- raise KafkaException(error)
- logger.info('Received non-fatal kafka error: %s', error)
-
- def _on_commit(self, error, partitions):
- if error is not None:
- self._error_cb(error)
-
def process(self, worker_fn):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 4:32 PM (2 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218247

Event Timeline