diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 6f80210..62ef89e 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,257 +1,270 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging +import random +import string from kafka import KafkaProducer, KafkaConsumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, constants ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': b'foo', }, { 'id': 2, 'fullname': b'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, }, { 'origin': ORIGINS[0], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } -TEST_CONFIG = { - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.publisher', - 'publisher_id': 'swh.journal.publisher', - 'object_types': OBJECT_TYPE_KEYS.keys(), - 'max_messages': 1, # will read 1 message and stops - 'storage': {'cls': 'memory', 'args': {}} -} - - class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures -zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) -kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') +kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) +@pytest.fixture(scope='function') +def kafka_prefix(): + return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) + + +TEST_CONFIG = { + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', + 'object_types': OBJECT_TYPE_KEYS.keys(), + 'max_messages': 1, # will read 1 message and stops + 'storage': {'cls': 'memory', 'args': {}}, +} + + @pytest.fixture -def test_config(): +def test_config(kafka_server: Tuple[Popen, int], + kafka_prefix: str): """Test configuration needed for publisher/producer/consumer """ - return TEST_CONFIG + _, port = kafka_server + return { + **TEST_CONFIG, + 'brokers': ['localhost:{}'.format(port)], + 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', + 'final_prefix': kafka_prefix + '.swh.journal.objects', + } @pytest.fixture def producer_to_publisher( kafka_server: Tuple[Popen, int], test_config: Dict, ) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. """ _, port = kafka_server producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=test_config['consumer_id'], ) return producer @pytest.fixture def consumer_from_publisher(kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ - kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) - for object_type in test_config['object_types']] + kafka_topics = [ + '%s.%s' % (test_config['final_prefix'], object_type) + for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( *kafka_topics, bootstrap_servers='localhost:{}'.format(kafka_port), consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, group_id="test-consumer" ) # Enforce auto_offset_reset=earliest even if the consumer was created # too soon wrt the server. while len(consumer.assignment()) == 0: consumer.poll(timeout_ms=20) consumer.seek_to_beginning() return consumer @pytest.fixture def publisher(kafka_server: Tuple[Popen, int], test_config: Dict) -> JournalPublisher: """Test Publisher factory. We cannot use a fixture here as we need to modify the sample. """ # consumer and producer of the publisher needs to discuss with the # right instance - _, port = kafka_server - test_config['brokers'] = ['localhost:{}'.format(port)] publisher = JournalPublisherTest(test_config) return publisher diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py index 284e72e..0e5cd6e 100644 --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -1,95 +1,99 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import time from kafka import KafkaConsumer from subprocess import Popen from typing import Tuple from swh.storage import get_storage from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.serializers import value_to_kafka, kafka_to_value from .conftest import OBJECT_TYPE_KEYS -def assert_written(consumer): +def assert_written(consumer, kafka_prefix): time.sleep(0.1) # Without this, some messages are missing consumed_objects = defaultdict(list) for msg in consumer: consumed_objects[msg.topic].append((msg.key, msg.value)) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): - topic = 'swh.journal.objects.%s' % object_type + topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] for object_ in objects: assert kafka_to_value(value_to_kafka(object_)) in values def test_direct_writer( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer): + kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } writer = DirectKafkaWriter(**config) for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} writer.write_addition(object_type, object_) - assert_written(consumer_from_publisher) + assert_written(consumer_from_publisher, kafka_prefix) def test_storage_direct_writer( + kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer_from_publisher: KafkaConsumer): + kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { 'cls': 'kafka', 'args': config}}) for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_id = storage.origin_add_one(object_.pop('origin')) visit = method(origin=origin_id, date=object_.pop('date')) visit_id = visit['visit'] storage.origin_visit_update(origin_id, visit_id, **object_) else: assert False, object_type - assert_written(consumer_from_publisher) + assert_written(consumer_from_publisher, kafka_prefix) diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py index 448487f..718b069 100644 --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -1,91 +1,93 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer, KafkaProducer from subprocess import Popen from typing import Tuple from swh.journal.serializers import value_to_kafka, kafka_to_value from swh.journal.publisher import JournalPublisher -from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS +from .conftest import OBJECT_TYPE_KEYS def assert_publish_ok(publisher: JournalPublisher, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer, + test_config: dict, object_type: str): """Assert that publishing object in the publisher is reified and published in output topics. Args: publisher (JournalPublisher): publisher to read and write data consumer_from_publisher (KafkaConsumer): To read data from the publisher producer_to_publisher (KafkaProducer): To send data to the publisher object_type (str): Object type to look for (e.g content, revision, etc...) """ # object type's id label key object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] # objects to send to the publisher if object_key_id: objects = [{object_key_id: c[object_key_id]} for c in expected_objects] else: # TODO: add support for origin and origin_visit return # send message to the publisher for obj in objects: producer_to_publisher.send( - '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), + '%s.%s' % (test_config['temporary_prefix'], object_type), obj ) nb_messages = len(objects) for _ in range(nb_messages): publisher.poll(max_messages=1) # then (client reads from the messages from output topic) - expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + expected_topic = '%s.%s' % (test_config['final_prefix'], object_type) expected_msgs = [ ( object_[object_key_id], kafka_to_value(value_to_kafka(object_)) ) for object_ in expected_objects] msgs = list(consumer_from_publisher) assert all(msg.topic == expected_topic for msg in msgs) assert [(msg.key, msg.value) for msg in msgs] == expected_msgs def test_publish( publisher: JournalPublisher, kafka_server: Tuple[Popen, int], + test_config: dict, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer): """ Reading from and writing to the journal publisher should work (contents) Args: journal_publisher (JournalPublisher): publisher to read and write data consumer_from_publisher (KafkaConsumer): To read data from publisher producer_to_publisher (KafkaProducer): To send data to publisher """ # retrieve the object types we want to test object_types = OBJECT_TYPE_KEYS.keys() # Now for each object type, we'll send data to the publisher and # check that data is indeed fetched and reified in the publisher's # output topics for object_type in object_types: assert_publish_ok( publisher, consumer_from_publisher, producer_to_publisher, - object_type) + test_config, object_type) diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py index f1792cf..10f23fd 100644 --- a/swh/journal/tests/test_publisher_no_kafka.py +++ b/swh/journal/tests/test_publisher_no_kafka.py @@ -1,151 +1,151 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import unittest from .conftest import ( JournalPublisherTest, TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS ) from swh.journal.publisher import MANDATORY_KEYS class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): """A journal publisher with: - no kafka dependency - in-memory storage """ def check_config(self, config): """No need to check the configuration here as we do not use kafka """ pass def _prepare_journal(self, config): """No journal for now """ pass class TestPublisherNoKafka(unittest.TestCase): """This tests only the part not using any kafka instance """ def setUp(self): self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] self.revisions = [{b'id': c['id']} for c in REVISIONS] self.releases = [{b'id': c['id']} for c in RELEASES] # those needs id generation from the storage # so initialization is different than other entities self.origins = [{b'url': o['url'], b'type': o['type']} for o in self.publisher.origins] self.origin_visits = [{b'origin': ov['origin'], b'visit': ov['visit']} for ov in self.publisher.origin_visits] # full objects storage = self.publisher.storage ovs = [] for ov in self.origin_visits: _ov = storage.origin_visit_get_by( ov[b'origin'], ov[b'visit']) _ov['date'] = str(_ov['date']) ovs.append(_ov) self.expected_origin_visits = ovs def test_process_contents(self): actual_contents = self.publisher.process_contents(self.contents) expected_contents = [(c['sha1'], c) for c in CONTENTS] self.assertEqual(actual_contents, expected_contents) def test_process_revisions(self): actual_revisions = self.publisher.process_revisions(self.revisions) expected_revisions = [(c['id'], c) for c in REVISIONS] self.assertEqual(actual_revisions, expected_revisions) def test_process_releases(self): actual_releases = self.publisher.process_releases(self.releases) expected_releases = [(c['id'], c) for c in RELEASES] self.assertEqual(actual_releases, expected_releases) def test_process_origins(self): actual_origins = self.publisher.process_origins(self.origins) expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, {'url': o[b'url'], 'type': o[b'type']}) for o in self.origins] self.assertEqual(actual_origins, expected_origins) def test_process_origin_visits(self): actual_ovs = self.publisher.process_origin_visits(self.origin_visits) expected_ovs = [((ov['origin'], ov['visit']), ov) for ov in self.expected_origin_visits] self.assertEqual(actual_ovs, expected_ovs) def test_process_objects(self): messages = { 'content': self.contents, 'revision': self.revisions, 'release': self.releases, 'origin': self.origins, 'origin_visit': self.origin_visits, } actual_objects = self.publisher.process_objects(messages) expected_contents = [(c['sha1'], c) for c in CONTENTS] expected_revisions = [(c['id'], c) for c in REVISIONS] expected_releases = [(c['id'], c) for c in RELEASES] expected_origins = [(o, o) for o in ORIGINS] expected_ovs = [((ov['origin'], ov['visit']), ov) for ov in self.expected_origin_visits] expected_objects = { 'content': expected_contents, 'revision': expected_revisions, 'release': expected_releases, 'origin': expected_origins, 'origin_visit': expected_ovs, } self.assertEqual(actual_objects, expected_objects) class JournalPublisherCheckTest(JournalPublisherTest): """A journal publisher with: - no kafka dependency - in-memory storage """ def _prepare_journal(self, config): """No journal for now """ pass -def test_check_config_ok(): +def test_check_config_ok(test_config): """Instantiate a publisher with the right config is fine """ - publisher = JournalPublisherCheckTest(TEST_CONFIG) + publisher = JournalPublisherCheckTest(test_config) assert publisher is not None -def test_check_config_ko(): +def test_check_config_ko(test_config): """Instantiate a publisher with the wrong config should raise """ for k in MANDATORY_KEYS: - conf = TEST_CONFIG.copy() + conf = test_config.copy() conf.pop(k) with pytest.raises(ValueError) as e: JournalPublisherCheckTest(conf) error = ('Configuration error: The following keys must be' ' provided: %s' % (','.join([k]), )) assert e.value.args[0] == error diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 34f965e..03e9872 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,80 +1,82 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import StorageReplayer from .conftest import OBJECT_TYPE_KEYS def test_storage_play( + kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) # Fill Kafka nb_sent = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = 'swh.journal.objects.' + object_type + topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', - 'prefix': 'swh.journal.objects', + 'prefix': kafka_prefix, } replayer = StorageReplayer(**config) nb_inserted = replayer.fill(storage, max_messages=nb_sent) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits # TODO: check for content