Changeset View
Standalone View
swh/storage/listener.py
# Copyright (C) 2016-2017 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import logging | import logging | ||||
import kafka | import kafka | ||||
import msgpack | import msgpack | ||||
import swh.storage.db | import swh.storage.db | ||||
from swh.core.config import load_named_config | from swh.core.config import load_named_config | ||||
CONFIG_BASENAME = 'storage/listener' | CONFIG_BASENAME = 'storage/listener' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'database': ('str', 'service=softwareheritage'), | 'database': ('str', 'service=softwareheritage'), | ||||
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | ||||
'topic_prefix': ('str', 'swh.tmp_journal.new'), | 'topic_prefix': ('str', 'swh.tmp_journal.new'), | ||||
'poll_timeout': ('int', 10), | 'poll_timeout': ('int', 10), | ||||
} | } | ||||
def decode_sha(value): | def decode_simple(value): | ||||
"""Decode the textual representation of a SHA hash""" | """Decode simple values (no hex identifiers in their midst) | ||||
if isinstance(value, str): | |||||
return bytes.fromhex(value) | |||||
return value | |||||
Args: | |||||
value (str): json representation | |||||
def decode_json(value): | Returns: | ||||
"""Decode a JSON value containing hashes and other types""" | dict representation | ||||
value = json.loads(value) | |||||
return {k: decode_sha(v) for k, v in value.items()} | """ | ||||
return json.loads(value) | |||||
douardda: Why keep this function? It adds IMHO useless complexity here. | |||||
ardumontAuthorUnsubmitted Done Inline ActionsYes, it was initially "more" than just json.loads. Oh, you mean systematically call decode_with_identifier on everything... ardumont: Yes, it was initially "more" than just `json.loads`.
Although, with this i don't repeat that… | |||||
Done Inline Actionsbut that's not possible... Something seems off here. ardumont: but that's not possible...
otherwise, that would not be json serializable?!
Something seems… | |||||
Done Inline Actionsok, i think i messed up. So yes, the complexity is needed in the end! ardumont: ok, i think i messed up.
It's the other way around, it takes the ids as hex and turn them into… | |||||
Not Done Inline ActionsI don't understand this. First, you probably meant if isinstance(v, bytes): , because value is always a dict. But v is produced by json.loads, so it's impossible for it to be bytes. vlorentz: I don't understand this. First, you probably meant `if isinstance(v, bytes): `, because `value`… | |||||
def decode_with_identifier(value): | |||||
"""Decode a JSON value containing hashes and other types | |||||
Args: | |||||
Not Done Inline ActionsUse hashutil.hash_from_hex instead of bytes.fromhex. vlorentz: Use `hashutil.hash_from_hex` instead of `bytes.fromhex`. | |||||
Done Inline ActionsYeah, i don't know why we used that instead of our common hashutil. ardumont: Yeah, i don't know why we used that instead of our common hashutil. | |||||
value (str): json dict representation whose value might be hex | |||||
identifier. | |||||
Returns: | |||||
dict representation whose identifier is now an hexadecimal | |||||
string. | |||||
""" | |||||
value = decode_simple(value) | |||||
m = {} | |||||
for k, v in value.items(): | |||||
if isinstance(value, bytes): | |||||
v = bytes.fromhex(v) | |||||
m[k] = v | |||||
return m | |||||
OBJECT_TYPES = { | OBJECT_TYPES = { | ||||
'content', | 'content', | ||||
'skipped_content', | 'skipped_content', | ||||
'directory', | 'directory', | ||||
'revision', | 'revision', | ||||
'release', | 'release', | ||||
'snapshot', | 'snapshot', | ||||
'origin_visit', | 'origin_visit', | ||||
'origin', | 'origin', | ||||
} | } | ||||
def register_all_notifies(db): | def register_all_notifies(db): | ||||
"""Register to notifications for all object types listed in OBJECT_TYPES""" | """Register to notifications for all object types listed in OBJECT_TYPES""" | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
for object_type in OBJECT_TYPES: | for object_type in OBJECT_TYPES: | ||||
Not Done Inline Actions"Registering to" or "Registered to" vlorentz: "Registering to" or "Registered to" | |||||
logging.debug('Register to notify events %s' % object_type) | |||||
db.register_listener('new_%s' % object_type, cur) | db.register_listener('new_%s' % object_type, cur) | ||||
def dispatch_notify(topic_prefix, producer, notify): | def dispatch_notify(topic_prefix, producer, notify): | ||||
"""Dispatch a notification to the proper topic""" | """Dispatch a notification to the proper topic""" | ||||
logging.debug('topic_prefix: %s' % topic_prefix) | |||||
logging.debug('producer: %s' % producer) | |||||
Not Done Inline ActionsNot very readable. Should be a single log line, and explain what this is about. vlorentz: Not very readable. Should be a single log line, and explain what this is about. | |||||
Done Inline ActionsThis is about understanding what happens and what those things look like. ardumont: This is about understanding what happens and what those things look like.
I did not remember ;) | |||||
logging.debug('notify: %s' % notify) | |||||
channel = notify.channel | channel = notify.channel | ||||
if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: | if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: | ||||
logging.warn("Got unexpected notify %s" % notify) | logging.warn("Got unexpected notify %s" % notify) | ||||
return | return | ||||
object_type = channel[4:] | object_type = channel[4:] | ||||
topic = '%s.%s' % (topic_prefix, object_type) | topic = '%s.%s' % (topic_prefix, object_type) | ||||
data = decode_json(notify.payload) | # mapping function depending on the object type | ||||
mapping_fn = { | |||||
'origin': decode_simple, | |||||
'origin_visit': decode_simple, | |||||
} | |||||
mapping_callable = mapping_fn.get(object_type, decode_with_identifier) | |||||
data = mapping_callable(notify.payload) | |||||
producer.send(topic, value=data) | producer.send(topic, value=data) | ||||
def run_from_config(config): | def run_from_config(config): | ||||
"""Run the Software Heritage listener from configuration""" | """Run the Software Heritage listener from configuration""" | ||||
db = swh.storage.db.Db.connect(config['database']) | db = swh.storage.db.Db.connect(config['database']) | ||||
def key_to_kafka(key): | def key_to_kafka(key): | ||||
Show All 13 Lines | def run_from_config(config): | ||||
register_all_notifies(db) | register_all_notifies(db) | ||||
topic_prefix = config['topic_prefix'] | topic_prefix = config['topic_prefix'] | ||||
poll_timeout = config['poll_timeout'] | poll_timeout = config['poll_timeout'] | ||||
try: | try: | ||||
while True: | while True: | ||||
for notify in db.listen_notifies(poll_timeout): | for notify in db.listen_notifies(poll_timeout): | ||||
logging.debug('Notified by event %s' % notify) | |||||
dispatch_notify(topic_prefix, producer, notify) | dispatch_notify(topic_prefix, producer, notify) | ||||
producer.flush() | producer.flush() | ||||
except Exception: | except Exception: | ||||
logging.exception("Caught exception") | logging.exception("Caught exception") | ||||
producer.flush() | producer.flush() | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
import click | |||||
@click.command() | |||||
@click.option('--verbose', is_flag=True, default=False, | |||||
help='Be verbose if asked.') | |||||
def main(verbose): | |||||
logging.basicConfig( | logging.basicConfig( | ||||
level=logging.INFO, | level=logging.DEBUG if verbose else logging.INFO, | ||||
format='%(asctime)s %(process)d %(levelname)s %(message)s' | format='%(asctime)s %(process)d %(levelname)s %(message)s' | ||||
) | ) | ||||
config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) | config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) | ||||
run_from_config(config) | run_from_config(config) | ||||
main() | |||||
douarddaUnsubmitted Not Done Inline Actionsnitpicking: this is unrelated with the "Adapt decoding behavior depending on the object type" and should be then in a dedicated git revision. douardda: nitpicking: this is unrelated with the "Adapt decoding behavior depending on the object type"… | |||||
ardumontAuthorUnsubmitted Done Inline ActionsYes, i thought i did multiple commits but somehow i did not. ardumont: Yes, i thought i did multiple commits but somehow i did not.
Will do that indeed. |
Why keep this function? It adds IMHO useless complexity here.