diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ dist/ version.txt .tox/ +kafka/ +kafka*.tgz +kafka*.tar.gz \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest swh.model +pytest-kafka diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -4,7 +4,41 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +# Regarding the kafka installation development code (test) +# Copyright 2018 Infectious Media Ltd and pytest-kafka contributors +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import contextlib +import os +import tarfile +import shutil +import subprocess +import sys +import urllib.request + +from pathlib import Path from setuptools import setup, find_packages +from setuptools.command.develop import develop # type: ignore + from os import path from io import open @@ -35,6 +69,82 @@ return requirements +# Dependencies for the test, copied from pytest-kafka's setup.py [1] +# [1] https://gitlab.com/karolinepauls/pytest-kafka/blob/master/setup.py +KAFKA_URL = ( + 'https://www.mirrorservice.org/sites/ftp.apache.org/kafka/1.1.1/' + 'kafka_2.11-1.1.1.tgz') +KAFKA_EXPECTED_HASH_SHA256 = '93b6f926b10b3ba826266272e3bd9d0fe8b33046da9a2688c58d403eb0a43430' # noqa +KAFKA_TAR = 'kafka.tgz' +KAFKA_TAR_ROOTDIR = 'kafka_2.11-1.1.1' +KAFKA_DIR = 'swh/journal/tests/kafka' + + +def set_up_kafka(): + """Download Kafka from an official mirror and untar it. The tarball + is checked for the right checksum. + + The data are not cleaned up to allow caching. Call specific + `setup.py clean` to clean up the test artefacts. + + """ + if not os.path.exists(KAFKA_TAR): + print('*swh-journal-setup* Downloading Kafka', file=sys.stderr) + urllib.request.urlretrieve(KAFKA_URL, KAFKA_TAR) + import hashlib + h = hashlib.sha256() + with open(KAFKA_TAR, 'rb') as f: + for chunk in f: + h.update(chunk) + hash_result = h.hexdigest() + if hash_result != KAFKA_EXPECTED_HASH_SHA256: + raise ValueError( + "Mismatch tarball %s hash checksum: expected %s, got %s" % ( + KAFKA_TAR, KAFKA_EXPECTED_HASH_SHA256, hash_result, )) + + if not os.path.exists(KAFKA_DIR): + print('*swh-journal-setup* Unpacking Kafka', file=sys.stderr) + with tarfile.open(KAFKA_TAR, 'r') as f: + f.extractall() + + print('*swh-journal-setup* Renaming:', KAFKA_TAR_ROOTDIR, '→', + KAFKA_DIR, file=sys.stderr) + Path(KAFKA_TAR_ROOTDIR).rename(KAFKA_DIR) + + +class InstallExtraDevDependencies(develop): + """Install development dependencies and download/setup Kafka. + + """ + def run(self): + """Set up the local dev environment fully. + + """ + print('*swh-journal-setup* Installing dev dependencies', + file=sys.stderr) + super().run() + subprocess.check_call(['pip', 'install', '-U', 'pip']) + subprocess.check_call(['pip', 'install', '.[testing]']) + + print('*swh-journal-setup* Setting up Kafka', file=sys.stderr) + set_up_kafka() + print('*swh-journal-setup* Setting up Kafka done') + + +class CleanupExtraDevDependencies(develop): + def run(self): + """Clean whatever `set_up_kafka` may create. + + """ + print('*swh-journal-setup* Cleaning up: %s, %s, %s' % ( + KAFKA_DIR, KAFKA_TAR_ROOTDIR, KAFKA_TAR)) + shutil.rmtree(KAFKA_DIR, ignore_errors=True) + shutil.rmtree(KAFKA_TAR_ROOTDIR, ignore_errors=True) + with contextlib.suppress(FileNotFoundError): + Path(KAFKA_TAR).unlink() + print('*swh-journal-setup* Cleaning up done') + + setup( name='swh.journal', description='Software Heritage Journal utilities', @@ -51,9 +161,15 @@ ''', install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + extras_require={ + 'testing': parse_requirements('test') + }, vcversioner={}, include_package_data=True, + cmdclass={ + 'develop': InstallExtraDevDependencies, + 'clean': CleanupExtraDevDependencies, + }, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/conftest.py @@ -0,0 +1,214 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import pytest +import logging + +from kafka import KafkaConsumer, KafkaProducer +from subprocess import Popen +from typing import Tuple + +from pathlib import Path +from pytest_kafka import ( + make_zookeeper_process, make_kafka_server, make_kafka_consumer +) + +from swh.journal.publisher import JournalPublisher +from swh.model.hashutil import hash_to_bytes + +from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value + + +TEST_CONFIG = { + 'brokers': ['localhost'], + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', + 'object_types': ['content'], + 'max_messages': 1, # will read 1 message and stops + 'storage': {'cls': 'memory', 'args': {}} +} + +CONTENTS = [ + { + 'length': 3, + 'sha1': hash_to_bytes( + '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), + 'sha1_git': b'foo', + 'blake2s256': b'bar', + 'sha256': b'baz', + 'status': 'visible', + }, +] + +COMMITTER = [ + { + 'id': 1, + 'fullname': 'foo', + }, + { + 'id': 2, + 'fullname': 'bar', + } +] + +REVISIONS = [ + { + 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), + 'message': b'hello', + 'date': { + 'timestamp': { + 'seconds': 1234567891, + 'microseconds': 0, + }, + 'offset': 120, + 'negative_utc': None, + }, + 'committer': COMMITTER[0], + 'author': COMMITTER[0], + 'committer_date': None, + }, + { + 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), + 'message': b'hello again', + 'date': { + 'timestamp': { + 'seconds': 1234567892, + 'microseconds': 0, + }, + 'offset': 120, + 'negative_utc': None, + }, + 'committer': COMMITTER[1], + 'author': COMMITTER[1], + 'committer_date': None, + }, +] + +RELEASES = [ + { + 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), + 'name': b'v0.0.1', + 'date': { + 'timestamp': { + 'seconds': 1234567890, + 'microseconds': 0, + }, + 'offset': 120, + 'negative_utc': None, + }, + 'author': COMMITTER[0], + }, +] + +ORIGINS = [ + { + 'url': 'https://somewhere.org/den/fox', + 'type': 'git', + }, + { + 'url': 'https://overtherainbow.org/fox/den', + 'type': 'svn', + } +] + +ORIGIN_VISITS = [ + { + 'date': '2013-05-07T04:20:39.369271+00:00', + }, + { + 'date': '2018-11-27T17:20:39.000000+00:00', + } +] + + +class JournalPublisherTest(JournalPublisher): + """A journal publisher which override the default configuration + parsing setup. + + """ + def _prepare_storage(self, config): + super()._prepare_storage(config) + self.storage.content_add({'data': b'42', **c} for c in CONTENTS) + print('#### all contents: %s' % self.storage._contents) + self.storage.revision_add(REVISIONS) + self.storage.release_add(RELEASES) + origins = self.storage.origin_add(ORIGINS) + origin_visits = [] + for i, ov in enumerate(ORIGIN_VISITS): + origin_id = origins[i]['id'] + ov = self.storage.origin_visit_add(origin_id, ov['date']) + origin_visits.append(ov) + self.origins = origins + self.origin_visits = origin_visits + + print("publisher.origin-visits", self.origin_visits) + + +KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT', Path(__file__).parent) +KAFKA_SCRIPTS = KAFKA_ROOT / 'kafka/bin/' + +KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') +ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') + + +# Those defines fixtures +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) +kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') + +# logger = logging.getLogger('kafka') +# logger.setLevel(logging.WARN) + + +@pytest.fixture +def producer_to_publisher( + request: 'SubRequest', + kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa + """Producer to send message to the publisher's consumer. + + """ + _, port = kafka_server + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=key_to_kafka, + client_id=TEST_CONFIG['consumer_id'], + ) + return producer + + +@pytest.fixture +def consumer_from_publisher(request: 'SubRequest') -> KafkaConsumer: # noqa + """Consumer to read message from the publisher's producer message + + """ + subscribed_topics = [ + '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + for object_type in TEST_CONFIG['object_types'] + ] + print('#### subscribed_topics: %s' % subscribed_topics) + kafka_consumer = make_kafka_consumer( + 'kafka_server', + seek_to_beginning=True, + key_deserializer=kafka_to_key, + value_deserializer=kafka_to_value, + auto_offset_reset='earliest', + enable_auto_commit=True, + client_id=TEST_CONFIG['publisher_id'], + kafka_topics=subscribed_topics) # Callback [..., KafkaConsumer] + return kafka_consumer(request) + + +@pytest.fixture +def publisher( + request: 'SubRequest', + kafka_server: Tuple[Popen, int]) -> JournalPublisher: + # consumer and producer of the publisher needs to discuss with the + # right instance + _, port = kafka_server + TEST_CONFIG['brokers'] = ['localhost:{}'.format(port)] + return JournalPublisherTest(TEST_CONFIG) diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -1,141 +1,22 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from swh.model.hashutil import hash_to_bytes -from swh.journal.publisher import JournalPublisher -from swh.storage.in_memory import Storage - -CONTENTS = [ - { - 'length': 3, - 'sha1': hash_to_bytes( - '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), - 'sha1_git': b'foo', - 'blake2s256': b'bar', - 'sha256': b'baz', - 'status': 'visible', - }, -] - -COMMITTER = [ - { - 'id': 1, - 'fullname': 'foo', - }, - { - 'id': 2, - 'fullname': 'bar', - } -] - -REVISIONS = [ - { - 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), - 'message': b'hello', - 'date': { - 'timestamp': { - 'seconds': 1234567891, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'committer': COMMITTER[0], - 'author': COMMITTER[0], - 'committer_date': None, - }, - { - 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), - 'message': b'hello again', - 'date': { - 'timestamp': { - 'seconds': 1234567892, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'committer': COMMITTER[1], - 'author': COMMITTER[1], - 'committer_date': None, - }, -] - -RELEASES = [ - { - 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), - 'name': b'v0.0.1', - 'date': { - 'timestamp': { - 'seconds': 1234567890, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'author': COMMITTER[0], - }, -] - -ORIGINS = [ - { - 'url': 'https://somewhere.org/den/fox', - 'type': 'git', - }, - { - 'url': 'https://overtherainbow.org/fox/den', - 'type': 'svn', - } -] - -ORIGIN_VISITS = [ - { - 'date': '2013-05-07T04:20:39.369271+00:00', - }, - { - 'date': '2018-11-27T17:20:39.000000+00:00', - } -] - -TEST_CONFIG = { - 'brokers': ['localhost'], - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.test.publisher', - 'publisher_id': 'swh.journal.test.publisher', - 'object_types': ['content'], - 'max_messages': 3, -} - - -class JournalPublisherTest(JournalPublisher): - def _prepare_storage(self, config): - self.storage = Storage() - self.storage.content_add({'data': b'42', **c} for c in CONTENTS) - self.storage.revision_add(REVISIONS) - self.storage.release_add(RELEASES) - origins = self.storage.origin_add(ORIGINS) - origin_visits = [] - for i, ov in enumerate(ORIGIN_VISITS): - origin_id = origins[i]['id'] - ov = self.storage.origin_visit_add(origin_id, ov['date']) - origin_visits.append(ov) - self.origins = origins - self.origin_visits = origin_visits - - print("publisher.origin-visits", self.origin_visits) +from .conftest import ( + JournalPublisherTest, TEST_CONFIG, + CONTENTS, REVISIONS, RELEASES, ORIGINS +) class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): """A journal publisher with: - no kafka dependency - in-memory storage - """ + """ def _prepare_journal(self, config): """No journal for now @@ -143,7 +24,10 @@ pass -class TestPublisher(unittest.TestCase): +class TestPublisherNoKafka(unittest.TestCase): + """This tests only the part not using any kafka instance + + """ def setUp(self): self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] diff --git a/swh/journal/tests/test_publisher2.py b/swh/journal/tests/test_publisher2.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_publisher2.py @@ -0,0 +1,71 @@ +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from kafka import KafkaConsumer, KafkaProducer +from swh.journal.publisher import JournalPublisher + +from .conftest import ( + TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS +) + + +def test_publisher( + publisher: JournalPublisher, + consumer_from_publisher: KafkaConsumer, + producer_to_publisher: KafkaProducer): + """ + Reading from and writing to the journal publisher should work + + Args: + journal_publisher (JournalPublisher): publisher to read and write data + kafka_consumer (KafkaConsumer): To read data from the publisher + kafka_producer (KafkaProducer): To send data to the publisher + + """ + contents = [{b'sha1': c['sha1']} for c in CONTENTS] + # revisions = [{b'id': c['id']} for c in REVISIONS] + # releases = [{b'id': c['id']} for c in RELEASES] + + # read the output of the publisher + consumer_from_publisher.subscribe( + topics=['%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + for object_type in TEST_CONFIG['object_types']]) + + print('#### producer_to_publisher: Sending: %s' % contents[0]) + # send message to the publisher + producer_to_publisher.send( + '%s.content' % TEST_CONFIG['temporary_prefix'], + contents[0] + ) + + nb_messages = len(contents) + + # publisher should poll 1 message and send 1 reified object + publisher.poll(max_messages=nb_messages) + + # then (client reads from the messages from output topic) + msgs = [] + for num, msg in enumerate(consumer_from_publisher): + print('#### consumer_from_publisher: msg %s: %s ' % (num, msg)) + print('#### consumer_from_publisher: msg.value %s: %s ' % ( + num, msg.value)) + msgs.append((msg.topic, msg.key, msg.value)) + + expected_topic = '%s.content' % TEST_CONFIG['final_prefix'] + assert expected_topic == msg.topic + + expected_key = contents[num][b'sha1'] + assert expected_key == msg.key + + expected_value = CONTENTS[num] + # Transformation is needed due to msgpack which encodes keys and values + value = {} + for k, v in msg.value.items(): + k = k.decode() + if k == 'status': + v = v.decode() + value[k] = v + + assert expected_value == value diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -6,6 +6,7 @@ .[testing] pytest-cov commands = + ./setup.py develop pytest --cov=swh --cov-branch {posargs} [testenv:flake8]