Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9342999
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
index c63e356..471175e 100644
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ b/swh/scheduler/tests/updater/test_ghtorrent.py
@@ -1,76 +1,175 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from arrow import utcnow
-from nose.plugins.attrib import attr
-from nose.tools import istest
from hypothesis import given
from hypothesis.strategies import sampled_from, from_regex
+from nose.tools import istest
+from unittest.mock import patch
from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.ghtorrent import events, convert_event
+from swh.scheduler.updater.ghtorrent import (
+ events, GHTorrentConsumer)
def event_values():
return set(events['evt']).union(set(events['ent']))
def ghtorrentize_event_name(event_name):
return '%sEvent' % event_name.capitalize()
EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()])
-@attr('db')
-class GHTorrentTest(unittest.TestCase):
+class FakeConnection:
+ """Fake Rabbitmq connection for test purposes
+
+ """
+ def __init__(self, conn_string):
+ self._conn_string = conn_string
+ self._connect = False
+ self._release = False
+
+ def connect(self):
+ self._connect = True
+ return True
+
+ def release(self):
+ self._connect = False
+ self._release = True
+
+ def channel(self):
+ self.channel = True
+ return None
+
+
+class GHTorrentConsumerTest(unittest.TestCase):
+ def setUp(self):
+ self.fake_config = {
+ 'conn': {
+ 'url': 'amqp://u:p@https://somewhere:9807',
+ },
+ 'debug': True,
+ 'batch_cache_write': 10,
+ 'rabbitmq_prefetch_read': 100,
+ }
+
+ self.consumer = GHTorrentConsumer(self.fake_config,
+ _connection_class=FakeConnection)
+
+ @istest
+ def test_init(self):
+ # given
+ # check init is ok
+ self.assertEqual(self.consumer.debug,
+ self.fake_config['debug'])
+ self.assertEqual(self.consumer.batch,
+ self.fake_config['batch_cache_write'])
+ self.assertEqual(self.consumer.prefetch_read,
+ self.fake_config['rabbitmq_prefetch_read'])
+ self.assertEqual(self.consumer.config, self.fake_config)
+
+ @istest
+ def test_has_events(self):
+ self.assertTrue(self.consumer.has_events())
+
+ @istest
+ def test_connection(self):
+ # when
+ self.consumer.open_connection()
+
+ # then
+ self.assertEqual(self.consumer.conn._conn_string,
+ self.fake_config['conn']['url'])
+ self.assertTrue(self.consumer.conn._connect)
+ self.assertFalse(self.consumer.conn._release)
+
+ # when
+ self.consumer.close_connection()
+
+ # then
+ self.assertFalse(self.consumer.conn._connect)
+ self.assertTrue(self.consumer.conn._release)
+
def _make_event(self, event_type, name):
return {
'type': event_type,
'repo': {
'name': name,
},
'created_at': utcnow(),
}
@istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'))
def convert_event_ok(self, event_type, name):
input_event = self._make_event(event_type, name)
- actual_event = convert_event(input_event)
+ actual_event = self.consumer.convert_event(input_event)
self.assertTrue(isinstance(actual_event, SWHEvent))
event = actual_event.get()
expected_event = {
'type': event_type.lower().rstrip('Event'),
'url': 'https://github.com/%s' % name,
'last_seen': input_event['created_at'],
'rate': 1,
}
self.assertEqual(event, expected_event)
def _make_incomplete_event(self, event_type, name, missing_data_key):
event = self._make_event(event_type, name)
del event[missing_data_key]
return event
@istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
sampled_from(['type', 'repo', 'created_at']))
def convert_event_ko(self, event_type, name, missing_data_key):
input_event = self._make_incomplete_event(
event_type, name, missing_data_key)
with self.assertRaisesRegex(
ValueError,
'Event should have the \'%s\' entry defined' % (
missing_data_key, )):
- convert_event(input_event)
+ self.consumer.convert_event(input_event)
+
+ @patch('swh.scheduler.updater.ghtorrent.collect_replies')
+ @istest
+ def consume(self, mock_collect_replies):
+ # given
+ self.consumer.queue = 'fake-queue' # hack
+ self.consumer.open_connection()
+
+ fake_events = [
+ self._make_event('PushEvent', 'user/some-repo'),
+ self._make_event('PushEvent', 'user2/some-other-repo'),
+ ]
+
+ mock_collect_replies.return_value = fake_events
+
+ # when
+ actual_events = []
+ for e in self.consumer.consume():
+ actual_events.append(e)
+
+ # then
+ self.assertEqual(fake_events, actual_events)
+
+ mock_collect_replies.assert_called_once_with(
+ self.consumer.conn,
+ None,
+ 'fake-queue',
+ no_ack=False,
+ limit=self.fake_config['rabbitmq_prefetch_read']
+ )
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
index 6654571..2f81bfe 100644
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ b/swh/scheduler/updater/ghtorrent/__init__.py
@@ -1,141 +1,138 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from kombu import Connection, Exchange, Queue
from kombu.common import collect_replies
from swh.core.config import SWHConfig
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.consumer import UpdaterConsumer
events = {
# ghtorrent events related to github events (interesting)
'evt': [
'commitcomment', 'create', 'delete', 'deployment',
'deploymentstatus', 'download', 'follow', 'fork', 'forkapply',
'gist', 'gollum', 'issuecomment', 'issues', 'member',
'membership', 'pagebuild', 'public', 'pullrequest',
'pullrequestreviewcomment', 'push', 'release', 'repository',
'status', 'teamadd', 'watch'
],
# ghtorrent events related to mongodb insert (not interesting)
'ent': [
'commit_comments', 'commits', 'followers', 'forks',
'geo_cache', 'issue_comments', 'issue_events', 'issues',
'org_members', 'pull_request_comments', 'pull_requests',
'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers'
]
}
class RabbitMQConn(SWHConfig):
"""RabbitMQ Connection class
"""
CONFIG_BASE_FILENAME = 'backend/ghtorrent'
DEFAULT_CONFIG = {
'conn': ('dict', {
'url': 'amqp://guest:guest@localhost:5672',
'exchange_name': 'ght-streams',
'routing_key': 'something',
'queue_name': 'fake-events'
})
}
ADDITIONAL_CONFIG = {}
def __init__(self, **config):
super().__init__()
if config:
self.config = config
else:
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.conn_string = self.config['conn']['url']
self.exchange = Exchange(self.config['conn']['exchange_name'],
'topic', durable=True)
self.routing_key = self.config['conn']['routing_key']
self.queue = Queue(self.config['conn']['queue_name'],
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=True)
-def convert_event(event):
- """Given ghtorrent event, convert it to an swhevent instance.
-
- """
- if isinstance(event, str):
- event = json.loads(event)
-
- keys = ['type', 'repo', 'created_at']
- for k in keys:
- if k not in event:
- raise ValueError('Event should have the \'%s\' entry defined' % k)
-
- _type = event['type'].lower().rstrip('Event')
- _repo_name = 'https://github.com/%s' % event['repo']['name']
-
- return SWHEvent({
- 'type': _type,
- 'url': _repo_name,
- 'last_seen': event['created_at']
- })
-
-
class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer):
"""GHTorrent events consumer
"""
ADDITIONAL_CONFIG = {
'debug': ('bool', False),
'batch_cache_write': ('int', 1000),
'rabbitmq_prefetch_read': ('int', 100),
}
- def __init__(self, **config):
- super().__init__(**config)
+ def __init__(self, config=None, _connection_class=Connection):
+ if config is None:
+ super().__init__()
+ else:
+ self.config = config
+ self._connection_class = _connection_class
self.debug = self.config['debug']
self.batch = self.config['batch_cache_write']
self.prefetch_read = self.config['rabbitmq_prefetch_read']
def has_events(self):
"""Always has events
"""
return True
def convert_event(self, event):
"""Given ghtorrent event, convert it to a SWHEvent instance.
"""
- return convert_event(event)
+ if isinstance(event, str):
+ event = json.loads(event)
+
+ keys = ['type', 'repo', 'created_at']
+ for k in keys:
+ if k not in event:
+ raise ValueError(
+ 'Event should have the \'%s\' entry defined' % k)
+
+ _type = event['type'].lower().rstrip('Event')
+ _repo_name = 'https://github.com/%s' % event['repo']['name']
+
+ return SWHEvent({
+ 'type': _type,
+ 'url': _repo_name,
+ 'last_seen': event['created_at']
+ })
def open_connection(self):
"""Open rabbitmq connection
"""
- self.conn = Connection(self.conn_string)
self.conn = self._connection_class(self.config['conn']['url'])
self.conn.connect()
def close_connection(self):
"""Close rabbitmq connection
"""
self.conn.release()
def consume(self):
"""Consume and yield queue messages
"""
yield from collect_replies(
self.conn, self.conn.channel(), self.queue,
no_ack=False, limit=self.prefetch_read)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:10 PM (1 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3253345
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment