diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -633,5 +633,34 @@ click.echo('OK') +@cli.command('updater') +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +@click.pass_context +def updater(ctx, verbose): + """Insert tasks in the scheduler from the scheduler-updater's events + + """ + from swh.scheduler.updater.writer import UpdaterWriter + UpdaterWriter(**ctx.obj['config']).run() + + +@cli.command('ghtorrent') +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +@click.pass_context +def ghtorrent(ctx, verbose): + """Consume events from ghtorrent and write them to cache. + + """ + from swh.scheduler.updater.ghtorrent import GHTorrentConsumer + from swh.scheduler.updater.backend import SchedulerUpdaterBackend + + ght_config = ctx.obj['config'].get('ghtorrent', {}) + back_config = ctx.obj['config'].get('scheduler_updater', {}) + backend = SchedulerUpdaterBackend(**back_config) + GHTorrentConsumer(backend, **ght_config).run() + + if __name__ == '__main__': cli() diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -27,7 +27,7 @@ def setUp(self): super().setUp() config = { - 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, + 'db': 'dbname=' + self.TEST_DB_NAME, 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) @@ -42,7 +42,6 @@ self.conn.commit() def tearDown(self): - self.backend.close_connection() self._empty_tables() super().tearDown() @@ -58,8 +57,6 @@ 'type': 'create', 'origin_type': 'git', }) - self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) - self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -24,8 +24,8 @@ class FakeUpdaterConsumerBase(UpdaterConsumer): - def __init__(self, backend_class=FakeSchedulerUpdaterBackend): - super().__init__(backend_class=backend_class) + def __init__(self, backend): + super().__init__(backend) self.connection_opened = False self.connection_closed = False self.consume_called = False @@ -53,7 +53,8 @@ class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): - self.updater = FakeUpdaterConsumerRaise() + self.updater = FakeUpdaterConsumerRaise( + FakeSchedulerUpdaterBackend()) def test_running_raise(self): """Raising during run should finish fine. @@ -89,7 +90,8 @@ class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): - self.updater = FakeUpdaterConsumerNoEvent() + self.updater = FakeUpdaterConsumerNoEvent( + FakeSchedulerUpdaterBackend()) def test_running_does_not_consume(self): """Run with no events should do just fine""" @@ -115,8 +117,8 @@ class FakeUpdaterConsumer(FakeUpdaterConsumerBase): - def __init__(self, messages): - super().__init__() + def __init__(self, backend, messages): + super().__init__(backend) self.messages = messages self.debug = False @@ -170,9 +172,11 @@ ready_incomplete_events = self._make_incomplete_events( incomplete_events) - updater = FakeUpdaterConsumer(list(chain( - ready_events, ready_incomplete_events, - ready_uninteresting_events))) + updater = FakeUpdaterConsumer( + FakeSchedulerUpdaterBackend(), + list(chain( + ready_events, ready_incomplete_events, + ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -12,6 +12,7 @@ from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS, GHTorrentConsumer, events) +from swh.scheduler.updater.backend import SchedulerUpdaterBackend from . import UpdaterTestUtil, from_regex @@ -60,28 +61,35 @@ class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): - self.fake_config = { - 'conn': { - 'url': 'amqp://u:p@https://somewhere:9807', + config = { + 'ghtorrent': { + 'rabbitmq': { + 'conn': { + 'url': 'amqp://u:p@https://somewhere:9807', + }, + 'prefetch_read': 17, + }, + 'batch_cache_write': 42, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-updater-dev', + }, }, - 'debug': True, - 'batch_cache_write': 10, - 'rabbitmq_prefetch_read': 100, } - self.consumer = GHTorrentConsumer(self.fake_config, - _connection_class=FakeConnection) + GHTorrentConsumer.connection_class = FakeConnection + with patch.object( + SchedulerUpdaterBackend, '__init__', return_value=None): + self.consumer = GHTorrentConsumer(**config) - def test_init(self): + @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend') + def test_init(self, mock_backend): # given # check init is ok - self.assertEqual(self.consumer.debug, - self.fake_config['debug']) - self.assertEqual(self.consumer.batch, - self.fake_config['batch_cache_write']) - self.assertEqual(self.consumer.prefetch_read, - self.fake_config['rabbitmq_prefetch_read']) - self.assertEqual(self.consumer.config, self.fake_config) + self.assertEqual(self.consumer.batch, 42) + self.assertEqual(self.consumer.prefetch_read, 17) def test_has_events(self): self.assertTrue(self.consumer.has_events()) @@ -92,7 +100,7 @@ # then self.assertEqual(self.consumer.conn._conn_string, - self.fake_config['conn']['url']) + 'amqp://u:p@https://somewhere:9807') self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) @@ -130,8 +138,10 @@ input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) + logger = self.consumer.log + del self.consumer.log # prevent gazillions of warnings actual_converted_event = self.consumer.convert_event(input_event) - + self.consumer.log = logger self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') @@ -160,5 +170,5 @@ self.consumer.channel, 'fake-queue', no_ack=False, - limit=self.fake_config['rabbitmq_prefetch_read'] + limit=17 ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -52,26 +52,26 @@ 'scheduler': { 'cls': 'local', 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-test', + 'db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { - 'scheduling_updater_db': - 'dbname=softwareheritage-scheduler-updater-test', - 'cache_read_limit': 5, + 'cls': 'local', + 'args': { + 'db': + 'dbname=softwareheritage-scheduler-updater-test', + 'cache_read_limit': 5, + }, + }, + 'updater_writer': { + 'pause': 0.1, + 'verbose': False, }, - 'pause': 0.1, - 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend - def tearDown(self): - self.scheduler_backend.close_connection() - self.scheduler_updater_backend.close_connection() - super().tearDown() - def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -5,33 +5,43 @@ from arrow import utcnow -from swh.core.config import SWHConfig -from swh.scheduler.backend import DbBackend, autocommit +import psycopg2.pool +import psycopg2.extras +from swh.scheduler.backend import DbBackend +from swh.core.db.common import db_transaction, db_transaction_generator -class SchedulerUpdaterBackend(SWHConfig, DbBackend): +class SchedulerUpdaterBackend: CONFIG_BASE_FILENAME = 'backend/scheduler-updater' - DEFAULT_CONFIG = { - 'scheduling_updater_db': ( - 'str', 'dbname=softwareheritage-scheduler-updater-dev'), - 'cache_read_limit': ('int', 1000), - } - - def __init__(self, **override_config): - super().__init__() - if override_config: - self.config = override_config +# 'cache_read_limit': ('int', 1000), + + def __init__(self, db, cache_read_limit=1000, + min_pool_conns=1, max_pool_conns=10): + """ + Args: + db_conn: either a libpq connection string, or a psycopg2 connection + + """ + if isinstance(db, psycopg2.extensions.connection): + self._pool = None + self._db = DbBackend(db) else: - self.config = self.parse_config_file(global_config=False) - self.db = None - self.db_conn_dsn = self.config['scheduling_updater_db'] - self.limit = self.config['cache_read_limit'] - self.reconnect() + self._pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, db, + cursor_factory=psycopg2.extras.RealDictCursor, + ) + self._db = None + self.limit = cache_read_limit + + def get_db(self): + if self._db: + return self._db + return DbBackend.from_pool(self._pool) cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] - @autocommit - def cache_put(self, events, timestamp=None, cursor=None): + @db_transaction() + def cache_put(self, events, timestamp=None, db=None, cur=None): """Write new events in the backend. """ @@ -45,17 +55,16 @@ if seen is None: event['last_seen'] = timestamp yield event - - cursor.execute('select swh_mktemp_cache()') - self.copy_to(prepare_events(events), - 'tmp_cache', self.cache_put_keys, cursor=cursor) - cursor.execute('select swh_cache_put()') + cur.execute('select swh_mktemp_cache()') + db.copy_to(prepare_events(events), + 'tmp_cache', self.cache_put_keys, cursor=cur) + cur.execute('select swh_cache_put()') cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', 'last_seen'] - @autocommit - def cache_read(self, timestamp=None, limit=None, cursor=None): + @db_transaction_generator() + def cache_read(self, timestamp=None, limit=None, db=None, cur=None): """Read events from the cache prior to timestamp. """ @@ -65,18 +74,18 @@ if not limit: limit = self.limit - q = self._format_query('select {keys} from swh_cache_read(%s, %s)', - self.cache_read_keys) - cursor.execute(q, (timestamp, limit)) - for r in cursor.fetchall(): + q = db._format_query('select {keys} from swh_cache_read(%s, %s)', + self.cache_read_keys) + cur.execute(q, (timestamp, limit)) + for r in cur.fetchall(): r['id'] = r['id'].tobytes() yield r - @autocommit - def cache_remove(self, entries, cursor=None): + @db_transaction() + def cache_remove(self, entries, db=None, cur=None): """Clean events from the cache """ q = 'delete from cache where url in (%s)' % ( ', '.join(("'%s'" % e for e in entries)), ) - cursor.execute(q) + cur.execute(q) diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -7,21 +7,19 @@ from abc import ABCMeta, abstractmethod -from swh.scheduler.updater.backend import SchedulerUpdaterBackend - class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ - def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, - log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): + def __init__(self, backend, batch_cache_write=1000): super().__init__() self._reset_cache() - self.backend = backend_class() - self.batch = batch + self.backend = backend + self.batch = int(batch_cache_write) logging.basicConfig(level=logging.DEBUG) - self.log = logging.getLogger(log_class) + self.log = logging.getLogger('%s.%s' % ( + self.__class__.__module__, self.__class__.__name__)) def _reset_cache(self): """Reset internal cache. diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -8,9 +8,11 @@ from kombu import Connection, Exchange, Queue from kombu.common import collect_replies -from swh.core.config import SWHConfig +from swh.core.config import merge_configs + from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer +from swh.scheduler.updater.backend import SchedulerUpdaterBackend events = { @@ -32,65 +34,60 @@ ] } +INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] -class RabbitMQConn(SWHConfig): - """RabbitMQ Connection class +DEFAULT_CONFIG = { + 'ghtorrent': { + 'batch_cache_write': 1000, + 'rabbitmq': { + 'prefetch_read': 100, + 'conn': { + 'url': 'amqp://guest:guest@localhost:5672', + 'exchange_name': 'ght-streams', + 'routing_key': 'something', + 'queue_name': 'fake-events', + }, + }, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-updater-dev', + 'cache_read_limit': 1000, + }, + }, +} - """ - CONFIG_BASE_FILENAME = 'backend/ghtorrent' - - DEFAULT_CONFIG = { - 'conn': ('dict', { - 'url': 'amqp://guest:guest@localhost:5672', - 'exchange_name': 'ght-streams', - 'routing_key': 'something', - 'queue_name': 'fake-events' - }) - } - ADDITIONAL_CONFIG = {} +class GHTorrentConsumer(UpdaterConsumer): + """GHTorrent events consumer + + """ + connection_class = Connection def __init__(self, **config): - super().__init__() - if config and set(config.keys()) - {'log_class'} != set(): - self.config = config - else: - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - - self.conn_string = self.config['conn']['url'] - self.exchange = Exchange(self.config['conn']['exchange_name'], - 'topic', durable=True) - self.routing_key = self.config['conn']['routing_key'] - self.queue = Queue(self.config['conn']['queue_name'], - exchange=self.exchange, - routing_key=self.routing_key, + self.config = merge_configs(DEFAULT_CONFIG, config) + + ght_config = self.config['ghtorrent'] + rmq_config = ght_config['rabbitmq'] + self.prefetch_read = int(rmq_config.get('prefetch_read', 100)) + + exchange = Exchange( + rmq_config['conn']['exchange_name'], + 'topic', durable=True) + routing_key = rmq_config['conn']['routing_key'] + self.queue = Queue(rmq_config['conn']['queue_name'], + exchange=exchange, + routing_key=routing_key, auto_delete=True) + if self.config['scheduler_updater']['cls'] != 'local': + raise ValueError( + 'The scheduler_updater can only be a cls=local for now') + backend = SchedulerUpdaterBackend( + **self.config['scheduler_updater']['args']) -INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] - - -class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): - """GHTorrent events consumer - - """ - ADDITIONAL_CONFIG = { - 'debug': ('bool', False), - 'batch_cache_write': ('int', 1000), - 'rabbitmq_prefetch_read': ('int', 100), - } - - def __init__(self, config=None, _connection_class=Connection): - if config is None: - super().__init__( - log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') - else: - self.config = config - self._connection_class = _connection_class - self.debug = self.config['debug'] - self.batch = self.config['batch_cache_write'] - self.prefetch_read = self.config['rabbitmq_prefetch_read'] + super().__init__(backend, ght_config.get('batch_cache_write', 1000)) def has_events(self): """Always has events @@ -104,11 +101,10 @@ """ if isinstance(event, str): event = json.loads(event) - for k in INTERESTING_EVENT_KEYS: if k not in event: if hasattr(self, 'log'): - self.log.warn( + self.log.warning( 'Event should have the \'%s\' entry defined' % k) return None @@ -126,7 +122,8 @@ """Open rabbitmq connection """ - self.conn = self._connection_class(self.config['conn']['url']) + self.conn = self.connection_class( + self.config['ghtorrent']['rabbitmq']['conn']['url']) self.conn.connect() self.channel = self.conn.channel() diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py --- a/swh/scheduler/updater/ghtorrent/cli.py +++ b/swh/scheduler/updater/ghtorrent/cli.py @@ -4,24 +4,19 @@ # See top-level LICENSE file for more information import click -import logging - -from swh.scheduler.updater.ghtorrent import GHTorrentConsumer @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') -def main(verbose): +@click.pass_context +def main(ctx, verbose): """Consume events from ghtorrent and write them to cache. """ - log = logging.getLogger('swh.scheduler.updater.ghtorrent.cli') - log.addHandler(logging.StreamHandler()) - _loglevel = logging.DEBUG if verbose else logging.INFO - log.setLevel(_loglevel) - - GHTorrentConsumer().run() + click.echo("Deprecated! Use 'swh-scheduler updater' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -9,14 +9,13 @@ from arrow import utcnow -from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.scheduler.updater.backend import SchedulerUpdaterBackend -class UpdaterWriter(SWHConfig): +class UpdaterWriter: """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks @@ -26,41 +25,18 @@ - dumps them into the scheduler db """ - CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' - DEFAULT_CONFIG = { - # access to the scheduler backend - 'scheduler': ('dict', { - 'cls': 'local', - 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', - }, - }), - # access to the scheduler updater cache - 'scheduler_updater': ('dict', { - 'scheduling_updater_db': - 'dbname=softwareheritage-scheduler-updater-dev', - 'cache_read_limit': 1000, - }), - # waiting time between db reads - 'pause': ('int', 10), - # verbose or not - 'verbose': ('bool', False), - } def __init__(self, **config): - if config: - self.config = config - else: - self.config = self.parse_config_file() - + self.config = config + if self.config['scheduler_updater']['cls'] != 'local': + raise ValueError( + 'The scheduler_updater can only be a cls=local for now') self.scheduler_updater_backend = SchedulerUpdaterBackend( - **self.config['scheduler_updater']) + **self.config['scheduler_updater']['args']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) - self.pause = self.config['pause'] + self.pause = self.config.get('updater_writer', {}).get('pause', 10) self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') - self.log.setLevel( - logging.DEBUG if self.config['verbose'] else logging.INFO) def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority @@ -109,13 +85,11 @@ @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') -def main(verbose): - log = logging.getLogger('swh.scheduler.updater.writer') - log.addHandler(logging.StreamHandler()) - _loglevel = logging.DEBUG if verbose else logging.INFO - log.setLevel(_loglevel) - - UpdaterWriter().run() +@click.pass_context +def main(ctx, verbose): + click.echo("Deprecated! Use 'swh-scheduler updater' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__':