Changeset View
Standalone View
swh/storage/listener.py
# Copyright (C) 2016-2017 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
import logging | import logging | ||||
import kafka | import kafka | ||||
import msgpack | import msgpack | ||||
import swh.storage.db | import swh.storage.db | ||||
from swh.core.config import load_named_config | from swh.core.config import load_named_config | ||||
CONFIG_BASENAME = 'storage/listener' | CONFIG_BASENAME = 'storage/listener' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'database': ('str', 'service=softwareheritage'), | 'database': ('str', 'service=softwareheritage'), | ||||
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | ||||
'topic_prefix': ('str', 'swh.tmp_journal.new'), | 'topic_prefix': ('str', 'swh.tmp_journal.new'), | ||||
'poll_timeout': ('int', 10), | 'poll_timeout': ('int', 10), | ||||
} | } | ||||
def decode_sha(value): | def decode(object_type, obj): | ||||
"""Decode the textual representation of a SHA hash""" | """Decode a JSON obj of nature object_type. Depending on the nature of | ||||
if isinstance(value, str): | the object, this can contain hex hashes | ||||
return bytes.fromhex(value) | |||||
return value | Args: | ||||
object_type (str): Nature of the object | |||||
obj (str): json dict representation whose values might be hex | |||||
identifier. | |||||
Returns: | |||||
dict representation ready for journal serialization | |||||
def decode_json(value): | """ | ||||
"""Decode a JSON value containing hashes and other types""" | value = json.loads(obj) | ||||
ardumont: but that's not possible...
otherwise, that would not be json serializable?!
Something seems… | |||||
Done Inline Actionsok, i think i messed up. So yes, the complexity is needed in the end! ardumont: ok, i think i messed up.
It's the other way around, it takes the ids as hex and turn them into… | |||||
Not Done Inline ActionsI don't understand this. First, you probably meant if isinstance(v, bytes): , because value is always a dict. But v is produced by json.loads, so it's impossible for it to be bytes. vlorentz: I don't understand this. First, you probably meant `if isinstance(v, bytes): `, because `value`… | |||||
value = json.loads(value) | |||||
return {k: decode_sha(v) for k, v in value.items()} | if object_type in ('origin', 'origin_visit'): | ||||
result = value | |||||
else: | |||||
result = {} | |||||
for k, v in value.items(): | |||||
Not Done Inline ActionsUse hashutil.hash_from_hex instead of bytes.fromhex. vlorentz: Use `hashutil.hash_from_hex` instead of `bytes.fromhex`. | |||||
Done Inline ActionsYeah, i don't know why we used that instead of our common hashutil. ardumont: Yeah, i don't know why we used that instead of our common hashutil. | |||||
result[k] = bytes.fromhex(v) | |||||
return result | |||||
Not Done Inline ActionsWhy keep this function? It adds IMHO useless complexity here. douardda: Why keep this function? It adds IMHO useless complexity here. | |||||
Done Inline ActionsYes, it was initially "more" than just json.loads. Oh, you mean systematically call decode_with_identifier on everything... ardumont: Yes, it was initially "more" than just `json.loads`.
Although, with this i don't repeat that… | |||||
OBJECT_TYPES = { | OBJECT_TYPES = { | ||||
'content', | 'content', | ||||
'skipped_content', | 'skipped_content', | ||||
'directory', | 'directory', | ||||
'revision', | 'revision', | ||||
'release', | 'release', | ||||
'snapshot', | 'snapshot', | ||||
'origin_visit', | 'origin_visit', | ||||
'origin', | 'origin', | ||||
} | } | ||||
def register_all_notifies(db): | def register_all_notifies(db): | ||||
"""Register to notifications for all object types listed in OBJECT_TYPES""" | """Register to notifications for all object types listed in OBJECT_TYPES""" | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
for object_type in OBJECT_TYPES: | for object_type in OBJECT_TYPES: | ||||
Not Done Inline Actions"Registering to" or "Registered to" vlorentz: "Registering to" or "Registered to" | |||||
db.register_listener('new_%s' % object_type, cur) | db.register_listener('new_%s' % object_type, cur) | ||||
def dispatch_notify(topic_prefix, producer, notify): | def dispatch_notify(topic_prefix, producer, notify): | ||||
"""Dispatch a notification to the proper topic""" | """Dispatch a notification to the proper topic""" | ||||
channel = notify.channel | channel = notify.channel | ||||
if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: | if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: | ||||
Not Done Inline ActionsNot very readable. Should be a single log line, and explain what this is about. vlorentz: Not very readable. Should be a single log line, and explain what this is about. | |||||
Done Inline ActionsThis is about understanding what happens and what those things look like. ardumont: This is about understanding what happens and what those things look like.
I did not remember ;) | |||||
logging.warn("Got unexpected notify %s" % notify) | logging.warn("Got unexpected notify %s" % notify) | ||||
return | return | ||||
object_type = channel[4:] | object_type = channel[4:] | ||||
topic = '%s.%s' % (topic_prefix, object_type) | topic = '%s.%s' % (topic_prefix, object_type) | ||||
data = decode_json(notify.payload) | producer.send(topic, value=decode(object_type, notify.payload)) | ||||
producer.send(topic, value=data) | |||||
def run_from_config(config): | def run_from_config(config): | ||||
"""Run the Software Heritage listener from configuration""" | """Run the Software Heritage listener from configuration""" | ||||
db = swh.storage.db.Db.connect(config['database']) | db = swh.storage.db.Db.connect(config['database']) | ||||
def key_to_kafka(key): | def key_to_kafka(key): | ||||
"""Serialize a key, possibly a dict, in a predictable way. | """Serialize a key, possibly a dict, in a predictable way. | ||||
Show All 25 Lines | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
logging.basicConfig( | logging.basicConfig( | ||||
level=logging.INFO, | level=logging.INFO, | ||||
format='%(asctime)s %(process)d %(levelname)s %(message)s' | format='%(asctime)s %(process)d %(levelname)s %(message)s' | ||||
) | ) | ||||
config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) | config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) | ||||
run_from_config(config) | run_from_config(config) | ||||
Not Done Inline Actionsnitpicking: this is unrelated with the "Adapt decoding behavior depending on the object type" and should be then in a dedicated git revision. douardda: nitpicking: this is unrelated with the "Adapt decoding behavior depending on the object type"… | |||||
Done Inline ActionsYes, i thought i did multiple commits but somehow i did not. ardumont: Yes, i thought i did multiple commits but somehow i did not.
Will do that indeed. |
but that's not possible...
otherwise, that would not be json serializable?!
Something seems off here.
Triggers provides the id as hex already... [1]
[1] https://forge.softwareheritage.org/source/swh-storage/browse/master/swh/storage/sql/70-swh-triggers.sql