diff --git a/swh/storage/listener.py b/swh/storage/listener.py --- a/swh/storage/listener.py +++ b/swh/storage/listener.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -12,6 +12,7 @@ import swh.storage.db from swh.core.config import load_named_config +from swh.model import hashutil CONFIG_BASENAME = 'storage/listener' @@ -23,18 +24,29 @@ } -def decode_sha(value): - """Decode the textual representation of a SHA hash""" - if isinstance(value, str): - return bytes.fromhex(value) - return value +def decode(object_type, obj): + """Decode a JSON obj of nature object_type. Depending on the nature of + the object, this can contain hex hashes + (cf. `/swh/storage/sql/70-swh-triggers.sql`). + Args: + object_type (str): Nature of the object + obj (str): json dict representation whose values might be hex + identifier. -def decode_json(value): - """Decode a JSON value containing hashes and other types""" - value = json.loads(value) + Returns: + dict representation ready for journal serialization - return {k: decode_sha(v) for k, v in value.items()} + """ + value = json.loads(obj) + + if object_type in ('origin', 'origin_visit'): + result = value + else: + result = {} + for k, v in value.items(): + result[k] = hashutil.hash_to_bytes(v) + return result OBJECT_TYPES = { @@ -54,20 +66,21 @@ with db.transaction() as cur: for object_type in OBJECT_TYPES: db.register_listener('new_%s' % object_type, cur) + logging.debug('Registered to notify events %s' % object_type) def dispatch_notify(topic_prefix, producer, notify): """Dispatch a notification to the proper topic""" + logging.debug('topic_prefix: %s, producer: %s, notify: %s' % ( + topic_prefix, producer, notify)) channel = notify.channel if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: logging.warn("Got unexpected notify %s" % notify) return object_type = channel[4:] - topic = '%s.%s' % (topic_prefix, object_type) - data = decode_json(notify.payload) - producer.send(topic, value=data) + producer.send(topic, value=decode(object_type, notify.payload)) def run_from_config(config): @@ -96,6 +109,7 @@ try: while True: for notify in db.listen_notifies(poll_timeout): + logging.debug('Notified by event %s' % notify) dispatch_notify(topic_prefix, producer, notify) producer.flush() except Exception: @@ -104,9 +118,20 @@ if __name__ == '__main__': - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) - run_from_config(config) + import click + + @click.command() + @click.option('--verbose', is_flag=True, default=False, + help='Be verbose if asked.') + def main(verbose): + logging.basicConfig( + level=logging.DEBUG if verbose else logging.INFO, + format='%(asctime)s %(process)d %(levelname)s %(message)s' + ) + _log = logging.getLogger('kafka') + _log.setLevel(logging.INFO) + + config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) + run_from_config(config) + + main() diff --git a/swh/storage/tests/test_listener.py b/swh/storage/tests/test_listener.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_listener.py @@ -0,0 +1,46 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import unittest + +from swh.storage.listener import decode + + +class ListenerUtils(unittest.TestCase): + def test_decode(self): + inputs = [ + ('content', json.dumps({ + 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', + })), + ('origin', json.dumps({ + 'url': 'https://some/origin', + 'type': 'svn', + })), + ('origin_visit', json.dumps({ + 'visit': 2, + 'origin': { + 'url': 'https://some/origin', + 'type': 'hg', + } + })) + ] + + expected_inputs = [{ + 'sha1': bytes.fromhex('34973274ccef6ab4dfaaf86599792fa9c3fe4689'), + }, { + 'url': 'https://some/origin', + 'type': 'svn', + }, { + 'visit': 2, + 'origin': { + 'url': 'https://some/origin', + 'type': 'hg' + }, + }] + + for i, (object_type, obj) in enumerate(inputs): + actual_value = decode(object_type, obj) + self.assertEqual(actual_value, expected_inputs[i]) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,7 @@ [testenv:py3] deps = .[testing] + .[listener] pytest-cov pifpaf commands = @@ -12,6 +13,7 @@ [testenv:py3-slow] deps = .[testing] + .[listener] pytest-cov pifpaf commands =