diff --git a/swh/scheduler/updater/scratch.py b/swh/scheduler/updater/scratch.py index c5d1428..4a65d7c 100644 --- a/swh/scheduler/updater/scratch.py +++ b/swh/scheduler/updater/scratch.py @@ -1,72 +1,72 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kombu import Connection, Exchange, Queue +from pprint import pprint -def process_message(body, message): - print('####', body, message) - message.ack() - - -# does not work yet -# connections setup conf = { 'user': 'streamer', 'pass': 'streamer', 'port': 2765, 'gh_torrent_exchange_name': 'ght-streams', - # 'queue_name': 'ardumont_swh_queue', - 'queue_name': 'swh_ghtorrent_queue', + 'queue_name': 'swh_queue', # http://ghtorrent.org/streaming.html : {evt|ent}.{entity|event}.action - 'routing_key': '*.*.*', - # 'routing_key': 'evt.*.*', - # 'routing_key': 'evt.create|delete|public|push.insert', + 'routing_key': 'evt.*.insert', } # works with fake event generator -conf = { - 'user': 'guest', - 'pass': 'guest', - 'port': 5672, - 'gh_torrent_exchange_name': 'ght-streams', - 'queue_name': 'fake-events', - 'routing_key': 'something', +# conf = { +# 'user': 'guest', +# 'pass': 'guest', +# 'port': 5672, +# 'gh_torrent_exchange_name': 'ght-streams', +# 'queue_name': 'fake-events', +# 'routing_key': 'something', -} +# } server_url = 'amqp://%s:%s@localhost:%s//' % ( conf['user'], conf['pass'], conf['port']) exchange = Exchange(conf['gh_torrent_exchange_name'], 'topic', durable=True) test_queue = Queue(conf['queue_name'], exchange=exchange, routing_key=conf['routing_key'], auto_delete=True) + +def process_message(body, message): + print('#### body') + pprint(body) + print('#### message') + pprint(message) + message.ack() + + with Connection(server_url) as conn: # produce # producer = conn.Producer(serializer='json') # producer.publish('hello', # exchange=media_exchange, routing_key='test', # declare=[test_queue]) # the declare above, makes sure the test queue is declared # so that the messages can be delivered. # It's a best practice in Kombu to have both publishers and # consumers declare the queue. You can also declare the # queue manually using: # test_queue(conn).declare() print('Connection established') # consume with conn.Consumer(test_queue, callbacks=[process_message], auto_declare=True) as consumer: # Process messages and handle events on all channels while True: conn.drain_events() diff --git a/swh/scheduler/updater/test.rb b/swh/scheduler/updater/test.rb index 9825493..7630d6f 100755 --- a/swh/scheduler/updater/test.rb +++ b/swh/scheduler/updater/test.rb @@ -1,17 +1,17 @@ require 'bunny' # apt install ruby-bunny conn = Bunny.new(:host => '127.0.0.1', :port => 2765, :username => 'streamer', :password => 'streamer') conn.start ch = conn.create_channel exchange = ch.topic('ght-streams', :durable => true) -queue_name = 'swh_ghtorrent_queue' +queue_name = 'swh_queue' q = ch.queue(queue_name, :auto_delete => true) # q.bind(exchange, :routing_key => 'evt.*.*') # q.bind(exchange, :routing_key => 'evt.push.insert') q.bind(exchange, :routing_key => '*.*.*') # q.bind(exchange, :routing_key => 'evt.create|delete|public|push.insert') q.subscribe do |delivery_info, properties, payload| puts "#{delivery_info.routing_key}: #{payload}" end