diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,6 @@ dist/ version.txt .tox/ +kafka/ +kafka*.tgz +kafka*.tar.gz \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest swh.model +pytest-kafka diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -4,7 +4,41 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +# Regarding the kafka installation development code (test) +# Copyright 2018 Infectious Media Ltd and pytest-kafka contributors +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import contextlib +import os +import tarfile +import shutil +import subprocess +import sys +import urllib.request + +from pathlib import Path from setuptools import setup, find_packages +from setuptools.command.develop import develop # type: ignore + from os import path from io import open @@ -35,6 +69,82 @@ return requirements +# Dependencies for the test, copied from pytest-kafka's setup.py [1] +# [1] https://gitlab.com/karolinepauls/pytest-kafka/blob/master/setup.py +KAFKA_URL = ( + 'https://www.mirrorservice.org/sites/ftp.apache.org/kafka/1.1.1/' + 'kafka_2.11-1.1.1.tgz') +KAFKA_EXPECTED_HASH_SHA256 = '93b6f926b10b3ba826266272e3bd9d0fe8b33046da9a2688c58d403eb0a43430' # noqa +KAFKA_TAR = 'kafka.tgz' +KAFKA_TAR_ROOTDIR = 'kafka_2.11-1.1.1' +KAFKA_DIR = 'swh/journal/tests/kafka' + + +def set_up_kafka(): + """Download Kafka from an official mirror and untar it. The tarball + is checked for the right checksum. + + The data are not cleaned up to allow caching. Call specific + `setup.py clean` to clean up the test artefacts. + + """ + if not os.path.exists(KAFKA_TAR): + print('*swh-journal-setup* Downloading Kafka', file=sys.stderr) + urllib.request.urlretrieve(KAFKA_URL, KAFKA_TAR) + import hashlib + h = hashlib.sha256() + with open(KAFKA_TAR, 'rb') as f: + for chunk in f: + h.update(chunk) + hash_result = h.hexdigest() + if hash_result != KAFKA_EXPECTED_HASH_SHA256: + raise ValueError( + "Mismatch tarball %s hash checksum: expected %s, got %s" % ( + KAFKA_TAR, KAFKA_EXPECTED_HASH_SHA256, hash_result, )) + + if not os.path.exists(KAFKA_DIR): + print('*swh-journal-setup* Unpacking Kafka', file=sys.stderr) + with tarfile.open(KAFKA_TAR, 'r') as f: + f.extractall() + + print('*swh-journal-setup* Renaming:', KAFKA_TAR_ROOTDIR, '→', + KAFKA_DIR, file=sys.stderr) + Path(KAFKA_TAR_ROOTDIR).rename(KAFKA_DIR) + + +class InstallExtraDevDependencies(develop): + """Install development dependencies and download/setup Kafka. + + """ + def run(self): + """Set up the local dev environment fully. + + """ + print('*swh-journal-setup* Installing dev dependencies', + file=sys.stderr) + super().run() + subprocess.check_call(['pip', 'install', '-U', 'pip']) + subprocess.check_call(['pip', 'install', '.[testing]']) + + print('*swh-journal-setup* Setting up Kafka', file=sys.stderr) + set_up_kafka() + print('*swh-journal-setup* Setting up Kafka done') + + +class CleanupExtraDevDependencies(develop): + def run(self): + """Clean whatever `set_up_kafka` may create. + + """ + print('*swh-journal-setup* Cleaning up: %s, %s, %s' % ( + KAFKA_DIR, KAFKA_TAR_ROOTDIR, KAFKA_TAR)) + shutil.rmtree(KAFKA_DIR, ignore_errors=True) + shutil.rmtree(KAFKA_TAR_ROOTDIR, ignore_errors=True) + with contextlib.suppress(FileNotFoundError): + Path(KAFKA_TAR).unlink() + print('*swh-journal-setup* Cleaning up done') + + setup( name='swh.journal', description='Software Heritage Journal utilities', @@ -47,9 +157,15 @@ scripts=[], install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + extras_require={ + 'testing': parse_requirements('test') + }, vcversioner={}, include_package_data=True, + cmdclass={ + 'develop': InstallExtraDevDependencies, + 'clean': CleanupExtraDevDependencies, + }, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -1,13 +1,82 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +from pathlib import Path +from typing import Tuple + from swh.model.hashutil import hash_to_bytes from swh.journal.publisher import JournalPublisher from swh.storage.in_memory import Storage +from kafka import KafkaConsumer, KafkaProducer + +from subprocess import Popen +from pytest_kafka import ( + make_zookeeper_process, make_kafka_server, make_kafka_consumer +) +# from swh.journal.serializers import kafka_to_key, key_to_kafka + + +class MockStorage: + # Type from object to their corresponding expected key id + type_to_key = { + 'content': 'sha1', + 'revision': 'id', + 'release': 'id', + } + + def __init__(self, state): + """Initialize mock storage's state. + + Args: + state (dict): keys are the object type (content, revision, + release) and values are a list of dict + representing the associated typed objects + + """ + self.state = {} + for type, key in self.type_to_key.items(): + self.state[type] = { + obj[key]: obj for obj in state[type] + } + + def get(self, type, objects): + """Given an object type and a list of objects with type type, returns + the state's matching objects. + + Args: + type (str): expected object type (release, revision, content) + objects ([bytes]): list of object id (bytes) + + Returns: + list of dict corresponding to the id provided + + """ + data = [] + if type not in self.type_to_key: + raise ValueError('Programmatic error: expected %s not %s' % ( + ', '.join(self.type_to_key), type + )) + object_ids = self.state[type] + for _id in objects: + c = object_ids.get(_id) + if c: + data.append(c) + + return data + + def content_get_metadata(self, contents): + return self.get('content', contents) + + def revision_get(self, revisions): + return self.get('revision', revisions) + + def release_get(self, releases): + return self.get('release', releases) + CONTENTS = [ { @@ -102,17 +171,24 @@ ] +TEST_CONFIG = { + 'brokers': ['localhost'], + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', + 'object_types': ['content'], + 'max_messages': 1, # will read 1 message and stops +} + + class JournalPublisherTest(JournalPublisher): + """A journal publisher which override the default configuration + parsing setup. + + """ def parse_config_file(self): - return { - 'brokers': ['localhost'], - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.test.publisher', - 'publisher_id': 'swh.journal.test.publisher', - 'object_types': ['content'], - 'max_messages': 3, - } + return TEST_CONFIG def _prepare_storage(self, config): self.storage = Storage() @@ -130,6 +206,13 @@ print("publisher.origin-visits", self.origin_visits) + +class JournalPublisherNoKafkaMockStorageTest(JournalPublisherTest): + """A journal publisher with: + - no kafka dependency + - mock storage as storage + + """ def _prepare_journal(self, config): """No journal for now @@ -137,9 +220,12 @@ pass -class TestPublisher(unittest.TestCase): +class TestPublisherNoKafka(unittest.TestCase): + """This tests only the part not using any kafka instance + + """ def setUp(self): - self.publisher = JournalPublisherTest() + self.publisher = JournalPublisherNoKafkaMockStorageTest() self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] self.revisions = [{b'id': c['id']} for c in REVISIONS] self.releases = [{b'id': c['id']} for c in RELEASES] @@ -215,3 +301,90 @@ } self.assertEqual(actual_objects, expected_objects) + + +# Prepare kafka + +ROOT = Path(__file__).parent +KAFKA_SCRIPTS = ROOT / 'kafka/bin/' + +KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') +ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') + + +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) +kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') + +TOPIC = 'abc' +kafka_consumer = make_kafka_consumer( + 'kafka_server', seek_to_beginning=True, kafka_topics=[TOPIC]) + + +def write_to_kafka(kafka_server: Tuple[Popen, int], message: bytes) -> None: + """Write a message to kafka_server.""" + _, kafka_port = kafka_server + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(kafka_port)) + producer.send(TOPIC, message) + producer.flush() + + +def write_and_read(kafka_server: Tuple[Popen, int], + kafka_consumer: KafkaConsumer) -> None: + """Write to kafka_server, consume with kafka_consumer.""" + message = b'msg' + write_to_kafka(kafka_server, message) + consumed = list(kafka_consumer) + assert len(consumed) == 1 + assert consumed[0].topic == TOPIC + assert consumed[0].value == message + + +def test_message(kafka_server: Tuple[Popen, int], + kafka_consumer: KafkaConsumer): + write_and_read(kafka_server, kafka_consumer) + +# def setUp(self): +# self.publisher = JournalPublisherTest() +# self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] +# # self.revisions = [{b'id': c['id']} for c in REVISIONS] +# # self.releases = [{b'id': c['id']} for c in RELEASES] +# # producer and consumer to send and read data from publisher +# self.producer_to_publisher = KafkaProducer( +# bootstrap_servers=TEST_CONFIG['brokers'], +# key_serializer=key_to_kafka, +# value_serializer=key_to_kafka, +# acks='all') +# self.consumer_from_publisher = KafkaConsumer( +# bootstrap_servers=TEST_CONFIG['brokers'], +# value_deserializer=kafka_to_key) +# self.consumer_from_publisher.subscribe( +# topics=['%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type) +# for object_type in TEST_CONFIG['object_types']]) + + +# def test_poll(kafka_consumer): +# # given (send message to the publisher) +# self.producer_to_publisher.send( +# '%s.content' % TEST_CONFIG['temporary_prefix'], +# self.contents[0] +# ) + +# nb_messages = 1 + +# # when (the publisher poll 1 message and send 1 reified object) +# self.publisher.poll(max_messages=nb_messages) + +# # then (client reads from the messages from output topic) +# msgs = [] +# for num, msg in enumerate(self.consumer_from_publisher): +# print('#### consumed msg %s: %s ' % (num, msg)) +# msgs.append(msg) + +# self.assertEqual(len(msgs), nb_messages) +# print('##### msgs: %s' % msgs) +# # check the results +# expected_topic = 'swh.journal.objects.content' +# expected_object = (self.contents[0][b'sha1'], CONTENTS[0]) + +# self.assertEqual(msgs, (expected_topic, expected_object)) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -4,8 +4,10 @@ [testenv:py3] deps = .[testing] + .[develop] pytest-cov commands = + ./setup.py develop pytest --cov=swh --cov-branch {posargs} [testenv:flake8]