diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index c63e356..471175e 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,76 +1,175 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from arrow import utcnow -from nose.plugins.attrib import attr -from nose.tools import istest from hypothesis import given from hypothesis.strategies import sampled_from, from_regex +from nose.tools import istest +from unittest.mock import patch from swh.scheduler.updater.events import SWHEvent -from swh.scheduler.updater.ghtorrent import events, convert_event +from swh.scheduler.updater.ghtorrent import ( + events, GHTorrentConsumer) def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) -@attr('db') -class GHTorrentTest(unittest.TestCase): +class FakeConnection: + """Fake Rabbitmq connection for test purposes + + """ + def __init__(self, conn_string): + self._conn_string = conn_string + self._connect = False + self._release = False + + def connect(self): + self._connect = True + return True + + def release(self): + self._connect = False + self._release = True + + def channel(self): + self.channel = True + return None + + +class GHTorrentConsumerTest(unittest.TestCase): + def setUp(self): + self.fake_config = { + 'conn': { + 'url': 'amqp://u:p@https://somewhere:9807', + }, + 'debug': True, + 'batch_cache_write': 10, + 'rabbitmq_prefetch_read': 100, + } + + self.consumer = GHTorrentConsumer(self.fake_config, + _connection_class=FakeConnection) + + @istest + def test_init(self): + # given + # check init is ok + self.assertEqual(self.consumer.debug, + self.fake_config['debug']) + self.assertEqual(self.consumer.batch, + self.fake_config['batch_cache_write']) + self.assertEqual(self.consumer.prefetch_read, + self.fake_config['rabbitmq_prefetch_read']) + self.assertEqual(self.consumer.config, self.fake_config) + + @istest + def test_has_events(self): + self.assertTrue(self.consumer.has_events()) + + @istest + def test_connection(self): + # when + self.consumer.open_connection() + + # then + self.assertEqual(self.consumer.conn._conn_string, + self.fake_config['conn']['url']) + self.assertTrue(self.consumer.conn._connect) + self.assertFalse(self.consumer.conn._release) + + # when + self.consumer.close_connection() + + # then + self.assertFalse(self.consumer.conn._connect) + self.assertTrue(self.consumer.conn._release) + def _make_event(self, event_type, name): return { 'type': event_type, 'repo': { 'name': name, }, 'created_at': utcnow(), } @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name) - actual_event = convert_event(input_event) + actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'rate': 1, } self.assertEqual(event, expected_event) def _make_incomplete_event(self, event_type, name, missing_data_key): event = self._make_event(event_type, name) del event[missing_data_key] return event @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(['type', 'repo', 'created_at'])) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, missing_data_key) with self.assertRaisesRegex( ValueError, 'Event should have the \'%s\' entry defined' % ( missing_data_key, )): - convert_event(input_event) + self.consumer.convert_event(input_event) + + @patch('swh.scheduler.updater.ghtorrent.collect_replies') + @istest + def consume(self, mock_collect_replies): + # given + self.consumer.queue = 'fake-queue' # hack + self.consumer.open_connection() + + fake_events = [ + self._make_event('PushEvent', 'user/some-repo'), + self._make_event('PushEvent', 'user2/some-other-repo'), + ] + + mock_collect_replies.return_value = fake_events + + # when + actual_events = [] + for e in self.consumer.consume(): + actual_events.append(e) + + # then + self.assertEqual(fake_events, actual_events) + + mock_collect_replies.assert_called_once_with( + self.consumer.conn, + None, + 'fake-queue', + no_ack=False, + limit=self.fake_config['rabbitmq_prefetch_read'] + ) diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 6654571..2f81bfe 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,141 +1,138 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class RabbitMQConn(SWHConfig): """RabbitMQ Connection class """ CONFIG_BASE_FILENAME = 'backend/ghtorrent' DEFAULT_CONFIG = { 'conn': ('dict', { 'url': 'amqp://guest:guest@localhost:5672', 'exchange_name': 'ght-streams', 'routing_key': 'something', 'queue_name': 'fake-events' }) } ADDITIONAL_CONFIG = {} def __init__(self, **config): super().__init__() if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.conn_string = self.config['conn']['url'] self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) self.routing_key = self.config['conn']['routing_key'] self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, routing_key=self.routing_key, auto_delete=True) -def convert_event(event): - """Given ghtorrent event, convert it to an swhevent instance. - - """ - if isinstance(event, str): - event = json.loads(event) - - keys = ['type', 'repo', 'created_at'] - for k in keys: - if k not in event: - raise ValueError('Event should have the \'%s\' entry defined' % k) - - _type = event['type'].lower().rstrip('Event') - _repo_name = 'https://github.com/%s' % event['repo']['name'] - - return SWHEvent({ - 'type': _type, - 'url': _repo_name, - 'last_seen': event['created_at'] - }) - - class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): """GHTorrent events consumer """ ADDITIONAL_CONFIG = { 'debug': ('bool', False), 'batch_cache_write': ('int', 1000), 'rabbitmq_prefetch_read': ('int', 100), } - def __init__(self, **config): - super().__init__(**config) + def __init__(self, config=None, _connection_class=Connection): + if config is None: + super().__init__() + else: + self.config = config + self._connection_class = _connection_class self.debug = self.config['debug'] self.batch = self.config['batch_cache_write'] self.prefetch_read = self.config['rabbitmq_prefetch_read'] def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ - return convert_event(event) + if isinstance(event, str): + event = json.loads(event) + + keys = ['type', 'repo', 'created_at'] + for k in keys: + if k not in event: + raise ValueError( + 'Event should have the \'%s\' entry defined' % k) + + _type = event['type'].lower().rstrip('Event') + _repo_name = 'https://github.com/%s' % event['repo']['name'] + + return SWHEvent({ + 'type': _type, + 'url': _repo_name, + 'last_seen': event['created_at'] + }) def open_connection(self): """Open rabbitmq connection """ - self.conn = Connection(self.conn_string) self.conn = self._connection_class(self.config['conn']['url']) self.conn.connect() def close_connection(self): """Close rabbitmq connection """ self.conn.release() def consume(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.conn.channel(), self.queue, no_ack=False, limit=self.prefetch_read)