diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py new file mode 100644 index 0000000..948af64 --- /dev/null +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -0,0 +1,186 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from arrow import utcnow +from hypothesis import given +from hypothesis.strategies import sampled_from, from_regex, lists, tuples, text +from nose.tools import istest + +from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS +from swh.scheduler.updater.consumer import UpdaterConsumer + + +class FakeSchedulerUpdaterBackend: + def __init__(self): + self.events = [] + + def cache_put(self, events): + self.events.append(events) + + +class FakeUpdaterConsumerBase(UpdaterConsumer): + def __init__(self, backend_class=FakeSchedulerUpdaterBackend): + super().__init__(backend_class=backend_class) + self.connection_opened = False + self.connection_closed = False + self.consume_called = False + self.has_events_called = False + + def open_connection(self): + self.connection_opened = True + + def close_connection(self): + self.connection_closed = True + + def convert_event(self, event): + pass + + +class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): + def has_events(self): + self.has_events_called = True + return True + + def consume_events(self): + self.consume_called = True + raise ValueError('Broken stuff') + + +class UpdaterConsumerRaisingTest(unittest.TestCase): + def setUp(self): + self.updater = FakeUpdaterConsumerRaise() + + @istest + def running_raise(self): + """Raising during run should finish fine. + + """ + # given + self.assertEqual(self.updater.count, 0) + self.assertEqual(self.updater.seen_events, set()) + self.assertEqual(self.updater.events, []) + + # when + with self.assertRaisesRegex(ValueError, 'Broken stuff'): + self.updater.run() + + # then + self.assertEqual(self.updater.count, 0) + self.assertEqual(self.updater.seen_events, set()) + self.assertEqual(self.updater.events, []) + self.assertTrue(self.updater.connection_opened) + self.assertTrue(self.updater.has_events_called) + self.assertTrue(self.updater.connection_closed) + self.assertTrue(self.updater.consume_called) + + +class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): + def has_events(self): + self.has_events_called = True + return False + + def consume_events(self): + self.consume_called = True + + +class UpdaterConsumerNoEventTest(unittest.TestCase): + def setUp(self): + self.updater = FakeUpdaterConsumerNoEvent() + + @istest + def running_does_not_consume(self): + """Run with no events should do just fine""" + # given + self.assertEqual(self.updater.count, 0) + self.assertEqual(self.updater.seen_events, set()) + self.assertEqual(self.updater.events, []) + + # when + self.updater.run() + + # then + self.assertEqual(self.updater.count, 0) + self.assertEqual(self.updater.seen_events, set()) + self.assertEqual(self.updater.events, []) + self.assertTrue(self.updater.connection_opened) + self.assertTrue(self.updater.has_events_called) + self.assertTrue(self.updater.connection_closed) + self.assertFalse(self.updater.consume_called) + + +class FakeUpdaterConsumer(FakeUpdaterConsumerBase): + def __init__(self, messages): + super().__init__() + self.messages = messages + self.debug = False + + def has_events(self): + self.has_events_called = True + return len(self.messages) > 0 + + def consume_events(self): + self.consume_called = True + for msg in self.messages: + yield msg + self.messages.pop() + + def convert_event(self, event): + e = { + 'type': event['type'], + 'url': 'https://fake.url/%s' % event['name'], + 'last_seen': event['created_at'] + } + return SWHEvent(e) + + +class UpdaterConsumerWithEventTest(unittest.TestCase): + def _make_event(self, event_type, name): + return { + 'type': event_type, + 'name': name, + 'created_at': utcnow(), + } + + def _make_events(self, events): + for event_type, repo_name in events: + yield self._make_event(event_type, repo_name) + + @istest + @given(lists(tuples(text(), + from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')), + min_size=3, max_size=10), + lists(tuples(sampled_from(LISTENED_EVENTS), + from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$')), + min_size=3, max_size=10)) + def running_with_uninteresting_events(self, uninteresting_events, events): + """Interesting events are written to cache, dropping uninteresting ones + + """ + # given + all_events = events.copy() + all_events.extend(uninteresting_events) + updater = FakeUpdaterConsumer(list(self._make_events(all_events))) + + self.assertEqual(updater.count, 0) + self.assertEqual(updater.seen_events, set()) + self.assertEqual(updater.events, []) + + # when + updater.run() + + # then + self.assertEqual(updater.count, 0) + self.assertEqual(updater.seen_events, set()) + self.assertEqual(updater.events, []) + self.assertTrue(updater.connection_opened) + self.assertTrue(updater.has_events_called) + self.assertTrue(updater.connection_closed) + self.assertTrue(updater.consume_called) + + self.assertEqual(updater.messages, []) + # uninteresting_events were dropped + self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index 471175e..96cc001 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,175 +1,175 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from arrow import utcnow from hypothesis import given from hypothesis.strategies import sampled_from, from_regex from nose.tools import istest from unittest.mock import patch from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import ( events, GHTorrentConsumer) def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self.channel = True return None class GHTorrentConsumerTest(unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) def _make_event(self, event_type, name): return { 'type': event_type, 'repo': { 'name': name, }, 'created_at': utcnow(), } @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name) actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'rate': 1, } self.assertEqual(event, expected_event) def _make_incomplete_event(self, event_type, name, missing_data_key): event = self._make_event(event_type, name) del event[missing_data_key] return event @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(['type', 'repo', 'created_at'])) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, missing_data_key) with self.assertRaisesRegex( ValueError, 'Event should have the \'%s\' entry defined' % ( missing_data_key, )): self.consumer.convert_event(input_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') @istest - def consume(self, mock_collect_replies): + def consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo'), self._make_event('PushEvent', 'user2/some-other-repo'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] - for e in self.consumer.consume(): + for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, None, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index 0d638f8..67fd751 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,117 +1,123 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from abc import ABCMeta, abstractmethod from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ - def __init__(self, batch=1000): + def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend): super().__init__() self._reset_cache() - self.backend = SchedulerUpdaterBackend() + self.backend = backend_class() self.batch = batch def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass def process_event(self, body): """Process event """ try: event = self.convert_event(body) if self.debug: print('#### body', body) if self.is_interesting(event): if event.url in self.seen_events: event.rate += 1 else: self.events.append(event) self.seen_events.add(event.url) self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod - def consume(self): + def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ - self.open_connection() - while self.has_events(): - for event in self.consume(): - self.process_event(event) - self.close_connection() - self._flush() + try: + self.open_connection() + while self.has_events(): + for event in self.consume_events(): + self.process_event(event) + except Exception as e: + # FIXME: use logging instead + print('Something went wrong: %s' % e) + raise e + finally: + self.close_connection() + self._flush() diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 2f81bfe..540c0c3 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,138 +1,138 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class RabbitMQConn(SWHConfig): """RabbitMQ Connection class """ CONFIG_BASE_FILENAME = 'backend/ghtorrent' DEFAULT_CONFIG = { 'conn': ('dict', { 'url': 'amqp://guest:guest@localhost:5672', 'exchange_name': 'ght-streams', 'routing_key': 'something', 'queue_name': 'fake-events' }) } ADDITIONAL_CONFIG = {} def __init__(self, **config): super().__init__() if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.conn_string = self.config['conn']['url'] self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) self.routing_key = self.config['conn']['routing_key'] self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, routing_key=self.routing_key, auto_delete=True) class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): """GHTorrent events consumer """ ADDITIONAL_CONFIG = { 'debug': ('bool', False), 'batch_cache_write': ('int', 1000), 'rabbitmq_prefetch_read': ('int', 100), } def __init__(self, config=None, _connection_class=Connection): if config is None: super().__init__() else: self.config = config self._connection_class = _connection_class self.debug = self.config['debug'] self.batch = self.config['batch_cache_write'] self.prefetch_read = self.config['rabbitmq_prefetch_read'] def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ if isinstance(event, str): event = json.loads(event) keys = ['type', 'repo', 'created_at'] for k in keys: if k not in event: raise ValueError( 'Event should have the \'%s\' entry defined' % k) _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, 'last_seen': event['created_at'] }) def open_connection(self): """Open rabbitmq connection """ self.conn = self._connection_class(self.config['conn']['url']) self.conn.connect() def close_connection(self): """Close rabbitmq connection """ self.conn.release() - def consume(self): + def consume_events(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.conn.channel(), self.queue, no_ack=False, limit=self.prefetch_read)