diff --git a/swh/scheduler/updater/scratch.py b/swh/scheduler/updater/scratch.py new file mode 100644 index 0000000..c5d1428 --- /dev/null +++ b/swh/scheduler/updater/scratch.py @@ -0,0 +1,72 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from kombu import Connection, Exchange, Queue + + +def process_message(body, message): + print('####', body, message) + message.ack() + + +# does not work yet +# connections setup +conf = { + 'user': 'streamer', + 'pass': 'streamer', + 'port': 2765, + 'gh_torrent_exchange_name': 'ght-streams', + # 'queue_name': 'ardumont_swh_queue', + 'queue_name': 'swh_ghtorrent_queue', + # http://ghtorrent.org/streaming.html : {evt|ent}.{entity|event}.action + 'routing_key': '*.*.*', + # 'routing_key': 'evt.*.*', + # 'routing_key': 'evt.create|delete|public|push.insert', +} + +# works with fake event generator +conf = { + 'user': 'guest', + 'pass': 'guest', + 'port': 5672, + 'gh_torrent_exchange_name': 'ght-streams', + 'queue_name': 'fake-events', + 'routing_key': 'something', + +} + +server_url = 'amqp://%s:%s@localhost:%s//' % ( + conf['user'], conf['pass'], conf['port']) + +exchange = Exchange(conf['gh_torrent_exchange_name'], + 'topic', durable=True) +test_queue = Queue(conf['queue_name'], + exchange=exchange, + routing_key=conf['routing_key'], + auto_delete=True) + +with Connection(server_url) as conn: + # produce + # producer = conn.Producer(serializer='json') + # producer.publish('hello', + # exchange=media_exchange, routing_key='test', + # declare=[test_queue]) + + # the declare above, makes sure the test queue is declared + # so that the messages can be delivered. + # It's a best practice in Kombu to have both publishers and + # consumers declare the queue. You can also declare the + # queue manually using: + # test_queue(conn).declare() + + print('Connection established') + + # consume + with conn.Consumer(test_queue, callbacks=[process_message], + auto_declare=True) as consumer: + # Process messages and handle events on all channels + while True: + conn.drain_events() diff --git a/swh/scheduler/updater/test.rb b/swh/scheduler/updater/test.rb new file mode 100755 index 0000000..9825493 --- /dev/null +++ b/swh/scheduler/updater/test.rb @@ -0,0 +1,17 @@ +require 'bunny' + +# apt install ruby-bunny +conn = Bunny.new(:host => '127.0.0.1', :port => 2765, + :username => 'streamer', :password => 'streamer') +conn.start +ch = conn.create_channel +exchange = ch.topic('ght-streams', :durable => true) +queue_name = 'swh_ghtorrent_queue' +q = ch.queue(queue_name, :auto_delete => true) +# q.bind(exchange, :routing_key => 'evt.*.*') +# q.bind(exchange, :routing_key => 'evt.push.insert') +q.bind(exchange, :routing_key => '*.*.*') +# q.bind(exchange, :routing_key => 'evt.create|delete|public|push.insert') +q.subscribe do |delivery_info, properties, payload| + puts "#{delivery_info.routing_key}: #{payload}" +end