Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
index c63e356..471175e 100644
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ b/swh/scheduler/tests/updater/test_ghtorrent.py
@@ -1,76 +1,175 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from arrow import utcnow
-from nose.plugins.attrib import attr
-from nose.tools import istest
from hypothesis import given
from hypothesis.strategies import sampled_from, from_regex
+from nose.tools import istest
+from unittest.mock import patch
from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.ghtorrent import events, convert_event
+from swh.scheduler.updater.ghtorrent import (
+ events, GHTorrentConsumer)
def event_values():
return set(events['evt']).union(set(events['ent']))
def ghtorrentize_event_name(event_name):
return '%sEvent' % event_name.capitalize()
EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()])
-@attr('db')
-class GHTorrentTest(unittest.TestCase):
+class FakeConnection:
+ """Fake Rabbitmq connection for test purposes
+
+ """
+ def __init__(self, conn_string):
+ self._conn_string = conn_string
+ self._connect = False
+ self._release = False
+
+ def connect(self):
+ self._connect = True
+ return True
+
+ def release(self):
+ self._connect = False
+ self._release = True
+
+ def channel(self):
+ self.channel = True
+ return None
+
+
+class GHTorrentConsumerTest(unittest.TestCase):
+ def setUp(self):
+ self.fake_config = {
+ 'conn': {
+ 'url': 'amqp://u:p@https://somewhere:9807',
+ },
+ 'debug': True,
+ 'batch_cache_write': 10,
+ 'rabbitmq_prefetch_read': 100,
+ }
+
+ self.consumer = GHTorrentConsumer(self.fake_config,
+ _connection_class=FakeConnection)
+
+ @istest
+ def test_init(self):
+ # given
+ # check init is ok
+ self.assertEqual(self.consumer.debug,
+ self.fake_config['debug'])
+ self.assertEqual(self.consumer.batch,
+ self.fake_config['batch_cache_write'])
+ self.assertEqual(self.consumer.prefetch_read,
+ self.fake_config['rabbitmq_prefetch_read'])
+ self.assertEqual(self.consumer.config, self.fake_config)
+
+ @istest
+ def test_has_events(self):
+ self.assertTrue(self.consumer.has_events())
+
+ @istest
+ def test_connection(self):
+ # when
+ self.consumer.open_connection()
+
+ # then
+ self.assertEqual(self.consumer.conn._conn_string,
+ self.fake_config['conn']['url'])
+ self.assertTrue(self.consumer.conn._connect)
+ self.assertFalse(self.consumer.conn._release)
+
+ # when
+ self.consumer.close_connection()
+
+ # then
+ self.assertFalse(self.consumer.conn._connect)
+ self.assertTrue(self.consumer.conn._release)
+
def _make_event(self, event_type, name):
return {
'type': event_type,
'repo': {
'name': name,
},
'created_at': utcnow(),
}
@istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'))
def convert_event_ok(self, event_type, name):
input_event = self._make_event(event_type, name)
- actual_event = convert_event(input_event)
+ actual_event = self.consumer.convert_event(input_event)
self.assertTrue(isinstance(actual_event, SWHEvent))
event = actual_event.get()
expected_event = {
'type': event_type.lower().rstrip('Event'),
'url': 'https://github.com/%s' % name,
'last_seen': input_event['created_at'],
'rate': 1,
}
self.assertEqual(event, expected_event)
def _make_incomplete_event(self, event_type, name, missing_data_key):
event = self._make_event(event_type, name)
del event[missing_data_key]
return event
@istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
sampled_from(['type', 'repo', 'created_at']))
def convert_event_ko(self, event_type, name, missing_data_key):
input_event = self._make_incomplete_event(
event_type, name, missing_data_key)
with self.assertRaisesRegex(
ValueError,
'Event should have the \'%s\' entry defined' % (
missing_data_key, )):
- convert_event(input_event)
+ self.consumer.convert_event(input_event)
+
+ @patch('swh.scheduler.updater.ghtorrent.collect_replies')
+ @istest
+ def consume(self, mock_collect_replies):
+ # given
+ self.consumer.queue = 'fake-queue' # hack
+ self.consumer.open_connection()
+
+ fake_events = [
+ self._make_event('PushEvent', 'user/some-repo'),
+ self._make_event('PushEvent', 'user2/some-other-repo'),
+ ]
+
+ mock_collect_replies.return_value = fake_events
+
+ # when
+ actual_events = []
+ for e in self.consumer.consume():
+ actual_events.append(e)
+
+ # then
+ self.assertEqual(fake_events, actual_events)
+
+ mock_collect_replies.assert_called_once_with(
+ self.consumer.conn,
+ None,
+ 'fake-queue',
+ no_ack=False,
+ limit=self.fake_config['rabbitmq_prefetch_read']
+ )
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
index 6654571..2f81bfe 100644
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ b/swh/scheduler/updater/ghtorrent/__init__.py
@@ -1,141 +1,138 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from kombu import Connection, Exchange, Queue
from kombu.common import collect_replies
from swh.core.config import SWHConfig
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.consumer import UpdaterConsumer
events = {
# ghtorrent events related to github events (interesting)
'evt': [
'commitcomment', 'create', 'delete', 'deployment',
'deploymentstatus', 'download', 'follow', 'fork', 'forkapply',
'gist', 'gollum', 'issuecomment', 'issues', 'member',
'membership', 'pagebuild', 'public', 'pullrequest',
'pullrequestreviewcomment', 'push', 'release', 'repository',
'status', 'teamadd', 'watch'
],
# ghtorrent events related to mongodb insert (not interesting)
'ent': [
'commit_comments', 'commits', 'followers', 'forks',
'geo_cache', 'issue_comments', 'issue_events', 'issues',
'org_members', 'pull_request_comments', 'pull_requests',
'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers'
]
}
class RabbitMQConn(SWHConfig):
"""RabbitMQ Connection class
"""
CONFIG_BASE_FILENAME = 'backend/ghtorrent'
DEFAULT_CONFIG = {
'conn': ('dict', {
'url': 'amqp://guest:guest@localhost:5672',
'exchange_name': 'ght-streams',
'routing_key': 'something',
'queue_name': 'fake-events'
})
}
ADDITIONAL_CONFIG = {}
def __init__(self, **config):
super().__init__()
if config:
self.config = config
else:
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.conn_string = self.config['conn']['url']
self.exchange = Exchange(self.config['conn']['exchange_name'],
'topic', durable=True)
self.routing_key = self.config['conn']['routing_key']
self.queue = Queue(self.config['conn']['queue_name'],
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=True)
-def convert_event(event):
- """Given ghtorrent event, convert it to an swhevent instance.
-
- """
- if isinstance(event, str):
- event = json.loads(event)
-
- keys = ['type', 'repo', 'created_at']
- for k in keys:
- if k not in event:
- raise ValueError('Event should have the \'%s\' entry defined' % k)
-
- _type = event['type'].lower().rstrip('Event')
- _repo_name = 'https://github.com/%s' % event['repo']['name']
-
- return SWHEvent({
- 'type': _type,
- 'url': _repo_name,
- 'last_seen': event['created_at']
- })
-
-
class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer):
"""GHTorrent events consumer
"""
ADDITIONAL_CONFIG = {
'debug': ('bool', False),
'batch_cache_write': ('int', 1000),
'rabbitmq_prefetch_read': ('int', 100),
}
- def __init__(self, **config):
- super().__init__(**config)
+ def __init__(self, config=None, _connection_class=Connection):
+ if config is None:
+ super().__init__()
+ else:
+ self.config = config
+ self._connection_class = _connection_class
self.debug = self.config['debug']
self.batch = self.config['batch_cache_write']
self.prefetch_read = self.config['rabbitmq_prefetch_read']
def has_events(self):
"""Always has events
"""
return True
def convert_event(self, event):
"""Given ghtorrent event, convert it to a SWHEvent instance.
"""
- return convert_event(event)
+ if isinstance(event, str):
+ event = json.loads(event)
+
+ keys = ['type', 'repo', 'created_at']
+ for k in keys:
+ if k not in event:
+ raise ValueError(
+ 'Event should have the \'%s\' entry defined' % k)
+
+ _type = event['type'].lower().rstrip('Event')
+ _repo_name = 'https://github.com/%s' % event['repo']['name']
+
+ return SWHEvent({
+ 'type': _type,
+ 'url': _repo_name,
+ 'last_seen': event['created_at']
+ })
def open_connection(self):
"""Open rabbitmq connection
"""
- self.conn = Connection(self.conn_string)
self.conn = self._connection_class(self.config['conn']['url'])
self.conn.connect()
def close_connection(self):
"""Close rabbitmq connection
"""
self.conn.release()
def consume(self):
"""Consume and yield queue messages
"""
yield from collect_replies(
self.conn, self.conn.channel(), self.queue,
no_ack=False, limit=self.prefetch_read)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:10 PM (1 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3253345

Event Timeline