Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
index da921ee..e497afa 100644
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -1,156 +1,178 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import click
import datetime
import socket
from arrow import utcnow
from kombu import Queue
from celery.events import EventReceiver
from swh.scheduler import get_scheduler
from .config import app as main_app
class ReliableEventReceiver(EventReceiver):
def __init__(self, channel, handlers=None, routing_key='#',
node_id=None, app=None, queue_prefix='celeryev',
accept=None):
super(ReliableEventReceiver, self).__init__(
channel, handlers, routing_key, node_id, app, queue_prefix, accept)
self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=False,
durable=True,
queue_arguments=self._get_queue_arguments())
def get_consumers(self, consumer, channel):
return [consumer(queues=[self.queue],
callbacks=[self._receive], no_ack=False,
accept=self.accept)]
def _receive(self, body, message):
type, body = self.event_from_message(body)
self.process(type, body, message)
def process(self, type, event, message):
"""Process the received event by dispatching it to the appropriate
handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
handler and handler(event, message)
ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0)
ACTION_QUEUE_MAX_LENGTH = 1000
def event_monitor(app, backend):
actions = {
'last_send': utcnow() - 2*ACTION_SEND_DELAY,
'queue': [],
}
def try_perform_actions(actions=actions):
if not actions['queue']:
return
if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \
len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH:
perform_actions(actions)
def perform_actions(actions, backend=backend):
action_map = {
'start_task_run': backend.start_task_run,
'end_task_run': backend.end_task_run,
}
messages = []
cursor = backend.cursor()
for action in actions['queue']:
messages.append(action['message'])
function = action_map[action['action']]
args = action.get('args', ())
kwargs = action.get('kwargs', {})
kwargs['cursor'] = cursor
function(*args, **kwargs)
backend.commit()
for message in messages:
message.ack()
actions['queue'] = []
actions['last_send'] = utcnow()
def queue_action(action, actions=actions):
actions['queue'].append(action)
try_perform_actions()
def catchall_event(event, message):
message.ack()
try_perform_actions()
def task_started(event, message):
queue_action({
'action': 'start_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'metadata': {
'worker': event['hostname'],
},
},
'message': message,
})
def task_succeeded(event, message):
result = event['result']
try:
status = result.get('status')
if status == 'success':
status = 'eventful' if result.get('eventful') else 'uneventful'
except Exception:
status = 'eventful' if result else 'uneventful'
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'status': status,
'result': result,
},
'message': message,
})
def task_failed(event, message):
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'status': 'failed',
},
'message': message,
})
recv = ReliableEventReceiver(
main_app.connection(),
app=main_app,
handlers={
'task-started': task_started,
'task-result': task_succeeded,
'task-failed': task_failed,
'*': catchall_event,
},
node_id='listener-%s' % socket.gethostname(),
)
recv.capture(limit=None, timeout=None, wakeup=True)
+@click.command()
+@click.option('--cls', '-c', default='local',
+ help="Scheduler's class, default to 'local'")
+@click.option(
+ '--database', '-d', help='Scheduling database DSN',
+ default='host=db.internal.softwareheritage.org '
+ 'dbname=softwareheritage-scheduler user=guest')
+@click.option('--url', '-u', default='http://localhost:5008',
+ help="(Optional) Scheduler's url access")
+def main(cls, database, url):
+ scheduler = None
+ if cls == 'local':
+ scheduler = get_scheduler(cls, args={'scheduling_db': database})
+ elif cls == 'remote':
+ scheduler = get_scheduler(cls, args={'url': url})
+
+ if not scheduler:
+ raise ValueError('Scheduler class (local/remote) must be instantiated')
+
+ event_monitor(main_app, backend=scheduler)
+
+
if __name__ == '__main__':
- main_backend = get_scheduler('local')
- event_monitor(main_app, main_backend)
+ main()
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
index 029a3ff..ba3cc37 100644
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -1,278 +1,287 @@
# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import arrow
import click
import csv
import itertools
import json
import locale
import logging
from swh.core import utils
from .backend_es import SWHElasticSearchClient
locale.setlocale(locale.LC_ALL, '')
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0]
class DateTimeType(click.ParamType):
name = 'time and date'
def convert(self, value, param, ctx):
if not isinstance(value, arrow.Arrow):
value = arrow.get(value)
return value
DATETIME = DateTimeType()
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
def pretty_print_list(list, indent):
"""Pretty-print a list"""
return ''.join('%s%s\n' % (' ' * indent, item) for item in list)
def pretty_print_dict(dict, indent):
"""Pretty-print a list"""
return ''.join('%s%s: %s\n' %
(' ' * indent, click.style(key, bold=True), value)
for key, value in dict.items())
def pretty_print_task(task):
"""Pretty-print a task"""
next_run = arrow.get(task['next_run'])
lines = [
'%s %s\n' % (click.style('Task', bold=True), task['id']),
click.style(' Next run: ', bold=True),
"%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE),
next_run.format()),
'\n',
click.style(' Interval: ', bold=True),
str(task['current_interval']), '\n',
click.style(' Type: ', bold=True), task['type'], '\n',
click.style(' Args:\n', bold=True),
pretty_print_list(task['arguments']['args'], indent=4),
click.style(' Keyword args:\n', bold=True),
pretty_print_dict(task['arguments']['kwargs'], indent=4),
]
return ''.join(lines)
@click.group(context_settings=CONTEXT_SETTINGS)
+@click.option('--cls', '-c', default='local',
+ help="Scheduler's class, default to 'local'")
@click.option(
'--database', '-d', help='Scheduling database DSN',
default='host=db.internal.softwareheritage.org '
'dbname=softwareheritage-scheduler user=guest')
-@click.option('--scheduler-class', '-c', default='local',
- help="Scheduler's class, default to 'local'")
+@click.option('--url', '-u', default='http://localhost:5008',
+ help="(Optional) Scheduler's url access")
@click.pass_context
-def cli(ctx, database, scheduler_class):
- """Software Heritage Scheduler CLI interface"""
- override_config = {}
- if database:
- override_config['scheduling_db'] = database
+def cli(ctx, cls, database, url):
+ """Software Heritage Scheduler CLI interface
+ """
+ scheduler = None
from . import get_scheduler
- ctx.obj = get_scheduler(scheduler_class, override_config)
+ if cls == 'local':
+ scheduler = get_scheduler(cls, args={'scheduling_db': database})
+ elif cls == 'remote':
+ scheduler = get_scheduler(cls, args={'url': url})
+
+ if not scheduler:
+ raise ValueError('Scheduler class (local/remote) must be instantiated')
+
+ ctx.obj = scheduler
@cli.group('task')
@click.pass_context
def task(ctx):
"""Manipulate tasks."""
pass
@task.command('schedule')
@click.option('--columns', '-c', multiple=True,
default=['type', 'args', 'kwargs', 'next_run'],
type=click.Choice([
'type', 'args', 'kwargs', 'policy', 'next_run']),
help='columns present in the CSV file')
@click.option('--delimiter', '-d', default=',')
@click.argument('file', type=click.File(encoding='utf-8'))
@click.pass_context
def schedule_tasks(ctx, columns, delimiter, file):
"""Schedule tasks from a CSV input file.
The following columns are expected, and can be set through the -c option:
- type: the type of the task to be scheduled (mandatory)
- args: the arguments passed to the task (JSON list, defaults to an empty
list)
- kwargs: the keyword arguments passed to the task (JSON object, defaults
to an empty dict)
- next_run: the date at which the task should run (datetime, defaults to
now)
The CSV can be read either from a named file, or from stdin (use - as
filename).
Use sample:
cat scheduling-task.txt | \
python3 -m swh.scheduler.cli \
--database 'service=swh-scheduler-dev' \
task schedule \
--columns type --columns kwargs --columns policy \
--delimiter ';' -
"""
tasks = []
now = arrow.utcnow()
reader = csv.reader(file, delimiter=delimiter)
for line in reader:
task = dict(zip(columns, line))
args = json.loads(task.pop('args', '[]'))
kwargs = json.loads(task.pop('kwargs', '{}'))
task['arguments'] = {
'args': args,
'kwargs': kwargs,
}
task['next_run'] = DATETIME.convert(task.get('next_run', now),
None, None)
tasks.append(task)
created = ctx.obj.create_tasks(tasks)
output = [
'Created %d tasks\n' % len(created),
]
for task in created:
output.append(pretty_print_task(task))
click.echo_via_pager('\n'.join(output))
@task.command('list-pending')
@click.option('--task-type', '-t', required=True,
help='The tasks\' type concerned by the listing')
@click.option('--limit', '-l', required=False, type=click.INT,
help='The maximum number of tasks to fetch')
@click.option('--before', '-b', required=False, type=DATETIME,
help='List all jobs supposed to run before the given date')
@click.pass_context
def list_pending_tasks(ctx, task_type, limit, before):
"""List the tasks that are going to be run.
You can override the number of tasks to fetch
"""
pending = ctx.obj.peek_ready_tasks(task_type,
timestamp=before, num_tasks=limit)
output = [
'Found %d tasks\n' % len(pending)
]
for task in pending:
output.append(pretty_print_task(task))
click.echo_via_pager('\n'.join(output))
@task.command('archive')
@click.option('--before', '-b', default=None,
help='''Task whose ended date is anterior will be archived.
Default to current month's first day.''')
@click.option('--after', '-a', default=None,
help='''Task whose ended date is after the specified date will
be archived. Default to prior month's first day.''')
@click.option('--batch-index', default=1000, type=click.INT,
help='Batch size of tasks to read from db to archive')
@click.option('--bulk-index', default=200, type=click.INT,
help='Batch size of tasks to bulk index')
@click.option('--batch-clean', default=1000, type=click.INT,
help='Batch size of task to clean after archival')
@click.option('--dry-run/--no-dry-run', is_flag=True, default=False,
help='Default to list only what would be archived.')
@click.option('--verbose', is_flag=True, default=False,
help='Default to list only what would be archived.')
@click.option('--cleanup/--no-cleanup', is_flag=True, default=True,
help='Clean up archived tasks (default)')
@click.option('--start-from', type=click.INT, default=-1,
help='(Optional) default task id to start from. Default is -1.')
@click.pass_context
def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean,
dry_run, verbose, cleanup, start_from):
"""Archive task/task_run whose (task_type is 'oneshot' and task_status
is 'completed') or (task_type is 'recurring' and task_status is
'disabled').
With --dry-run flag set (default), only list those.
"""
es_client = SWHElasticSearchClient()
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
log = logging.getLogger('swh.scheduler.cli.archive')
logging.getLogger('urllib3').setLevel(logging.WARN)
logging.getLogger('elasticsearch').setLevel(logging.WARN)
if dry_run:
log.info('**DRY-RUN** (only reading db)')
if not cleanup:
log.info('**NO CLEANUP**')
now = arrow.utcnow()
# Default to archive all tasks prior to now's last month
if not before:
before = now.format('YYYY-MM-01')
if not after:
after = now.shift(months=-1).format('YYYY-MM-01')
log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % (
not dry_run, not dry_run and cleanup, after, before))
def group_by_index_name(data, es_client=es_client):
ended = data['ended']
return es_client.compute_index_name(ended.year, ended.month)
def index_data(before, last_id, batch_index, backend=ctx.obj):
tasks_in = backend.filter_task_to_archive(
after, before, last_id=last_id, limit=batch_index)
for index_name, tasks_group in itertools.groupby(
tasks_in, key=group_by_index_name):
log.debug('Send for indexation to index %s' % index_name)
if dry_run:
for task in tasks_group:
yield task
continue
yield from es_client.streaming_bulk(
index_name, tasks_group, source=['task_id', 'task_run_id'],
chunk_size=bulk_index, log=log)
gen = index_data(before, last_id=start_from, batch_index=batch_index)
if cleanup:
for task_ids in utils.grouper(gen, n=batch_clean):
task_ids = list(task_ids)
log.info('Clean up %s tasks: [%s, ...]' % (
len(task_ids), task_ids[0]))
if dry_run: # no clean up
continue
ctx.obj.delete_archived_tasks(task_ids)
else:
for task_ids in utils.grouper(gen, n=batch_index):
task_ids = list(task_ids)
log.info('Indexed %s tasks: [%s, ...]' % (
len(task_ids), task_ids[0]))
@cli.group('task-run')
@click.pass_context
def task_run(ctx):
"""Manipulate task runs."""
pass
if __name__ == '__main__':
cli()

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 10:59 AM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252142

Event Timeline