Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show All 12 Lines | |||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Only accepted offset reset policy accepted | # Only accepted offset reset policy accepted | ||||
ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] | ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] | ||||
# Only accepted object types | # Default object types, which are the node types in the SWH model | ||||
ACCEPTED_OBJECT_TYPES = [ | DEFAULT_OBJECT_TYPES = [ | ||||
'content', | 'content', | ||||
douardda: This default / accepted is unclear to me. At least before actually reading the code using… | |||||
'directory', | 'directory', | ||||
'revision', | 'revision', | ||||
'release', | 'release', | ||||
'snapshot', | 'snapshot', | ||||
'origin', | 'origin', | ||||
'origin_visit' | 'origin_visit' | ||||
] | ] | ||||
# Only accepted object types | |||||
ACCEPTED_OBJECT_TYPES = DEFAULT_OBJECT_TYPES + [ | |||||
'origin_intrinsic_metadata', | |||||
] | |||||
class JournalClient: | class JournalClient: | ||||
"""A base client for the Software Heritage journal. | """A base client for the Software Heritage journal. | ||||
The current implementation of the journal uses Apache Kafka | The current implementation of the journal uses Apache Kafka | ||||
brokers to publish messages under a given topic prefix, with each | brokers to publish messages under a given topic prefix, with each | ||||
object type using a specific topic under that prefix. If the 'prefix' | object type using a specific topic under that prefix. If the 'prefix' | ||||
argument is None (default value), it will take the default value | argument is None (default value), it will take the default value | ||||
'swh.journal.objects'. | 'swh.journal.objects'. | ||||
Clients subscribe to events specific to each object type as listed in the | Clients subscribe to events specific to each object type as listed in the | ||||
`object_types` argument (if unset, defaults to all accepted object types). | `object_types` argument (if unset, defaults to `DEFAULT_OBJECT_TYPES`, | ||||
all object types in the SWH model). | |||||
Clients can be sharded by setting the `group_id` to a common | Clients can be sharded by setting the `group_id` to a common | ||||
value across instances. The journal will share the message | value across instances. The journal will share the message | ||||
throughput across the nodes sharing the same group_id. | throughput across the nodes sharing the same group_id. | ||||
Messages are processed by the `worker_fn` callback passed to the | Messages are processed by the `worker_fn` callback passed to the | ||||
`process` method, in batches of maximum `max_messages`. | `process` method, in batches of maximum `max_messages`. | ||||
Any other named argument is passed directly to KafkaConsumer(). | Any other named argument is passed directly to KafkaConsumer(). | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, brokers, group_id, prefix=None, object_types=None, | self, brokers, group_id, prefix=None, object_types=None, | ||||
max_messages=0, process_timeout=0, auto_offset_reset='earliest', | max_messages=0, process_timeout=0, auto_offset_reset='earliest', | ||||
**kwargs): | **kwargs): | ||||
if prefix is None: | if prefix is None: | ||||
prefix = DEFAULT_PREFIX | prefix = DEFAULT_PREFIX | ||||
if object_types is None: | if object_types is None: | ||||
object_types = ACCEPTED_OBJECT_TYPES | object_types = DEFAULT_OBJECT_TYPES | ||||
if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | ||||
raise ValueError( | raise ValueError( | ||||
'Option \'auto_offset_reset\' only accept %s, not %s' % | 'Option \'auto_offset_reset\' only accept %s, not %s' % | ||||
(ACCEPTED_OFFSET_RESET, auto_offset_reset)) | (ACCEPTED_OFFSET_RESET, auto_offset_reset)) | ||||
for object_type in object_types: | for object_type in object_types: | ||||
if object_type not in ACCEPTED_OBJECT_TYPES: | if object_type not in ACCEPTED_OBJECT_TYPES: | ||||
raise ValueError( | raise ValueError( | ||||
▲ Show 20 Lines • Show All 87 Lines • Show Last 20 Lines |
This default / accepted is unclear to me. At least before actually reading the code using theses.
The comment is unclear, and should explain what 'default' stands for.