Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show All 26 Lines | |||||
] | ] | ||||
class JournalClient: | class JournalClient: | ||||
"""A base client for the Software Heritage journal. | """A base client for the Software Heritage journal. | ||||
The current implementation of the journal uses Apache Kafka | The current implementation of the journal uses Apache Kafka | ||||
brokers to publish messages under a given topic prefix, with each | brokers to publish messages under a given topic prefix, with each | ||||
object type using a specific topic under that prefix. | object type using a specific topic under that prefix. If the 'prefix' | ||||
argument is None (default value), it will take the default value | |||||
'swh.journal.objects'. | |||||
Clients subscribe to events specific to each object type by using | Clients subscribe to events specific to each object type by using | ||||
the `object_types` configuration variable. | the `object_types` configuration variable. | ||||
Clients can be sharded by setting the `client_id` to a common | Clients can be sharded by setting the `group_id` to a common | ||||
value across instances. The journal will share the message | value across instances. The journal will share the message | ||||
throughput across the nodes sharing the same client_id. | throughput across the nodes sharing the same group_id. | ||||
Messages are processed by the `process_objects` method in batches | Messages are processed by the `process_objects` method in batches | ||||
of maximum `max_messages`. | of maximum `max_messages`. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, brokers, group_id, prefix=DEFAULT_PREFIX, | self, brokers, group_id, prefix=None, | ||||
object_types=ACCEPTED_OBJECT_TYPES, | object_types=ACCEPTED_OBJECT_TYPES, | ||||
max_messages=0, auto_offset_reset='earliest'): | max_messages=0, auto_offset_reset='earliest'): | ||||
if prefix is None: | |||||
prefix = DEFAULT_PREFIX | |||||
if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | ||||
raise ValueError( | raise ValueError( | ||||
'Option \'auto_offset_reset\' only accept %s.' % | 'Option \'auto_offset_reset\' only accept %s.' % | ||||
ACCEPTED_OFFSET_RESET) | ACCEPTED_OFFSET_RESET) | ||||
for object_type in object_types: | for object_type in object_types: | ||||
if object_type not in ACCEPTED_OBJECT_TYPES: | if object_type not in ACCEPTED_OBJECT_TYPES: | ||||
raise ValueError( | raise ValueError( | ||||
▲ Show 20 Lines • Show All 42 Lines • Show Last 20 Lines |