diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py new file mode 100644 index 0000000..5179151 --- /dev/null +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -0,0 +1,75 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from arrow import utcnow +from nose.plugins.attrib import attr +from nose.tools import istest +from hypothesis import given +from hypothesis.strategies import sampled_from, from_regex + +from swh.scheduler.updater.events import SWHEvent +from swh.scheduler.updater.ghtorrent import events, convert_event + + +def event_values(): + return set(events['evt']).union(set(events['ent'])) + + +def ghtorrentize_event_name(event_name): + return '%sEvent' % event_name.capitalize() + + +EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) + + +@attr('db') +class GHTorrentTest(unittest.TestCase): + def _make_event(self, event_type, name): + return { + 'type': event_type, + 'repo': { + 'name': name, + }, + 'created_at': utcnow(), + } + + @istest + @given(sampled_from(EVENT_TYPES), + from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) + def convert_event_ok(self, event_type, name): + input_event = self._make_event(event_type, name) + actual_event = convert_event(input_event) + + self.assertTrue(isinstance(actual_event, SWHEvent)) + + event = actual_event.get() + + expected_event = { + 'type': event_type.lower().rstrip('Event'), + 'url': 'https://github.com/%s' % name, + 'last_seen': input_event['created_at'], + } + self.assertEqual(event, expected_event) + + def _make_incomplete_event(self, event_type, name, missing_data_key): + event = self._make_event(event_type, name) + del event[missing_data_key] + return event + + @istest + @given(sampled_from(EVENT_TYPES), + from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), + sampled_from(['type', 'repo', 'created_at'])) + def convert_event_ko(self, event_type, name, missing_data_key): + input_event = self._make_incomplete_event( + event_type, name, missing_data_key) + + with self.assertRaisesRegex( + ValueError, + 'Event should have the \'%s\' entry defined' % ( + missing_data_key, )): + convert_event(input_event) diff --git a/swh/scheduler/updater/ghtorrent.py b/swh/scheduler/updater/ghtorrent.py index 958b316..ce05c98 100644 --- a/swh/scheduler/updater/ghtorrent.py +++ b/swh/scheduler/updater/ghtorrent.py @@ -1,154 +1,211 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import json import random import string +from arrow import utcnow from kombu import Connection, Exchange, Queue +from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.backend import SchedulerUpdaterBackend events = { - # ghtorrent events related to github events (that's the one we are - # interested in) + # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], - # ghtorrent events related to mongodb insert + # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class FakeRandomOriginGenerator: def _random_string(self, length): """Build a fake string of length length. """ return ''.join([ random.choice(string.ascii_letters + string.digits) for n in range(length)]) def generate(self, user_range=range(5, 10), repo_range=range(10, 15)): """Build a fake url """ length_username = random.choice(user_range) user = self._random_string(length_username) length_repo = random.choice(repo_range) repo = self._random_string(length_repo) - return 'https://fake.github.com/%s/%s' % (user, repo) + return '%s/%s' % (user, repo) -class RabbitMQConn: +class RabbitMQConn(SWHConfig): + """RabbitMQ Connection class + + """ + CONFIG_BASE_FILENAME = 'backend/ghtorrent' + DEFAULT_CONFIG = { - 'user': 'guest', - 'pass': 'guest', - 'port': 5672, - 'server': 'localhost', - 'exchange_name': 'ght-streams', - 'routing_key': 'something', - 'queue_name': 'fake-events' + 'conn': ('dict', { + 'user': 'guest', + 'pass': 'guest', + 'port': 5672, + 'server': 'localhost', + 'exchange_name': 'ght-streams', + 'routing_key': 'something', + 'queue_name': 'fake-events' + }) } - def _conn_queue(self, config): + ADDITIONAL_CONFIG = {} + + def _connection_string(self): """Build the connection queue string. """ return 'amqp://%s:%s@%s:%s' % ( - self.config['user'], - self.config['pass'], - self.config['server'], - self.config['port'] + self.config['conn']['user'], + self.config['conn']['pass'], + self.config['conn']['server'], + self.config['conn']['port'] ) def __init__(self, **config): if config: self.config = config else: - self.config = self.DEFAULT_CONFIG + self.config = self.parse_config_file( + additional_configs=[self.ADDITIONAL_CONFIG]) - self.conn_queue = self._conn_queue(self.config) - self.exchange = Exchange(self.config['exchange_name'], + self.conn_string = self._connection_string() + self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) - self.queue = Queue(self.config['queue_name'], + self.routing_key = self.config['conn']['routing_key'] + self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, - routing_key=self.config['routing_key'], + routing_key=self.routing_key, auto_delete=True) class FakeGHTorrentPublisher(RabbitMQConn): """Fake GHTorrent that randomly publishes fake events. Those events are published in similar manner as described by ghtorrent's documentation [2]. context: stuck with raw ghtorrent so far [1] [1] https://github.com/ghtorrent/ghtorrent.org/issues/397#issuecomment-387052462 # noqa [2] http://ghtorrent.org/streaming.html """ + + ADDITIONAL_CONFIG = { + 'nb_messages': ('int', 100) + } + def __init__(self, **config): super().__init__(**config) self.fake_origin_generator = FakeRandomOriginGenerator() + self.nb_messages = self.config['nb_messages'] def _random_event(self): """Create a fake and random event """ event_type = random.choice(['evt', 'ent']) sub_event = random.choice(events[event_type]) return { 'type': sub_event, - 'url': self.fake_origin_generator.generate(), + 'repo': { + 'name': self.fake_origin_generator.generate(), + }, + 'created_at': utcnow().isoformat() + } - def publish(self, nb_messages=10): - with Connection(self.conn_queue) as conn: + def publish(self, nb_messages=None): + if not nb_messages: + nb_messages = self.nb_messages + + with Connection(self.conn_string) as conn: with conn.Producer(serializer='json') as producer: for n in range(nb_messages): event = self._random_event() producer.publish(event, exchange=self.exchange, - routing_key=self.config['routing_key'], + routing_key=self.routing_key, declare=[self.queue]) -def process_message(body, message, backend): - print('#### body', body) - print('#### message', message) - e = SWHEvent(body) - if e.check(): - backend.cache_put([e]) - message.ack() +def convert_event(event): + """Given ghtorrent event, convert it to an swhevent instance. + + """ + if isinstance(event, str): + event = json.loads(event) + + keys = ['type', 'repo', 'created_at'] + for k in keys: + if k not in event: + raise ValueError('Event should have the \'%s\' entry defined' % k) + + _type = event['type'].lower().rstrip('Event') + _repo_name = 'https://github.com/%s' % event['repo']['name'] + + return SWHEvent({ + 'type': _type, + 'url': _repo_name, + 'last_seen': event['created_at'] + }) + + +def process_message(body, message, backend, debug=False): + """Callback method to convert and push event to cache""" + try: + event = convert_event(body) + if debug: + print('#### body', body) + if event.check(): + backend.cache_put([event]) + finally: + message.ack() class GHTorrentConsumer(RabbitMQConn): """GHTorrent consumer """ + ADDITIONAL_CONFIG = { + 'debug': (bool, False), + } + def __init__(self): super().__init__() self.backend = SchedulerUpdaterBackend() + self.debug = self.config['debug'] def consume(self): - def process_message_fn(b, m, backend=self.backend): - return process_message(b, m, backend) + def process_message_fn(b, m, backend=self.backend, debug=self.debug): + return process_message(b, m, backend=backend, debug=debug) - with Connection(self.conn_queue) as conn: + with Connection(self.conn_string) as conn: with conn.Consumer(self.queue, callbacks=[process_message_fn], - auto_declare=True) as consumer: - consumer.consume() + auto_declare=True): + while True: + conn.drain_events()