Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/updater/ghtorrent/__init__.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
from kombu import Connection, Exchange, Queue | from kombu import Connection, Exchange, Queue | ||||
from kombu.common import collect_replies | from kombu.common import collect_replies | ||||
from swh.core.config import SWHConfig | from swh.core.config import merge_configs | ||||
from swh.scheduler.updater.events import SWHEvent | from swh.scheduler.updater.events import SWHEvent | ||||
from swh.scheduler.updater.consumer import UpdaterConsumer | from swh.scheduler.updater.consumer import UpdaterConsumer | ||||
from swh.scheduler.updater.backend import SchedulerUpdaterBackend | |||||
events = { | events = { | ||||
# ghtorrent events related to github events (interesting) | # ghtorrent events related to github events (interesting) | ||||
'evt': [ | 'evt': [ | ||||
'commitcomment', 'create', 'delete', 'deployment', | 'commitcomment', 'create', 'delete', 'deployment', | ||||
'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', | 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', | ||||
'gist', 'gollum', 'issuecomment', 'issues', 'member', | 'gist', 'gollum', 'issuecomment', 'issues', 'member', | ||||
'membership', 'pagebuild', 'public', 'pullrequest', | 'membership', 'pagebuild', 'public', 'pullrequest', | ||||
'pullrequestreviewcomment', 'push', 'release', 'repository', | 'pullrequestreviewcomment', 'push', 'release', 'repository', | ||||
'status', 'teamadd', 'watch' | 'status', 'teamadd', 'watch' | ||||
], | ], | ||||
# ghtorrent events related to mongodb insert (not interesting) | # ghtorrent events related to mongodb insert (not interesting) | ||||
'ent': [ | 'ent': [ | ||||
'commit_comments', 'commits', 'followers', 'forks', | 'commit_comments', 'commits', 'followers', 'forks', | ||||
'geo_cache', 'issue_comments', 'issue_events', 'issues', | 'geo_cache', 'issue_comments', 'issue_events', 'issues', | ||||
'org_members', 'pull_request_comments', 'pull_requests', | 'org_members', 'pull_request_comments', 'pull_requests', | ||||
'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' | 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' | ||||
] | ] | ||||
} | } | ||||
INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] | |||||
class RabbitMQConn(SWHConfig): | |||||
"""RabbitMQ Connection class | |||||
""" | |||||
CONFIG_BASE_FILENAME = 'backend/ghtorrent' | |||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'conn': ('dict', { | 'ghtorrent': { | ||||
'batch_cache_write': 1000, | |||||
'rabbitmq': { | |||||
'prefetch_read': 100, | |||||
'conn': { | |||||
'url': 'amqp://guest:guest@localhost:5672', | 'url': 'amqp://guest:guest@localhost:5672', | ||||
'exchange_name': 'ght-streams', | 'exchange_name': 'ght-streams', | ||||
'routing_key': 'something', | 'routing_key': 'something', | ||||
'queue_name': 'fake-events' | 'queue_name': 'fake-events', | ||||
}) | }, | ||||
}, | |||||
}, | |||||
'scheduler_updater': { | |||||
'cls': 'local', | |||||
'args': { | |||||
'db': 'dbname=softwareheritage-scheduler-updater-dev', | |||||
'cache_read_limit': 1000, | |||||
}, | |||||
}, | |||||
} | } | ||||
ADDITIONAL_CONFIG = {} | |||||
def __init__(self, **config): | |||||
super().__init__() | |||||
if config and set(config.keys()) - {'log_class'} != set(): | |||||
self.config = config | |||||
else: | |||||
self.config = self.parse_config_file( | |||||
additional_configs=[self.ADDITIONAL_CONFIG]) | |||||
self.conn_string = self.config['conn']['url'] | class GHTorrentConsumer(UpdaterConsumer): | ||||
self.exchange = Exchange(self.config['conn']['exchange_name'], | """GHTorrent events consumer | ||||
'topic', durable=True) | |||||
self.routing_key = self.config['conn']['routing_key'] | |||||
self.queue = Queue(self.config['conn']['queue_name'], | |||||
exchange=self.exchange, | |||||
routing_key=self.routing_key, | |||||
auto_delete=True) | |||||
""" | |||||
connection_class = Connection | |||||
INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] | def __init__(self, **config): | ||||
self.config = merge_configs(DEFAULT_CONFIG, config) | |||||
ght_config = self.config['ghtorrent'] | |||||
rmq_config = ght_config['rabbitmq'] | |||||
self.prefetch_read = int(rmq_config.get('prefetch_read', 100)) | |||||
class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): | exchange = Exchange( | ||||
"""GHTorrent events consumer | rmq_config['conn']['exchange_name'], | ||||
'topic', durable=True) | |||||
routing_key = rmq_config['conn']['routing_key'] | |||||
self.queue = Queue(rmq_config['conn']['queue_name'], | |||||
exchange=exchange, | |||||
routing_key=routing_key, | |||||
auto_delete=True) | |||||
""" | if self.config['scheduler_updater']['cls'] != 'local': | ||||
ADDITIONAL_CONFIG = { | raise ValueError( | ||||
'debug': ('bool', False), | 'The scheduler_updater can only be a cls=local for now') | ||||
'batch_cache_write': ('int', 1000), | backend = SchedulerUpdaterBackend( | ||||
'rabbitmq_prefetch_read': ('int', 100), | **self.config['scheduler_updater']['args']) | ||||
} | |||||
def __init__(self, config=None, _connection_class=Connection): | super().__init__(backend, ght_config.get('batch_cache_write', 1000)) | ||||
if config is None: | |||||
super().__init__( | |||||
log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') | |||||
else: | |||||
self.config = config | |||||
self._connection_class = _connection_class | |||||
self.debug = self.config['debug'] | |||||
self.batch = self.config['batch_cache_write'] | |||||
self.prefetch_read = self.config['rabbitmq_prefetch_read'] | |||||
def has_events(self): | def has_events(self): | ||||
"""Always has events | """Always has events | ||||
""" | """ | ||||
return True | return True | ||||
def convert_event(self, event): | def convert_event(self, event): | ||||
"""Given ghtorrent event, convert it to a SWHEvent instance. | """Given ghtorrent event, convert it to a SWHEvent instance. | ||||
""" | """ | ||||
if isinstance(event, str): | if isinstance(event, str): | ||||
event = json.loads(event) | event = json.loads(event) | ||||
for k in INTERESTING_EVENT_KEYS: | for k in INTERESTING_EVENT_KEYS: | ||||
if k not in event: | if k not in event: | ||||
if hasattr(self, 'log'): | if hasattr(self, 'log'): | ||||
self.log.warn( | self.log.warning( | ||||
'Event should have the \'%s\' entry defined' % k) | 'Event should have the \'%s\' entry defined' % k) | ||||
return None | return None | ||||
_type = event['type'].lower().rstrip('Event') | _type = event['type'].lower().rstrip('Event') | ||||
_repo_name = 'https://github.com/%s' % event['repo']['name'] | _repo_name = 'https://github.com/%s' % event['repo']['name'] | ||||
return SWHEvent({ | return SWHEvent({ | ||||
'type': _type, | 'type': _type, | ||||
'url': _repo_name, | 'url': _repo_name, | ||||
'last_seen': event['created_at'], | 'last_seen': event['created_at'], | ||||
'origin_type': 'git', | 'origin_type': 'git', | ||||
}) | }) | ||||
def open_connection(self): | def open_connection(self): | ||||
"""Open rabbitmq connection | """Open rabbitmq connection | ||||
""" | """ | ||||
self.conn = self._connection_class(self.config['conn']['url']) | self.conn = self.connection_class( | ||||
self.config['ghtorrent']['rabbitmq']['conn']['url']) | |||||
self.conn.connect() | self.conn.connect() | ||||
self.channel = self.conn.channel() | self.channel = self.conn.channel() | ||||
def close_connection(self): | def close_connection(self): | ||||
"""Close rabbitmq connection | """Close rabbitmq connection | ||||
""" | """ | ||||
self.channel.close() | self.channel.close() | ||||
Show All 9 Lines |