diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html -kafka-python >= 1.3 +confluent-kafka msgpack vcversioner diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -3,11 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict import logging +import time -from kafka import KafkaConsumer +from confluent_kafka import Consumer, KafkaException -from .serializers import kafka_to_key, kafka_to_value +from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) @@ -52,7 +54,8 @@ """ def __init__( self, brokers, group_id, prefix=None, object_types=None, - max_messages=0, auto_offset_reset='earliest', **kwargs): + max_messages=0, process_timeout=0, auto_offset_reset='earliest', + **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: @@ -68,21 +71,35 @@ 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) - self.consumer = KafkaConsumer( - bootstrap_servers=brokers, - key_deserializer=kafka_to_key, - value_deserializer=kafka_to_value, - auto_offset_reset=auto_offset_reset, - enable_auto_commit=False, - group_id=group_id, - **kwargs) + self.value_deserializer = kafka_to_value - self.consumer.subscribe( - topics=['%s.%s' % (prefix, object_type) - for object_type in object_types], - ) + if isinstance(brokers, str): + brokers = [brokers] + + consumer_settings = { + **kwargs, + 'bootstrap.servers': ','.join(brokers), + 'auto.offset.reset': auto_offset_reset, + 'group.id': group_id, + 'enable.auto.commit': False, + } + + logger.debug('Consumer settings: %s', consumer_settings) + + self.consumer = Consumer(consumer_settings, logger=logger) + + topics = ['%s.%s' % (prefix, object_type) + for object_type in object_types] + + logger.debug('Upstream topics: %s', + self.consumer.list_topics(timeout=1)) + logger.debug('Subscribing to: %s', topics) + + self.consumer.subscribe(topics=topics) self.max_messages = max_messages + self.process_timeout = process_timeout + self._object_types = object_types def process(self, worker_fn): @@ -94,16 +111,47 @@ the messages as argument. """ + start_time = time.monotonic() nb_messages = 0 - polled = self.consumer.poll() - for (partition, messages) in polled.items(): - object_type = partition.topic.split('.')[-1] + + objects = defaultdict(list) + + while True: + # timeout for message poll + timeout = 1.0 + + elapsed = time.monotonic() - start_time + if self.process_timeout: + if elapsed >= self.process_timeout: + break + + timeout = self.process_timeout - elapsed + + message = self.consumer.poll(timeout=timeout) + if not message: + continue + + error = message.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + continue + + nb_messages += 1 + + object_type = message.topic().split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type - worker_fn({object_type: [msg.value for msg in messages]}) + objects[object_type].append( + self.value_deserializer(message.value()) + ) + + if nb_messages >= self.max_messages: + break - nb_messages += len(messages) + worker_fn(dict(objects)) self.consumer.commit() return nb_messages diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py --- a/swh/journal/direct_writer.py +++ b/swh/journal/direct_writer.py @@ -5,7 +5,7 @@ import logging -from kafka import KafkaProducer +from confluent_kafka import Producer from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel @@ -22,15 +22,23 @@ def __init__(self, brokers, prefix, client_id): self._prefix = prefix - self.producer = KafkaProducer( - bootstrap_servers=brokers, - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id=client_id, - ) + self.producer = Producer({ + 'bootstrap.servers': brokers, + 'client.id': client_id, + 'enable.idempotence': 'true', + }) def send(self, topic, key, value): - self.producer.send(topic=topic, key=key, value=value) + self.producer.produce( + topic=topic, + key=key_to_kafka(key), + value=value_to_kafka(value), + ) + + def flush(self): + self.producer.flush() + # Need to service the callbacks regularly by calling poll + self.producer.poll(0) def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): @@ -62,7 +70,8 @@ assert 'id' not in object_ return object_ - def write_addition(self, object_type, object_): + def write_addition(self, object_type, object_, flush=True): + """Write a single object to the journal""" if isinstance(object_, BaseModel): object_ = object_.to_dict() topic = '%s.%s' % (self._prefix, object_type) @@ -71,8 +80,15 @@ logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) self.send(topic, key=key, value=object_) + if flush: + self.flush() + write_update = write_addition - def write_additions(self, object_type, objects): + def write_additions(self, object_type, objects, flush=True): + """Write a set of objects to the journal""" for object_ in objects: - self.write_addition(object_type, object_) + self.write_addition(object_type, object_, flush=False) + + if flush: + self.flush() diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -9,18 +9,19 @@ import random import string -from kafka import KafkaConsumer +from confluent_kafka import Consumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( - make_zookeeper_process, make_kafka_server, constants + make_zookeeper_process, make_kafka_server ) from swh.model.hashutil import hash_to_bytes -from swh.journal.serializers import kafka_to_key, kafka_to_value + +logger = logging.getLogger(__name__) CONTENTS = [ @@ -176,8 +177,8 @@ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') -logger = logging.getLogger('kafka') -logger.setLevel(logging.WARN) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') @@ -202,36 +203,33 @@ _, port = kafka_server return { **TEST_CONFIG, - 'brokers': ['localhost:{}'.format(port)], + 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( - kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: + kafka_server: Tuple[Popen, int], + test_config: Dict, + kafka_prefix: str, +) -> Consumer: """Get a connected Kafka consumer. """ + _, kafka_port = kafka_server + consumer = Consumer({ + 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': True, + 'group.id': "test-consumer-%s" % kafka_prefix, + }) + kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) - for object_type in test_config['object_types']] - _, kafka_port = kafka_server - consumer = KafkaConsumer( - *kafka_topics, - bootstrap_servers='localhost:{}'.format(kafka_port), - consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, - key_deserializer=kafka_to_key, - value_deserializer=kafka_to_value, - auto_offset_reset='earliest', - enable_auto_commit=True, - group_id="test-consumer" - ) - - # Enforce auto_offset_reset=earliest even if the consumer was created - # too soon wrt the server. - while len(consumer.assignment()) == 0: - consumer.poll(timeout_ms=20) - consumer.seek_to_beginning() + for object_type in test_config['object_types'] + ] + + consumer.subscribe(kafka_topics) return consumer diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import functools +import logging import re import tempfile from subprocess import Popen @@ -11,7 +12,7 @@ from unittest.mock import patch from click.testing import CliRunner -from kafka import KafkaProducer +from confluent_kafka import Producer import pytest from swh.objstorage.backends.in_memory import InMemoryObjStorage @@ -21,6 +22,9 @@ from swh.journal.serializers import key_to_kafka, value_to_kafka +logger = logging.getLogger(__name__) + + CLI_CONFIG = ''' storage: cls: memory @@ -52,7 +56,7 @@ config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args - result = runner.invoke(cli, args) + result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG}) if not catch_exceptions and result.exception: print(result.output) raise result.exception @@ -66,12 +70,11 @@ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(port), - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id='test-producer', - ) + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test-producer', + 'enable.idempotence': 'true', + }) snapshot = {'id': b'foo', 'branches': { b'HEAD': { @@ -79,12 +82,18 @@ 'target': b'\x01'*20, } }} - producer.send( - topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) + producer.produce( + topic=kafka_prefix+'.snapshot', + key=key_to_kafka(snapshot['id']), + value=value_to_kafka(snapshot), + ) + producer.flush() + + logger.debug('Flushed producer') result = invoke(False, [ 'replay', - '--broker', 'localhost:%d' % port, + '--broker', '127.0.0.1:%d' % port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', @@ -117,22 +126,25 @@ def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(kafka_port), - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id='test-producer', - ) + producer = Producer({ + 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), + 'client.id': 'test-producer', + 'enable.idempotence': 'true', + }) contents = {} for i in range(10): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content - producer.send(topic=kafka_prefix+'.content', key=sha1, value={ - 'sha1': sha1, - 'status': 'visible', - }) + producer.produce( + topic=kafka_prefix+'.content', + key=key_to_kafka(sha1), + value=key_to_kafka({ + 'sha1': sha1, + 'status': 'visible', + }), + ) producer.flush() @@ -153,7 +165,7 @@ result = invoke(False, [ 'content-replay', - '--broker', 'localhost:%d' % kafka_port, + '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', @@ -187,7 +199,7 @@ result = invoke(False, [ 'content-replay', - '--broker', 'localhost:%d' % kafka_port, + '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -5,26 +5,48 @@ from collections import defaultdict import datetime -import time -from kafka import KafkaConsumer +from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import Tuple from swh.storage import get_storage from swh.journal.direct_writer import DirectKafkaWriter -from swh.journal.serializers import value_to_kafka, kafka_to_value +from swh.journal.serializers import ( + kafka_to_key, kafka_to_value +) from .conftest import OBJECT_TYPE_KEYS -def assert_written(consumer, kafka_prefix): - time.sleep(0.1) # Without this, some messages are missing - +def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) - for msg in consumer: - consumed_objects[msg.topic].append((msg.key, msg.value)) + + fetched_messages = 0 + retries_left = 1000 + + while fetched_messages < expected_messages: + if retries_left == 0: + raise ValueError('Timed out fetching messages from kafka') + + msg = consumer.poll(timeout=0.01) + + if not msg: + retries_left -= 1 + continue + + error = msg.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + retries_left -= 1 + continue + + fetched_messages += 1 + consumed_objects[msg.topic()].append( + (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type @@ -42,13 +64,13 @@ del value['ctime'] for object_ in objects: - assert kafka_to_value(value_to_kafka(object_)) in values + assert object_ in values def test_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer: KafkaConsumer): + consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { @@ -59,6 +81,8 @@ writer = DirectKafkaWriter(**config) + expected_messages = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': @@ -66,14 +90,15 @@ if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) + expected_messages += 1 - assert_written(consumer, kafka_prefix) + assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer: KafkaConsumer): + consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { @@ -85,6 +110,8 @@ storage = get_storage('memory', {'journal_writer': { 'cls': 'kafka', 'args': config}}) + expected_messages = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', @@ -92,15 +119,18 @@ if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) + expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_id = storage.origin_add_one(object_.pop('origin')) del object_['type'] visit = method(origin=origin_id, date=object_.pop('date')) + expected_messages += 1 visit_id = visit['visit'] storage.origin_visit_update(origin_id, visit_id, **object_) + expected_messages += 1 else: assert False, object_type - assert_written(consumer, kafka_prefix) + assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -10,7 +10,7 @@ from typing import Tuple import dateutil -from kafka import KafkaProducer +from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage @@ -32,12 +32,11 @@ storage = get_storage('memory', {}) - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(port), - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id='test producer', - ) + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) now = datetime.datetime.now(tz=datetime.timezone.utc) @@ -54,9 +53,14 @@ elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits - producer.send(topic, key=key, value=object_) + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(object_), + ) nb_sent += 1 + producer.flush() + # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], @@ -133,9 +137,7 @@ for visit in visits: writer.send('origin_visit', 'foo', visit) - queue_size = sum(len(partition) - for batch in queue - for partition in batch.values()) + queue_size = len(queue) storage = get_storage('memory', {}) worker_fn = functools.partial(process_replay_objects, storage=storage) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -39,9 +39,7 @@ except HashCollision: pass - queue_size = sum(len(partition) - for batch in queue - for partition in batch.values()) + queue_size = len(queue) storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) @@ -74,9 +72,7 @@ if obj_type == 'content': storage1.content_add([obj]) - queue_size = sum(len(partition) - for batch in queue - for partition in batch.values()) + queue_size = len(queue) storage2 = Storage() worker_fn = functools.partial(process_replay_objects_content, diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,12 +1,26 @@ -from collections import namedtuple - from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.direct_writer import DirectKafkaWriter -from swh.journal.serializers import ( - key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) +from swh.journal.serializers import (kafka_to_value, key_to_kafka, + value_to_kafka) + + +class FakeKafkaMessage: + def __init__(self, topic, key, value): + self._topic = topic + self._key = key_to_kafka(key) + self._value = value_to_kafka(value) + + def topic(self): + return self._topic -FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') -FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') + def value(self): + return self._value + + def key(self): + return self._key + + def error(self): + return None class MockedKafkaWriter(DirectKafkaWriter): @@ -15,15 +29,11 @@ self.queue = queue def send(self, topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if self.queue and {partition} == set(self.queue[-1]): - # The last message is of the same object type, groupping them - self.queue[-1][partition].append(msg) - else: - self.queue.append({partition: [msg]}) + msg = FakeKafkaMessage(topic=topic, key=key, value=value) + self.queue.append(msg) + + def flush(self): + pass class MockedKafkaConsumer: @@ -31,7 +41,7 @@ self.queue = queue self.committed = False - def poll(self): + def poll(self, timeout=None): return self.queue.pop(0) def commit(self): @@ -43,3 +53,6 @@ def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) + self.process_timeout = 0 + self.max_messages = 0 + self.value_deserializer = kafka_to_value