Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/sql/updater/sql/swh-func.sql b/sql/updater/sql/swh-func.sql
index 8e33cba..435ce34 100644
--- a/sql/updater/sql/swh-func.sql
+++ b/sql/updater/sql/swh-func.sql
@@ -1,34 +1,34 @@
-- Postgresql index helper function
create or replace function hash_sha1(text)
returns sha1
as $$
select public.digest($1, 'sha1') :: sha1
$$ language sql strict immutable;
comment on function hash_sha1(text) is 'Compute sha1 hash as text';
-- create a temporary table for cache tmp_cache,
create or replace function swh_mktemp_cache()
returns void
language sql
as $$
create temporary table tmp_cache (
like cache including defaults
) on commit drop;
alter table tmp_cache drop column id;
$$;
create or replace function swh_cache_put()
returns void
language plpgsql
as $$
begin
- insert into cache (id, url, rate, last_seen)
- select hash_sha1(url), url, rate, last_seen
+ insert into cache (id, url, rate, last_seen, origin_type)
+ select hash_sha1(url), url, rate, last_seen, origin_type
from tmp_cache t
on conflict(id)
do update set rate = (select rate from cache where id=excluded.id) + excluded.rate,
last_seen = excluded.last_seen;
return;
end
$$;
diff --git a/sql/updater/sql/swh-schema.sql b/sql/updater/sql/swh-schema.sql
index 56a7391..965a4bf 100644
--- a/sql/updater/sql/swh-schema.sql
+++ b/sql/updater/sql/swh-schema.sql
@@ -1,24 +1,28 @@
create table dbversion
(
version int primary key,
release timestamptz not null,
description text not null
);
comment on table dbversion is 'Schema update tracking';
-- a SHA1 checksum (not necessarily originating from Git)
create domain sha1 as bytea check (length(value) = 20);
insert into dbversion (version, release, description)
values (1, now(), 'Work In Progress');
+create type origin_type as enum ('git', 'svn', 'hg', 'deb');
+comment on type origin_type is 'Url''s repository type';
+
create table cache (
id sha1 primary key,
url text not null,
rate int default 1,
- last_seen timestamptz not null
+ last_seen timestamptz not null,
+ origin_type origin_type not null
);
create index on cache(url);
create index on cache(last_seen);
diff --git a/swh/scheduler/tests/updater/__init__.py b/swh/scheduler/tests/updater/__init__.py
index 7806abc..865b0be 100644
--- a/swh/scheduler/tests/updater/__init__.py
+++ b/swh/scheduler/tests/updater/__init__.py
@@ -1,34 +1,44 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from arrow import utcnow
class UpdaterTestUtil:
"""Mixin intended for event generation purposes
"""
- def _make_event(self, event_type, name):
+ def _make_event(self, event_type, name, origin_type):
return {
'type': event_type,
'repo': {
'name': name,
},
'created_at': utcnow(),
+ 'origin_type': origin_type,
}
def _make_events(self, events):
- for event_type, repo_name in events:
- yield self._make_event(event_type, repo_name)
+ for event_type, repo_name, origin_type in events:
+ yield self._make_event(event_type, repo_name, origin_type)
- def _make_incomplete_event(self, event_type, name, missing_data_key):
- event = self._make_event(event_type, name)
+ def _make_incomplete_event(self, event_type, name, origin_type,
+ missing_data_key):
+ event = self._make_event(event_type, name, origin_type)
del event[missing_data_key]
return event
def _make_incomplete_events(self, events):
- for event_type, repo_name, missing_data_key in events:
+ for event_type, repo_name, origin_type, missing_data_key in events:
yield self._make_incomplete_event(event_type, repo_name,
- missing_data_key)
+ origin_type, missing_data_key)
+
+ def _make_simple_event(self, event_type, name, origin_type):
+ return {
+ 'type': event_type,
+ 'url': 'https://fakeurl/%s' % name,
+ 'origin_type': origin_type,
+ 'created_at': utcnow(),
+ }
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
index 8430687..7dfdbf4 100644
--- a/swh/scheduler/tests/updater/test_backend.py
+++ b/swh/scheduler/tests/updater/test_backend.py
@@ -1,67 +1,68 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import unittest
from arrow import utcnow
from nose.plugins.attrib import attr
from nose.tools import istest
from hypothesis import given
from hypothesis.strategies import sets, from_regex
from swh.core.tests.db_testing import SingleDbTestFixture
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
from swh.scheduler.updater.events import SWHEvent
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata')
@attr('db')
class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase):
TEST_DB_NAME = 'softwareheritage-scheduler-updater-test'
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR,
'dumps/swh-scheduler-updater.dump')
def setUp(self):
super().setUp()
config = {
'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME,
'time_window': '1 minute',
}
self.backend = SchedulerUpdaterBackend(**config)
def _empty_tables(self):
self.cursor.execute(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = %s""", ('public', ))
tables = set(table for (table,) in self.cursor.fetchall())
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.conn.commit()
def tearDown(self):
self.backend.close_connection()
self._empty_tables()
super().tearDown()
@istest
@given(sets(
from_regex(
r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
min_size=10, max_size=15))
def cache_read(self, urls):
def gen_events(urls):
for url in urls:
yield SWHEvent({
'url': url,
- 'type': 'create'
+ 'type': 'create',
+ 'origin_type': 'git',
})
self.backend.cache_put(gen_events(urls))
r = self.backend.cache_read(timestamp=utcnow())
self.assertNotEqual(r, [])
diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py
index 12e15f8..e614d41 100644
--- a/swh/scheduler/tests/updater/test_consumer.py
+++ b/swh/scheduler/tests/updater/test_consumer.py
@@ -1,194 +1,198 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from hypothesis import given
from hypothesis.strategies import sampled_from, from_regex, lists, tuples, text
from itertools import chain
from nose.tools import istest
from swh.scheduler.tests.updater import UpdaterTestUtil
from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS
from swh.scheduler.updater.consumer import UpdaterConsumer
class FakeSchedulerUpdaterBackend:
def __init__(self):
self.events = []
def cache_put(self, events):
self.events.append(events)
class FakeUpdaterConsumerBase(UpdaterConsumer):
def __init__(self, backend_class=FakeSchedulerUpdaterBackend):
super().__init__(backend_class=backend_class)
self.connection_opened = False
self.connection_closed = False
self.consume_called = False
self.has_events_called = False
def open_connection(self):
self.connection_opened = True
def close_connection(self):
self.connection_closed = True
def convert_event(self, event):
pass
class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase):
def has_events(self):
self.has_events_called = True
return True
def consume_events(self):
self.consume_called = True
raise ValueError('Broken stuff')
class UpdaterConsumerRaisingTest(unittest.TestCase):
def setUp(self):
self.updater = FakeUpdaterConsumerRaise()
@istest
def running_raise(self):
"""Raising during run should finish fine.
"""
# given
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
# when
with self.assertRaisesRegex(ValueError, 'Broken stuff'):
self.updater.run()
# then
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
self.assertTrue(self.updater.connection_opened)
self.assertTrue(self.updater.has_events_called)
self.assertTrue(self.updater.connection_closed)
self.assertTrue(self.updater.consume_called)
class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase):
def has_events(self):
self.has_events_called = True
return False
def consume_events(self):
self.consume_called = True
class UpdaterConsumerNoEventTest(unittest.TestCase):
def setUp(self):
self.updater = FakeUpdaterConsumerNoEvent()
@istest
def running_does_not_consume(self):
"""Run with no events should do just fine"""
# given
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
# when
self.updater.run()
# then
self.assertEqual(self.updater.count, 0)
self.assertEqual(self.updater.seen_events, set())
self.assertEqual(self.updater.events, [])
self.assertTrue(self.updater.connection_opened)
self.assertTrue(self.updater.has_events_called)
self.assertTrue(self.updater.connection_closed)
self.assertFalse(self.updater.consume_called)
-EVENT_KEYS = ['type', 'repo', 'created_at']
+EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type']
class FakeUpdaterConsumer(FakeUpdaterConsumerBase):
def __init__(self, messages):
super().__init__()
self.messages = messages
self.debug = False
def has_events(self):
self.has_events_called = True
return len(self.messages) > 0
def consume_events(self):
self.consume_called = True
for msg in self.messages:
yield msg
self.messages.pop()
def convert_event(self, event, keys=EVENT_KEYS):
for k in keys:
v = event.get(k)
if v is None:
return None
e = {
'type': event['type'],
'url': 'https://fake.url/%s' % event['repo']['name'],
- 'last_seen': event['created_at']
+ 'last_seen': event['created_at'],
+ 'origin_type': event['origin_type'],
}
return SWHEvent(e)
class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase):
@istest
- @given(lists(tuples(sampled_from(LISTENED_EVENTS),
- from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$')),
+ @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type
+ from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
+ text()), # origin type
min_size=3, max_size=10),
- lists(tuples(text(),
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')),
+ lists(tuples(text(), # event type
+ from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name
+ text()), # origin type
min_size=3, max_size=10),
- lists(tuples(sampled_from(LISTENED_EVENTS),
- from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'),
- sampled_from(EVENT_KEYS)),
+ lists(tuples(sampled_from(LISTENED_EVENTS), # event type
+ from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
+ text(), # origin type
+ sampled_from(EVENT_KEYS)), # keys to drop
min_size=3, max_size=10))
def running(self, events, uninteresting_events, incomplete_events):
- """Interesting events are written to cache, dropping uninteresting ones
+ """Interesting events are written to cache, others are dropped
"""
# given
ready_events = self._make_events(events)
ready_uninteresting_events = self._make_events(uninteresting_events)
ready_incomplete_events = self._make_incomplete_events(
incomplete_events)
updater = FakeUpdaterConsumer(list(chain(
ready_events, ready_incomplete_events,
ready_uninteresting_events)))
self.assertEqual(updater.count, 0)
self.assertEqual(updater.seen_events, set())
self.assertEqual(updater.events, [])
# when
updater.run()
# then
self.assertEqual(updater.count, 0)
self.assertEqual(updater.seen_events, set())
self.assertEqual(updater.events, [])
self.assertTrue(updater.connection_opened)
self.assertTrue(updater.has_events_called)
self.assertTrue(updater.connection_closed)
self.assertTrue(updater.consume_called)
self.assertEqual(updater.messages, [])
# uninteresting or incomplete events are dropped
self.assertTrue(len(updater.backend.events), len(events))
diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py
index 75cee1b..d36bd54 100644
--- a/swh/scheduler/tests/updater/test_events.py
+++ b/swh/scheduler/tests/updater/test_events.py
@@ -1,53 +1,48 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
-from arrow import utcnow
from hypothesis import given
from hypothesis.strategies import text, sampled_from
from nose.tools import istest
+from swh.scheduler.tests.updater import UpdaterTestUtil
+
from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS
from swh.scheduler.updater.ghtorrent import events
def event_values_ko():
return set(events['evt']).union(
set(events['ent'])).difference(
set(LISTENED_EVENTS))
WRONG_EVENTS = sorted(list(event_values_ko()))
-class EventTest(unittest.TestCase):
- def _make_event(self, event_name):
- return {
- 'type': event_name,
- 'url': 'something',
- 'last_seen': utcnow(),
- }
-
+class EventTest(UpdaterTestUtil, unittest.TestCase):
@istest
- @given(sampled_from(LISTENED_EVENTS))
- def is_interesting_ok(self, event_name):
- evt = self._make_event(event_name)
+ @given(sampled_from(LISTENED_EVENTS), text(), text())
+ def is_interesting_ok(self, event_type, name, origin_type):
+ evt = self._make_simple_event(event_type, name, origin_type)
self.assertTrue(SWHEvent(evt).is_interesting())
@istest
- @given(text())
- def is_interested_with_noisy_event_should_be_ko(self, event_name):
- if event_name in LISTENED_EVENTS:
- # just in generation generates a real and correct name, skip it
+ @given(text(), text(), text())
+ def is_interested_with_noisy_event_should_be_ko(
+ self, event_type, name, origin_type):
+ if event_type in LISTENED_EVENTS:
+ # just in case something good is generated, skip it
return
- evt = self._make_event(event_name)
+ evt = self._make_simple_event(event_type, name, origin_type)
self.assertFalse(SWHEvent(evt).is_interesting())
@istest
- @given(sampled_from(WRONG_EVENTS))
- def is_interesting_ko(self, event_name):
- evt = self._make_event(event_name)
+ @given(sampled_from(WRONG_EVENTS), text(), text())
+ def is_interesting_ko(self, event_type, name, origin_type):
+ evt = self._make_simple_event(event_type, name, origin_type)
self.assertFalse(SWHEvent(evt).is_interesting())
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
index e164180..fc8ad0d 100644
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ b/swh/scheduler/tests/updater/test_ghtorrent.py
@@ -1,170 +1,171 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from hypothesis import given
from hypothesis.strategies import sampled_from, from_regex
from nose.tools import istest
from unittest.mock import patch
from swh.scheduler.tests.updater import UpdaterTestUtil
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.ghtorrent import (
- events, GHTorrentConsumer)
+ events, GHTorrentConsumer, INTERESTING_EVENT_KEYS)
def event_values():
return set(events['evt']).union(set(events['ent']))
def ghtorrentize_event_name(event_name):
return '%sEvent' % event_name.capitalize()
EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()])
class FakeChannel:
"""Fake Channel (virtual connection inside a connection)
"""
def close(self):
self.close = True
class FakeConnection:
"""Fake Rabbitmq connection for test purposes
"""
def __init__(self, conn_string):
self._conn_string = conn_string
self._connect = False
self._release = False
self._channel = False
def connect(self):
self._connect = True
return True
def release(self):
self._connect = False
self._release = True
def channel(self):
self._channel = True
return FakeChannel()
class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase):
def setUp(self):
self.fake_config = {
'conn': {
'url': 'amqp://u:p@https://somewhere:9807',
},
'debug': True,
'batch_cache_write': 10,
'rabbitmq_prefetch_read': 100,
}
self.consumer = GHTorrentConsumer(self.fake_config,
_connection_class=FakeConnection)
@istest
def test_init(self):
# given
# check init is ok
self.assertEqual(self.consumer.debug,
self.fake_config['debug'])
self.assertEqual(self.consumer.batch,
self.fake_config['batch_cache_write'])
self.assertEqual(self.consumer.prefetch_read,
self.fake_config['rabbitmq_prefetch_read'])
self.assertEqual(self.consumer.config, self.fake_config)
@istest
def test_has_events(self):
self.assertTrue(self.consumer.has_events())
@istest
def test_connection(self):
# when
self.consumer.open_connection()
# then
self.assertEqual(self.consumer.conn._conn_string,
self.fake_config['conn']['url'])
self.assertTrue(self.consumer.conn._connect)
self.assertFalse(self.consumer.conn._release)
# when
self.consumer.close_connection()
# then
self.assertFalse(self.consumer.conn._connect)
self.assertTrue(self.consumer.conn._release)
self.assertIsInstance(self.consumer.channel, FakeChannel)
@istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'))
def convert_event_ok(self, event_type, name):
- input_event = self._make_event(event_type, name)
+ input_event = self._make_event(event_type, name, 'git')
actual_event = self.consumer.convert_event(input_event)
self.assertTrue(isinstance(actual_event, SWHEvent))
event = actual_event.get()
expected_event = {
'type': event_type.lower().rstrip('Event'),
'url': 'https://github.com/%s' % name,
'last_seen': input_event['created_at'],
'rate': 1,
+ 'origin_type': 'git',
}
self.assertEqual(event, expected_event)
@istest
@given(sampled_from(EVENT_TYPES),
from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
- sampled_from(['type', 'repo', 'created_at']))
+ sampled_from(INTERESTING_EVENT_KEYS))
def convert_event_ko(self, event_type, name, missing_data_key):
input_event = self._make_incomplete_event(
- event_type, name, missing_data_key)
+ event_type, name, 'git', missing_data_key)
actual_converted_event = self.consumer.convert_event(input_event)
self.assertIsNone(actual_converted_event)
@patch('swh.scheduler.updater.ghtorrent.collect_replies')
@istest
def consume_events(self, mock_collect_replies):
# given
self.consumer.queue = 'fake-queue' # hack
self.consumer.open_connection()
fake_events = [
- self._make_event('PushEvent', 'user/some-repo'),
- self._make_event('PushEvent', 'user2/some-other-repo'),
+ self._make_event('PushEvent', 'user/some-repo', 'git'),
+ self._make_event('PushEvent', 'user2/some-other-repo', 'git'),
]
mock_collect_replies.return_value = fake_events
# when
actual_events = []
for e in self.consumer.consume_events():
actual_events.append(e)
# then
self.assertEqual(fake_events, actual_events)
mock_collect_replies.assert_called_once_with(
self.consumer.conn,
self.consumer.channel,
'fake-queue',
no_ack=False,
limit=self.fake_config['rabbitmq_prefetch_read']
)
diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py
index 838307d..9f82734 100644
--- a/swh/scheduler/updater/backend.py
+++ b/swh/scheduler/updater/backend.py
@@ -1,67 +1,67 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from arrow import utcnow
from swh.core.config import SWHConfig
from swh.scheduler.backend import DbBackend, autocommit
class SchedulerUpdaterBackend(SWHConfig, DbBackend):
CONFIG_BASE_FILENAME = 'scheduler-updater'
DEFAULT_CONFIG = {
'scheduling_updater_db': (
'str', 'dbname=softwareheritage-scheduler-updater-dev'),
'time_window': ('str', '1 hour'),
}
def __init__(self, **override_config):
super().__init__()
self.config = self.parse_config_file(global_config=False)
self.config.update(override_config)
self.db = None
self.db_conn_dsn = self.config['scheduling_updater_db']
self.time_window = self.config['time_window']
self.reconnect()
- cache_put_keys = ['url', 'rate', 'last_seen']
+ cache_put_keys = ['url', 'rate', 'last_seen', 'origin_type']
@autocommit
def cache_put(self, events, timestamp=None, cursor=None):
if timestamp is None:
timestamp = utcnow()
def prepare_events(events):
for e in events:
event = e.get()
seen = event['last_seen']
if seen is None:
event['last_seen'] = timestamp
yield event
cursor.execute('select swh_mktemp_cache()')
self.copy_to(prepare_events(events),
'tmp_cache', self.cache_put_keys, cursor)
cursor.execute('select swh_cache_put()')
# @autocommit
# def cache_get(self, event, cursor=None):
# pass
# @autocommit
# def cache_remove(self, event, cursor=None):
# pass
cache_read_keys = ['id', 'url']
@autocommit
def cache_read(self, timestamp, limit=100, cursor=None):
q = self._format_query("""select {keys}
from cache
where %s - interval %s <= last_seen and last_seen <= %s
limit %s
""", self.cache_read_keys)
cursor.execute(q, (timestamp, self.time_window, timestamp, limit))
return cursor.fetchall()
diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py
index bde4f4c..2858d33 100644
--- a/swh/scheduler/updater/events.py
+++ b/swh/scheduler/updater/events.py
@@ -1,37 +1,39 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
LISTENED_EVENTS = [
'delete',
'public',
'push'
]
class SWHEvent:
"""SWH's interesting event (resulting in an origin update)
"""
def __init__(self, evt, rate=1):
self.event = evt
self.type = evt['type'].lower()
self.url = evt['url']
self.last_seen = evt.get('last_seen')
self.rate = rate
+ self.origin_type = evt.get('origin_type')
def is_interesting(self):
return self.type in LISTENED_EVENTS
def get(self):
return {
'type': self.type,
'url': self.url,
'last_seen': self.last_seen,
'rate': self.rate,
+ 'origin_type': self.origin_type
}
def __str__(self):
return str(self.get())
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
index 4425747..2b9e5fc 100644
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ b/swh/scheduler/updater/ghtorrent/__init__.py
@@ -1,146 +1,146 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from kombu import Connection, Exchange, Queue
from kombu.common import collect_replies
from swh.core.config import SWHConfig
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.consumer import UpdaterConsumer
events = {
# ghtorrent events related to github events (interesting)
'evt': [
'commitcomment', 'create', 'delete', 'deployment',
'deploymentstatus', 'download', 'follow', 'fork', 'forkapply',
'gist', 'gollum', 'issuecomment', 'issues', 'member',
'membership', 'pagebuild', 'public', 'pullrequest',
'pullrequestreviewcomment', 'push', 'release', 'repository',
'status', 'teamadd', 'watch'
],
# ghtorrent events related to mongodb insert (not interesting)
'ent': [
'commit_comments', 'commits', 'followers', 'forks',
'geo_cache', 'issue_comments', 'issue_events', 'issues',
'org_members', 'pull_request_comments', 'pull_requests',
'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers'
]
}
class RabbitMQConn(SWHConfig):
"""RabbitMQ Connection class
"""
CONFIG_BASE_FILENAME = 'backend/ghtorrent'
DEFAULT_CONFIG = {
'conn': ('dict', {
'url': 'amqp://guest:guest@localhost:5672',
'exchange_name': 'ght-streams',
'routing_key': 'something',
'queue_name': 'fake-events'
})
}
ADDITIONAL_CONFIG = {}
def __init__(self, **config):
super().__init__()
if config and set(config.keys()) - {'log_class'} != set():
self.config = config
else:
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.conn_string = self.config['conn']['url']
self.exchange = Exchange(self.config['conn']['exchange_name'],
'topic', durable=True)
self.routing_key = self.config['conn']['routing_key']
self.queue = Queue(self.config['conn']['queue_name'],
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=True)
-# Expected interesting event keys
-EVENT_KEYS = ['type', 'repo', 'created_at']
+INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer):
"""GHTorrent events consumer
"""
ADDITIONAL_CONFIG = {
'debug': ('bool', False),
'batch_cache_write': ('int', 1000),
'rabbitmq_prefetch_read': ('int', 100),
}
def __init__(self, config=None, _connection_class=Connection):
if config is None:
super().__init__(
log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer')
else:
self.config = config
self._connection_class = _connection_class
self.debug = self.config['debug']
self.batch = self.config['batch_cache_write']
self.prefetch_read = self.config['rabbitmq_prefetch_read']
def has_events(self):
"""Always has events
"""
return True
def convert_event(self, event):
"""Given ghtorrent event, convert it to a SWHEvent instance.
"""
if isinstance(event, str):
event = json.loads(event)
- for k in EVENT_KEYS:
+ for k in INTERESTING_EVENT_KEYS:
if k not in event:
if hasattr(self, 'log'):
self.log.warn(
'Event should have the \'%s\' entry defined' % k)
return None
_type = event['type'].lower().rstrip('Event')
_repo_name = 'https://github.com/%s' % event['repo']['name']
return SWHEvent({
'type': _type,
'url': _repo_name,
- 'last_seen': event['created_at']
+ 'last_seen': event['created_at'],
+ 'origin_type': 'git',
})
def open_connection(self):
"""Open rabbitmq connection
"""
self.conn = self._connection_class(self.config['conn']['url'])
self.conn.connect()
self.channel = self.conn.channel()
def close_connection(self):
"""Close rabbitmq connection
"""
self.channel.close()
self.conn.release()
def consume_events(self):
"""Consume and yield queue messages
"""
yield from collect_replies(
self.conn, self.channel, self.queue,
no_ack=False, limit=self.prefetch_read)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:10 PM (1 w, 23 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3317532

Event Timeline