diff --git a/sql/updater/sql/swh-func.sql b/sql/updater/sql/swh-func.sql index 8e33cba..435ce34 100644 --- a/sql/updater/sql/swh-func.sql +++ b/sql/updater/sql/swh-func.sql @@ -1,34 +1,34 @@ -- Postgresql index helper function create or replace function hash_sha1(text) returns sha1 as $$ select public.digest($1, 'sha1') :: sha1 $$ language sql strict immutable; comment on function hash_sha1(text) is 'Compute sha1 hash as text'; -- create a temporary table for cache tmp_cache, create or replace function swh_mktemp_cache() returns void language sql as $$ create temporary table tmp_cache ( like cache including defaults ) on commit drop; alter table tmp_cache drop column id; $$; create or replace function swh_cache_put() returns void language plpgsql as $$ begin - insert into cache (id, url, rate, last_seen) - select hash_sha1(url), url, rate, last_seen + insert into cache (id, url, rate, last_seen, origin_type) + select hash_sha1(url), url, rate, last_seen, origin_type from tmp_cache t on conflict(id) do update set rate = (select rate from cache where id=excluded.id) + excluded.rate, last_seen = excluded.last_seen; return; end $$; diff --git a/sql/updater/sql/swh-schema.sql b/sql/updater/sql/swh-schema.sql index 56a7391..965a4bf 100644 --- a/sql/updater/sql/swh-schema.sql +++ b/sql/updater/sql/swh-schema.sql @@ -1,24 +1,28 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); insert into dbversion (version, release, description) values (1, now(), 'Work In Progress'); +create type origin_type as enum ('git', 'svn', 'hg', 'deb'); +comment on type origin_type is 'Url''s repository type'; + create table cache ( id sha1 primary key, url text not null, rate int default 1, - last_seen timestamptz not null + last_seen timestamptz not null, + origin_type origin_type not null ); create index on cache(url); create index on cache(last_seen); diff --git a/swh/scheduler/tests/updater/__init__.py b/swh/scheduler/tests/updater/__init__.py index 7806abc..865b0be 100644 --- a/swh/scheduler/tests/updater/__init__.py +++ b/swh/scheduler/tests/updater/__init__.py @@ -1,34 +1,44 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow class UpdaterTestUtil: """Mixin intended for event generation purposes """ - def _make_event(self, event_type, name): + def _make_event(self, event_type, name, origin_type): return { 'type': event_type, 'repo': { 'name': name, }, 'created_at': utcnow(), + 'origin_type': origin_type, } def _make_events(self, events): - for event_type, repo_name in events: - yield self._make_event(event_type, repo_name) + for event_type, repo_name, origin_type in events: + yield self._make_event(event_type, repo_name, origin_type) - def _make_incomplete_event(self, event_type, name, missing_data_key): - event = self._make_event(event_type, name) + def _make_incomplete_event(self, event_type, name, origin_type, + missing_data_key): + event = self._make_event(event_type, name, origin_type) del event[missing_data_key] return event def _make_incomplete_events(self, events): - for event_type, repo_name, missing_data_key in events: + for event_type, repo_name, origin_type, missing_data_key in events: yield self._make_incomplete_event(event_type, repo_name, - missing_data_key) + origin_type, missing_data_key) + + def _make_simple_event(self, event_type, name, origin_type): + return { + 'type': event_type, + 'url': 'https://fakeurl/%s' % name, + 'origin_type': origin_type, + 'created_at': utcnow(), + } diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index 8430687..7dfdbf4 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,67 +1,68 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow from nose.plugins.attrib import attr from nose.tools import istest from hypothesis import given from hypothesis.strategies import sets, from_regex from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') @attr('db') class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler-updater.dump') def setUp(self): super().setUp() config = { 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, 'time_window': '1 minute', } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): self.backend.close_connection() self._empty_tables() super().tearDown() @istest @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) def cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, - 'type': 'create' + 'type': 'create', + 'origin_type': 'git', }) self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index 12e15f8..e614d41 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,194 +1,198 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given from hypothesis.strategies import sampled_from, from_regex, lists, tuples, text from itertools import chain from nose.tools import istest from swh.scheduler.tests.updater import UpdaterTestUtil from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS from swh.scheduler.updater.consumer import UpdaterConsumer class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): def __init__(self, backend_class=FakeSchedulerUpdaterBackend): super().__init__(backend_class=backend_class) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerRaise() @istest def running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerNoEvent() @istest def running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) -EVENT_KEYS = ['type', 'repo', 'created_at'] +EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] class FakeUpdaterConsumer(FakeUpdaterConsumerBase): def __init__(self, messages): super().__init__() self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() def convert_event(self, event, keys=EVENT_KEYS): for k in keys: v = event.get(k) if v is None: return None e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['repo']['name'], - 'last_seen': event['created_at'] + 'last_seen': event['created_at'], + 'origin_type': event['origin_type'], } return SWHEvent(e) class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): @istest - @given(lists(tuples(sampled_from(LISTENED_EVENTS), - from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$')), + @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type + from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name + text()), # origin type min_size=3, max_size=10), - lists(tuples(text(), - from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')), + lists(tuples(text(), # event type + from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name + text()), # origin type min_size=3, max_size=10), - lists(tuples(sampled_from(LISTENED_EVENTS), - from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), - sampled_from(EVENT_KEYS)), + lists(tuples(sampled_from(LISTENED_EVENTS), # event type + from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name + text(), # origin type + sampled_from(EVENT_KEYS)), # keys to drop min_size=3, max_size=10)) def running(self, events, uninteresting_events, incomplete_events): - """Interesting events are written to cache, dropping uninteresting ones + """Interesting events are written to cache, others are dropped """ # given ready_events = self._make_events(events) ready_uninteresting_events = self._make_events(uninteresting_events) ready_incomplete_events = self._make_incomplete_events( incomplete_events) updater = FakeUpdaterConsumer(list(chain( ready_events, ready_incomplete_events, ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py index 75cee1b..d36bd54 100644 --- a/swh/scheduler/tests/updater/test_events.py +++ b/swh/scheduler/tests/updater/test_events.py @@ -1,53 +1,48 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from arrow import utcnow from hypothesis import given from hypothesis.strategies import text, sampled_from from nose.tools import istest +from swh.scheduler.tests.updater import UpdaterTestUtil + from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS from swh.scheduler.updater.ghtorrent import events def event_values_ko(): return set(events['evt']).union( set(events['ent'])).difference( set(LISTENED_EVENTS)) WRONG_EVENTS = sorted(list(event_values_ko())) -class EventTest(unittest.TestCase): - def _make_event(self, event_name): - return { - 'type': event_name, - 'url': 'something', - 'last_seen': utcnow(), - } - +class EventTest(UpdaterTestUtil, unittest.TestCase): @istest - @given(sampled_from(LISTENED_EVENTS)) - def is_interesting_ok(self, event_name): - evt = self._make_event(event_name) + @given(sampled_from(LISTENED_EVENTS), text(), text()) + def is_interesting_ok(self, event_type, name, origin_type): + evt = self._make_simple_event(event_type, name, origin_type) self.assertTrue(SWHEvent(evt).is_interesting()) @istest - @given(text()) - def is_interested_with_noisy_event_should_be_ko(self, event_name): - if event_name in LISTENED_EVENTS: - # just in generation generates a real and correct name, skip it + @given(text(), text(), text()) + def is_interested_with_noisy_event_should_be_ko( + self, event_type, name, origin_type): + if event_type in LISTENED_EVENTS: + # just in case something good is generated, skip it return - evt = self._make_event(event_name) + evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) @istest - @given(sampled_from(WRONG_EVENTS)) - def is_interesting_ko(self, event_name): - evt = self._make_event(event_name) + @given(sampled_from(WRONG_EVENTS), text(), text()) + def is_interesting_ko(self, event_type, name, origin_type): + evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index e164180..fc8ad0d 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,170 +1,171 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given from hypothesis.strategies import sampled_from, from_regex from nose.tools import istest from unittest.mock import patch from swh.scheduler.tests.updater import UpdaterTestUtil from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import ( - events, GHTorrentConsumer) + events, GHTorrentConsumer, INTERESTING_EVENT_KEYS) def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): - input_event = self._make_event(event_type, name) + input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'rate': 1, + 'origin_type': 'git', } self.assertEqual(event, expected_event) @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), - sampled_from(['type', 'repo', 'created_at'])) + sampled_from(INTERESTING_EVENT_KEYS)) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( - event_type, name, missing_data_key) + event_type, name, 'git', missing_data_key) actual_converted_event = self.consumer.convert_event(input_event) self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') @istest def consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ - self._make_event('PushEvent', 'user/some-repo'), - self._make_event('PushEvent', 'user2/some-other-repo'), + self._make_event('PushEvent', 'user/some-repo', 'git'), + self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index 838307d..9f82734 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,67 +1,67 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow from swh.core.config import SWHConfig from swh.scheduler.backend import DbBackend, autocommit class SchedulerUpdaterBackend(SWHConfig, DbBackend): CONFIG_BASE_FILENAME = 'scheduler-updater' DEFAULT_CONFIG = { 'scheduling_updater_db': ( 'str', 'dbname=softwareheritage-scheduler-updater-dev'), 'time_window': ('str', '1 hour'), } def __init__(self, **override_config): super().__init__() self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.db_conn_dsn = self.config['scheduling_updater_db'] self.time_window = self.config['time_window'] self.reconnect() - cache_put_keys = ['url', 'rate', 'last_seen'] + cache_put_keys = ['url', 'rate', 'last_seen', 'origin_type'] @autocommit def cache_put(self, events, timestamp=None, cursor=None): if timestamp is None: timestamp = utcnow() def prepare_events(events): for e in events: event = e.get() seen = event['last_seen'] if seen is None: event['last_seen'] = timestamp yield event cursor.execute('select swh_mktemp_cache()') self.copy_to(prepare_events(events), 'tmp_cache', self.cache_put_keys, cursor) cursor.execute('select swh_cache_put()') # @autocommit # def cache_get(self, event, cursor=None): # pass # @autocommit # def cache_remove(self, event, cursor=None): # pass cache_read_keys = ['id', 'url'] @autocommit def cache_read(self, timestamp, limit=100, cursor=None): q = self._format_query("""select {keys} from cache where %s - interval %s <= last_seen and last_seen <= %s limit %s """, self.cache_read_keys) cursor.execute(q, (timestamp, self.time_window, timestamp, limit)) return cursor.fetchall() diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py index bde4f4c..2858d33 100644 --- a/swh/scheduler/updater/events.py +++ b/swh/scheduler/updater/events.py @@ -1,37 +1,39 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information LISTENED_EVENTS = [ 'delete', 'public', 'push' ] class SWHEvent: """SWH's interesting event (resulting in an origin update) """ def __init__(self, evt, rate=1): self.event = evt self.type = evt['type'].lower() self.url = evt['url'] self.last_seen = evt.get('last_seen') self.rate = rate + self.origin_type = evt.get('origin_type') def is_interesting(self): return self.type in LISTENED_EVENTS def get(self): return { 'type': self.type, 'url': self.url, 'last_seen': self.last_seen, 'rate': self.rate, + 'origin_type': self.origin_type } def __str__(self): return str(self.get()) diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 4425747..2b9e5fc 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,146 +1,146 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class RabbitMQConn(SWHConfig): """RabbitMQ Connection class """ CONFIG_BASE_FILENAME = 'backend/ghtorrent' DEFAULT_CONFIG = { 'conn': ('dict', { 'url': 'amqp://guest:guest@localhost:5672', 'exchange_name': 'ght-streams', 'routing_key': 'something', 'queue_name': 'fake-events' }) } ADDITIONAL_CONFIG = {} def __init__(self, **config): super().__init__() if config and set(config.keys()) - {'log_class'} != set(): self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.conn_string = self.config['conn']['url'] self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) self.routing_key = self.config['conn']['routing_key'] self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, routing_key=self.routing_key, auto_delete=True) -# Expected interesting event keys -EVENT_KEYS = ['type', 'repo', 'created_at'] +INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): """GHTorrent events consumer """ ADDITIONAL_CONFIG = { 'debug': ('bool', False), 'batch_cache_write': ('int', 1000), 'rabbitmq_prefetch_read': ('int', 100), } def __init__(self, config=None, _connection_class=Connection): if config is None: super().__init__( log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') else: self.config = config self._connection_class = _connection_class self.debug = self.config['debug'] self.batch = self.config['batch_cache_write'] self.prefetch_read = self.config['rabbitmq_prefetch_read'] def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ if isinstance(event, str): event = json.loads(event) - for k in EVENT_KEYS: + for k in INTERESTING_EVENT_KEYS: if k not in event: if hasattr(self, 'log'): self.log.warn( 'Event should have the \'%s\' entry defined' % k) return None _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, - 'last_seen': event['created_at'] + 'last_seen': event['created_at'], + 'origin_type': 'git', }) def open_connection(self): """Open rabbitmq connection """ self.conn = self._connection_class(self.config['conn']['url']) self.conn.connect() self.channel = self.conn.channel() def close_connection(self): """Close rabbitmq connection """ self.channel.close() self.conn.release() def consume_events(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.channel, self.queue, no_ack=False, limit=self.prefetch_read)