diff --git a/sql/updater/sql/swh-func.sql b/sql/updater/sql/swh-func.sql index 81d38ce..8e33cba 100644 --- a/sql/updater/sql/swh-func.sql +++ b/sql/updater/sql/swh-func.sql @@ -1,34 +1,34 @@ -- Postgresql index helper function create or replace function hash_sha1(text) returns sha1 as $$ select public.digest($1, 'sha1') :: sha1 $$ language sql strict immutable; comment on function hash_sha1(text) is 'Compute sha1 hash as text'; -- create a temporary table for cache tmp_cache, create or replace function swh_mktemp_cache() returns void language sql as $$ create temporary table tmp_cache ( like cache including defaults ) on commit drop; alter table tmp_cache drop column id; $$; create or replace function swh_cache_put() returns void language plpgsql as $$ begin - insert into cache (id, url, last_seen) - select hash_sha1(url), url, last_seen + insert into cache (id, url, rate, last_seen) + select hash_sha1(url), url, rate, last_seen from tmp_cache t on conflict(id) - do update set rate = (select rate from cache where id=excluded.id) + 1, + do update set rate = (select rate from cache where id=excluded.id) + excluded.rate, last_seen = excluded.last_seen; return; end $$; diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index 5179151..c63e356 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,75 +1,76 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from arrow import utcnow from nose.plugins.attrib import attr from nose.tools import istest from hypothesis import given from hypothesis.strategies import sampled_from, from_regex from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import events, convert_event def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) @attr('db') class GHTorrentTest(unittest.TestCase): def _make_event(self, event_type, name): return { 'type': event_type, 'repo': { 'name': name, }, 'created_at': utcnow(), } @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name) actual_event = convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], + 'rate': 1, } self.assertEqual(event, expected_event) def _make_incomplete_event(self, event_type, name, missing_data_key): event = self._make_event(event_type, name) del event[missing_data_key] return event @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(['type', 'repo', 'created_at'])) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, missing_data_key) with self.assertRaisesRegex( ValueError, 'Event should have the \'%s\' entry defined' % ( missing_data_key, )): convert_event(input_event) diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index 2b695df..838307d 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,67 +1,67 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow from swh.core.config import SWHConfig from swh.scheduler.backend import DbBackend, autocommit class SchedulerUpdaterBackend(SWHConfig, DbBackend): CONFIG_BASE_FILENAME = 'scheduler-updater' DEFAULT_CONFIG = { 'scheduling_updater_db': ( 'str', 'dbname=softwareheritage-scheduler-updater-dev'), 'time_window': ('str', '1 hour'), } def __init__(self, **override_config): super().__init__() self.config = self.parse_config_file(global_config=False) self.config.update(override_config) self.db = None self.db_conn_dsn = self.config['scheduling_updater_db'] self.time_window = self.config['time_window'] self.reconnect() - cache_put_keys = ['url', 'last_seen'] + cache_put_keys = ['url', 'rate', 'last_seen'] @autocommit def cache_put(self, events, timestamp=None, cursor=None): if timestamp is None: timestamp = utcnow() def prepare_events(events): for e in events: event = e.get() - seen = event.get('last_seen') + seen = event['last_seen'] if seen is None: event['last_seen'] = timestamp yield event cursor.execute('select swh_mktemp_cache()') self.copy_to(prepare_events(events), 'tmp_cache', self.cache_put_keys, cursor) cursor.execute('select swh_cache_put()') # @autocommit # def cache_get(self, event, cursor=None): # pass # @autocommit # def cache_remove(self, event, cursor=None): # pass cache_read_keys = ['id', 'url'] @autocommit def cache_read(self, timestamp, limit=100, cursor=None): q = self._format_query("""select {keys} from cache where %s - interval %s <= last_seen and last_seen <= %s limit %s """, self.cache_read_keys) cursor.execute(q, (timestamp, self.time_window, timestamp, limit)) return cursor.fetchall() diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py index 03bfa1a..89b6356 100644 --- a/swh/scheduler/updater/events.py +++ b/swh/scheduler/updater/events.py @@ -1,38 +1,38 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information LISTENED_EVENTS = [ 'create', 'delete', 'public', 'push' ] class SWHEvent: """SWH's interesting event (resulting in an origin update) """ - def __init__(self, evt): + def __init__(self, evt, rate=1): self.event = evt + self.type = evt['type'].lower() + self.url = evt['url'] + self.last_seen = evt.get('last_seen') + self.rate = rate def check(self): - return 'type' in self.event and \ - self.event['type'].lower() in LISTENED_EVENTS + return self.type in LISTENED_EVENTS def get(self): return { - 'type': self.event['type'], - 'url': self.event['url'], - 'last_seen': self.event.get('last_seen') + 'type': self.type, + 'url': self.url, + 'last_seen': self.last_seen, + 'rate': self.rate, } def __str__(self): - return { - 'type': self.event['type'], - 'url': self.event['url'], - 'last_seen': self.event.get('last_seen') - } + return self.get() diff --git a/swh/scheduler/updater/ghtorrent.py b/swh/scheduler/updater/ghtorrent.py index fbe476c..33973f2 100644 --- a/swh/scheduler/updater/ghtorrent.py +++ b/swh/scheduler/updater/ghtorrent.py @@ -1,224 +1,236 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import json import random import string from arrow import utcnow +from collections import defaultdict from kombu import Connection, Exchange, Queue from swh.core.config import SWHConfig from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.backend import SchedulerUpdaterBackend events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } class FakeRandomOriginGenerator: def _random_string(self, length): """Build a fake string of length length. """ return ''.join([ random.choice(string.ascii_letters + string.digits) for n in range(length)]) def generate(self, user_range=range(5, 10), repo_range=range(10, 15)): """Build a fake url """ length_username = random.choice(user_range) user = self._random_string(length_username) length_repo = random.choice(repo_range) repo = self._random_string(length_repo) return '%s/%s' % (user, repo) class RabbitMQConn(SWHConfig): """RabbitMQ Connection class """ CONFIG_BASE_FILENAME = 'backend/ghtorrent' DEFAULT_CONFIG = { 'conn': ('dict', { 'user': 'guest', 'pass': 'guest', 'port': 5672, 'server': 'localhost', 'exchange_name': 'ght-streams', 'routing_key': 'something', 'queue_name': 'fake-events' }) } ADDITIONAL_CONFIG = {} def _connection_string(self): """Build the connection queue string. """ return 'amqp://%s:%s@%s:%s' % ( self.config['conn']['user'], self.config['conn']['pass'], self.config['conn']['server'], self.config['conn']['port'] ) def __init__(self, **config): if config: self.config = config else: self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.conn_string = self._connection_string() self.exchange = Exchange(self.config['conn']['exchange_name'], 'topic', durable=True) self.routing_key = self.config['conn']['routing_key'] self.queue = Queue(self.config['conn']['queue_name'], exchange=self.exchange, routing_key=self.routing_key, auto_delete=True) class FakeGHTorrentPublisher(RabbitMQConn): """Fake GHTorrent that randomly publishes fake events. Those events are published in similar manner as described by ghtorrent's documentation [2]. context: stuck with raw ghtorrent so far [1] [1] https://github.com/ghtorrent/ghtorrent.org/issues/397#issuecomment-387052462 # noqa [2] http://ghtorrent.org/streaming.html """ ADDITIONAL_CONFIG = { 'nb_messages': ('int', 100) } def __init__(self, **config): super().__init__(**config) self.fake_origin_generator = FakeRandomOriginGenerator() self.nb_messages = self.config['nb_messages'] def _random_event(self): """Create a fake and random event """ event_type = random.choice(['evt', 'ent']) sub_event = random.choice(events[event_type]) return { 'type': sub_event, 'repo': { 'name': self.fake_origin_generator.generate(), }, 'created_at': utcnow().isoformat() } def publish(self, nb_messages=None): if not nb_messages: nb_messages = self.nb_messages with Connection(self.conn_string) as conn: with conn.Producer(serializer='json') as producer: for n in range(nb_messages): event = self._random_event() producer.publish(event, exchange=self.exchange, routing_key=self.routing_key, declare=[self.queue]) def convert_event(event): """Given ghtorrent event, convert it to an swhevent instance. """ if isinstance(event, str): event = json.loads(event) keys = ['type', 'repo', 'created_at'] for k in keys: if k not in event: raise ValueError('Event should have the \'%s\' entry defined' % k) _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, 'last_seen': event['created_at'] }) -def process_message(body, message, backend, debug=False): - """Callback method to convert and push event to cache""" - try: - event = convert_event(body) - if debug: - print('#### body', body) - if event.check(): - backend.cache_put([event]) - finally: - message.ack() - - class GHTorrentConsumer(RabbitMQConn): """GHTorrent consumer """ ADDITIONAL_CONFIG = { - 'debug': (bool, False), + 'debug': ('bool', False), + 'batch': ('int', 1000), } def __init__(self): super().__init__() self.backend = SchedulerUpdaterBackend() self.debug = self.config['debug'] + self._reset_cache() + self.batch = self.config['batch'] + + def _reset_cache(self): + self.count = 0 + self.seen_events = set() + self.events = [] + + def process_message(self, body, message): + try: + event = convert_event(body) + if self.debug: + print('#### body', body) + if event.check(): + if event.url in self.seen_events: + event.rate += 1 + else: + self.events.append(event) + self.seen_events.add(event.url) + self.count += 1 + if self.count >= self.batch: + self.backend.cache_put(self.events) + self._reset_cache() + finally: + message.ack() def consume(self): - def process_message_fn(b, m, backend=self.backend, debug=self.debug): - return process_message(b, m, backend=backend, debug=debug) - with Connection(self.conn_string) as conn: - with conn.Consumer(self.queue, callbacks=[process_message_fn], + with conn.Consumer(self.queue, callbacks=[self.process_message], auto_declare=True): while True: conn.drain_events() @click.command() def main(): """Consume events from ghtorrent """ GHTorrentConsumer().consume() if __name__ == '__main__': main()