diff --git a/swh/journal/client.py b/swh/journal/client.py index b0b363d..9c8a97e 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,277 +1,282 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug('Received non-fatal kafka error: %s', error) else: logger.info('Received non-fatal kafka error: %s', error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` - method, in batches of maximum 20 messages (currently hardcoded). If set, - the processing stops after processing `stop_after_objects` messages in - total. + method, in batches of maximum `batch_size` messages (defaults to 200). + + If set, the processing stops after processing `stop_after_objects` messages + in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, stop_after_objects: Optional[int] = None, + batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool = False, **kwargs ): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s, not %s' % (ACCEPTED_OFFSET_RESET, auto_offset_reset)) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s, not %s.' % (ACCEPTED_OBJECT_TYPES, object_type)) + if batch_size <= 0: + raise ValueError("Option 'batch_size' needs to be positive") + self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' # Static group instance id management group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') if group_instance_id: kwargs['group.instance.id'] = group_instance_id if 'group.instance.id' in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if 'session.timeout.ms' not in kwargs: kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes if 'session.timeout.ms' in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if 'max.poll.interval.ms' not in kwargs: kwargs['max.poll.interval.ms'] = ( kwargs['session.timeout.ms'] // 2 * 3 ) consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, 'on_commit': _on_commit, 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings['enable.partition.eof'] = True logger.debug('Consumer settings: %s', consumer_settings) self.consumer = Consumer(consumer_settings) topics = ['%s.%s' % (prefix, object_type) for object_type in object_types] logger.debug('Upstream topics: %s', self.consumer.list_topics(timeout=10)) logger.debug('Subscribing to: %s', topics) self.consumer.subscribe(topics=topics) self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() + self.batch_size = batch_size self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed - batch_size = 20 - + batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( self.stop_after_objects-total_objects_processed, batch_size, ) messages = self.consumer.consume( timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add( (message.topic(), message.partition()) ) else: _error_cb(error) continue nb_processed += 1 object_type = message.topic().split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = (self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() )) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index d47a941..954d36d 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,85 +1,138 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from subprocess import Popen -from typing import Tuple +from typing import Dict, List, Tuple from unittest.mock import MagicMock from confluent_kafka import Producer +import pytest from swh.model.hypothesis_strategies import revisions +from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka def test_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) def test_client_eof( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + + +@pytest.mark.parametrize("batch_size", [1, 5, 100]) +def test_client_batch_size( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + batch_size: int, +): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + num_objects = 2 * batch_size + 1 + assert num_objects < 256, "Too many objects, generation will fail" + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + contents = [Content.from_data(bytes([i])) for i in range(num_objects)] + + # Fill Kafka + for content in contents: + producer.produce( + topic=kafka_prefix + '.content', + key=key_to_kafka(content.sha1), + value=value_to_kafka(content.to_dict()), + ) + + producer.flush() + + client = JournalClient( + brokers=['localhost:%d' % kafka_server[1]], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=num_objects, + batch_size=batch_size, + ) + + collected_output: List[Dict] = [] + + def worker_fn(objects): + received = objects['content'] + assert len(received) <= batch_size + collected_output.extend(received) + + client.process(worker_fn) + + assert collected_output == [content.to_dict() for content in contents] diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 9e34487..0b8880e 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,75 +1,76 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import (kafka_to_value, key_to_kafka, value_to_kafka) class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False + self.batch_size = 200