Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9340677
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py
index da921ee..e497afa 100644
--- a/swh/scheduler/celery_backend/listener.py
+++ b/swh/scheduler/celery_backend/listener.py
@@ -1,156 +1,178 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import click
import datetime
import socket
from arrow import utcnow
from kombu import Queue
from celery.events import EventReceiver
from swh.scheduler import get_scheduler
from .config import app as main_app
class ReliableEventReceiver(EventReceiver):
def __init__(self, channel, handlers=None, routing_key='#',
node_id=None, app=None, queue_prefix='celeryev',
accept=None):
super(ReliableEventReceiver, self).__init__(
channel, handlers, routing_key, node_id, app, queue_prefix, accept)
self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=False,
durable=True,
queue_arguments=self._get_queue_arguments())
def get_consumers(self, consumer, channel):
return [consumer(queues=[self.queue],
callbacks=[self._receive], no_ack=False,
accept=self.accept)]
def _receive(self, body, message):
type, body = self.event_from_message(body)
self.process(type, body, message)
def process(self, type, event, message):
"""Process the received event by dispatching it to the appropriate
handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
handler and handler(event, message)
ACTION_SEND_DELAY = datetime.timedelta(seconds=1.0)
ACTION_QUEUE_MAX_LENGTH = 1000
def event_monitor(app, backend):
actions = {
'last_send': utcnow() - 2*ACTION_SEND_DELAY,
'queue': [],
}
def try_perform_actions(actions=actions):
if not actions['queue']:
return
if utcnow() - actions['last_send'] > ACTION_SEND_DELAY or \
len(actions['queue']) > ACTION_QUEUE_MAX_LENGTH:
perform_actions(actions)
def perform_actions(actions, backend=backend):
action_map = {
'start_task_run': backend.start_task_run,
'end_task_run': backend.end_task_run,
}
messages = []
cursor = backend.cursor()
for action in actions['queue']:
messages.append(action['message'])
function = action_map[action['action']]
args = action.get('args', ())
kwargs = action.get('kwargs', {})
kwargs['cursor'] = cursor
function(*args, **kwargs)
backend.commit()
for message in messages:
message.ack()
actions['queue'] = []
actions['last_send'] = utcnow()
def queue_action(action, actions=actions):
actions['queue'].append(action)
try_perform_actions()
def catchall_event(event, message):
message.ack()
try_perform_actions()
def task_started(event, message):
queue_action({
'action': 'start_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'metadata': {
'worker': event['hostname'],
},
},
'message': message,
})
def task_succeeded(event, message):
result = event['result']
try:
status = result.get('status')
if status == 'success':
status = 'eventful' if result.get('eventful') else 'uneventful'
except Exception:
status = 'eventful' if result else 'uneventful'
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'status': status,
'result': result,
},
'message': message,
})
def task_failed(event, message):
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
'kwargs': {
'timestamp': utcnow(),
'status': 'failed',
},
'message': message,
})
recv = ReliableEventReceiver(
main_app.connection(),
app=main_app,
handlers={
'task-started': task_started,
'task-result': task_succeeded,
'task-failed': task_failed,
'*': catchall_event,
},
node_id='listener-%s' % socket.gethostname(),
)
recv.capture(limit=None, timeout=None, wakeup=True)
+@click.command()
+@click.option('--cls', '-c', default='local',
+ help="Scheduler's class, default to 'local'")
+@click.option(
+ '--database', '-d', help='Scheduling database DSN',
+ default='host=db.internal.softwareheritage.org '
+ 'dbname=softwareheritage-scheduler user=guest')
+@click.option('--url', '-u', default='http://localhost:5008',
+ help="(Optional) Scheduler's url access")
+def main(cls, database, url):
+ scheduler = None
+ if cls == 'local':
+ scheduler = get_scheduler(cls, args={'scheduling_db': database})
+ elif cls == 'remote':
+ scheduler = get_scheduler(cls, args={'url': url})
+
+ if not scheduler:
+ raise ValueError('Scheduler class (local/remote) must be instantiated')
+
+ event_monitor(main_app, backend=scheduler)
+
+
if __name__ == '__main__':
- main_backend = get_scheduler('local')
- event_monitor(main_app, main_backend)
+ main()
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
index 029a3ff..ba3cc37 100644
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -1,278 +1,287 @@
# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import arrow
import click
import csv
import itertools
import json
import locale
import logging
from swh.core import utils
from .backend_es import SWHElasticSearchClient
locale.setlocale(locale.LC_ALL, '')
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0]
class DateTimeType(click.ParamType):
name = 'time and date'
def convert(self, value, param, ctx):
if not isinstance(value, arrow.Arrow):
value = arrow.get(value)
return value
DATETIME = DateTimeType()
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
def pretty_print_list(list, indent):
"""Pretty-print a list"""
return ''.join('%s%s\n' % (' ' * indent, item) for item in list)
def pretty_print_dict(dict, indent):
"""Pretty-print a list"""
return ''.join('%s%s: %s\n' %
(' ' * indent, click.style(key, bold=True), value)
for key, value in dict.items())
def pretty_print_task(task):
"""Pretty-print a task"""
next_run = arrow.get(task['next_run'])
lines = [
'%s %s\n' % (click.style('Task', bold=True), task['id']),
click.style(' Next run: ', bold=True),
"%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE),
next_run.format()),
'\n',
click.style(' Interval: ', bold=True),
str(task['current_interval']), '\n',
click.style(' Type: ', bold=True), task['type'], '\n',
click.style(' Args:\n', bold=True),
pretty_print_list(task['arguments']['args'], indent=4),
click.style(' Keyword args:\n', bold=True),
pretty_print_dict(task['arguments']['kwargs'], indent=4),
]
return ''.join(lines)
@click.group(context_settings=CONTEXT_SETTINGS)
+@click.option('--cls', '-c', default='local',
+ help="Scheduler's class, default to 'local'")
@click.option(
'--database', '-d', help='Scheduling database DSN',
default='host=db.internal.softwareheritage.org '
'dbname=softwareheritage-scheduler user=guest')
-@click.option('--scheduler-class', '-c', default='local',
- help="Scheduler's class, default to 'local'")
+@click.option('--url', '-u', default='http://localhost:5008',
+ help="(Optional) Scheduler's url access")
@click.pass_context
-def cli(ctx, database, scheduler_class):
- """Software Heritage Scheduler CLI interface"""
- override_config = {}
- if database:
- override_config['scheduling_db'] = database
+def cli(ctx, cls, database, url):
+ """Software Heritage Scheduler CLI interface
+ """
+ scheduler = None
from . import get_scheduler
- ctx.obj = get_scheduler(scheduler_class, override_config)
+ if cls == 'local':
+ scheduler = get_scheduler(cls, args={'scheduling_db': database})
+ elif cls == 'remote':
+ scheduler = get_scheduler(cls, args={'url': url})
+
+ if not scheduler:
+ raise ValueError('Scheduler class (local/remote) must be instantiated')
+
+ ctx.obj = scheduler
@cli.group('task')
@click.pass_context
def task(ctx):
"""Manipulate tasks."""
pass
@task.command('schedule')
@click.option('--columns', '-c', multiple=True,
default=['type', 'args', 'kwargs', 'next_run'],
type=click.Choice([
'type', 'args', 'kwargs', 'policy', 'next_run']),
help='columns present in the CSV file')
@click.option('--delimiter', '-d', default=',')
@click.argument('file', type=click.File(encoding='utf-8'))
@click.pass_context
def schedule_tasks(ctx, columns, delimiter, file):
"""Schedule tasks from a CSV input file.
The following columns are expected, and can be set through the -c option:
- type: the type of the task to be scheduled (mandatory)
- args: the arguments passed to the task (JSON list, defaults to an empty
list)
- kwargs: the keyword arguments passed to the task (JSON object, defaults
to an empty dict)
- next_run: the date at which the task should run (datetime, defaults to
now)
The CSV can be read either from a named file, or from stdin (use - as
filename).
Use sample:
cat scheduling-task.txt | \
python3 -m swh.scheduler.cli \
--database 'service=swh-scheduler-dev' \
task schedule \
--columns type --columns kwargs --columns policy \
--delimiter ';' -
"""
tasks = []
now = arrow.utcnow()
reader = csv.reader(file, delimiter=delimiter)
for line in reader:
task = dict(zip(columns, line))
args = json.loads(task.pop('args', '[]'))
kwargs = json.loads(task.pop('kwargs', '{}'))
task['arguments'] = {
'args': args,
'kwargs': kwargs,
}
task['next_run'] = DATETIME.convert(task.get('next_run', now),
None, None)
tasks.append(task)
created = ctx.obj.create_tasks(tasks)
output = [
'Created %d tasks\n' % len(created),
]
for task in created:
output.append(pretty_print_task(task))
click.echo_via_pager('\n'.join(output))
@task.command('list-pending')
@click.option('--task-type', '-t', required=True,
help='The tasks\' type concerned by the listing')
@click.option('--limit', '-l', required=False, type=click.INT,
help='The maximum number of tasks to fetch')
@click.option('--before', '-b', required=False, type=DATETIME,
help='List all jobs supposed to run before the given date')
@click.pass_context
def list_pending_tasks(ctx, task_type, limit, before):
"""List the tasks that are going to be run.
You can override the number of tasks to fetch
"""
pending = ctx.obj.peek_ready_tasks(task_type,
timestamp=before, num_tasks=limit)
output = [
'Found %d tasks\n' % len(pending)
]
for task in pending:
output.append(pretty_print_task(task))
click.echo_via_pager('\n'.join(output))
@task.command('archive')
@click.option('--before', '-b', default=None,
help='''Task whose ended date is anterior will be archived.
Default to current month's first day.''')
@click.option('--after', '-a', default=None,
help='''Task whose ended date is after the specified date will
be archived. Default to prior month's first day.''')
@click.option('--batch-index', default=1000, type=click.INT,
help='Batch size of tasks to read from db to archive')
@click.option('--bulk-index', default=200, type=click.INT,
help='Batch size of tasks to bulk index')
@click.option('--batch-clean', default=1000, type=click.INT,
help='Batch size of task to clean after archival')
@click.option('--dry-run/--no-dry-run', is_flag=True, default=False,
help='Default to list only what would be archived.')
@click.option('--verbose', is_flag=True, default=False,
help='Default to list only what would be archived.')
@click.option('--cleanup/--no-cleanup', is_flag=True, default=True,
help='Clean up archived tasks (default)')
@click.option('--start-from', type=click.INT, default=-1,
help='(Optional) default task id to start from. Default is -1.')
@click.pass_context
def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean,
dry_run, verbose, cleanup, start_from):
"""Archive task/task_run whose (task_type is 'oneshot' and task_status
is 'completed') or (task_type is 'recurring' and task_status is
'disabled').
With --dry-run flag set (default), only list those.
"""
es_client = SWHElasticSearchClient()
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
log = logging.getLogger('swh.scheduler.cli.archive')
logging.getLogger('urllib3').setLevel(logging.WARN)
logging.getLogger('elasticsearch').setLevel(logging.WARN)
if dry_run:
log.info('**DRY-RUN** (only reading db)')
if not cleanup:
log.info('**NO CLEANUP**')
now = arrow.utcnow()
# Default to archive all tasks prior to now's last month
if not before:
before = now.format('YYYY-MM-01')
if not after:
after = now.shift(months=-1).format('YYYY-MM-01')
log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % (
not dry_run, not dry_run and cleanup, after, before))
def group_by_index_name(data, es_client=es_client):
ended = data['ended']
return es_client.compute_index_name(ended.year, ended.month)
def index_data(before, last_id, batch_index, backend=ctx.obj):
tasks_in = backend.filter_task_to_archive(
after, before, last_id=last_id, limit=batch_index)
for index_name, tasks_group in itertools.groupby(
tasks_in, key=group_by_index_name):
log.debug('Send for indexation to index %s' % index_name)
if dry_run:
for task in tasks_group:
yield task
continue
yield from es_client.streaming_bulk(
index_name, tasks_group, source=['task_id', 'task_run_id'],
chunk_size=bulk_index, log=log)
gen = index_data(before, last_id=start_from, batch_index=batch_index)
if cleanup:
for task_ids in utils.grouper(gen, n=batch_clean):
task_ids = list(task_ids)
log.info('Clean up %s tasks: [%s, ...]' % (
len(task_ids), task_ids[0]))
if dry_run: # no clean up
continue
ctx.obj.delete_archived_tasks(task_ids)
else:
for task_ids in utils.grouper(gen, n=batch_index):
task_ids = list(task_ids)
log.info('Indexed %s tasks: [%s, ...]' % (
len(task_ids), task_ids[0]))
@cli.group('task-run')
@click.pass_context
def task_run(ctx):
"""Manipulate task runs."""
pass
if __name__ == '__main__':
cli()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 10:59 AM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252142
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment