diff --git a/PKG-INFO b/PKG-INFO index bcf02b0..d6463d2 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.23 +Version: 0.0.24 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..762c332 --- /dev/null +++ b/data/README.md @@ -0,0 +1,23 @@ +# Install/Update template + +Install the `task` template in elasticsearch: + +``` shell +INSTANCE=http://something:9200 +TEMPLATE_NAME=template_swh_tasks +curl -i -H'Content-Type: application/json' -d@./elastic-template.json -XPUT ${INSTANCE}/_template/${TEMPLATE_NAME} +``` + +# Update index settings + +The index setup is fixed and defined on the template settings basis. + +When that setup needs to change, we need to update both the template +and the existing indices. + +To update index settings: + +``` shell +INDEX_NAME=swh-tasks-2017-11 +curl -i -H'Content-Type: application/json' -d@./update-index-settings.json -XPUT ${INSTANCE}/${INDEX_NAME}/_settings +``` diff --git a/data/elastic-template.json b/data/elastic-template.json index ed4b172..da3799d 100644 --- a/data/elastic-template.json +++ b/data/elastic-template.json @@ -1,51 +1,53 @@ { "order": 0, "index_patterns": ["swh-tasks-*"], "settings": { "index": { "codec": "best_compression", "refresh_interval": "30s" } }, "mappings" : { "task" : { "_source" : { "enabled": true}, "properties": { "task_id": {"type": "double"}, "task_policy": {"type": "text"}, "task_status": {"type": "text"}, "task_run_id": {"type": "double"}, "arguments": { "type": "object", "properties" : { "args": { - "type": "nested" + "type": "nested", + "dynamic": false }, "kwargs": { - "type": "object" + "type": "object", + "dynamic": false } } }, "type": {"type": "text"}, "backend_id": {"type": "text"}, "metadata": { "type": "object", "enabled" : false }, "scheduled": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" }, "started": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" }, "ended": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" } } } }, "aliases": {} } diff --git a/data/update-index-settings.json b/data/update-index-settings.json new file mode 100644 index 0000000..ce3deab --- /dev/null +++ b/data/update-index-settings.json @@ -0,0 +1,5 @@ +{ + "index": { + "refresh_interval": "1s" + } +} diff --git a/debian/changelog b/debian/changelog index 56b3554..84ef591 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,182 +1,185 @@ -swh-scheduler (0.0.23-1~swh1~bpo9+1) stretch-swh; urgency=medium +swh-scheduler (0.0.24-1~swh1) unstable-swh; urgency=medium - * Rebuild for stretch-backports. + * v0.0.24 + * data/template: Do not index the arguments field (it's in _source) + * data/README: Add a small readme to explain es install step + * swh.scheduler.cli: Add a bulk index flag to separate read from index - -- Antoine R. Dumont (@ardumont) Tue, 10 Apr 2018 17:43:07 +0200 + -- Antoine R. Dumont (@ardumont) Fri, 13 Apr 2018 14:55:32 +0200 swh-scheduler (0.0.23-1~swh1) unstable-swh; urgency=medium * swh.scheduler.cli.archive: Delete only completely indexed tasks * Prior to this commit, it could happen that we removed tasks even * though we did not yet index associated task_run. * Related T986 -- Antoine R. Dumont (@ardumont) Tue, 10 Apr 2018 17:43:07 +0200 swh-scheduler (0.0.22-1~swh1) unstable-swh; urgency=medium * v0.0.22 * Update to a more recent python3-elasticsearch client -- Antoine R. Dumont (@ardumont) Mon, 09 Apr 2018 16:09:16 +0200 swh-scheduler (0.0.21-1~swh1) unstable-swh; urgency=medium * v0.0.21 * Adapt default configuration * Fix typo in configuration variable name -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 15:02:55 +0200 swh-scheduler (0.0.20-1~swh1) unstable-swh; urgency=medium * v0.0.20 * swh.scheduler.cli.archive: Open completed oneshot or disabled * recurring tasks archival endpoint * swh.core.serializer: Move to msgpack serialization format * swh.scheduler.cli: Unify pretty print output * sql/data: Add new task type for loading mercurial dump * swh.scheduler.cli: Add sample use case for the scheduling cli * swh.scheduler.cli: Open policy column to the scheduling cli * swh.scheduler.cli: Open the delimiter option as cli argument * Fix issue when updating task-type without any retry delay defined * swh-scheduler/data: Add new oneshot scheduling load-mercurial task * backend: fix default scheduling_db value for consistency * backend: doc: fix return value of create_tasks -- Antoine R. Dumont (@ardumont) Fri, 30 Mar 2018 11:44:18 +0200 swh-scheduler (0.0.19-1~swh1) unstable-swh; urgency=medium * v0.0.19 * swh.scheduler.utils: Open utility function to create oneshot task -- Antoine R. Dumont (@ardumont) Wed, 29 Nov 2017 12:51:15 +0100 swh-scheduler (0.0.18-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.18 * Celery 4 compatibility -- Nicolas Dandrimont Wed, 08 Nov 2017 17:06:22 +0100 swh-scheduler (0.0.17-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.17 * Update packaging runes -- Nicolas Dandrimont Thu, 12 Oct 2017 18:49:02 +0200 swh-scheduler (0.0.16-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.16 * add some tests * implement one-shot tasks * implement retry on temporary failure -- Nicolas Dandrimont Mon, 07 Aug 2017 18:44:03 +0200 swh-scheduler (0.0.15-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.15 * Add some methods to get the length of task queues * worker: Show logs on stdout if loglevel = debug -- Nicolas Dandrimont Mon, 19 Jun 2017 19:44:56 +0200 swh-scheduler (0.0.14-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler 0.0.14 * Make the return value of tasks available in the listener -- Nicolas Dandrimont Mon, 12 Jun 2017 17:50:32 +0200 swh-scheduler (0.0.13-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.13 * Use systemd for logging rather than PostgreSQL -- Nicolas Dandrimont Fri, 07 Apr 2017 11:57:50 +0200 swh-scheduler (0.0.12-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.12 * Only log to database if the configuration is present -- Nicolas Dandrimont Thu, 09 Mar 2017 11:12:45 +0100 swh-scheduler (0.0.11-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.11 * add utils.get_task -- Nicolas Dandrimont Tue, 14 Feb 2017 19:49:34 +0100 swh-scheduler (0.0.10-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.10 * Allow disabling tasks -- Nicolas Dandrimont Thu, 20 Oct 2016 17:20:17 +0200 swh-scheduler (0.0.9-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.9 * Revert management of one shot tasks * Add possibility of launching several worker instances -- Nicolas Dandrimont Fri, 02 Sep 2016 17:09:18 +0200 swh-scheduler (0.0.7-1~swh1) unstable-swh; urgency=medium * v0.0.7 * Add oneshot task -- Antoine R. Dumont (@ardumont) Fri, 01 Jul 2016 16:42:45 +0200 swh-scheduler (0.0.6-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.6 * More reliability and efficiency when scheduling a lot ot tasks -- Nicolas Dandrimont Wed, 24 Feb 2016 18:46:57 +0100 swh-scheduler (0.0.5-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.5 * Use copy for task mass-scheduling -- Nicolas Dandrimont Wed, 24 Feb 2016 12:13:38 +0100 swh-scheduler (0.0.4-1~swh1) unstable-swh; urgency=medium * Release swh-scheduler v0.0.4 * general cleanup of the backend * use arrow instead of dateutil * add new cli program -- Nicolas Dandrimont Tue, 23 Feb 2016 17:46:04 +0100 swh-scheduler (0.0.3-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler version 0.0.3 * Implement the timestamp arguments to the task_run functions * Make the celery event listener use a reliable queue -- Nicolas Dandrimont Mon, 22 Feb 2016 15:14:28 +0100 swh-scheduler (0.0.2-1~swh1) unstable-swh; urgency=medium * Release swh.scheduler v0.0.2 * Multiple schema changes * Initial releases for the celery job runner and the event listener -- Nicolas Dandrimont Fri, 19 Feb 2016 18:50:47 +0100 swh-scheduler (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * Release swh.scheduler v0.0.1 * Move swh.core.scheduling and swh.core.worker to swh.scheduler -- Nicolas Dandrimont Mon, 15 Feb 2016 11:07:30 +0100 diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index bcf02b0..d6463d2 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.scheduler -Version: 0.0.23 +Version: 0.0.24 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index a248fb8..4945f7f 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,56 +1,58 @@ .gitignore AUTHORS LICENSE LICENSE.Celery MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt bin/swh-worker-control +data/README.md data/elastic-template.json +data/update-index-settings.json debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile sql/swh-scheduler-data.sql sql/swh-scheduler-schema.sql sql/swh-scheduler-testdata.sql sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/tests/__init__.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_task.py \ No newline at end of file diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index aa7443c..ed4a901 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -1,274 +1,276 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import click import csv import itertools import json import locale import logging from swh.core import utils from .backend import SchedulerBackend from .backend_es import SWHElasticSearchClient locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] class DateTimeType(click.ParamType): name = 'time and date' def convert(self, value, param, ctx): if not isinstance(value, arrow.Arrow): value = arrow.get(value) return value DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def pretty_print_list(list, indent): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) def pretty_print_task(task): """Pretty-print a task""" next_run = arrow.get(task['next_run']) lines = [ '%s %s\n' % (click.style('Task', bold=True), task['id']), click.style(' Next run: ', bold=True), "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', click.style(' Type: ', bold=True), task['type'], '\n', click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), pretty_print_dict(task['arguments']['kwargs'], indent=4), ] return ''.join(lines) @click.group(context_settings=CONTEXT_SETTINGS) @click.option( '--database', '-d', help='Scheduling database DSN', default='host=db.internal.softwareheritage.org ' 'dbname=softwareheritage-scheduler user=guest') @click.pass_context def cli(ctx, database): """Software Heritage Scheduler CLI interface""" override_config = {} if database: override_config['scheduling_db'] = database ctx.obj = SchedulerBackend(**override_config) @cli.group('task') @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command('schedule') @click.option('--columns', '-c', multiple=True, default=['type', 'args', 'kwargs', 'next_run'], type=click.Choice([ 'type', 'args', 'kwargs', 'policy', 'next_run']), help='columns present in the CSV file') @click.option('--delimiter', '-d', default=',') @click.argument('file', type=click.File(encoding='utf-8')) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ tasks = [] now = arrow.utcnow() reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop('args', '[]')) kwargs = json.loads(task.pop('kwargs', '{}')) task['arguments'] = { 'args': args, 'kwargs': kwargs, } task['next_run'] = DATETIME.convert(task.get('next_run', now), None, None) tasks.append(task) created = ctx.obj.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('list-pending') @click.option('--task-type', '-t', required=True, help='The tasks\' type concerned by the listing') @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch') @click.option('--before', '-b', required=False, type=DATETIME, help='List all jobs supposed to run before the given date') @click.pass_context def list_pending_tasks(ctx, task_type, limit, before): """List the tasks that are going to be run. You can override the number of tasks to fetch """ pending = ctx.obj.peek_ready_tasks(task_type, timestamp=before, num_tasks=limit) output = [ 'Found %d tasks\n' % len(pending) ] for task in pending: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived. Default to current month's first day.''') @click.option('--after', '-a', default=None, help='''Task whose ended date is after the specified date will be archived. Default to prior month's first day.''') @click.option('--batch-index', default=1000, type=click.INT, - help='Batch size of tasks to archive') + help='Batch size of tasks to read from db to archive') +@click.option('--bulk-index', default=200, type=click.INT, + help='Batch size of tasks to bulk index') @click.option('--batch-clean', default=1000, type=click.INT, help='Batch size of task to clean after archival') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--verbose', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') @click.option('--start-from', type=click.INT, default=-1, help='(Optional) default task id to start from. Default is -1.') @click.pass_context -def archive_tasks(ctx, before, after, batch_index, batch_clean, +def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): """Archive task/task_run whose (task_type is 'oneshot' and task_status is 'completed') or (task_type is 'recurring' and task_status is 'disabled'). With --dry-run flag set (default), only list those. """ es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('elasticsearch').setLevel(logging.WARN) if dry_run: log.info('**DRY-RUN** (only reading db)') if not cleanup: log.info('**NO CLEANUP**') now = arrow.utcnow() # Default to archive all tasks prior to now's last month if not before: before = now.format('YYYY-MM-01') if not after: after = now.shift(months=-1).format('YYYY-MM-01') log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) def group_by_index_name(data, es_client=es_client): ended = data['ended'] return es_client.compute_index_name(ended.year, ended.month) def index_data(before, last_id, batch_index, backend=ctx.obj): tasks_in = backend.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): log.debug('Send for indexation to index %s' % index_name) if dry_run: for task in tasks_group: yield task continue yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], - chunk_size=batch_index, log=log) + chunk_size=bulk_index, log=log) gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: for task_ids in utils.grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue ctx.obj.delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @cli.group('task-run') @click.pass_context def task_run(ctx): """Manipulate task runs.""" pass if __name__ == '__main__': cli() diff --git a/version.txt b/version.txt index d835fc1..19ebef5 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.23-0-ge972b6a \ No newline at end of file +v0.0.24-0-gf4587a3 \ No newline at end of file