Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/listener.py
Show All 9 Lines | |||||
import sys | import sys | ||||
import click | import click | ||||
from arrow import utcnow | from arrow import utcnow | ||||
from kombu import Queue | from kombu import Queue | ||||
from celery.events import EventReceiver | from celery.events import EventReceiver | ||||
from swh.scheduler import get_scheduler | from .config import app as main_app | ||||
from .config import setup_log_handler, app as main_app | |||||
class ReliableEventReceiver(EventReceiver): | class ReliableEventReceiver(EventReceiver): | ||||
def __init__(self, channel, handlers=None, routing_key='#', | def __init__(self, channel, handlers=None, routing_key='#', | ||||
node_id=None, app=None, queue_prefix='celeryev', | node_id=None, app=None, queue_prefix='celeryev', | ||||
accept=None): | accept=None): | ||||
super(ReliableEventReceiver, self).__init__( | super(ReliableEventReceiver, self).__init__( | ||||
channel, handlers, routing_key, node_id, app, queue_prefix, accept) | channel, handlers, routing_key, node_id, app, queue_prefix, accept) | ||||
▲ Show 20 Lines • Show All 164 Lines • ▼ Show 20 Lines | while True: | ||||
time.sleep(errors) | time.sleep(errors) | ||||
errors += 1 | errors += 1 | ||||
else: | else: | ||||
logger.error('Too many consecutive errors, exiting') | logger.error('Too many consecutive errors, exiting') | ||||
sys.exit(1) | sys.exit(1) | ||||
@click.command() | @click.command() | ||||
@click.option('--cls', '-c', default='local', | @click.pass_context | ||||
help="Scheduler's class, default to 'local'") | def main(ctx): | ||||
@click.option( | click.echo("Deprecated! Use 'swh-scheduler listener' instead.", | ||||
'--database', '-d', help='Scheduling database DSN') | err=True) | ||||
@click.option('--url', '-u', | ctx.exit(1) | ||||
help="(Optional) Scheduler's url access") | |||||
@click.option('--log-level', '-l', default='INFO', | |||||
type=click.Choice(logging._nameToLevel.keys()), | |||||
help='Log level (default to INFO)') | |||||
def main(cls, database, url, log_level): | |||||
setup_log_handler(loglevel=log_level, colorize=False, | |||||
format='[%(levelname)s] %(name)s -- %(message)s') | |||||
# logging.basicConfig(level=level) | |||||
scheduler = None | |||||
override_config = {} | |||||
if cls == 'local': | |||||
if database: | |||||
override_config = {'scheduling_db': database} | |||||
scheduler = get_scheduler(cls, args=override_config) | |||||
elif cls == 'remote': | |||||
if url: | |||||
override_config = {'url': url} | |||||
scheduler = get_scheduler(cls, args=override_config) | |||||
if not scheduler: | |||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | |||||
event_monitor(main_app, backend=scheduler) | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |