Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/conftest.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import pytest | import pytest | ||||
import logging | import logging | ||||
import random | import random | ||||
import string | import string | ||||
from kafka import KafkaProducer, KafkaConsumer | from kafka import KafkaConsumer | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple, Dict | from typing import Tuple, Dict | ||||
from pathlib import Path | from pathlib import Path | ||||
from pytest_kafka import ( | from pytest_kafka import ( | ||||
make_zookeeper_process, make_kafka_server, constants | make_zookeeper_process, make_kafka_server, constants | ||||
) | ) | ||||
from swh.journal.publisher import JournalPublisher | |||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value | from swh.journal.serializers import kafka_to_key, kafka_to_value | ||||
CONTENTS = [ | CONTENTS = [ | ||||
{ | { | ||||
'length': 3, | 'length': 3, | ||||
'sha1': hash_to_bytes( | 'sha1': hash_to_bytes( | ||||
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), | ||||
'sha1_git': b'foo', | 'sha1_git': b'foo', | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | OBJECT_TYPE_KEYS = { | ||||
'content': ('sha1', CONTENTS), | 'content': ('sha1', CONTENTS), | ||||
'revision': ('id', REVISIONS), | 'revision': ('id', REVISIONS), | ||||
'release': ('id', RELEASES), | 'release': ('id', RELEASES), | ||||
'origin': (None, ORIGINS), | 'origin': (None, ORIGINS), | ||||
'origin_visit': (None, ORIGIN_VISITS), | 'origin_visit': (None, ORIGIN_VISITS), | ||||
} | } | ||||
class JournalPublisherTest(JournalPublisher): | |||||
"""A journal publisher which override the default configuration | |||||
parsing setup. | |||||
""" | |||||
def _prepare_storage(self, config): | |||||
super()._prepare_storage(config) | |||||
self.storage.content_add({'data': b'42', **c} for c in CONTENTS) | |||||
self.storage.revision_add(REVISIONS) | |||||
self.storage.release_add(RELEASES) | |||||
origins = self.storage.origin_add(ORIGINS) | |||||
origin_visits = [] | |||||
for i, ov in enumerate(ORIGIN_VISITS): | |||||
origin_id = origins[i]['id'] | |||||
ov = self.storage.origin_visit_add(origin_id, ov['date']) | |||||
origin_visits.append(ov) | |||||
self.origins = origins | |||||
self.origin_visits = origin_visits | |||||
KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') | KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') | ||||
KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' | KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' | ||||
if not os.path.exists(KAFKA_ROOT): | if not os.path.exists(KAFKA_ROOT): | ||||
msg = ('Development error: %s must exist and target an ' | msg = ('Development error: %s must exist and target an ' | ||||
'existing kafka installation' % KAFKA_ROOT) | 'existing kafka installation' % KAFKA_ROOT) | ||||
raise ValueError(msg) | raise ValueError(msg) | ||||
KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' | KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' | ||||
Show All 40 Lines | return { | ||||
**TEST_CONFIG, | **TEST_CONFIG, | ||||
'brokers': ['localhost:{}'.format(port)], | 'brokers': ['localhost:{}'.format(port)], | ||||
'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', | 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', | ||||
'final_prefix': kafka_prefix + '.swh.journal.objects', | 'final_prefix': kafka_prefix + '.swh.journal.objects', | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def producer_to_publisher( | def consumer( | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: | ||||
test_config: Dict, | |||||
) -> KafkaProducer: # noqa | |||||
"""Producer to send message to the publisher's consumer. | |||||
""" | |||||
_, port = kafka_server | |||||
producer = KafkaProducer( | |||||
bootstrap_servers='localhost:{}'.format(port), | |||||
key_serializer=key_to_kafka, | |||||
value_serializer=key_to_kafka, | |||||
client_id=test_config['consumer_id'], | |||||
) | |||||
return producer | |||||
@pytest.fixture | |||||
def consumer_from_publisher(kafka_server: Tuple[Popen, int], | |||||
test_config: Dict) -> KafkaConsumer: | |||||
"""Get a connected Kafka consumer. | """Get a connected Kafka consumer. | ||||
""" | """ | ||||
kafka_topics = [ | kafka_topics = [ | ||||
'%s.%s' % (test_config['final_prefix'], object_type) | '%s.%s' % (test_config['final_prefix'], object_type) | ||||
for object_type in test_config['object_types']] | for object_type in test_config['object_types']] | ||||
_, kafka_port = kafka_server | _, kafka_port = kafka_server | ||||
consumer = KafkaConsumer( | consumer = KafkaConsumer( | ||||
Show All 9 Lines | def consumer( | ||||
# Enforce auto_offset_reset=earliest even if the consumer was created | # Enforce auto_offset_reset=earliest even if the consumer was created | ||||
# too soon wrt the server. | # too soon wrt the server. | ||||
while len(consumer.assignment()) == 0: | while len(consumer.assignment()) == 0: | ||||
consumer.poll(timeout_ms=20) | consumer.poll(timeout_ms=20) | ||||
consumer.seek_to_beginning() | consumer.seek_to_beginning() | ||||
return consumer | return consumer | ||||
@pytest.fixture | |||||
def publisher(kafka_server: Tuple[Popen, int], | |||||
test_config: Dict) -> JournalPublisher: | |||||
"""Test Publisher factory. We cannot use a fixture here as we need to | |||||
modify the sample. | |||||
""" | |||||
# consumer and producer of the publisher needs to discuss with the | |||||
# right instance | |||||
publisher = JournalPublisherTest(test_config) | |||||
return publisher |