Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9336893
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Subscribers
None
View Options
diff --git a/swh/storage/listener.py b/swh/storage/listener.py
index 9c2ad82..52b1ef4 100644
--- a/swh/storage/listener.py
+++ b/swh/storage/listener.py
@@ -1,134 +1,137 @@
# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
import kafka
import msgpack
import swh.storage.db
from swh.core.config import load_named_config
from swh.model import hashutil
CONFIG_BASENAME = 'storage/listener'
DEFAULT_CONFIG = {
'database': ('str', 'service=softwareheritage'),
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
'topic_prefix': ('str', 'swh.tmp_journal.new'),
'poll_timeout': ('int', 10),
}
def decode(object_type, obj):
"""Decode a JSON obj of nature object_type. Depending on the nature of
the object, this can contain hex hashes
(cf. `/swh/storage/sql/70-swh-triggers.sql`).
Args:
object_type (str): Nature of the object
obj (str): json dict representation whose values might be hex
identifier.
Returns:
dict representation ready for journal serialization
"""
value = json.loads(obj)
if object_type in ('origin', 'origin_visit'):
result = value
else:
result = {}
for k, v in value.items():
result[k] = hashutil.hash_to_bytes(v)
return result
OBJECT_TYPES = {
'content',
'skipped_content',
'directory',
'revision',
'release',
'snapshot',
'origin_visit',
'origin',
}
def register_all_notifies(db):
"""Register to notifications for all object types listed in OBJECT_TYPES"""
with db.transaction() as cur:
for object_type in OBJECT_TYPES:
db.register_listener('new_%s' % object_type, cur)
logging.debug('Registered to notify events %s' % object_type)
def dispatch_notify(topic_prefix, producer, notify):
"""Dispatch a notification to the proper topic"""
logging.debug('topic_prefix: %s, producer: %s, notify: %s' % (
topic_prefix, producer, notify))
channel = notify.channel
if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES:
logging.warn("Got unexpected notify %s" % notify)
return
object_type = channel[4:]
topic = '%s.%s' % (topic_prefix, object_type)
producer.send(topic, value=decode(object_type, notify.payload))
def run_from_config(config):
"""Run the Software Heritage listener from configuration"""
db = swh.storage.db.Db.connect(config['database'])
def key_to_kafka(key):
"""Serialize a key, possibly a dict, in a predictable way.
Duplicated from swh.journal to avoid a cyclic dependency."""
p = msgpack.Packer(use_bin_type=True)
if isinstance(key, dict):
return p.pack_map_pairs(sorted(key.items()))
else:
return p.pack(key)
producer = kafka.KafkaProducer(
bootstrap_servers=config['brokers'],
value_serializer=key_to_kafka,
)
register_all_notifies(db)
topic_prefix = config['topic_prefix']
poll_timeout = config['poll_timeout']
try:
while True:
for notify in db.listen_notifies(poll_timeout):
logging.debug('Notified by event %s' % notify)
dispatch_notify(topic_prefix, producer, notify)
producer.flush()
except Exception:
logging.exception("Caught exception")
producer.flush()
if __name__ == '__main__':
import click
@click.command()
@click.option('--verbose', is_flag=True, default=False,
help='Be verbose if asked.')
def main(verbose):
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO,
format='%(asctime)s %(process)d %(levelname)s %(message)s'
)
+ _log = logging.getLogger('kafka')
+ _log.setLevel(logging.INFO)
+
config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG)
run_from_config(config)
main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 7:47 AM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3261560
Attached To
rDSTOC swh-storage-cassandra
Event Timeline
Log In to Comment