Page MenuHomeSoftware Heritage

D1980.diff
No OneTemporary

D1980.diff

diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,6 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-kafka-python >= 1.3
+confluent-kafka
msgpack
vcversioner
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -3,11 +3,13 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import defaultdict
import logging
+import time
-from kafka import KafkaConsumer
+from confluent_kafka import Consumer, KafkaException
-from .serializers import kafka_to_key, kafka_to_value
+from .serializers import kafka_to_value
from swh.journal import DEFAULT_PREFIX
logger = logging.getLogger(__name__)
@@ -52,7 +54,8 @@
"""
def __init__(
self, brokers, group_id, prefix=None, object_types=None,
- max_messages=0, auto_offset_reset='earliest', **kwargs):
+ max_messages=0, process_timeout=0, auto_offset_reset='earliest',
+ **kwargs):
if prefix is None:
prefix = DEFAULT_PREFIX
if object_types is None:
@@ -68,21 +71,35 @@
'Option \'object_types\' only accepts %s.' %
ACCEPTED_OFFSET_RESET)
- self.consumer = KafkaConsumer(
- bootstrap_servers=brokers,
- key_deserializer=kafka_to_key,
- value_deserializer=kafka_to_value,
- auto_offset_reset=auto_offset_reset,
- enable_auto_commit=False,
- group_id=group_id,
- **kwargs)
+ self.value_deserializer = kafka_to_value
- self.consumer.subscribe(
- topics=['%s.%s' % (prefix, object_type)
- for object_type in object_types],
- )
+ if isinstance(brokers, str):
+ brokers = [brokers]
+
+ consumer_settings = {
+ **kwargs,
+ 'bootstrap.servers': ','.join(brokers),
+ 'auto.offset.reset': auto_offset_reset,
+ 'group.id': group_id,
+ 'enable.auto.commit': False,
+ }
+
+ logger.debug('Consumer settings: %s', consumer_settings)
+
+ self.consumer = Consumer(consumer_settings, logger=logger)
+
+ topics = ['%s.%s' % (prefix, object_type)
+ for object_type in object_types]
+
+ logger.debug('Upstream topics: %s',
+ self.consumer.list_topics(timeout=1))
+ logger.debug('Subscribing to: %s', topics)
+
+ self.consumer.subscribe(topics=topics)
self.max_messages = max_messages
+ self.process_timeout = process_timeout
+
self._object_types = object_types
def process(self, worker_fn):
@@ -94,16 +111,47 @@
the messages as
argument.
"""
+ start_time = time.monotonic()
nb_messages = 0
- polled = self.consumer.poll()
- for (partition, messages) in polled.items():
- object_type = partition.topic.split('.')[-1]
+
+ objects = defaultdict(list)
+
+ while True:
+ # timeout for message poll
+ timeout = 1.0
+
+ elapsed = time.monotonic() - start_time
+ if self.process_timeout:
+ if elapsed >= self.process_timeout:
+ break
+
+ timeout = self.process_timeout - elapsed
+
+ message = self.consumer.poll(timeout=timeout)
+ if not message:
+ continue
+
+ error = message.error()
+ if error is not None:
+ if error.fatal():
+ raise KafkaException(error)
+ logger.info('Received non-fatal kafka error: %s', error)
+ continue
+
+ nb_messages += 1
+
+ object_type = message.topic().split('.')[-1]
# Got a message from a topic we did not subscribe to.
assert object_type in self._object_types, object_type
- worker_fn({object_type: [msg.value for msg in messages]})
+ objects[object_type].append(
+ self.value_deserializer(message.value())
+ )
+
+ if nb_messages >= self.max_messages:
+ break
- nb_messages += len(messages)
+ worker_fn(dict(objects))
self.consumer.commit()
return nb_messages
diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py
--- a/swh/journal/direct_writer.py
+++ b/swh/journal/direct_writer.py
@@ -5,7 +5,7 @@
import logging
-from kafka import KafkaProducer
+from confluent_kafka import Producer
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.model import BaseModel
@@ -22,15 +22,23 @@
def __init__(self, brokers, prefix, client_id):
self._prefix = prefix
- self.producer = KafkaProducer(
- bootstrap_servers=brokers,
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id=client_id,
- )
+ self.producer = Producer({
+ 'bootstrap.servers': brokers,
+ 'client.id': client_id,
+ 'enable.idempotence': 'true',
+ })
def send(self, topic, key, value):
- self.producer.send(topic=topic, key=key, value=value)
+ self.producer.produce(
+ topic=topic,
+ key=key_to_kafka(key),
+ value=value_to_kafka(value),
+ )
+
+ def flush(self):
+ self.producer.flush()
+ # Need to service the callbacks regularly by calling poll
+ self.producer.poll(0)
def _get_key(self, object_type, object_):
if object_type in ('revision', 'release', 'directory', 'snapshot'):
@@ -62,7 +70,8 @@
assert 'id' not in object_
return object_
- def write_addition(self, object_type, object_):
+ def write_addition(self, object_type, object_, flush=True):
+ """Write a single object to the journal"""
if isinstance(object_, BaseModel):
object_ = object_.to_dict()
topic = '%s.%s' % (self._prefix, object_type)
@@ -71,8 +80,15 @@
logger.debug('topic: %s, key: %s, value: %s', topic, key, object_)
self.send(topic, key=key, value=object_)
+ if flush:
+ self.flush()
+
write_update = write_addition
- def write_additions(self, object_type, objects):
+ def write_additions(self, object_type, objects, flush=True):
+ """Write a set of objects to the journal"""
for object_ in objects:
- self.write_addition(object_type, object_)
+ self.write_addition(object_type, object_, flush=False)
+
+ if flush:
+ self.flush()
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -9,18 +9,19 @@
import random
import string
-from kafka import KafkaConsumer
+from confluent_kafka import Consumer
from subprocess import Popen
from typing import Tuple, Dict
from pathlib import Path
from pytest_kafka import (
- make_zookeeper_process, make_kafka_server, constants
+ make_zookeeper_process, make_kafka_server
)
from swh.model.hashutil import hash_to_bytes
-from swh.journal.serializers import kafka_to_key, kafka_to_value
+
+logger = logging.getLogger(__name__)
CONTENTS = [
@@ -176,8 +177,8 @@
os.path.dirname(__file__)
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session')
-logger = logging.getLogger('kafka')
-logger.setLevel(logging.WARN)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.WARN)
@pytest.fixture(scope='function')
@@ -202,36 +203,33 @@
_, port = kafka_server
return {
**TEST_CONFIG,
- 'brokers': ['localhost:{}'.format(port)],
+ 'brokers': ['127.0.0.1:{}'.format(port)],
'prefix': kafka_prefix + '.swh.journal.objects',
}
@pytest.fixture
def consumer(
- kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer:
+ kafka_server: Tuple[Popen, int],
+ test_config: Dict,
+ kafka_prefix: str,
+) -> Consumer:
"""Get a connected Kafka consumer.
"""
+ _, kafka_port = kafka_server
+ consumer = Consumer({
+ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port),
+ 'auto.offset.reset': 'earliest',
+ 'enable.auto.commit': True,
+ 'group.id': "test-consumer-%s" % kafka_prefix,
+ })
+
kafka_topics = [
'%s.%s' % (test_config['prefix'], object_type)
- for object_type in test_config['object_types']]
- _, kafka_port = kafka_server
- consumer = KafkaConsumer(
- *kafka_topics,
- bootstrap_servers='localhost:{}'.format(kafka_port),
- consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS,
- key_deserializer=kafka_to_key,
- value_deserializer=kafka_to_value,
- auto_offset_reset='earliest',
- enable_auto_commit=True,
- group_id="test-consumer"
- )
-
- # Enforce auto_offset_reset=earliest even if the consumer was created
- # too soon wrt the server.
- while len(consumer.assignment()) == 0:
- consumer.poll(timeout_ms=20)
- consumer.seek_to_beginning()
+ for object_type in test_config['object_types']
+ ]
+
+ consumer.subscribe(kafka_topics)
return consumer
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import functools
+import logging
import re
import tempfile
from subprocess import Popen
@@ -11,7 +12,7 @@
from unittest.mock import patch
from click.testing import CliRunner
-from kafka import KafkaProducer
+from confluent_kafka import Producer
import pytest
from swh.objstorage.backends.in_memory import InMemoryObjStorage
@@ -21,6 +22,9 @@
from swh.journal.serializers import key_to_kafka, value_to_kafka
+logger = logging.getLogger(__name__)
+
+
CLI_CONFIG = '''
storage:
cls: memory
@@ -52,7 +56,7 @@
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
args = ['-C' + config_fd.name] + args
- result = runner.invoke(cli, args)
+ result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG})
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
@@ -66,12 +70,11 @@
(_, port) = kafka_server
kafka_prefix += '.swh.journal.objects'
- producer = KafkaProducer(
- bootstrap_servers='localhost:{}'.format(port),
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id='test-producer',
- )
+ producer = Producer({
+ 'bootstrap.servers': 'localhost:{}'.format(port),
+ 'client.id': 'test-producer',
+ 'enable.idempotence': 'true',
+ })
snapshot = {'id': b'foo', 'branches': {
b'HEAD': {
@@ -79,12 +82,18 @@
'target': b'\x01'*20,
}
}}
- producer.send(
- topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot)
+ producer.produce(
+ topic=kafka_prefix+'.snapshot',
+ key=key_to_kafka(snapshot['id']),
+ value=value_to_kafka(snapshot),
+ )
+ producer.flush()
+
+ logger.debug('Flushed producer')
result = invoke(False, [
'replay',
- '--broker', 'localhost:%d' % port,
+ '--broker', '127.0.0.1:%d' % port,
'--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '1',
@@ -117,22 +126,25 @@
def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
- producer = KafkaProducer(
- bootstrap_servers='localhost:{}'.format(kafka_port),
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id='test-producer',
- )
+ producer = Producer({
+ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port),
+ 'client.id': 'test-producer',
+ 'enable.idempotence': 'true',
+ })
contents = {}
for i in range(10):
content = b'\x00'*19 + bytes([i])
sha1 = objstorages['src'].add(content)
contents[sha1] = content
- producer.send(topic=kafka_prefix+'.content', key=sha1, value={
- 'sha1': sha1,
- 'status': 'visible',
- })
+ producer.produce(
+ topic=kafka_prefix+'.content',
+ key=key_to_kafka(sha1),
+ value=key_to_kafka({
+ 'sha1': sha1,
+ 'status': 'visible',
+ }),
+ )
producer.flush()
@@ -153,7 +165,7 @@
result = invoke(False, [
'content-replay',
- '--broker', 'localhost:%d' % kafka_port,
+ '--broker', '127.0.0.1:%d' % kafka_port,
'--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '10',
@@ -187,7 +199,7 @@
result = invoke(False, [
'content-replay',
- '--broker', 'localhost:%d' % kafka_port,
+ '--broker', '127.0.0.1:%d' % kafka_port,
'--group-id', 'test-cli-consumer',
'--prefix', kafka_prefix,
'--max-messages', '10',
diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py
--- a/swh/journal/tests/test_direct_writer.py
+++ b/swh/journal/tests/test_direct_writer.py
@@ -5,26 +5,48 @@
from collections import defaultdict
import datetime
-import time
-from kafka import KafkaConsumer
+from confluent_kafka import Consumer, KafkaException
from subprocess import Popen
from typing import Tuple
from swh.storage import get_storage
from swh.journal.direct_writer import DirectKafkaWriter
-from swh.journal.serializers import value_to_kafka, kafka_to_value
+from swh.journal.serializers import (
+ kafka_to_key, kafka_to_value
+)
from .conftest import OBJECT_TYPE_KEYS
-def assert_written(consumer, kafka_prefix):
- time.sleep(0.1) # Without this, some messages are missing
-
+def assert_written(consumer, kafka_prefix, expected_messages):
consumed_objects = defaultdict(list)
- for msg in consumer:
- consumed_objects[msg.topic].append((msg.key, msg.value))
+
+ fetched_messages = 0
+ retries_left = 1000
+
+ while fetched_messages < expected_messages:
+ if retries_left == 0:
+ raise ValueError('Timed out fetching messages from kafka')
+
+ msg = consumer.poll(timeout=0.01)
+
+ if not msg:
+ retries_left -= 1
+ continue
+
+ error = msg.error()
+ if error is not None:
+ if error.fatal():
+ raise KafkaException(error)
+ retries_left -= 1
+ continue
+
+ fetched_messages += 1
+ consumed_objects[msg.topic()].append(
+ (kafka_to_key(msg.key()), kafka_to_value(msg.value()))
+ )
for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items():
topic = kafka_prefix + '.' + object_type
@@ -42,13 +64,13 @@
del value['ctime']
for object_ in objects:
- assert kafka_to_value(value_to_kafka(object_)) in values
+ assert object_ in values
def test_direct_writer(
kafka_prefix: str,
kafka_server: Tuple[Popen, int],
- consumer: KafkaConsumer):
+ consumer: Consumer):
kafka_prefix += '.swh.journal.objects'
config = {
@@ -59,6 +81,8 @@
writer = DirectKafkaWriter(**config)
+ expected_messages = 0
+
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
for (num, object_) in enumerate(objects):
if object_type == 'origin_visit':
@@ -66,14 +90,15 @@
if object_type == 'content':
object_ = {**object_, 'ctime': datetime.datetime.now()}
writer.write_addition(object_type, object_)
+ expected_messages += 1
- assert_written(consumer, kafka_prefix)
+ assert_written(consumer, kafka_prefix, expected_messages)
def test_storage_direct_writer(
kafka_prefix: str,
kafka_server: Tuple[Popen, int],
- consumer: KafkaConsumer):
+ consumer: Consumer):
kafka_prefix += '.swh.journal.objects'
config = {
@@ -85,6 +110,8 @@
storage = get_storage('memory', {'journal_writer': {
'cls': 'kafka', 'args': config}})
+ expected_messages = 0
+
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
method = getattr(storage, object_type + '_add')
if object_type in ('content', 'directory', 'revision', 'release',
@@ -92,15 +119,18 @@
if object_type == 'content':
objects = [{**obj, 'data': b''} for obj in objects]
method(objects)
+ expected_messages += len(objects)
elif object_type in ('origin_visit',):
for object_ in objects:
object_ = object_.copy()
origin_id = storage.origin_add_one(object_.pop('origin'))
del object_['type']
visit = method(origin=origin_id, date=object_.pop('date'))
+ expected_messages += 1
visit_id = visit['visit']
storage.origin_visit_update(origin_id, visit_id, **object_)
+ expected_messages += 1
else:
assert False, object_type
- assert_written(consumer, kafka_prefix)
+ assert_written(consumer, kafka_prefix, expected_messages)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -10,7 +10,7 @@
from typing import Tuple
import dateutil
-from kafka import KafkaProducer
+from confluent_kafka import Producer
from hypothesis import strategies, given, settings
from swh.storage import get_storage
@@ -32,12 +32,11 @@
storage = get_storage('memory', {})
- producer = KafkaProducer(
- bootstrap_servers='localhost:{}'.format(port),
- key_serializer=key_to_kafka,
- value_serializer=value_to_kafka,
- client_id='test producer',
- )
+ producer = Producer({
+ 'bootstrap.servers': 'localhost:{}'.format(port),
+ 'client.id': 'test producer',
+ 'enable.idempotence': 'true',
+ })
now = datetime.datetime.now(tz=datetime.timezone.utc)
@@ -54,9 +53,14 @@
elif object_type == 'origin_visit':
nb_visits += 1
object_['visit'] = nb_visits
- producer.send(topic, key=key, value=object_)
+ producer.produce(
+ topic=topic, key=key_to_kafka(key),
+ value=value_to_kafka(object_),
+ )
nb_sent += 1
+ producer.flush()
+
# Fill the storage from Kafka
config = {
'brokers': 'localhost:%d' % kafka_server[1],
@@ -133,9 +137,7 @@
for visit in visits:
writer.send('origin_visit', 'foo', visit)
- queue_size = sum(len(partition)
- for batch in queue
- for partition in batch.values())
+ queue_size = len(queue)
storage = get_storage('memory', {})
worker_fn = functools.partial(process_replay_objects, storage=storage)
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -39,9 +39,7 @@
except HashCollision:
pass
- queue_size = sum(len(partition)
- for batch in queue
- for partition in batch.values())
+ queue_size = len(queue)
storage2 = Storage()
worker_fn = functools.partial(process_replay_objects, storage=storage2)
@@ -74,9 +72,7 @@
if obj_type == 'content':
storage1.content_add([obj])
- queue_size = sum(len(partition)
- for batch in queue
- for partition in batch.values())
+ queue_size = len(queue)
storage2 = Storage()
worker_fn = functools.partial(process_replay_objects_content,
diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py
--- a/swh/journal/tests/utils.py
+++ b/swh/journal/tests/utils.py
@@ -1,12 +1,26 @@
-from collections import namedtuple
-
from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES
from swh.journal.direct_writer import DirectKafkaWriter
-from swh.journal.serializers import (
- key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value)
+from swh.journal.serializers import (kafka_to_value, key_to_kafka,
+ value_to_kafka)
+
+
+class FakeKafkaMessage:
+ def __init__(self, topic, key, value):
+ self._topic = topic
+ self._key = key_to_kafka(key)
+ self._value = value_to_kafka(value)
+
+ def topic(self):
+ return self._topic
-FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value')
-FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic')
+ def value(self):
+ return self._value
+
+ def key(self):
+ return self._key
+
+ def error(self):
+ return None
class MockedKafkaWriter(DirectKafkaWriter):
@@ -15,15 +29,11 @@
self.queue = queue
def send(self, topic, key, value):
- key = kafka_to_key(key_to_kafka(key))
- value = kafka_to_value(value_to_kafka(value))
- partition = FakeKafkaPartition(topic)
- msg = FakeKafkaMessage(key=key, value=value)
- if self.queue and {partition} == set(self.queue[-1]):
- # The last message is of the same object type, groupping them
- self.queue[-1][partition].append(msg)
- else:
- self.queue.append({partition: [msg]})
+ msg = FakeKafkaMessage(topic=topic, key=key, value=value)
+ self.queue.append(msg)
+
+ def flush(self):
+ pass
class MockedKafkaConsumer:
@@ -31,7 +41,7 @@
self.queue = queue
self.committed = False
- def poll(self):
+ def poll(self, timeout=None):
return self.queue.pop(0)
def commit(self):
@@ -43,3 +53,6 @@
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
self._object_types = object_types
self.consumer = MockedKafkaConsumer(queue)
+ self.process_timeout = 0
+ self.max_messages = 0
+ self.value_deserializer = kafka_to_value

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 6:27 PM (5 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222257

Event Timeline