Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437621
journal_client.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
journal_client.py
View Options
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
datetime
import
datetime
from
typing
import
Dict
,
List
,
Optional
,
Tuple
import
attr
from
swh.scheduler.interface
import
SchedulerInterface
from
swh.scheduler.model
import
OriginVisitStats
msg_type
=
"origin_visit_status"
def
max_date
(
*
dates
:
Optional
[
datetime
])
->
datetime
:
"""Return the max date of given (possibly None) dates
At least one date must be not None.
"""
datesok
:
Tuple
[
datetime
,
...
]
=
tuple
(
d
for
d
in
dates
if
d
is
not
None
)
if
not
datesok
:
raise
ValueError
(
"At least one date should be a valid datetime"
)
maxdate
=
datesok
[
0
]
if
len
(
datesok
)
==
1
:
return
maxdate
for
d
in
datesok
[
1
:]:
maxdate
=
max
(
d
,
maxdate
)
return
maxdate
def
process_journal_objects
(
messages
:
Dict
[
str
,
List
[
Dict
]],
*
,
scheduler
:
SchedulerInterface
)
->
None
:
"""Read messages from origin_visit_status journal topics, then inserts them in the
scheduler "origin_visit_stats" table.
Worker function for `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`.
"""
assert
set
(
messages
)
<=
{
msg_type
},
f
"Got unexpected {', '.join(set(messages) - set([msg_type]))} message types"
assert
msg_type
in
messages
,
f
"Expected {msg_type} messages"
interesting_messages
=
[
msg
for
msg
in
messages
[
msg_type
]
if
msg
[
"status"
]
not
in
(
"created"
,
"ongoing"
)
]
origin_visit_stats
:
Dict
[
Tuple
[
str
,
str
],
Dict
]
=
{
(
visit_stats
.
url
,
visit_stats
.
visit_type
):
attr
.
asdict
(
visit_stats
)
for
visit_stats
in
scheduler
.
origin_visit_stats_get
(
list
(
set
((
vs
[
"origin"
],
vs
[
"type"
])
for
vs
in
interesting_messages
))
)
}
for
msg_dict
in
interesting_messages
:
origin
=
msg_dict
[
"origin"
]
visit_type
=
msg_dict
[
"type"
]
empty_object
=
{
"url"
:
origin
,
"visit_type"
:
visit_type
,
"last_uneventful"
:
None
,
"last_eventful"
:
None
,
"last_failed"
:
None
,
"last_notfound"
:
None
,
"last_snapshot"
:
None
,
}
pk
=
origin
,
visit_type
if
pk
not
in
origin_visit_stats
:
origin_visit_stats
[
pk
]
=
empty_object
visit_stats_d
=
origin_visit_stats
[
pk
]
if
msg_dict
[
"status"
]
==
"not_found"
:
visit_stats_d
[
"last_notfound"
]
=
max_date
(
msg_dict
[
"date"
],
visit_stats_d
.
get
(
"last_notfound"
)
)
elif
msg_dict
[
"status"
]
==
"failed"
:
visit_stats_d
[
"last_failed"
]
=
max_date
(
msg_dict
[
"date"
],
visit_stats_d
.
get
(
"last_failed"
)
)
elif
msg_dict
[
"snapshot"
]
is
None
:
visit_stats_d
[
"last_failed"
]
=
max_date
(
msg_dict
[
"date"
],
visit_stats_d
.
get
(
"last_failed"
)
)
else
:
# visit with snapshot, something happened
if
visit_stats_d
[
"last_snapshot"
]
is
None
:
# first time visit with snapshot, we keep relevant information
visit_stats_d
[
"last_eventful"
]
=
msg_dict
[
"date"
]
visit_stats_d
[
"last_snapshot"
]
=
msg_dict
[
"snapshot"
]
else
:
# visit with snapshot already stored, last_eventful should already be
# stored
assert
visit_stats_d
[
"last_eventful"
]
is
not
None
latest_recorded_visit_date
=
max_date
(
visit_stats_d
[
"last_eventful"
],
visit_stats_d
[
"last_uneventful"
]
)
current_status_date
=
msg_dict
[
"date"
]
previous_snapshot
=
visit_stats_d
[
"last_snapshot"
]
if
msg_dict
[
"snapshot"
]
!=
previous_snapshot
:
if
(
latest_recorded_visit_date
and
current_status_date
<
latest_recorded_visit_date
):
# out of order message so ignored
continue
# new eventful visit (new snapshot)
visit_stats_d
[
"last_eventful"
]
=
current_status_date
visit_stats_d
[
"last_snapshot"
]
=
msg_dict
[
"snapshot"
]
else
:
# same snapshot as before
if
(
latest_recorded_visit_date
and
current_status_date
<
latest_recorded_visit_date
):
# we receive an old message which is an earlier "eventful" event
# than what we had, we consider the last_eventful event as
# actually an uneventful event. The true eventful message is the
# current one
visit_stats_d
[
"last_uneventful"
]
=
visit_stats_d
[
"last_eventful"
]
visit_stats_d
[
"last_eventful"
]
=
current_status_date
elif
(
latest_recorded_visit_date
and
current_status_date
==
latest_recorded_visit_date
):
# A duplicated message must be ignored to avoid
# populating the last_uneventful message
continue
else
:
# uneventful event
visit_stats_d
[
"last_uneventful"
]
=
current_status_date
scheduler
.
origin_visit_stats_upsert
(
OriginVisitStats
(
**
ovs
)
for
ovs
in
origin_visit_stats
.
values
()
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Apr 15, 2:21 AM (1 d, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286484
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment