Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9124979
admin.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
admin.py
View Options
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
__future__
import
annotations
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import
logging
import
time
from
typing
import
List
,
Tuple
import
click
from
.
import
cli
@cli.command
(
"start-runner"
)
@click.option
(
"--period"
,
"-p"
,
default
=
0
,
help
=
(
"Period (in s) at witch pending tasks are checked and "
"executed. Set to 0 (default) for a one shot."
),
)
@click.option
(
"--task-type"
,
"task_type_names"
,
multiple
=
True
,
default
=
[],
help
=
(
"Task types to schedule. If not provided, this iterates over every "
"task types referenced in the scheduler backend."
),
)
@click.option
(
"--with-priority/--without-priority"
,
is_flag
=
True
,
default
=
False
,
help
=
(
"Determine if those tasks should be the ones with priority or not."
"By default, this deals with tasks without any priority."
),
)
@click.pass_context
def
runner
(
ctx
,
period
,
task_type_names
,
with_priority
):
"""Starts a swh-scheduler runner service.
This process is responsible for checking for ready-to-run tasks and
schedule them."""
from
swh.scheduler.celery_backend.config
import
build_app
from
swh.scheduler.celery_backend.runner
import
run_ready_tasks
config
=
ctx
.
obj
[
"config"
]
app
=
build_app
(
config
.
get
(
"celery"
))
app
.
set_current
()
logger
=
logging
.
getLogger
(
__name__
+
".runner"
)
scheduler
=
ctx
.
obj
[
"scheduler"
]
logger
.
debug
(
"Scheduler
%s
"
,
scheduler
)
task_types
=
[]
for
task_type_name
in
task_type_names
:
task_type
=
scheduler
.
get_task_type
(
task_type_name
)
if
not
task_type
:
raise
ValueError
(
f
"Unknown {task_type_name}"
)
task_types
.
append
(
task_type
)
try
:
while
True
:
logger
.
debug
(
"Run ready tasks"
)
try
:
ntasks
=
len
(
run_ready_tasks
(
scheduler
,
app
,
task_types
,
with_priority
))
if
ntasks
:
logger
.
info
(
"Scheduled
%s
tasks"
,
ntasks
)
except
Exception
:
logger
.
exception
(
"Unexpected error in run_ready_tasks()"
)
if
not
period
:
break
time
.
sleep
(
period
)
except
KeyboardInterrupt
:
ctx
.
exit
(
0
)
@cli.command
(
"start-listener"
)
@click.pass_context
def
listener
(
ctx
):
"""Starts a swh-scheduler listener service.
This service is responsible for listening at task lifecycle events and
handle their workflow status in the database."""
scheduler_backend
=
ctx
.
obj
[
"scheduler"
]
if
not
scheduler_backend
:
raise
ValueError
(
"Scheduler class (local/remote) must be instantiated"
)
broker
=
(
ctx
.
obj
[
"config"
]
.
get
(
"celery"
,
{})
.
get
(
"task_broker"
,
"amqp://guest@localhost/
%2f
"
)
)
from
swh.scheduler.celery_backend.pika_listener
import
get_listener
listener
=
get_listener
(
broker
,
"celeryev.listener"
,
scheduler_backend
)
try
:
listener
.
start_consuming
()
finally
:
listener
.
stop_consuming
()
@cli.command
(
"schedule-recurrent"
)
@click.option
(
"--visit-type"
,
"visit_types"
,
multiple
=
True
,
default
=
[],
help
=
(
"Visit types to schedule. If not provided, this iterates over every "
"corresponding load task types referenced in the scheduler backend."
),
)
@click.pass_context
def
schedule_recurrent
(
ctx
,
visit_types
:
List
[
str
]):
"""Starts the scheduler for recurrent visits.
This runs one thread for each visit type, which regularly sends new visits
to celery.
"""
from
queue
import
Queue
from
swh.scheduler.celery_backend.recurrent_visits
import
(
VisitSchedulerThreads
,
logger
,
spawn_visit_scheduler_thread
,
terminate_visit_scheduler_threads
,
)
config
=
ctx
.
obj
[
"config"
]
scheduler
=
ctx
.
obj
[
"scheduler"
]
if
not
visit_types
:
visit_types
=
[]
# Figure out which visit types exist in the scheduler
all_task_types
=
scheduler
.
get_task_types
()
for
task_type
in
all_task_types
:
if
not
task_type
[
"type"
]
.
startswith
(
"load-"
):
# only consider loading tasks as recurring ones, the rest is dismissed
continue
# get visit type name from task type
visit_types
.
append
(
task_type
[
"type"
][
5
:])
else
:
# Check that the passed visit types exist in the scheduler
for
visit_type
in
visit_types
:
task_type_name
=
f
"load-{visit_type}"
task_type
=
scheduler
.
get_task_type
(
task_type_name
)
if
not
task_type
:
raise
ValueError
(
f
"Unknown task type: {task_type_name}"
)
exc_queue
:
Queue
[
Tuple
[
str
,
BaseException
]]
=
Queue
()
threads
:
VisitSchedulerThreads
=
{}
try
:
# Spawn initial threads
for
visit_type
in
visit_types
:
spawn_visit_scheduler_thread
(
threads
,
exc_queue
,
config
,
visit_type
)
# Handle exceptions from child threads
while
True
:
visit_type
,
exc_info
=
exc_queue
.
get
(
block
=
True
)
logger
.
exception
(
"Thread
%s
died with exception; respawning"
,
visit_type
,
exc_info
=
exc_info
,
)
dead_thread
=
threads
[
visit_type
][
0
]
dead_thread
.
join
(
timeout
=
1
)
if
dead_thread
.
is_alive
():
logger
.
warn
(
"The thread for
%s
is still alive after sending an exception?! "
"Respawning anyway."
,
visit_type
,
)
spawn_visit_scheduler_thread
(
threads
,
exc_queue
,
config
,
visit_type
)
except
SystemExit
:
remaining_threads
=
terminate_visit_scheduler_threads
(
threads
)
if
remaining_threads
:
ctx
.
exit
(
1
)
ctx
.
exit
(
0
)
@cli.command
(
"rpc-serve"
)
@click.option
(
"--host"
,
default
=
"0.0.0.0"
,
help
=
"Host to run the scheduler server api"
)
@click.option
(
"--port"
,
default
=
5008
,
type
=
click
.
INT
,
help
=
"Binding port of the server"
)
@click.option
(
"--debug/--nodebug"
,
default
=
None
,
help
=
(
"Indicates if the server should run in debug mode. "
"Defaults to True if log-level is DEBUG, False otherwise."
),
)
@click.pass_context
def
rpc_server
(
ctx
,
host
,
port
,
debug
):
"""Starts a swh-scheduler API HTTP server.
"""
if
ctx
.
obj
[
"config"
][
"scheduler"
][
"cls"
]
==
"remote"
:
click
.
echo
(
"The API server can only be started with a 'local' "
"configuration"
,
err
=
True
,
)
ctx
.
exit
(
1
)
from
swh.scheduler.api
import
server
server
.
app
.
config
.
update
(
ctx
.
obj
[
"config"
])
if
debug
is
None
:
debug
=
ctx
.
obj
[
"log_level"
]
<=
logging
.
DEBUG
server
.
app
.
run
(
host
,
port
=
port
,
debug
=
bool
(
debug
))
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Jun 21, 7:51 PM (3 w, 27 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3294201
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment