diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index 67fd751..92793d0 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,123 +1,128 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging + from abc import ABCMeta, abstractmethod from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ - def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend): + def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, + log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): super().__init__() self._reset_cache() self.backend = backend_class() self.batch = batch + self.log = logging.getLogger(log_class) + self.log.setLevel(logging.INFO) def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass def process_event(self, body): """Process event """ try: event = self.convert_event(body) if self.debug: print('#### body', body) if self.is_interesting(event): if event.url in self.seen_events: event.rate += 1 else: self.events.append(event) self.seen_events.add(event.url) self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ try: self.open_connection() while self.has_events(): for event in self.consume_events(): self.process_event(event) except Exception as e: # FIXME: use logging instead - print('Something went wrong: %s' % e) + self.log.error('Error raised during consumption: %s' % e) raise e finally: self.close_connection() self._flush() diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 540c0c3..2dad0d6 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,138 +1,140 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class RabbitMQConn(SWHConfig): """RabbitMQ Connection class """ CONFIG_BASE_FILENAME = 'backend/ghtorrent' DEFAULT_CONFIG = { 'conn': ('dict', { 'url': 'amqp://guest:guest@localhost:5672', 'exchange_name': 'ght-streams', 'routing_key': 'something', 'queue_name': 'fake-events' }) } ADDITIONAL_CONFIG = {} def __init__(self, **config): super().__init__() if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.conn_string = self.config['conn']['url'] self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) self.routing_key = self.config['conn']['routing_key'] self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, routing_key=self.routing_key, auto_delete=True) class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): """GHTorrent events consumer """ ADDITIONAL_CONFIG = { 'debug': ('bool', False), 'batch_cache_write': ('int', 1000), 'rabbitmq_prefetch_read': ('int', 100), } def __init__(self, config=None, _connection_class=Connection): if config is None: - super().__init__() + super().__init__( + log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') else: self.config = config self._connection_class = _connection_class self.debug = self.config['debug'] self.batch = self.config['batch_cache_write'] self.prefetch_read = self.config['rabbitmq_prefetch_read'] def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ if isinstance(event, str): event = json.loads(event) keys = ['type', 'repo', 'created_at'] for k in keys: if k not in event: - raise ValueError( - 'Event should have the \'%s\' entry defined' % k) + if hasattr(self, 'log'): + self.log.warn( + 'Event should have the \'%s\' entry defined' % k) _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, 'last_seen': event['created_at'] }) def open_connection(self): """Open rabbitmq connection """ self.conn = self._connection_class(self.config['conn']['url']) self.conn.connect() def close_connection(self): """Close rabbitmq connection """ self.conn.release() def consume_events(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.conn.channel(), self.queue, no_ack=False, limit=self.prefetch_read)