Page MenuHomeSoftware Heritage

D1340.id4310.diff
No OneTemporary

D1340.id4310.diff

diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -6,6 +6,8 @@
import os
import pytest
import logging
+import random
+import string
from kafka import KafkaProducer, KafkaConsumer
from subprocess import Popen
@@ -132,17 +134,6 @@
}
-TEST_CONFIG = {
- 'temporary_prefix': 'swh.tmp_journal.new',
- 'final_prefix': 'swh.journal.objects',
- 'consumer_id': 'swh.journal.publisher',
- 'publisher_id': 'swh.journal.publisher',
- 'object_types': OBJECT_TYPE_KEYS.keys(),
- 'max_messages': 1, # will read 1 message and stops
- 'storage': {'cls': 'memory', 'args': {}}
-}
-
-
class JournalPublisherTest(JournalPublisher):
"""A journal publisher which override the default configuration
parsing setup.
@@ -177,22 +168,45 @@
# Those defines fixtures
-zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN)
+zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session')
os.environ['KAFKA_LOG4J_OPTS'] = \
'-Dlog4j.configuration=file:%s/log4j.properties' % \
os.path.dirname(__file__)
-kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc')
+kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session')
logger = logging.getLogger('kafka')
logger.setLevel(logging.WARN)
+@pytest.fixture(scope='function')
+def kafka_prefix():
+ return ''.join(random.choice(string.ascii_lowercase) for _ in range(10))
+
+
+TEST_CONFIG = {
+ 'temporary_prefix': 'swh.tmp_journal.new',
+ 'final_prefix': 'swh.journal.objects',
+ 'consumer_id': 'swh.journal.publisher',
+ 'publisher_id': 'swh.journal.publisher',
+ 'object_types': OBJECT_TYPE_KEYS.keys(),
+ 'max_messages': 1, # will read 1 message and stops
+ 'storage': {'cls': 'memory', 'args': {}},
+}
+
+
@pytest.fixture
-def test_config():
+def test_config(kafka_server: Tuple[Popen, int],
+ kafka_prefix: str):
"""Test configuration needed for publisher/producer/consumer
"""
- return TEST_CONFIG
+ _, port = kafka_server
+ return {
+ **TEST_CONFIG,
+ 'brokers': ['localhost:{}'.format(port)],
+ 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new',
+ 'final_prefix': kafka_prefix + '.swh.journal.objects',
+ }
@pytest.fixture
@@ -219,8 +233,9 @@
"""Get a connected Kafka consumer.
"""
- kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type)
- for object_type in test_config['object_types']]
+ kafka_topics = [
+ '%s.%s' % (test_config['final_prefix'], object_type)
+ for object_type in test_config['object_types']]
_, kafka_port = kafka_server
consumer = KafkaConsumer(
*kafka_topics,
@@ -251,7 +266,5 @@
"""
# consumer and producer of the publisher needs to discuss with the
# right instance
- _, port = kafka_server
- test_config['brokers'] = ['localhost:{}'.format(port)]
publisher = JournalPublisherTest(test_config)
return publisher
diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py
--- a/swh/journal/tests/test_direct_writer.py
+++ b/swh/journal/tests/test_direct_writer.py
@@ -18,7 +18,7 @@
from .conftest import OBJECT_TYPE_KEYS
-def assert_written(consumer):
+def assert_written(consumer, kafka_prefix):
time.sleep(0.1) # Without this, some messages are missing
consumed_objects = defaultdict(list)
@@ -26,7 +26,7 @@
consumed_objects[msg.topic].append((msg.key, msg.value))
for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items():
- topic = 'swh.journal.objects.%s' % object_type
+ topic = kafka_prefix + '.' + object_type
(keys, values) = zip(*consumed_objects[topic])
if key_name:
assert list(keys) == [object_[key_name] for object_ in objects]
@@ -42,13 +42,15 @@
def test_direct_writer(
+ kafka_prefix: str,
kafka_server: Tuple[Popen, int],
consumer_from_publisher: KafkaConsumer):
+ kafka_prefix += '.swh.journal.objects'
config = {
'brokers': 'localhost:%d' % kafka_server[1],
'client_id': 'direct_writer',
- 'prefix': 'swh.journal.objects',
+ 'prefix': kafka_prefix,
}
writer = DirectKafkaWriter(**config)
@@ -59,17 +61,19 @@
object_ = {**object_, 'visit': num}
writer.write_addition(object_type, object_)
- assert_written(consumer_from_publisher)
+ assert_written(consumer_from_publisher, kafka_prefix)
def test_storage_direct_writer(
+ kafka_prefix: str,
kafka_server: Tuple[Popen, int],
consumer_from_publisher: KafkaConsumer):
+ kafka_prefix += '.swh.journal.objects'
config = {
'brokers': 'localhost:%d' % kafka_server[1],
'client_id': 'direct_writer',
- 'prefix': 'swh.journal.objects',
+ 'prefix': kafka_prefix,
}
storage = get_storage('memory', {'journal_writer': {
@@ -92,4 +96,4 @@
else:
assert False, object_type
- assert_written(consumer_from_publisher)
+ assert_written(consumer_from_publisher, kafka_prefix)
diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py
--- a/swh/journal/tests/test_publisher_kafka.py
+++ b/swh/journal/tests/test_publisher_kafka.py
@@ -11,12 +11,13 @@
from swh.journal.serializers import value_to_kafka, kafka_to_value
from swh.journal.publisher import JournalPublisher
-from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS
+from .conftest import OBJECT_TYPE_KEYS
def assert_publish_ok(publisher: JournalPublisher,
consumer_from_publisher: KafkaConsumer,
producer_to_publisher: KafkaProducer,
+ test_config: dict,
object_type: str):
"""Assert that publishing object in the publisher is reified and
published in output topics.
@@ -43,7 +44,7 @@
# send message to the publisher
for obj in objects:
producer_to_publisher.send(
- '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type),
+ '%s.%s' % (test_config['temporary_prefix'], object_type),
obj
)
@@ -53,7 +54,7 @@
publisher.poll(max_messages=1)
# then (client reads from the messages from output topic)
- expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type)
+ expected_topic = '%s.%s' % (test_config['final_prefix'], object_type)
expected_msgs = [
(
object_[object_key_id],
@@ -69,6 +70,7 @@
def test_publish(
publisher: JournalPublisher,
kafka_server: Tuple[Popen, int],
+ test_config: dict,
consumer_from_publisher: KafkaConsumer,
producer_to_publisher: KafkaProducer):
"""
@@ -88,4 +90,4 @@
for object_type in object_types:
assert_publish_ok(
publisher, consumer_from_publisher, producer_to_publisher,
- object_type)
+ test_config, object_type)
diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py
--- a/swh/journal/tests/test_publisher_no_kafka.py
+++ b/swh/journal/tests/test_publisher_no_kafka.py
@@ -128,20 +128,20 @@
pass
-def test_check_config_ok():
+def test_check_config_ok(test_config):
"""Instantiate a publisher with the right config is fine
"""
- publisher = JournalPublisherCheckTest(TEST_CONFIG)
+ publisher = JournalPublisherCheckTest(test_config)
assert publisher is not None
-def test_check_config_ko():
+def test_check_config_ko(test_config):
"""Instantiate a publisher with the wrong config should raise
"""
for k in MANDATORY_KEYS:
- conf = TEST_CONFIG.copy()
+ conf = test_config.copy()
conf.pop(k)
with pytest.raises(ValueError) as e:
JournalPublisherCheckTest(conf)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -19,8 +19,10 @@
def test_storage_play(
+ kafka_prefix: str,
kafka_server: Tuple[Popen, int]):
(_, port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
storage = get_storage('memory', {})
@@ -34,7 +36,7 @@
# Fill Kafka
nb_sent = 0
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
- topic = 'swh.journal.objects.' + object_type
+ topic = kafka_prefix + '.' + object_type
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
producer.send(topic, key=key, value=object_)
@@ -44,7 +46,7 @@
config = {
'brokers': 'localhost:%d' % kafka_server[1],
'consumer_id': 'replayer',
- 'prefix': 'swh.journal.objects',
+ 'prefix': kafka_prefix,
}
replayer = StorageReplayer(**config)
nb_inserted = replayer.fill(storage, max_messages=nb_sent)

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 7:01 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218392

Event Timeline