Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345117
journal_client.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
1 KB
Subscribers
None
journal_client.py
View Options
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
logging
from
swh.core.utils
import
grouper
from
swh.scheduler.utils
import
create_task_dict
MAX_ORIGINS_PER_TASK
=
100
def
process_journal_objects
(
messages
,
*
,
scheduler
,
task_names
):
"""Worker function for `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`."""
assert
set
(
messages
)
==
{
'origin_visit'
},
set
(
messages
)
process_origin_visits
(
messages
[
'origin_visit'
],
scheduler
,
task_names
)
def
process_origin_visits
(
visits
,
scheduler
,
task_names
):
task_dicts
=
[]
logging
.
debug
(
'processing origin visits
%r
'
,
visits
)
if
task_names
.
get
(
'origin_metadata'
):
visits
=
[
visit
for
visit
in
visits
if
visit
[
'status'
]
==
'full'
]
visit_batches
=
grouper
(
visits
,
MAX_ORIGINS_PER_TASK
)
for
visit_batch
in
visit_batches
:
task_dicts
.
append
(
create_task_dict
(
task_names
[
'origin_metadata'
],
'oneshot'
,
[
visit
[
'origin'
][
'url'
]
for
visit
in
visit_batch
],
policy_update
=
'update-dups'
,
))
if
task_dicts
:
scheduler
.
create_tasks
(
task_dicts
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 3:06 PM (4 d, 19 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3241354
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment