Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348376
D1980.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
D1980.diff
View Options
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,6 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-kafka-python >= 1.3
+confluent-kafka
msgpack
vcversioner
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -3,11 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import defaultdict
import logging
+import time
-from kafka import KafkaConsumer
+from confluent_kafka import Consumer, KafkaException
-from .serializers import kafka_to_key, kafka_to_value
+from .serializers import kafka_to_value
from swh.journal import DEFAULT_PREFIX
logger = logging.getLogger(__name__)
@@ -52,7 +54,8 @@
"""
def __init__(
self, brokers, group_id, prefix=None, object_types=None,
- max_messages=0, auto_offset_reset='earliest', **kwargs):
+ max_messages=0, process_timeout=0, auto_offset_reset='earliest',
+ **kwargs):
if prefix is None:
prefix = DEFAULT_PREFIX
if object_types is None:
@@ -68,21 +71,35 @@
'Option \'object_types\' only accepts %s.' %
ACCEPTED_OFFSET_RESET)
- self.consumer = KafkaConsumer(
- bootstrap_servers=brokers,
- key_deserializer=kafka_to_key,
- value_deserializer=kafka_to_value,
- auto_offset_reset=auto_offset_reset,
- enable_auto_commit=False,
- group_id=group_id,
- **kwargs)
+ self.value_deserializer = kafka_to_value
- self.consumer.subscribe(
- topics=['%s.%s' % (prefix, object_type)
- for object_type in object_types],
- )
+ if isinstance(brokers, str):
+ brokers = [brokers]
+
+ consumer_settings = {
+ **kwargs,
+ 'bootstrap.servers': ','.join(brokers),
+ 'auto.offset.reset': auto_offset_reset,
+ 'group.id': group_id,
+ 'enable.auto.commit': False,
+ }
+
+ logger.debug('Consumer settings: %s', consumer_settings)
+
+ self.consumer = Consumer(consumer_settings, logger=logger)
+
+ topics = ['%s.%s' % (prefix, object_type)
+ for object_type in object_types]
+
+ logger.debug('Upstream topics: %s',
+ self.consumer.list_topics(timeout=1))
+ logger.debug('Subscribing to: %s', topics)
+
+ self.consumer.subscribe(topics=topics)
self.max_messages = max_messages
+ self.process_timeout = process_timeout
+
self._object_types = object_types
def process(self, worker_fn):
@@ -94,16 +111,47 @@
the messages as
argument.
"""
+ start_time = time.monotonic()
nb_messages = 0
- polled = self.consumer.poll()
- for (partition, messages) in polled.items():
- object_type = partition.topic.split('.')[-1]
+
+ objects = defaultdict(list)
+
+ while True:
+ # timeout for message poll
+ timeout = 1.0
+
+ elapsed = time.monotonic() - start_time
+ if self.process_timeout:
+ if elapsed >= self.process_timeout:
+ break
+
+ timeout = self.process_timeout - elapsed
+
+ message = self.consumer.poll(timeout=timeout)
+ if not message:
+ continue
+
+ error = message.error()
+ if error is not None:
+ if error.fatal():
+ raise KafkaException(error)
+ logger.info('Received non-fatal kafka error: %s', error)
+ continue
+
+ nb_messages += 1
+
+ object_type = message.topic().split('.')[-1]
# Got a message from a topic we did not subscribe to.
assert object_type in self._object_types, object_type
- worker_fn({object_type: [msg.value for msg in messages]})
+ objects[object_type].append(
+ self.value_deserializer(message.value())
+ )
+
+ if nb_messages >= self.max_messages:
+ break
- nb_messages += len(messages)
+ worker_fn(dict(objects))
self.consumer.commit()
return nb_messages
diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/direct_writer.py
@@ -5,7 +5,7 @@
import logging
-from kafka import KafkaProducer
+from confluent_kafka import Producer
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import BaseModel
@@ -22,15 +22,23 @@
def __init__(self, brokers, prefix, client_id):
self._prefix = prefix
- self.producer = KafkaProducer(
- bootstrap_servers=brokers,
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id=client_id,
- )
+ self.producer = Producer({
+ 'bootstrap.servers': brokers,
+ 'client.id': client_id,
+ 'enable.idempotence': 'true',
+ })
def send(self, topic, key, value):
- self.producer.send(topic=topic, key=key, value=value)
+ self.producer.produce(
+ topic=topic,
+ key=key_to_kafka(key),
+ value=value_to_kafka(value),
+ )
+
+ def flush(self):
+ self.producer.flush()
+ # Need to service the callbacks regularly by calling poll
+ self.producer.poll(0)
def _get_key(self, object_type, object_):
if object_type in ('revision', 'release', 'directory', 'snapshot'):
@@ -62,7 +70,8 @@
assert 'id' not in object_
return object_
- def write_addition(self, object_type, object_):
+ def write_addition(self, object_type, object_, flush=True):
+ """Write a single object to the journal"""
if isinstance(object_, BaseModel):
object_ = object_.to_dict()
topic = '%s.%s' % (self._prefix, object_type)
@@ -71,8 +80,15 @@
logger.debug('topic: %s, key: %s, value: %s', topic, key, object_)
self.send(topic, key=key, value=object_)
+ if flush:
+ self.flush()
+
write_update = write_addition
- def write_additions(self, object_type, objects):
+ def write_additions(self, object_type, objects, flush=True):
+ """Write a set of objects to the journal"""
for object_ in objects:
- self.write_addition(object_type, object_)
+ self.write_addition(object_type, object_, flush=False)
+
+ if flush:
+ self.flush()
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -9,18 +9,19 @@
import random
import string
-from kafka import KafkaConsumer
+from confluent_kafka import Consumer
from subprocess import Popen
from typing import Tuple, Dict
from pathlib import Path
from pytest_kafka import (
- make_zookeeper_process, make_kafka_server, constants
+ make_zookeeper_process, make_kafka_server
)
from swh.model.hashutil import hash_to_bytes
-from swh.journal.serializers import kafka_to_key, kafka_to_value
+
+logger = logging.getLogger(__name__)
CONTENTS = [
@@ -176,8 +177,8 @@
os.path.dirname(__file__)
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session')
-logger = logging.getLogger('kafka')
-logger.setLevel(logging.WARN)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.WARN)
@pytest.fixture(scope='function')
@@ -202,36 +203,33 @@
_, port = kafka_server
return {
**TEST_CONFIG,
- 'brokers': ['localhost:{}'.format(port)],
+ 'brokers': ['127.0.0.1:{}'.format(port)],
'prefix': kafka_prefix + '.swh.journal.objects',
}
@pytest.fixture
def consumer(
- kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer:
+ kafka_server: Tuple[Popen, int],
+ test_config: Dict,
+ kafka_prefix: str,
+) -> Consumer:
"""Get a connected Kafka consumer.
"""
+ _, kafka_port = kafka_server
+ consumer = Consumer({
+ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port),
+ 'auto.offset.reset': 'earliest',
+ 'enable.auto.commit': True,
+ 'group.id': "test-consumer-%s" % kafka_prefix,
+ })
+
kafka_topics = [
'%s.%s' % (test_config['prefix'], object_type)
- for object_type in test_config['object_types']]
- _, kafka_port = kafka_server
- consumer = KafkaConsumer(
- *kafka_topics,
- bootstrap_servers='localhost:{}'.format(kafka_port),
- consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS,
- key_deserializer=kafka_to_key,
- value_deserializer=kafka_to_value,
- auto_offset_reset='earliest',
- enable_auto_commit=True,
- group_id="test-consumer"
- )
-
- # Enforce auto_offset_reset=earliest even if the consumer was created
- # too soon wrt the server.
- while len(consumer.assignment()) == 0:
- consumer.poll(timeout_ms=20)
- consumer.seek_to_beginning()
+ for object_type in test_config['object_types']
+ ]
+
+ consumer.subscribe(kafka_topics)
return consumer
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import functools
+import logging
import re
import tempfile
from subprocess import Popen
@@ -11,7 +12,7 @@
from unittest.mock import patch
from click.testing import CliRunner
-from kafka import KafkaProducer
+from confluent_kafka import Producer
import pytest
from swh.objstorage.backends.in_memory import InMemoryObjStorage
@@ -21,6 +22,9 @@
from swh.journal.serializers import key_to_kafka, value_to_kafka
+logger = logging.getLogger(__name__)
+
+
CLI_CONFIG = '''
storage:
cls: memory
@@ -52,7 +56,7 @@
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
args = ['-C' + config_fd.name] + args
- result = runner.invoke(cli, args)
+ result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG})
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
@@ -66,12 +70,11 @@
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
- producer = KafkaProducer(
- bootstrap_servers='localhost:{}'.format(port),
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id='test-producer',
- )
+ producer = Producer({
+ 'bootstrap.servers': 'localhost:{}'.format(port),
+ 'client.id': 'test-producer',
+ 'enable.idempotence': 'true',
+ })
snapshot = {'id': b'foo', 'branches': {
b'HEAD': {
@@ -79,12 +82,18 @@
'target': b'\x01'*20,
}
}}
- producer.send(
- topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot)
+ producer.produce(
+ topic=kafka_prefix+'.snapshot',
+ key=key_to_kafka(snapshot['id']),
+ value=value_to_kafka(snapshot),
+ )
+ producer.flush()
+
+ logger.debug('Flushed producer')
result = invoke(False, [
'replay',
- '--broker', 'localhost:%d' % port,
+ '--broker', '127.0.0.1:%d' % port,
'--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '1',
@@ -117,22 +126,25 @@
def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
- producer = KafkaProducer(
- bootstrap_servers='localhost:{}'.format(kafka_port),
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id='test-producer',
- )
+ producer = Producer({
+ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port),
+ 'client.id': 'test-producer',
+ 'enable.idempotence': 'true',
+ })
contents = {}
for i in range(10):
content = b'\x00'*19 + bytes([i])
sha1 = objstorages['src'].add(content)
contents[sha1] = content
- producer.send(topic=kafka_prefix+'.content', key=sha1, value={
- 'sha1': sha1,
- 'status': 'visible',
- })
+ producer.produce(
+ topic=kafka_prefix+'.content',
+ key=key_to_kafka(sha1),
+ value=key_to_kafka({
+ 'sha1': sha1,
+ 'status': 'visible',
+ }),
+ )
producer.flush()
@@ -153,7 +165,7 @@
result = invoke(False, [
'content-replay',
- '--broker', 'localhost:%d' % kafka_port,
+ '--broker', '127.0.0.1:%d' % kafka_port,
'--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '10',
@@ -187,7 +199,7 @@
result = invoke(False, [
'content-replay',
- '--broker', 'localhost:%d' % kafka_port,
+ '--broker', '127.0.0.1:%d' % kafka_port,
'--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '10',
diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py
--- a/swh/journal/tests/test_direct_writer.py
+++ b/swh/journal/tests/test_direct_writer.py
@@ -5,26 +5,48 @@
from collections import defaultdict
import datetime
-import time
-from kafka import KafkaConsumer
+from confluent_kafka import Consumer, KafkaException
from subprocess import Popen
from typing import Tuple
from swh.storage import get_storage
from swh.journal.direct_writer import DirectKafkaWriter
-from swh.journal.serializers import value_to_kafka, kafka_to_value
+from swh.journal.serializers import (
+ kafka_to_key, kafka_to_value
+)
from .conftest import OBJECT_TYPE_KEYS
-def assert_written(consumer, kafka_prefix):
- time.sleep(0.1) # Without this, some messages are missing
-
+def assert_written(consumer, kafka_prefix, expected_messages):
consumed_objects = defaultdict(list)
- for msg in consumer:
- consumed_objects[msg.topic].append((msg.key, msg.value))
+
+ fetched_messages = 0
+ retries_left = 1000
+
+ while fetched_messages < expected_messages:
+ if retries_left == 0:
+ raise ValueError('Timed out fetching messages from kafka')
+
+ msg = consumer.poll(timeout=0.01)
+
+ if not msg:
+ retries_left -= 1
+ continue
+
+ error = msg.error()
+ if error is not None:
+ if error.fatal():
+ raise KafkaException(error)
+ retries_left -= 1
+ continue
+
+ fetched_messages += 1
+ consumed_objects[msg.topic()].append(
+ (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
+ )
for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items():
topic = kafka_prefix + '.' + object_type
@@ -42,13 +64,13 @@
del value['ctime']
for object_ in objects:
- assert kafka_to_value(value_to_kafka(object_)) in values
+ assert object_ in values
def test_direct_writer(
kafka_prefix: str,
kafka_server: Tuple[Popen, int],
- consumer: KafkaConsumer):
+ consumer: Consumer):
kafka_prefix += '.swh.journal.objects'
config = {
@@ -59,6 +81,8 @@
writer = DirectKafkaWriter(**config)
+ expected_messages = 0
+
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
for (num, object_) in enumerate(objects):
if object_type == 'origin_visit':
@@ -66,14 +90,15 @@
if object_type == 'content':
object_ = {**object_, 'ctime': datetime.datetime.now()}
writer.write_addition(object_type, object_)
+ expected_messages += 1
- assert_written(consumer, kafka_prefix)
+ assert_written(consumer, kafka_prefix, expected_messages)
def test_storage_direct_writer(
kafka_prefix: str,
kafka_server: Tuple[Popen, int],
- consumer: KafkaConsumer):
+ consumer: Consumer):
kafka_prefix += '.swh.journal.objects'
config = {
@@ -85,6 +110,8 @@
storage = get_storage('memory', {'journal_writer': {
'cls': 'kafka', 'args': config}})
+ expected_messages = 0
+
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
method = getattr(storage, object_type + '_add')
if object_type in ('content', 'directory', 'revision', 'release',
@@ -92,15 +119,18 @@
if object_type == 'content':
objects = [{**obj, 'data': b''} for obj in objects]
method(objects)
+ expected_messages += len(objects)
elif object_type in ('origin_visit',):
for object_ in objects:
object_ = object_.copy()
origin_id = storage.origin_add_one(object_.pop('origin'))
del object_['type']
visit = method(origin=origin_id, date=object_.pop('date'))
+ expected_messages += 1
visit_id = visit['visit']
storage.origin_visit_update(origin_id, visit_id, **object_)
+ expected_messages += 1
else:
assert False, object_type
- assert_written(consumer, kafka_prefix)
+ assert_written(consumer, kafka_prefix, expected_messages)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -10,7 +10,7 @@
from typing import Tuple
import dateutil
-from kafka import KafkaProducer
+from confluent_kafka import Producer
from hypothesis import strategies, given, settings
from swh.storage import get_storage
@@ -32,12 +32,11 @@
storage = get_storage('memory', {})
- producer = KafkaProducer(
- bootstrap_servers='localhost:{}'.format(port),
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id='test producer',
- )
+ producer = Producer({
+ 'bootstrap.servers': 'localhost:{}'.format(port),
+ 'client.id': 'test producer',
+ 'enable.idempotence': 'true',
+ })
now = datetime.datetime.now(tz=datetime.timezone.utc)
@@ -54,9 +53,14 @@
elif object_type == 'origin_visit':
nb_visits += 1
object_['visit'] = nb_visits
- producer.send(topic, key=key, value=object_)
+ producer.produce(
+ topic=topic, key=key_to_kafka(key),
+ value=value_to_kafka(object_),
+ )
nb_sent += 1
+ producer.flush()
+
# Fill the storage from Kafka
config = {
'brokers': 'localhost:%d' % kafka_server[1],
@@ -133,9 +137,7 @@
for visit in visits:
writer.send('origin_visit', 'foo', visit)
- queue_size = sum(len(partition)
- for batch in queue
- for partition in batch.values())
+ queue_size = len(queue)
storage = get_storage('memory', {})
worker_fn = functools.partial(process_replay_objects, storage=storage)
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -39,9 +39,7 @@
except HashCollision:
pass
- queue_size = sum(len(partition)
- for batch in queue
- for partition in batch.values())
+ queue_size = len(queue)
storage2 = Storage()
worker_fn = functools.partial(process_replay_objects, storage=storage2)
@@ -74,9 +72,7 @@
if obj_type == 'content':
storage1.content_add([obj])
- queue_size = sum(len(partition)
- for batch in queue
- for partition in batch.values())
+ queue_size = len(queue)
storage2 = Storage()
worker_fn = functools.partial(process_replay_objects_content,
diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py
--- a/swh/journal/tests/utils.py
+++ b/swh/journal/tests/utils.py
@@ -1,12 +1,26 @@
-from collections import namedtuple
-
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
from swh.journal.direct_writer import DirectKafkaWriter
-from swh.journal.serializers import (
- key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
+from swh.journal.serializers import (kafka_to_value, key_to_kafka,
+ value_to_kafka)
+
+
+class FakeKafkaMessage:
+ def __init__(self, topic, key, value):
+ self._topic = topic
+ self._key = key_to_kafka(key)
+ self._value = value_to_kafka(value)
+
+ def topic(self):
+ return self._topic
-FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value')
-FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
+ def value(self):
+ return self._value
+
+ def key(self):
+ return self._key
+
+ def error(self):
+ return None
class MockedKafkaWriter(DirectKafkaWriter):
@@ -15,15 +29,11 @@
self.queue = queue
def send(self, topic, key, value):
- key = kafka_to_key(key_to_kafka(key))
- value = kafka_to_value(value_to_kafka(value))
- partition = FakeKafkaPartition(topic)
- msg = FakeKafkaMessage(key=key, value=value)
- if self.queue and {partition} == set(self.queue[-1]):
- # The last message is of the same object type, groupping them
- self.queue[-1][partition].append(msg)
- else:
- self.queue.append({partition: [msg]})
+ msg = FakeKafkaMessage(topic=topic, key=key, value=value)
+ self.queue.append(msg)
+
+ def flush(self):
+ pass
class MockedKafkaConsumer:
@@ -31,7 +41,7 @@
self.queue = queue
self.committed = False
- def poll(self):
+ def poll(self, timeout=None):
return self.queue.pop(0)
def commit(self):
@@ -43,3 +53,6 @@
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
self._object_types = object_types
self.consumer = MockedKafkaConsumer(queue)
+ self.process_timeout = 0
+ self.max_messages = 0
+ self.value_deserializer = kafka_to_value
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 6:27 PM (5 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222257
Attached To
D1980: Migrate to confluent-kafka instead of kafka-python
Event Timeline
Log In to Comment