diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index 948af64..f1489c6 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,186 +1,214 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from arrow import utcnow from hypothesis import given from hypothesis.strategies import sampled_from, from_regex, lists, tuples, text +from itertools import chain from nose.tools import istest from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS from swh.scheduler.updater.consumer import UpdaterConsumer class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): def __init__(self, backend_class=FakeSchedulerUpdaterBackend): super().__init__(backend_class=backend_class) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerRaise() @istest def running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerNoEvent() @istest def running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) +EVENT_KEYS = ['type', 'name', 'created_at'] + + class FakeUpdaterConsumer(FakeUpdaterConsumerBase): def __init__(self, messages): super().__init__() self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() - def convert_event(self, event): + def convert_event(self, event, keys=EVENT_KEYS): + for k in keys: + v = event.get(k) + if v is None: + return None + e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['name'], 'last_seen': event['created_at'] } return SWHEvent(e) class UpdaterConsumerWithEventTest(unittest.TestCase): def _make_event(self, event_type, name): return { 'type': event_type, 'name': name, 'created_at': utcnow(), } def _make_events(self, events): for event_type, repo_name in events: yield self._make_event(event_type, repo_name) + def _make_incomplete_event(self, event_type, name, missing_data_key): + event = self._make_event(event_type, name) + del event[missing_data_key] + return event + + def _make_incomplete_events(self, events): + for event_type, repo_name, missing_data_key in events: + yield self._make_incomplete_event(event_type, repo_name, + missing_data_key) + @istest - @given(lists(tuples(text(), - from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')), + @given(lists(tuples(sampled_from(LISTENED_EVENTS), + from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$')), + min_size=3, max_size=10), + lists(tuples(text(), + from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')), min_size=3, max_size=10), lists(tuples(sampled_from(LISTENED_EVENTS), - from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$')), + from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), + sampled_from(EVENT_KEYS)), min_size=3, max_size=10)) - def running_with_uninteresting_events(self, uninteresting_events, events): + def running(self, events, uninteresting_events, incomplete_events): """Interesting events are written to cache, dropping uninteresting ones """ # given - all_events = events.copy() - all_events.extend(uninteresting_events) - updater = FakeUpdaterConsumer(list(self._make_events(all_events))) + ready_events = self._make_events(events) + ready_uninteresting_events = self._make_events(uninteresting_events) + ready_incomplete_events = self._make_incomplete_events( + incomplete_events) + + updater = FakeUpdaterConsumer(list(chain( + ready_events, ready_incomplete_events, + ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) - # uninteresting_events were dropped + # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index 96cc001..f7fa9d9 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,175 +1,173 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from arrow import utcnow from hypothesis import given from hypothesis.strategies import sampled_from, from_regex from nose.tools import istest from unittest.mock import patch from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import ( events, GHTorrentConsumer) def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self.channel = True return None class GHTorrentConsumerTest(unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) def _make_event(self, event_type, name): return { 'type': event_type, 'repo': { 'name': name, }, 'created_at': utcnow(), } @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name) actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'rate': 1, } self.assertEqual(event, expected_event) def _make_incomplete_event(self, event_type, name, missing_data_key): event = self._make_event(event_type, name) del event[missing_data_key] return event @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(['type', 'repo', 'created_at'])) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, missing_data_key) - with self.assertRaisesRegex( - ValueError, - 'Event should have the \'%s\' entry defined' % ( - missing_data_key, )): - self.consumer.convert_event(input_event) + actual_converted_event = self.consumer.convert_event(input_event) + + self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') @istest def consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo'), self._make_event('PushEvent', 'user2/some-other-repo'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, None, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index 92793d0..c751ec9 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,128 +1,138 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from abc import ABCMeta, abstractmethod from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): super().__init__() self._reset_cache() self.backend = backend_class() self.batch = batch self.log = logging.getLogger(log_class) self.log.setLevel(logging.INFO) def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass - def process_event(self, body): - """Process event + def process_event(self, event): + """Process converted and interesting event. + + Params: + event (SWHEvent): Event to process if deemed interesting """ try: - event = self.convert_event(body) - if self.debug: - print('#### body', body) - if self.is_interesting(event): - if event.url in self.seen_events: - event.rate += 1 - else: - self.events.append(event) - self.seen_events.add(event.url) - self.count += 1 + if event.url in self.seen_events: + event.rate += 1 + else: + self.events.append(event) + self.seen_events.add(event.url) + self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ try: self.open_connection() while self.has_events(): - for event in self.consume_events(): - self.process_event(event) + for _event in self.consume_events(): + event = self.convert_event(_event) + if not event: + self.log.warn( + 'Incomplete event dropped %s' % _event) + continue + if not self.is_interesting(event): + continue + try: + self.process_event(event) + except Exception: + self.log.exception( + 'Problem when processing event %s' % _event) + continue except Exception as e: - # FIXME: use logging instead self.log.error('Error raised during consumption: %s' % e) raise e finally: self.close_connection() self._flush() diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 2dad0d6..f98974e 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,140 +1,141 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class RabbitMQConn(SWHConfig): """RabbitMQ Connection class """ CONFIG_BASE_FILENAME = 'backend/ghtorrent' DEFAULT_CONFIG = { 'conn': ('dict', { 'url': 'amqp://guest:guest@localhost:5672', 'exchange_name': 'ght-streams', 'routing_key': 'something', 'queue_name': 'fake-events' }) } ADDITIONAL_CONFIG = {} def __init__(self, **config): super().__init__() if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.conn_string = self.config['conn']['url'] self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) self.routing_key = self.config['conn']['routing_key'] self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, routing_key=self.routing_key, auto_delete=True) class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): """GHTorrent events consumer """ ADDITIONAL_CONFIG = { 'debug': ('bool', False), 'batch_cache_write': ('int', 1000), 'rabbitmq_prefetch_read': ('int', 100), } def __init__(self, config=None, _connection_class=Connection): if config is None: super().__init__( log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') else: self.config = config self._connection_class = _connection_class self.debug = self.config['debug'] self.batch = self.config['batch_cache_write'] self.prefetch_read = self.config['rabbitmq_prefetch_read'] def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ if isinstance(event, str): event = json.loads(event) keys = ['type', 'repo', 'created_at'] for k in keys: if k not in event: if hasattr(self, 'log'): self.log.warn( 'Event should have the \'%s\' entry defined' % k) + return None _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, 'last_seen': event['created_at'] }) def open_connection(self): """Open rabbitmq connection """ self.conn = self._connection_class(self.config['conn']['url']) self.conn.connect() def close_connection(self): """Close rabbitmq connection """ self.conn.release() def consume_events(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.conn.channel(), self.queue, no_ack=False, limit=self.prefetch_read)