Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7343055
D1340.id4310.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D1340.id4310.diff
View Options
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -6,6 +6,8 @@
import os
import pytest
import logging
+import random
+import string
from kafka import KafkaProducer, KafkaConsumer
from subprocess import Popen
@@ -132,17 +134,6 @@
}
-TEST_CONFIG = {
- 'temporary_prefix': 'swh.tmp_journal.new',
- 'final_prefix': 'swh.journal.objects',
- 'consumer_id': 'swh.journal.publisher',
- 'publisher_id': 'swh.journal.publisher',
- 'object_types': OBJECT_TYPE_KEYS.keys(),
- 'max_messages': 1, # will read 1 message and stops
- 'storage': {'cls': 'memory', 'args': {}}
-}
-
-
class JournalPublisherTest(JournalPublisher):
"""A journal publisher which override the default configuration
parsing setup.
@@ -177,22 +168,45 @@
# Those defines fixtures
-zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN)
+zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session')
os.environ['KAFKA_LOG4J_OPTS'] = \
'-Dlog4j.configuration=file:%s/log4j.properties' % \
os.path.dirname(__file__)
-kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc')
+kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session')
logger = logging.getLogger('kafka')
logger.setLevel(logging.WARN)
+@pytest.fixture(scope='function')
+def kafka_prefix():
+ return ''.join(random.choice(string.ascii_lowercase) for _ in range(10))
+
+
+TEST_CONFIG = {
+ 'temporary_prefix': 'swh.tmp_journal.new',
+ 'final_prefix': 'swh.journal.objects',
+ 'consumer_id': 'swh.journal.publisher',
+ 'publisher_id': 'swh.journal.publisher',
+ 'object_types': OBJECT_TYPE_KEYS.keys(),
+ 'max_messages': 1, # will read 1 message and stops
+ 'storage': {'cls': 'memory', 'args': {}},
+}
+
+
@pytest.fixture
-def test_config():
+def test_config(kafka_server: Tuple[Popen, int],
+ kafka_prefix: str):
"""Test configuration needed for publisher/producer/consumer
"""
- return TEST_CONFIG
+ _, port = kafka_server
+ return {
+ **TEST_CONFIG,
+ 'brokers': ['localhost:{}'.format(port)],
+ 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new',
+ 'final_prefix': kafka_prefix + '.swh.journal.objects',
+ }
@pytest.fixture
@@ -219,8 +233,9 @@
"""Get a connected Kafka consumer.
"""
- kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type)
- for object_type in test_config['object_types']]
+ kafka_topics = [
+ '%s.%s' % (test_config['final_prefix'], object_type)
+ for object_type in test_config['object_types']]
_, kafka_port = kafka_server
consumer = KafkaConsumer(
*kafka_topics,
@@ -251,7 +266,5 @@
"""
# consumer and producer of the publisher needs to discuss with the
# right instance
- _, port = kafka_server
- test_config['brokers'] = ['localhost:{}'.format(port)]
publisher = JournalPublisherTest(test_config)
return publisher
diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py
--- a/swh/journal/tests/test_direct_writer.py
+++ b/swh/journal/tests/test_direct_writer.py
@@ -18,7 +18,7 @@
from .conftest import OBJECT_TYPE_KEYS
-def assert_written(consumer):
+def assert_written(consumer, kafka_prefix):
time.sleep(0.1) # Without this, some messages are missing
consumed_objects = defaultdict(list)
@@ -26,7 +26,7 @@
consumed_objects[msg.topic].append((msg.key, msg.value))
for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items():
- topic = 'swh.journal.objects.%s' % object_type
+ topic = kafka_prefix + '.' + object_type
(keys, values) = zip(*consumed_objects[topic])
if key_name:
assert list(keys) == [object_[key_name] for object_ in objects]
@@ -42,13 +42,15 @@
def test_direct_writer(
+ kafka_prefix: str,
kafka_server: Tuple[Popen, int],
consumer_from_publisher: KafkaConsumer):
+ kafka_prefix += '.swh.journal.objects'
config = {
'brokers': 'localhost:%d' % kafka_server[1],
'client_id': 'direct_writer',
- 'prefix': 'swh.journal.objects',
+ 'prefix': kafka_prefix,
}
writer = DirectKafkaWriter(**config)
@@ -59,17 +61,19 @@
object_ = {**object_, 'visit': num}
writer.write_addition(object_type, object_)
- assert_written(consumer_from_publisher)
+ assert_written(consumer_from_publisher, kafka_prefix)
def test_storage_direct_writer(
+ kafka_prefix: str,
kafka_server: Tuple[Popen, int],
consumer_from_publisher: KafkaConsumer):
+ kafka_prefix += '.swh.journal.objects'
config = {
'brokers': 'localhost:%d' % kafka_server[1],
'client_id': 'direct_writer',
- 'prefix': 'swh.journal.objects',
+ 'prefix': kafka_prefix,
}
storage = get_storage('memory', {'journal_writer': {
@@ -92,4 +96,4 @@
else:
assert False, object_type
- assert_written(consumer_from_publisher)
+ assert_written(consumer_from_publisher, kafka_prefix)
diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py
--- a/swh/journal/tests/test_publisher_kafka.py
+++ b/swh/journal/tests/test_publisher_kafka.py
@@ -11,12 +11,13 @@
from swh.journal.serializers import value_to_kafka, kafka_to_value
from swh.journal.publisher import JournalPublisher
-from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS
+from .conftest import OBJECT_TYPE_KEYS
def assert_publish_ok(publisher: JournalPublisher,
consumer_from_publisher: KafkaConsumer,
producer_to_publisher: KafkaProducer,
+ test_config: dict,
object_type: str):
"""Assert that publishing object in the publisher is reified and
published in output topics.
@@ -43,7 +44,7 @@
# send message to the publisher
for obj in objects:
producer_to_publisher.send(
- '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type),
+ '%s.%s' % (test_config['temporary_prefix'], object_type),
obj
)
@@ -53,7 +54,7 @@
publisher.poll(max_messages=1)
# then (client reads from the messages from output topic)
- expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type)
+ expected_topic = '%s.%s' % (test_config['final_prefix'], object_type)
expected_msgs = [
(
object_[object_key_id],
@@ -69,6 +70,7 @@
def test_publish(
publisher: JournalPublisher,
kafka_server: Tuple[Popen, int],
+ test_config: dict,
consumer_from_publisher: KafkaConsumer,
producer_to_publisher: KafkaProducer):
"""
@@ -88,4 +90,4 @@
for object_type in object_types:
assert_publish_ok(
publisher, consumer_from_publisher, producer_to_publisher,
- object_type)
+ test_config, object_type)
diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py
--- a/swh/journal/tests/test_publisher_no_kafka.py
+++ b/swh/journal/tests/test_publisher_no_kafka.py
@@ -128,20 +128,20 @@
pass
-def test_check_config_ok():
+def test_check_config_ok(test_config):
"""Instantiate a publisher with the right config is fine
"""
- publisher = JournalPublisherCheckTest(TEST_CONFIG)
+ publisher = JournalPublisherCheckTest(test_config)
assert publisher is not None
-def test_check_config_ko():
+def test_check_config_ko(test_config):
"""Instantiate a publisher with the wrong config should raise
"""
for k in MANDATORY_KEYS:
- conf = TEST_CONFIG.copy()
+ conf = test_config.copy()
conf.pop(k)
with pytest.raises(ValueError) as e:
JournalPublisherCheckTest(conf)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -19,8 +19,10 @@
def test_storage_play(
+ kafka_prefix: str,
kafka_server: Tuple[Popen, int]):
(_, port) = kafka_server
+ kafka_prefix += '.swh.journal.objects'
storage = get_storage('memory', {})
@@ -34,7 +36,7 @@
# Fill Kafka
nb_sent = 0
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items():
- topic = 'swh.journal.objects.' + object_type
+ topic = kafka_prefix + '.' + object_type
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
producer.send(topic, key=key, value=object_)
@@ -44,7 +46,7 @@
config = {
'brokers': 'localhost:%d' % kafka_server[1],
'consumer_id': 'replayer',
- 'prefix': 'swh.journal.objects',
+ 'prefix': kafka_prefix,
}
replayer = StorageReplayer(**config)
nb_inserted = replayer.fill(storage, max_messages=nb_sent)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 7:01 PM (7 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3218392
Attached To
D1340: Reuse the same Kafka instance across tests.
Event Timeline
Log In to Comment