Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437529
listener.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
listener.py
View Options
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
click
import
datetime
import
socket
from
arrow
import
utcnow
from
kombu
import
Queue
from
celery.events
import
EventReceiver
from
swh.scheduler
import
get_scheduler
from
.config
import
app
as
main_app
class
ReliableEventReceiver
(
EventReceiver
):
def
__init__
(
self
,
channel
,
handlers
=
None
,
routing_key
=
'#'
,
node_id
=
None
,
app
=
None
,
queue_prefix
=
'celeryev'
,
accept
=
None
):
super
(
ReliableEventReceiver
,
self
)
.
__init__
(
channel
,
handlers
,
routing_key
,
node_id
,
app
,
queue_prefix
,
accept
)
try
:
queue_arguments
=
self
.
_get_queue_arguments
()
except
AttributeError
:
# HACK: buster's celery version does
# not support this
queue_arguments
=
None
self
.
queue
=
Queue
(
'.'
.
join
([
self
.
queue_prefix
,
self
.
node_id
]),
exchange
=
self
.
exchange
,
routing_key
=
self
.
routing_key
,
auto_delete
=
False
,
durable
=
True
,
queue_arguments
=
queue_arguments
)
def
get_consumers
(
self
,
consumer
,
channel
):
return
[
consumer
(
queues
=
[
self
.
queue
],
callbacks
=
[
self
.
_receive
],
no_ack
=
False
,
accept
=
self
.
accept
)]
def
_receive
(
self
,
body
,
message
):
print
(
'## event-receiver: body'
,
body
)
print
(
'## event-receiver: message'
,
message
)
if
isinstance
(
body
,
list
):
# HACK: buster's celery version
# sometimes returns body as list
# of 1 element
body
=
body
[
0
]
type
,
body
=
self
.
event_from_message
(
body
)
self
.
process
(
type
,
body
,
message
)
def
process
(
self
,
type
,
event
,
message
):
"""Process the received event by dispatching it to the appropriate
handler."""
handler
=
self
.
handlers
.
get
(
type
)
or
self
.
handlers
.
get
(
'*'
)
print
(
'## event-receiver: type'
,
type
)
print
(
'## event-receiver: event'
,
event
)
print
(
'## event-receiver: handler'
,
handler
)
handler
and
handler
(
event
,
message
)
ACTION_SEND_DELAY
=
datetime
.
timedelta
(
seconds
=
1.0
)
ACTION_QUEUE_MAX_LENGTH
=
1000
def
event_monitor
(
app
,
backend
):
actions
=
{
'last_send'
:
utcnow
()
-
2
*
ACTION_SEND_DELAY
,
'queue'
:
[],
}
def
try_perform_actions
(
actions
=
actions
):
if
not
actions
[
'queue'
]:
return
if
utcnow
()
-
actions
[
'last_send'
]
>
ACTION_SEND_DELAY
or
\
len
(
actions
[
'queue'
])
>
ACTION_QUEUE_MAX_LENGTH
:
perform_actions
(
actions
)
def
perform_actions
(
actions
,
backend
=
backend
):
action_map
=
{
'start_task_run'
:
backend
.
start_task_run
,
'end_task_run'
:
backend
.
end_task_run
,
}
messages
=
[]
cursor
=
backend
.
cursor
()
for
action
in
actions
[
'queue'
]:
messages
.
append
(
action
[
'message'
])
function
=
action_map
[
action
[
'action'
]]
args
=
action
.
get
(
'args'
,
())
kwargs
=
action
.
get
(
'kwargs'
,
{})
kwargs
[
'cursor'
]
=
cursor
function
(
*
args
,
**
kwargs
)
backend
.
commit
()
for
message
in
messages
:
message
.
ack
()
actions
[
'queue'
]
=
[]
actions
[
'last_send'
]
=
utcnow
()
def
queue_action
(
action
,
actions
=
actions
):
actions
[
'queue'
]
.
append
(
action
)
try_perform_actions
()
def
catchall_event
(
event
,
message
):
message
.
ack
()
try_perform_actions
()
def
task_started
(
event
,
message
):
print
(
'#### task_started: event'
,
event
)
print
(
'#### task_started: message'
,
message
)
queue_action
({
'action'
:
'start_task_run'
,
'args'
:
[
event
[
'uuid'
]],
'kwargs'
:
{
'timestamp'
:
utcnow
(),
'metadata'
:
{
'worker'
:
event
[
'hostname'
],
},
},
'message'
:
message
,
})
def
task_succeeded
(
event
,
message
):
print
(
'#### task_succeeded: event'
,
event
)
print
(
'#### task_succeeded: message'
,
message
)
result
=
event
[
'result'
]
print
(
'#### task_succeeded: result'
,
result
)
try
:
status
=
result
.
get
(
'status'
)
if
status
==
'success'
:
status
=
'eventful'
if
result
.
get
(
'eventful'
)
else
'uneventful'
except
Exception
:
status
=
'eventful'
if
result
else
'uneventful'
queue_action
({
'action'
:
'end_task_run'
,
'args'
:
[
event
[
'uuid'
]],
'kwargs'
:
{
'timestamp'
:
utcnow
(),
'status'
:
status
,
'result'
:
result
,
},
'message'
:
message
,
})
def
task_failed
(
event
,
message
):
print
(
'#### task_failed: event'
,
event
)
print
(
'#### task_failed: message'
,
message
)
queue_action
({
'action'
:
'end_task_run'
,
'args'
:
[
event
[
'uuid'
]],
'kwargs'
:
{
'timestamp'
:
utcnow
(),
'status'
:
'failed'
,
},
'message'
:
message
,
})
recv
=
ReliableEventReceiver
(
main_app
.
connection
(),
app
=
main_app
,
handlers
=
{
'task-started'
:
task_started
,
'task-result'
:
task_succeeded
,
'task-failed'
:
task_failed
,
'*'
:
catchall_event
,
},
node_id
=
'listener-
%s
'
%
socket
.
gethostname
(),
)
recv
.
capture
(
limit
=
None
,
timeout
=
None
,
wakeup
=
True
)
@click.command
()
@click.option
(
'--cls'
,
'-c'
,
default
=
'local'
,
help
=
"Scheduler's class, default to 'local'"
)
@click.option
(
'--database'
,
'-d'
,
help
=
'Scheduling database DSN'
)
@click.option
(
'--url'
,
'-u'
,
help
=
"(Optional) Scheduler's url access"
)
def
main
(
cls
,
database
,
url
):
scheduler
=
None
override_config
=
{}
if
cls
==
'local'
:
if
database
:
override_config
=
{
'scheduling_db'
:
database
}
scheduler
=
get_scheduler
(
cls
,
args
=
override_config
)
elif
cls
==
'remote'
:
if
url
:
override_config
=
{
'url'
:
url
}
scheduler
=
get_scheduler
(
cls
,
args
=
override_config
)
if
not
scheduler
:
raise
ValueError
(
'Scheduler class (local/remote) must be instantiated'
)
event_monitor
(
main_app
,
backend
=
scheduler
)
if
__name__
==
'__main__'
:
main
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Apr 15, 12:21 AM (1 w, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286469
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment