diff --git a/sql/updater/sql/swh-func.sql b/sql/updater/sql/swh-func.sql index cbde2ed..786dee1 100644 --- a/sql/updater/sql/swh-func.sql +++ b/sql/updater/sql/swh-func.sql @@ -1,48 +1,48 @@ -- Postgresql index helper function create or replace function hash_sha1(text) returns sha1 as $$ select public.digest($1, 'sha1') :: sha1 $$ language sql strict immutable; comment on function hash_sha1(text) is 'Compute sha1 hash as text'; -- create a temporary table for cache tmp_cache, create or replace function swh_mktemp_cache() returns void language sql as $$ create temporary table tmp_cache ( like cache including defaults ) on commit drop; alter table tmp_cache drop column id; $$; create or replace function swh_cache_put() returns void language plpgsql as $$ begin - insert into cache (id, url, origin_type, rate, last_seen) - select hash_sha1(url), url, origin_type, rate, last_seen + insert into cache (id, url, origin_type, cnt, last_seen) + select hash_sha1(url), url, origin_type, cnt, last_seen from tmp_cache t on conflict(id) - do update set rate = (select rate from cache where id=excluded.id) + excluded.rate, + do update set cnt = (select cnt from cache where id=excluded.id) + excluded.cnt, last_seen = excluded.last_seen; return; end $$; comment on function swh_cache_put() is 'Write to cache temporary events'; create or replace function swh_cache_read(ts timestamptz, lim integer) returns setof cache language sql stable as $$ - select id, url, origin_type, rate, first_seen, last_seen + select id, url, origin_type, cnt, first_seen, last_seen from cache where last_seen <= ts limit lim; $$; comment on function swh_cache_read(timestamptz, integer) is 'Read cache entries'; diff --git a/sql/updater/sql/swh-schema.sql b/sql/updater/sql/swh-schema.sql index 2f6736f..5833df8 100644 --- a/sql/updater/sql/swh-schema.sql +++ b/sql/updater/sql/swh-schema.sql @@ -1,29 +1,29 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); insert into dbversion (version, release, description) values (1, now(), 'Work In Progress'); create type origin_type as enum ('git', 'svn', 'hg', 'deb'); comment on type origin_type is 'Url''s repository type'; create table cache ( id sha1 primary key, url text not null, origin_type origin_type not null, - rate int default 1, + cnt int default 1, first_seen timestamptz not null default now(), last_seen timestamptz not null ); create index on cache(url); create index on cache(last_seen); diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index fc8ad0d..f82d9b0 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,171 +1,171 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given from hypothesis.strategies import sampled_from, from_regex from nose.tools import istest from unittest.mock import patch from swh.scheduler.tests.updater import UpdaterTestUtil from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import ( events, GHTorrentConsumer, INTERESTING_EVENT_KEYS) def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], - 'rate': 1, + 'cnt': 1, 'origin_type': 'git', } self.assertEqual(event, expected_event) @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(INTERESTING_EVENT_KEYS)) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) actual_converted_event = self.consumer.convert_event(input_event) self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') @istest def consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo', 'git'), self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index 1b898e7..34cd1c1 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,78 +1,78 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow from swh.core.config import SWHConfig from swh.scheduler.backend import DbBackend, autocommit class SchedulerUpdaterBackend(SWHConfig, DbBackend): CONFIG_BASE_FILENAME = 'scheduler-updater' DEFAULT_CONFIG = { 'scheduling_updater_db': ( 'str', 'dbname=softwareheritage-scheduler-updater-dev'), 'cache_read_limit': ('int', 1000), } def __init__(self, **override_config): super().__init__() if override_config: self.config = override_config else: self.config = self.parse_config_file(global_config=False) self.db = None self.db_conn_dsn = self.config['scheduling_updater_db'] self.limit = self.config['cache_read_limit'] self.reconnect() - cache_put_keys = ['url', 'rate', 'last_seen', 'origin_type'] + cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] @autocommit def cache_put(self, events, timestamp=None, cursor=None): """Write new events in the backend. """ if timestamp is None: timestamp = utcnow() def prepare_events(events): for e in events: event = e.get() seen = event['last_seen'] if seen is None: event['last_seen'] = timestamp yield event cursor.execute('select swh_mktemp_cache()') self.copy_to(prepare_events(events), 'tmp_cache', self.cache_put_keys, cursor) cursor.execute('select swh_cache_put()') - cache_read_keys = ['id', 'url', 'origin_type', 'rate', 'first_seen', + cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', 'last_seen'] @autocommit def cache_read(self, timestamp, limit=None, cursor=None): """Read events from the cache prior to timestamp. """ if not limit: limit = self.limit q = self._format_query('select {keys} from swh_cache_read(%s, %s)', self.cache_read_keys) cursor.execute(q, (timestamp, limit)) for r in cursor.fetchall(): r['id'] = r['id'].tobytes() yield r @autocommit def cache_remove(self, entries, cursor=None): """Clean events from the cache """ q = 'delete from cache where url in (%s)' % ( ', '.join(("'%s'" % e for e in entries)), ) cursor.execute(q) diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index 80ca656..e0a0760 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,140 +1,140 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from abc import ABCMeta, abstractmethod from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): super().__init__() self._reset_cache() self.backend = backend_class() self.batch = batch logging.basicConfig(level=logging.DEBUG) self.log = logging.getLogger(log_class) def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args: event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass def process_event(self, event): """Process converted and interesting event. Args: event (SWHEvent): Event to process if deemed interesting """ try: if event.url in self.seen_events: - event.rate += 1 + event.cnt += 1 else: self.events.append(event) self.seen_events.add(event.url) self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ try: self.open_connection() while self.has_events(): for _event in self.consume_events(): event = self.convert_event(_event) if not event: self.log.warn( 'Incomplete event dropped %s' % _event) continue if not self.is_interesting(event): continue if self.debug: self.log.debug('Event: %s' % event) try: self.process_event(event) except Exception: self.log.exception( 'Problem when processing event %s' % _event) continue except Exception as e: self.log.error('Error raised during consumption: %s' % e) raise e finally: self.close_connection() self._flush() diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py index 2858d33..d70efbe 100644 --- a/swh/scheduler/updater/events.py +++ b/swh/scheduler/updater/events.py @@ -1,39 +1,39 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information LISTENED_EVENTS = [ 'delete', 'public', 'push' ] class SWHEvent: """SWH's interesting event (resulting in an origin update) """ - def __init__(self, evt, rate=1): + def __init__(self, evt, cnt=1): self.event = evt self.type = evt['type'].lower() self.url = evt['url'] self.last_seen = evt.get('last_seen') - self.rate = rate + self.cnt = cnt self.origin_type = evt.get('origin_type') def is_interesting(self): return self.type in LISTENED_EVENTS def get(self): return { 'type': self.type, 'url': self.url, 'last_seen': self.last_seen, - 'rate': self.rate, + 'cnt': self.cnt, 'origin_type': self.origin_type } def __str__(self): return str(self.get()) diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py index ba53c4b..8c371b7 100644 --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -1,138 +1,138 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import time from arrow import utcnow from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler from swh.scheduler.updater.backend import SchedulerUpdaterBackend class UpdaterWriter(SWHConfig): """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks In effect, this: - reads the events from scheduler updater's db - converts those events into priority oneshot tasks - dumps them into the scheduler db """ CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' DEFAULT_CONFIG = { 'scheduler': ('dict', { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', }, }), 'scheduler_updater': ('dict', { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-dev', # noqa 'cache_read_limit': 1000, }), 'pause': ('int', 10), } def __init__(self, **config): if config: self.config = config else: self.config = self.parse_config_file() self.scheduler_updater_backend = SchedulerUpdaterBackend( **self.config['scheduler_updater']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) self.pause = self.config['pause'] self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') self.log.setLevel(logging.DEBUG) - def _compute_priority(self, rate): + def _compute_priority(self, cnt): """Given a ratio, compute the task priority. """ - if rate < 5: + if cnt < 5: return 'low' - elif rate < 50: + elif cnt < 50: return 'normal' else: return 'high' def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority Args: event (dict): The event to convert to task """ if event['origin_type'] == 'git': return { 'type': 'origin-update-git', 'arguments': { 'args': [event['url']], 'kwargs': {}, }, 'next_run': utcnow(), 'policy': 'oneshot', 'retries_left': 2, - 'priority': self._compute_priority(event['rate']), + 'priority': self._compute_priority(event['cnt']), } else: self.log.warn('Type %s is not supported for now, only git' % ( event['type'], )) return None def write_event_to_scheduler(self, events): """Write events to the scheduler and yield ids when done""" for event in events: # convert event to oneshot task oneshot_task = self.convert_to_oneshot_task(event) if not oneshot_task: continue # write event to scheduler # FIXME: deal with this in batch r = self.scheduler_backend.create_tasks([oneshot_task]) if r: yield event['url'] def run(self): - """First retrieve events from cache (including origin_type, rate), + """First retrieve events from cache (including origin_type, cnt), then convert them into oneshot tasks with priority, then write them to the scheduler db, at last remove them from cache. """ while True: timestamp = utcnow() events = self.scheduler_updater_backend.cache_read(timestamp) if not events: time.sleep(self.pause) continue for urls in utils.grouper(self.write_event_to_scheduler(events), n=100): self.scheduler_updater_backend.cache_remove(urls) @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') def main(verbose): log = logging.getLogger('swh.scheduler.updater.writer') log.addHandler(logging.StreamHandler()) _loglevel = logging.DEBUG if verbose else logging.INFO log.setLevel(_loglevel) UpdaterWriter().run() if __name__ == '__main__': main()