diff --git a/setup.py b/setup.py index bf702a2a..7c6f2eb8 100755 --- a/setup.py +++ b/setup.py @@ -1,75 +1,74 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = 'requirements-%s.txt' % name else: reqf = 'requirements.txt' requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.storage', description='Software Heritage storage manager', long_description=long_description, long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DSTO/', packages=find_packages(), scripts=[ 'bin/swh-storage-add-dir', ], entry_points=''' [console_scripts] swh-storage=swh.storage.cli:main ''', setup_requires=['vcversioner'], install_requires=parse_requirements() + parse_requirements('swh'), extras_require={ 'testing': parse_requirements('test'), 'schemata': ['SQLAlchemy'], - 'listener': ['kafka_python'], }, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-storage', }, ) diff --git a/swh/storage/listener.py b/swh/storage/listener.py deleted file mode 100644 index 4d0f844e..00000000 --- a/swh/storage/listener.py +++ /dev/null @@ -1,142 +0,0 @@ -# Copyright (C) 2016-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import json -import logging - -import kafka -import msgpack - -import swh.storage.db - -from swh.core.config import load_named_config -from swh.model import hashutil - - -CONFIG_BASENAME = 'storage/listener' -DEFAULT_CONFIG = { - 'database': ('str', 'service=softwareheritage'), - 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - 'topic_prefix': ('str', 'swh.tmp_journal.new'), - 'poll_timeout': ('int', 10), -} - - -def decode(object_type, obj): - """Decode a JSON obj of nature object_type. Depending on the nature of - the object, this can contain hex hashes - (cf. `/swh/storage/sql/70-swh-triggers.sql`). - - Args: - object_type (str): Nature of the object - obj (str): json dict representation whose values might be hex - identifier. - - Returns: - dict representation ready for journal serialization - - """ - value = json.loads(obj) - - if object_type in ('origin', 'origin_visit'): - result = value - else: - result = {} - for k, v in value.items(): - result[k] = hashutil.hash_to_bytes(v) - return result - - -OBJECT_TYPES = { - 'content', - 'skipped_content', - 'directory', - 'revision', - 'release', - 'snapshot', - 'origin_visit', - 'origin', -} - - -def register_all_notifies(db): - """Register to notifications for all object types listed in OBJECT_TYPES""" - with db.transaction() as cur: - for object_type in OBJECT_TYPES: - db.register_listener('new_%s' % object_type, cur) - logging.debug( - 'Registered to events for object type %s', object_type) - - -def dispatch_notify(topic_prefix, producer, notify): - """Dispatch a notification to the proper topic""" - logging.debug('topic_prefix: %s, producer: %s, notify: %s' % ( - topic_prefix, producer, notify)) - channel = notify.channel - if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: - logging.warn("Got unexpected notify %s" % notify) - return - - object_type = channel[4:] - topic = '%s.%s' % (topic_prefix, object_type) - producer.send(topic, value=decode(object_type, notify.payload)) - - -def run_once(db, producer, topic_prefix, poll_timeout): - for notify in db.listen_notifies(poll_timeout): - logging.debug('Notified by event %s' % notify) - dispatch_notify(topic_prefix, producer, notify) - producer.flush() - - -def run_from_config(config): - """Run the Software Heritage listener from configuration""" - db = swh.storage.db.Db.connect(config['database']) - - def key_to_kafka(key): - """Serialize a key, possibly a dict, in a predictable way. - - Duplicated from swh.journal to avoid a cyclic dependency.""" - p = msgpack.Packer(use_bin_type=True) - if isinstance(key, dict): - return p.pack_map_pairs(sorted(key.items())) - else: - return p.pack(key) - - producer = kafka.KafkaProducer( - bootstrap_servers=config['brokers'], - value_serializer=key_to_kafka, - ) - - register_all_notifies(db) - - topic_prefix = config['topic_prefix'] - poll_timeout = config['poll_timeout'] - try: - while True: - run_once(db, producer, topic_prefix, poll_timeout) - except Exception: - logging.exception("Caught exception") - producer.flush() - - -if __name__ == '__main__': - import click - - @click.command() - @click.option('--verbose', is_flag=True, default=False, - help='Be verbose if asked.') - def main(verbose): - logging.basicConfig( - level=logging.DEBUG if verbose else logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - _log = logging.getLogger('kafka') - _log.setLevel(logging.INFO) - - config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) - run_from_config(config) - - main() diff --git a/swh/storage/tests/test_listener.py b/swh/storage/tests/test_listener.py deleted file mode 100644 index 8496a015..00000000 --- a/swh/storage/tests/test_listener.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import json -import os -import unittest -import unittest.mock - -import pytest - -from swh.core.tests.db_testing import SingleDbTestFixture -from swh.storage.tests.storage_testing import StorageTestFixture -from swh.storage.tests.test_storage import TestStorageData -import swh.storage.listener as listener -from swh.storage.db import Db -from . import SQL_DIR - - -@pytest.mark.db -class ListenerTest(StorageTestFixture, SingleDbTestFixture, - TestStorageData, unittest.TestCase): - TEST_DB_NAME = 'softwareheritage-test-storage' - TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') - - def setUp(self): - super().setUp() - self.db = Db(self.conn) - - def tearDown(self): - self.db.conn.close() - super().tearDown() - - def test_notify(self): - class MockProducer: - def send(self, topic, value): - sent.append((topic, value)) - - def flush(self): - pass - - listener.register_all_notifies(self.db) - - # Add an origin and an origin visit - origin_id = self.storage.origin_add_one(self.origin) - visit = self.storage.origin_visit_add(origin_id, date=self.date_visit1) - visit_id = visit['visit'] - - sent = [] - listener.run_once(self.db, MockProducer(), 'swh.tmp_journal.new', 10) - self.assertEqual(sent, [ - ('swh.tmp_journal.new.origin', - {'type': 'git', 'url': 'file:///dev/null'}), - ('swh.tmp_journal.new.origin_visit', - {'origin': 1, 'visit': 1}), - ]) - - # Update the status of the origin visit - self.storage.origin_visit_update(origin_id, visit_id, status='full') - - sent = [] - listener.run_once(self.db, MockProducer(), 'swh.tmp_journal.new', 10) - self.assertEqual(sent, [ - ('swh.tmp_journal.new.origin_visit', - {'origin': 1, 'visit': 1}), - ]) - - -class ListenerUtils(unittest.TestCase): - def test_decode(self): - inputs = [ - ('content', json.dumps({ - 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', - })), - ('origin', json.dumps({ - 'url': 'https://some/origin', - 'type': 'svn', - })), - ('origin_visit', json.dumps({ - 'visit': 2, - 'origin': { - 'url': 'https://some/origin', - 'type': 'hg', - } - })) - ] - - expected_inputs = [{ - 'sha1': bytes.fromhex('34973274ccef6ab4dfaaf86599792fa9c3fe4689'), - }, { - 'url': 'https://some/origin', - 'type': 'svn', - }, { - 'visit': 2, - 'origin': { - 'url': 'https://some/origin', - 'type': 'hg' - }, - }] - - for i, (object_type, obj) in enumerate(inputs): - actual_value = listener.decode(object_type, obj) - self.assertEqual(actual_value, expected_inputs[i])