diff --git a/debian/control b/debian/control index 207b2e37..17e98e70 100644 --- a/debian/control +++ b/debian/control @@ -1,55 +1,56 @@ Source: swh-storage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-click, python3-dateutil, python3-flask, python3-nose, python3-psycopg2, python3-requests, python3-setuptools, python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.14~), python3-swh.objstorage (>= 0.0.17~), python3-swh.scheduler (>= 0.0.11~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DSTO/ Package: python3-swh.storage Architecture: all Depends: python3-swh.core (>= 0.0.28~), python3-swh.model (>= 0.0.14~), python3-swh.objstorage (>= 0.0.17~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage utilities Package: python3-swh.storage.listener Architecture: all -Depends: python3-kafka (>= 1.3.1~), +Depends: python3-swh.journal, + python3-kafka (>= 1.3.1~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage listener Package: python3-swh.storage.archiver Architecture: all Depends: python3-swh.scheduler (>= 0.0.11~), python3-swh.journal, python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Archiver Package: python3-swh.storage.provenance Architecture: all Depends: python3-swh.scheduler (>= 0.0.11~), python3-swh.storage (= ${binary:Version}), ${misc:Depends}, ${python3:Depends} Description: Software Heritage storage Provenance diff --git a/requirements.txt b/requirements.txt index 48f0a0be..a7fbb411 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ click flask psycopg2 python-dateutil python-fastimport vcversioner +kafka diff --git a/swh/storage/listener.py b/swh/storage/listener.py index 7a41e13f..4d0b605f 100644 --- a/swh/storage/listener.py +++ b/swh/storage/listener.py @@ -1,102 +1,100 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging - import kafka -import msgpack -from swh.core.config import load_named_config import swh.storage.db +from swh.core.config import load_named_config +from swh.journal.serializers import key_to_kafka + + CONFIG_BASENAME = 'storage/listener' DEFAULT_CONFIG = { 'database': ('str', 'service=softwareheritage'), 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), 'topic_prefix': ('str', 'swh.tmp_journal.new'), 'poll_timeout': ('int', 10), } def decode_sha(value): """Decode the textual representation of a SHA hash""" if isinstance(value, str): return bytes.fromhex(value) return value def decode_json(value): """Decode a JSON value containing hashes and other types""" value = json.loads(value) return {k: decode_sha(v) for k, v in value.items()} OBJECT_TYPES = { 'content', 'skipped_content', 'directory', 'revision', 'release', 'origin_visit', 'origin', } def register_all_notifies(db): """Register to notifications for all object types listed in OBJECT_TYPES""" with db.transaction() as cur: for object_type in OBJECT_TYPES: db.register_listener('new_%s' % object_type, cur) def dispatch_notify(topic_prefix, producer, notify): """Dispatch a notification to the proper topic""" channel = notify.channel if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: logging.warn("Got unexpected notify %s" % notify) return object_type = channel[4:] topic = '%s.%s' % (topic_prefix, object_type) data = decode_json(notify.payload) producer.send(topic, value=data) def run_from_config(config): """Run the Software Heritage listener from configuration""" db = swh.storage.db.Db.connect(config['database']) - def kafka_serializer(data): - return msgpack.dumps(data, use_bin_type=True) - producer = kafka.KafkaProducer( bootstrap_servers=config['brokers'], - value_serializer=kafka_serializer, + value_serializer=key_to_kafka, ) register_all_notifies(db) topic_prefix = config['topic_prefix'] poll_timeout = config['poll_timeout'] try: while True: for notify in db.listen_notifies(poll_timeout): dispatch_notify(topic_prefix, producer, notify) producer.flush() except Exception: logging.exception("Caught exception") producer.flush() if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s %(process)d %(levelname)s %(message)s' ) config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) run_from_config(config)