diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index c4f61d2..ae131fa 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -1,637 +1,666 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import click import csv import itertools import json import locale import logging import time from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient from . import get_scheduler, DEFAULT_CONFIG locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] class DateTimeType(click.ParamType): name = 'time and date' def convert(self, value, param, ctx): if not isinstance(value, arrow.Arrow): value = arrow.get(value) return value DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def pretty_print_list(list, indent): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) def pretty_print_task(task, full=False): """Pretty-print a task If 'full' is True, also print the status and priority fields. """ next_run = arrow.get(task['next_run']) lines = [ '%s %s\n' % (click.style('Task', bold=True), task['id']), click.style(' Next run: ', bold=True), "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', click.style(' Type: ', bold=True), task['type'] or '', '\n', click.style(' Policy: ', bold=True), task['policy'] or '', '\n', ] if full: lines += [ click.style(' Status: ', bold=True), task['status'] or '', '\n', click.style(' Priority: ', bold=True), task['priority'] or '', '\n', ] lines += [ click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), pretty_print_dict(task['arguments']['kwargs'], indent=4), ] return ''.join(lines) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--database', '-d', default=None, help="Scheduling database DSN (imply cls is 'local')") @click.option('--url', '-u', default=None, help="Scheduler's url access (imply cls is 'remote')") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, cls, database, url, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ from swh.scheduler.celery_backend.config import setup_log_handler log_level = setup_log_handler( loglevel=log_level, colorize=False, format='[%(levelname)s] %(name)s -- %(message)s') ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if 'scheduler' not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf['scheduler']['cls'] = 'local' conf['scheduler']['args']['db'] = database elif url: conf['scheduler']['cls'] = 'local' conf['scheduler']['args']['url'] = url sched_conf = conf['scheduler'] try: logger.debug('Instanciating scheduler with %s' % ( sched_conf)) scheduler = get_scheduler(**sched_conf) except Exception: # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj['scheduler'] = scheduler ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.group('task') @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command('schedule') @click.option('--columns', '-c', multiple=True, default=['type', 'args', 'kwargs', 'next_run'], type=click.Choice([ 'type', 'args', 'kwargs', 'policy', 'next_run']), help='columns present in the CSV file') @click.option('--delimiter', '-d', default=',') @click.argument('file', type=click.File(encoding='utf-8')) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ tasks = [] now = arrow.utcnow() scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop('args', '[]')) kwargs = json.loads(task.pop('kwargs', '{}')) task['arguments'] = { 'args': args, 'kwargs': kwargs, } task['next_run'] = DATETIME.convert(task.get('next_run', now), None, None) tasks.append(task) created = scheduler.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('add') @click.argument('type', nargs=1, required=True) @click.argument('options', nargs=-1) @click.option('--policy', '-p', default='recurring', type=click.Choice(['recurring', 'oneshot'])) @click.option('--priority', '-P', default=None, type=click.Choice(['low', 'normal', 'high'])) @click.option('--next-run', '-n', default=None) @click.pass_context def schedule_task(ctx, type, options, policy, priority, next_run): """Schedule one task from arguments. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-pypi swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-debian --policy=oneshot distribution=stretch Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') now = arrow.utcnow() args = [x for x in options if '=' not in x] kw = dict(x.split('=', 1) for x in options if '=' in x) task = {'type': type, 'policy': policy, 'priority': priority, 'arguments': { 'args': args, 'kwargs': kw, }, 'next_run': DATETIME.convert(next_run or now, None, None), } created = scheduler.create_tasks([task]) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch') @click.option('--before', '-b', required=False, type=DATETIME, help='List all jobs supposed to run before the given date') @click.pass_context def list_pending_tasks(ctx, task_types, limit, before): """List the tasks that are going to be run. You can override the number of tasks to fetch """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) output.append('Found %d %s tasks\n' % ( len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list') @click.option('--task-id', '-i', default=None, multiple=True, metavar='ID', help='List only tasks whose id is ID.') @click.option('--task-type', '-t', default=None, multiple=True, metavar='TYPE', help='List only tasks of type TYPE') @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch.') @click.option('--status', '-s', multiple=True, metavar='STATUS', default=None, help='List tasks whose status is STATUS.') @click.option('--policy', '-p', default=None, type=click.Choice(['recurring', 'oneshot']), help='List tasks whose policy is POLICY.') @click.option('--priority', '-P', default=None, multiple=True, type=click.Choice(['all', 'low', 'normal', 'high']), help='List tasks whose priority is PRIORITY.') @click.option('--before', '-b', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run before the given date.') @click.option('--after', '-a', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run after the given date.') @click.pass_context def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, before, after): """List tasks. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') if not task_type: task_type = [x['type'] for x in scheduler.get_task_types()] # if task_id is not given, default value for status is # 'next_run_not_scheduled' # if task_id is given, default status is 'all' if task_id is None and status is None: status = ['next_run_not_scheduled'] if status and 'all' in status: status = None if priority and 'all' in priority: priority = None output = [] tasks = scheduler.search_tasks( task_id=task_id, task_type=task_type, status=status, priority=priority, policy=policy, before=before, after=after, limit=limit) output.append('Found %d tasks\n' % ( len(tasks))) for task in tasks: output.append(pretty_print_task(task, full=True)) click.echo('\n'.join(output)) @task.command('respawn') @click.argument('task-ids', required=True, nargs=-1) @click.option('--next-run', '-n', required=False, type=DATETIME, metavar='DATETIME', default=None, help='Re spawn the selected tasks at this date') @click.pass_context def respawn_tasks(ctx, task_ids, next_run): """Respawn tasks. Respawn tasks given by their ids (see the 'task list' command to find task ids) at the given date (immediately by default). Eg. swh-scheduler task respawn 1 3 12 """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') if next_run is None: next_run = arrow.utcnow() output = [] scheduler.set_status_tasks( task_ids, status='next_run_not_scheduled', next_run=next_run) output.append('Respawn tasks %s\n' % (task_ids,)) click.echo('\n'.join(output)) @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived. Default to current month's first day.''') @click.option('--after', '-a', default=None, help='''Task whose ended date is after the specified date will be archived. Default to prior month's first day.''') @click.option('--batch-index', default=1000, type=click.INT, help='Batch size of tasks to read from db to archive') @click.option('--bulk-index', default=200, type=click.INT, help='Batch size of tasks to bulk index') @click.option('--batch-clean', default=1000, type=click.INT, help='Batch size of task to clean after archival') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--verbose', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') @click.option('--start-from', type=click.INT, default=-1, help='(Optional) default task id to start from. Default is -1.') @click.pass_context def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): """Archive task/task_run whose (task_type is 'oneshot' and task_status is 'completed') or (task_type is 'recurring' and task_status is 'disabled'). With --dry-run flag set (default), only list those. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('elasticsearch').setLevel(logging.WARN) if dry_run: log.info('**DRY-RUN** (only reading db)') if not cleanup: log.info('**NO CLEANUP**') now = arrow.utcnow() # Default to archive tasks from a rolling month starting the week # prior to the current one if not before: before = now.shift(weeks=-1).format('YYYY-MM-DD') if not after: after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) def group_by_index_name(data, es_client=es_client): """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. """ date = data.get('started') if not date: date = data['scheduled'] return es_client.compute_index_name(date.year, date.month) def index_data(before, last_id, batch_index): tasks_in = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: yield task continue yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], chunk_size=bulk_index, log=log) gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: for task_ids in utils.grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @cli.command('runner') @click.option('--period', '-p', default=0, help=('Period (in s) at witch pending tasks are checked and ' 'executed. Set to 0 (default) for a one shot.')) @click.pass_context def runner(ctx, period): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.scheduler.celery_backend.config import app logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] logger.debug('Scheduler %s' % scheduler) try: while True: logger.debug('Run ready tasks') try: ntasks = len(run_ready_tasks(scheduler, app)) if ntasks: logger.info('Scheduled %s tasks', ntasks) except Exception: scheduler.rollback() logger.exception('Unexpected error in run_ready_tasks()') if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command('listener') @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') from swh.scheduler.celery_backend.listener import ( event_monitor, main_app) event_monitor(main_app, backend=scheduler) @cli.command('api-server') @click.option('--host', default='0.0.0.0', help="Host to run the scheduler server api") @click.option('--port', default=5008, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=None, help=("Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise.") ) @click.pass_context def api_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj['config']['scheduler']['cls'] == 'remote': click.echo("The API server can only be started with a 'local' " "configuration", err=True) ctx.exit(1) from swh.scheduler.api import server server.app.scheduler = ctx.obj['scheduler'] server.app.config.update(ctx.obj['config']) if debug is None: debug = ctx.obj['loglevel'] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) @cli.group('task-type') @click.pass_context def task_type(ctx): """Manipulate task types.""" pass @task_type.command('list') @click.option('--verbose', '-v', is_flag=True, default=False) @click.option('--task_type', '-t', multiple=True, default=None, help='List task types of given type') @click.option('--task_name', '-n', multiple=True, default=None, help='List task types of given backend task name') @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = click.style('{type}: ', bold=True) + '''{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} ''' else: tmpl = '{type}:\n {description}' for tasktype in ctx.obj['scheduler'].get_task_types(): if task_type and tasktype['type'] not in task_type: continue if task_name and tasktype['backend_name'] not in task_name: continue click.echo(tmpl.format(**tasktype)) @task_type.command('add') @click.argument('type', required=1) @click.argument('task-name', required=1) @click.argument('description', required=1) @click.option('--default-interval', '-i', default='90 days', help='Default interval ("90 days" by default)') @click.option('--min-interval', default=None, help='Minimum interval (default interval if not set)') @click.option('--max-interval', '-i', default=None, help='Maximal interval (default interval if not set)') @click.option('--backoff-factor', '-f', type=float, default=1, help='Backoff factor') @click.pass_context def add_task_type(ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor): """Create a new task type """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) scheduler.create_task_type(task_type) click.echo('OK') +@cli.command('updater') +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +@click.pass_context +def updater(ctx, verbose): + """Insert tasks in the scheduler from the scheduler-updater's events + + """ + from swh.scheduler.updater.writer import UpdaterWriter + UpdaterWriter(**ctx.obj['config']).run() + + +@cli.command('ghtorrent') +@click.option('--verbose/--no-verbose', '-v', default=False, + help='Verbose mode') +@click.pass_context +def ghtorrent(ctx, verbose): + """Consume events from ghtorrent and write them to cache. + + """ + from swh.scheduler.updater.ghtorrent import GHTorrentConsumer + from swh.scheduler.updater.backend import SchedulerUpdaterBackend + + ght_config = ctx.obj['config'].get('ghtorrent', {}) + back_config = ctx.obj['config'].get('scheduler_updater', {}) + backend = SchedulerUpdaterBackend(**back_config) + GHTorrentConsumer(backend, **ght_config).run() + + if __name__ == '__main__': cli() diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index baa8205..4e8bfee 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,65 +1,62 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow from hypothesis import given from hypothesis.strategies import sets import pytest from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent from . import from_regex @pytest.mark.db class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' TEST_DB_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') def setUp(self): super().setUp() config = { - 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, + 'db': 'dbname=' + self.TEST_DB_NAME, 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): - self.backend.close_connection() self._empty_tables() super().tearDown() @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) def test_cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, 'type': 'create', 'origin_type': 'git', }) - self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) - self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index 1a2f2ac..93656a0 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,195 +1,199 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from itertools import chain from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists, sampled_from, text, tuples from swh.scheduler.updater.consumer import UpdaterConsumer from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from . import UpdaterTestUtil, from_regex class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): - def __init__(self, backend_class=FakeSchedulerUpdaterBackend): - super().__init__(backend_class=backend_class) + def __init__(self, backend): + super().__init__(backend) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): - self.updater = FakeUpdaterConsumerRaise() + self.updater = FakeUpdaterConsumerRaise( + FakeSchedulerUpdaterBackend()) def test_running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): - self.updater = FakeUpdaterConsumerNoEvent() + self.updater = FakeUpdaterConsumerNoEvent( + FakeSchedulerUpdaterBackend()) def test_running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] class FakeUpdaterConsumer(FakeUpdaterConsumerBase): - def __init__(self, messages): - super().__init__() + def __init__(self, backend, messages): + super().__init__(backend) self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() def convert_event(self, event, keys=EVENT_KEYS): for k in keys: v = event.get(k) if v is None: return None e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['repo']['name'], 'last_seen': event['created_at'], 'origin_type': event['origin_type'], } return SWHEvent(e) class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): @settings(suppress_health_check=[HealthCheck.too_slow]) @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(text(), # event type from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text(), # origin type sampled_from(EVENT_KEYS)), # keys to drop min_size=3, max_size=10)) def test_running(self, events, uninteresting_events, incomplete_events): """Interesting events are written to cache, others are dropped """ # given ready_events = self._make_events(events) ready_uninteresting_events = self._make_events(uninteresting_events) ready_incomplete_events = self._make_incomplete_events( incomplete_events) - updater = FakeUpdaterConsumer(list(chain( - ready_events, ready_incomplete_events, - ready_uninteresting_events))) + updater = FakeUpdaterConsumer( + FakeSchedulerUpdaterBackend(), + list(chain( + ready_events, ready_incomplete_events, + ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index b87e345..92cc89d 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,164 +1,174 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch from hypothesis import given from hypothesis.strategies import sampled_from from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS, GHTorrentConsumer, events) +from swh.scheduler.updater.backend import SchedulerUpdaterBackend from . import UpdaterTestUtil, from_regex def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): - self.fake_config = { - 'conn': { - 'url': 'amqp://u:p@https://somewhere:9807', + config = { + 'ghtorrent': { + 'rabbitmq': { + 'conn': { + 'url': 'amqp://u:p@https://somewhere:9807', + }, + 'prefetch_read': 17, + }, + 'batch_cache_write': 42, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-updater-dev', + }, }, - 'debug': True, - 'batch_cache_write': 10, - 'rabbitmq_prefetch_read': 100, } - self.consumer = GHTorrentConsumer(self.fake_config, - _connection_class=FakeConnection) + GHTorrentConsumer.connection_class = FakeConnection + with patch.object( + SchedulerUpdaterBackend, '__init__', return_value=None): + self.consumer = GHTorrentConsumer(**config) - def test_init(self): + @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend') + def test_init(self, mock_backend): # given # check init is ok - self.assertEqual(self.consumer.debug, - self.fake_config['debug']) - self.assertEqual(self.consumer.batch, - self.fake_config['batch_cache_write']) - self.assertEqual(self.consumer.prefetch_read, - self.fake_config['rabbitmq_prefetch_read']) - self.assertEqual(self.consumer.config, self.fake_config) + self.assertEqual(self.consumer.batch, 42) + self.assertEqual(self.consumer.prefetch_read, 17) def test_has_events(self): self.assertTrue(self.consumer.has_events()) def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, - self.fake_config['conn']['url']) + 'amqp://u:p@https://somewhere:9807') self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def test_convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'cnt': 1, 'origin_type': 'git', } self.assertEqual(event, expected_event) @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(INTERESTING_EVENT_KEYS)) def test_convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) + logger = self.consumer.log + del self.consumer.log # prevent gazillions of warnings actual_converted_event = self.consumer.convert_event(input_event) - + self.consumer.log = logger self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') def test_consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo', 'git'), self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, - limit=self.fake_config['rabbitmq_prefetch_read'] + limit=17 ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index 1d932a5..77dee54 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,158 +1,158 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from glob import glob import pytest from swh.core.utils import numfile_sortkey as sortkey from swh.core.tests.db_testing import DbTestFixture from swh.scheduler.tests import SQL_DIR from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent from swh.scheduler.updater.writer import UpdaterWriter from . import UpdaterTestUtil @pytest.mark.db class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' TEST_SCHED_DUMP = os.path.join(SQL_DIR, '*.sql') TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' TEST_SCHED_UPDATER_DUMP = os.path.join(SQL_DIR, 'updater', '*.sql') @classmethod def setUpClass(cls): cls.add_db(cls.TEST_SCHED_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_DUMP), key=sortkey)]) cls.add_db(cls.TEST_SCHED_UPDATER_DB, [(sqlfn, 'psql') for sqlfn in sorted(glob(cls.TEST_SCHED_UPDATER_DUMP), key=sortkey)]) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-test', + 'db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { - 'scheduling_updater_db': - 'dbname=softwareheritage-scheduler-updater-test', - 'cache_read_limit': 5, + 'cls': 'local', + 'args': { + 'db': + 'dbname=softwareheritage-scheduler-updater-test', + 'cache_read_limit': 5, + }, + }, + 'updater_writer': { + 'pause': 0.1, + 'verbose': False, }, - 'pause': 0.1, - 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend - def tearDown(self): - self.scheduler_backend.close_connection() - self.scheduler_updater_backend.close_connection() - super().tearDown() - def test_run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) def test_run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEqual(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEqual(t['type'], 'origin-update-git') self.assertEqual(t['priority'], 'normal') self.assertEqual(t['policy'], 'oneshot') self.assertEqual(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEqual(len(r), expected_length) diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py index 8193996..aabc5a5 100644 --- a/swh/scheduler/updater/backend.py +++ b/swh/scheduler/updater/backend.py @@ -1,82 +1,91 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow -from swh.core.config import SWHConfig -from swh.scheduler.backend import DbBackend, autocommit +import psycopg2.pool +import psycopg2.extras +from swh.scheduler.backend import DbBackend +from swh.core.db.common import db_transaction, db_transaction_generator -class SchedulerUpdaterBackend(SWHConfig, DbBackend): +class SchedulerUpdaterBackend: CONFIG_BASE_FILENAME = 'backend/scheduler-updater' - DEFAULT_CONFIG = { - 'scheduling_updater_db': ( - 'str', 'dbname=softwareheritage-scheduler-updater-dev'), - 'cache_read_limit': ('int', 1000), - } - - def __init__(self, **override_config): - super().__init__() - if override_config: - self.config = override_config +# 'cache_read_limit': ('int', 1000), + + def __init__(self, db, cache_read_limit=1000, + min_pool_conns=1, max_pool_conns=10): + """ + Args: + db_conn: either a libpq connection string, or a psycopg2 connection + + """ + if isinstance(db, psycopg2.extensions.connection): + self._pool = None + self._db = DbBackend(db) else: - self.config = self.parse_config_file(global_config=False) - self.db = None - self.db_conn_dsn = self.config['scheduling_updater_db'] - self.limit = self.config['cache_read_limit'] - self.reconnect() + self._pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, db, + cursor_factory=psycopg2.extras.RealDictCursor, + ) + self._db = None + self.limit = cache_read_limit + + def get_db(self): + if self._db: + return self._db + return DbBackend.from_pool(self._pool) cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] - @autocommit - def cache_put(self, events, timestamp=None, cursor=None): + @db_transaction() + def cache_put(self, events, timestamp=None, db=None, cur=None): """Write new events in the backend. """ if timestamp is None: timestamp = utcnow() def prepare_events(events): for e in events: event = e.get() seen = event['last_seen'] if seen is None: event['last_seen'] = timestamp yield event - - cursor.execute('select swh_mktemp_cache()') - self.copy_to(prepare_events(events), - 'tmp_cache', self.cache_put_keys, cursor=cursor) - cursor.execute('select swh_cache_put()') + cur.execute('select swh_mktemp_cache()') + db.copy_to(prepare_events(events), + 'tmp_cache', self.cache_put_keys, cursor=cur) + cur.execute('select swh_cache_put()') cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', 'last_seen'] - @autocommit - def cache_read(self, timestamp=None, limit=None, cursor=None): + @db_transaction_generator() + def cache_read(self, timestamp=None, limit=None, db=None, cur=None): """Read events from the cache prior to timestamp. """ if not timestamp: timestamp = utcnow() if not limit: limit = self.limit - q = self._format_query('select {keys} from swh_cache_read(%s, %s)', - self.cache_read_keys) - cursor.execute(q, (timestamp, limit)) - for r in cursor.fetchall(): + q = db._format_query('select {keys} from swh_cache_read(%s, %s)', + self.cache_read_keys) + cur.execute(q, (timestamp, limit)) + for r in cur.fetchall(): r['id'] = r['id'].tobytes() yield r - @autocommit - def cache_remove(self, entries, cursor=None): + @db_transaction() + def cache_remove(self, entries, db=None, cur=None): """Clean events from the cache """ q = 'delete from cache where url in (%s)' % ( ', '.join(("'%s'" % e for e in entries)), ) - cursor.execute(q) + cur.execute(q) diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py index e0465f2..a41404b 100644 --- a/swh/scheduler/updater/consumer.py +++ b/swh/scheduler/updater/consumer.py @@ -1,140 +1,138 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from abc import ABCMeta, abstractmethod -from swh.scheduler.updater.backend import SchedulerUpdaterBackend - class UpdaterConsumer(metaclass=ABCMeta): """Event consumer """ - def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend, - log_class='swh.scheduler.updater.consumer.UpdaterConsumer'): + def __init__(self, backend, batch_cache_write=1000): super().__init__() self._reset_cache() - self.backend = backend_class() - self.batch = batch + self.backend = backend + self.batch = int(batch_cache_write) logging.basicConfig(level=logging.DEBUG) - self.log = logging.getLogger(log_class) + self.log = logging.getLogger('%s.%s' % ( + self.__class__.__module__, self.__class__.__name__)) def _reset_cache(self): """Reset internal cache. """ self.count = 0 self.seen_events = set() self.events = [] def is_interesting(self, event): """Determine if an event is interesting or not. Args: event (SWHEvent): SWH event """ return event.is_interesting() @abstractmethod def convert_event(self, event): """Parse an event into an SWHEvent. """ pass def process_event(self, event): """Process converted and interesting event. Args: event (SWHEvent): Event to process if deemed interesting """ try: if event.url in self.seen_events: event.cnt += 1 else: self.events.append(event) self.seen_events.add(event.url) self.count += 1 finally: if self.count >= self.batch: if self.events: self.backend.cache_put(self.events) self._reset_cache() def _flush(self): """Flush remaining internal cache if any. """ if self.events: self.backend.cache_put(self.events) self._reset_cache() @abstractmethod def has_events(self): """Determine if there remains events to consume. Returns boolean value, true for remaining events, False otherwise """ pass @abstractmethod def consume_events(self): """The main entry point to consume events. This should either yield or return message for consumption. """ pass @abstractmethod def open_connection(self): """Open a connection to the remote system we are supposed to consume from. """ pass @abstractmethod def close_connection(self): """Close opened connection to the remote system. """ pass def run(self): """The main entry point to consume events. """ try: self.open_connection() while self.has_events(): for _event in self.consume_events(): event = self.convert_event(_event) if not event: self.log.warning( 'Incomplete event dropped %s' % _event) continue if not self.is_interesting(event): continue if self.debug: self.log.debug('Event: %s' % event) try: self.process_event(event) except Exception: self.log.exception( 'Problem when processing event %s' % _event) continue except Exception as e: self.log.error('Error raised during consumption: %s' % e) raise e finally: self.close_connection() self._flush() diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py index 2b9e5fc..8ece752 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ b/swh/scheduler/updater/ghtorrent/__init__.py @@ -1,146 +1,143 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from kombu import Connection, Exchange, Queue from kombu.common import collect_replies -from swh.core.config import SWHConfig +from swh.core.config import merge_configs + from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.consumer import UpdaterConsumer +from swh.scheduler.updater.backend import SchedulerUpdaterBackend events = { # ghtorrent events related to github events (interesting) 'evt': [ 'commitcomment', 'create', 'delete', 'deployment', 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', 'gist', 'gollum', 'issuecomment', 'issues', 'member', 'membership', 'pagebuild', 'public', 'pullrequest', 'pullrequestreviewcomment', 'push', 'release', 'repository', 'status', 'teamadd', 'watch' ], # ghtorrent events related to mongodb insert (not interesting) 'ent': [ 'commit_comments', 'commits', 'followers', 'forks', 'geo_cache', 'issue_comments', 'issue_events', 'issues', 'org_members', 'pull_request_comments', 'pull_requests', 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' ] } +INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] -class RabbitMQConn(SWHConfig): - """RabbitMQ Connection class +DEFAULT_CONFIG = { + 'ghtorrent': { + 'batch_cache_write': 1000, + 'rabbitmq': { + 'prefetch_read': 100, + 'conn': { + 'url': 'amqp://guest:guest@localhost:5672', + 'exchange_name': 'ght-streams', + 'routing_key': 'something', + 'queue_name': 'fake-events', + }, + }, + }, + 'scheduler_updater': { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-scheduler-updater-dev', + 'cache_read_limit': 1000, + }, + }, +} - """ - CONFIG_BASE_FILENAME = 'backend/ghtorrent' - - DEFAULT_CONFIG = { - 'conn': ('dict', { - 'url': 'amqp://guest:guest@localhost:5672', - 'exchange_name': 'ght-streams', - 'routing_key': 'something', - 'queue_name': 'fake-events' - }) - } - ADDITIONAL_CONFIG = {} +class GHTorrentConsumer(UpdaterConsumer): + """GHTorrent events consumer + + """ + connection_class = Connection def __init__(self, **config): - super().__init__() - if config and set(config.keys()) - {'log_class'} != set(): - self.config = config - else: - self.config = self.parse_config_file( - additional_configs=[self.ADDITIONAL_CONFIG]) - - self.conn_string = self.config['conn']['url'] - self.exchange = Exchange(self.config['conn']['exchange_name'], - 'topic', durable=True) - self.routing_key = self.config['conn']['routing_key'] - self.queue = Queue(self.config['conn']['queue_name'], - exchange=self.exchange, - routing_key=self.routing_key, + self.config = merge_configs(DEFAULT_CONFIG, config) + + ght_config = self.config['ghtorrent'] + rmq_config = ght_config['rabbitmq'] + self.prefetch_read = int(rmq_config.get('prefetch_read', 100)) + + exchange = Exchange( + rmq_config['conn']['exchange_name'], + 'topic', durable=True) + routing_key = rmq_config['conn']['routing_key'] + self.queue = Queue(rmq_config['conn']['queue_name'], + exchange=exchange, + routing_key=routing_key, auto_delete=True) + if self.config['scheduler_updater']['cls'] != 'local': + raise ValueError( + 'The scheduler_updater can only be a cls=local for now') + backend = SchedulerUpdaterBackend( + **self.config['scheduler_updater']['args']) -INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] - - -class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer): - """GHTorrent events consumer - - """ - ADDITIONAL_CONFIG = { - 'debug': ('bool', False), - 'batch_cache_write': ('int', 1000), - 'rabbitmq_prefetch_read': ('int', 100), - } - - def __init__(self, config=None, _connection_class=Connection): - if config is None: - super().__init__( - log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer') - else: - self.config = config - self._connection_class = _connection_class - self.debug = self.config['debug'] - self.batch = self.config['batch_cache_write'] - self.prefetch_read = self.config['rabbitmq_prefetch_read'] + super().__init__(backend, ght_config.get('batch_cache_write', 1000)) def has_events(self): """Always has events """ return True def convert_event(self, event): """Given ghtorrent event, convert it to a SWHEvent instance. """ if isinstance(event, str): event = json.loads(event) - for k in INTERESTING_EVENT_KEYS: if k not in event: if hasattr(self, 'log'): - self.log.warn( + self.log.warning( 'Event should have the \'%s\' entry defined' % k) return None _type = event['type'].lower().rstrip('Event') _repo_name = 'https://github.com/%s' % event['repo']['name'] return SWHEvent({ 'type': _type, 'url': _repo_name, 'last_seen': event['created_at'], 'origin_type': 'git', }) def open_connection(self): """Open rabbitmq connection """ - self.conn = self._connection_class(self.config['conn']['url']) + self.conn = self.connection_class( + self.config['ghtorrent']['rabbitmq']['conn']['url']) self.conn.connect() self.channel = self.conn.channel() def close_connection(self): """Close rabbitmq connection """ self.channel.close() self.conn.release() def consume_events(self): """Consume and yield queue messages """ yield from collect_replies( self.conn, self.channel, self.queue, no_ack=False, limit=self.prefetch_read) diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py index db05060..f94290b 100644 --- a/swh/scheduler/updater/ghtorrent/cli.py +++ b/swh/scheduler/updater/ghtorrent/cli.py @@ -1,28 +1,23 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -import logging - -from swh.scheduler.updater.ghtorrent import GHTorrentConsumer @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') -def main(verbose): +@click.pass_context +def main(ctx, verbose): """Consume events from ghtorrent and write them to cache. """ - log = logging.getLogger('swh.scheduler.updater.ghtorrent.cli') - log.addHandler(logging.StreamHandler()) - _loglevel = logging.DEBUG if verbose else logging.INFO - log.setLevel(_loglevel) - - GHTorrentConsumer().run() + click.echo("Deprecated! Use 'swh-scheduler updater' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': main() diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py index 829f31c..99bab3a 100644 --- a/swh/scheduler/updater/writer.py +++ b/swh/scheduler/updater/writer.py @@ -1,122 +1,96 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import time from arrow import utcnow -from swh.core.config import SWHConfig from swh.core import utils from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.scheduler.updater.backend import SchedulerUpdaterBackend -class UpdaterWriter(SWHConfig): +class UpdaterWriter: """Updater writer in charge of updating the scheduler db with latest prioritized oneshot tasks In effect, this: - reads the events from scheduler updater's db - converts those events into priority oneshot tasks - dumps them into the scheduler db """ - CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer' - DEFAULT_CONFIG = { - # access to the scheduler backend - 'scheduler': ('dict', { - 'cls': 'local', - 'args': { - 'scheduling_db': 'dbname=softwareheritage-scheduler-dev', - }, - }), - # access to the scheduler updater cache - 'scheduler_updater': ('dict', { - 'scheduling_updater_db': - 'dbname=softwareheritage-scheduler-updater-dev', - 'cache_read_limit': 1000, - }), - # waiting time between db reads - 'pause': ('int', 10), - # verbose or not - 'verbose': ('bool', False), - } def __init__(self, **config): - if config: - self.config = config - else: - self.config = self.parse_config_file() - + self.config = config + if self.config['scheduler_updater']['cls'] != 'local': + raise ValueError( + 'The scheduler_updater can only be a cls=local for now') self.scheduler_updater_backend = SchedulerUpdaterBackend( - **self.config['scheduler_updater']) + **self.config['scheduler_updater']['args']) self.scheduler_backend = get_scheduler(**self.config['scheduler']) - self.pause = self.config['pause'] + self.pause = self.config.get('updater_writer', {}).get('pause', 10) self.log = logging.getLogger( 'swh.scheduler.updater.writer.UpdaterWriter') - self.log.setLevel( - logging.DEBUG if self.config['verbose'] else logging.INFO) def convert_to_oneshot_task(self, event): """Given an event, convert it into oneshot task with priority Args: event (dict): The event to convert to task """ if event['origin_type'] == 'git': return create_oneshot_task_dict( 'origin-update-git', event['url'], priority='normal') self.log.warning('Type %s is not supported for now, only git' % ( event['origin_type'], )) return None def write_event_to_scheduler(self, events): """Write events to the scheduler and yield ids when done""" # convert events to oneshot tasks oneshot_tasks = filter(lambda e: e is not None, map(self.convert_to_oneshot_task, events)) # write event to scheduler self.scheduler_backend.create_tasks(list(oneshot_tasks)) for e in events: yield e['url'] def run(self): """First retrieve events from cache (including origin_type, cnt), then convert them into oneshot tasks with priority, then write them to the scheduler db, at last remove them from cache. """ while True: timestamp = utcnow() events = list(self.scheduler_updater_backend.cache_read(timestamp)) if not events: break for urls in utils.grouper(self.write_event_to_scheduler(events), n=100): self.scheduler_updater_backend.cache_remove(urls) time.sleep(self.pause) @click.command() @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') -def main(verbose): - log = logging.getLogger('swh.scheduler.updater.writer') - log.addHandler(logging.StreamHandler()) - _loglevel = logging.DEBUG if verbose else logging.INFO - log.setLevel(_loglevel) - - UpdaterWriter().run() +@click.pass_context +def main(ctx, verbose): + click.echo("Deprecated! Use 'swh-scheduler updater' instead.", + err=True) + ctx.exit(1) if __name__ == '__main__': main()