Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Paste
P1021
migrate_extra_headers.py
Active
Public
Actions
Authored by
vlorentz
on Apr 27 2021, 3:25 PM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
None
Subscribers
None
#!/usr/bin/env python3
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
functools
import
psycopg2
import
psycopg2.extras
from
swh.journal.client
import
get_journal_client
total_revisions_processed
=
0
def
process_journal_objects
(
messages
,
*
,
conn
):
global
total_revisions_processed
assert
set
(
messages
)
==
{
"revision"
},
set
(
messages
)
revisions
=
messages
[
"revision"
]
rows
=
[]
for
revision
in
revisions
:
if
revision
.
get
(
"extra_headers"
):
rows
.
append
((
revision
[
"id"
],
revision
[
"extra_headers"
]))
else
:
eh
=
(
revision
.
get
(
"metadata"
)
or
{})
.
get
(
"extra_headers"
)
if
eh
:
print
(
eh
)
elif
revision
[
"type"
]
!=
"tar"
and
revision
.
get
(
"metadata"
):
print
(
revision
[
"metadata"
])
cur
=
conn
.
cursor
()
psycopg2
.
extras
.
execute_values
(
cur
,
"""
select ids.id from (VALUES %s) AS ids (id, eh)
inner join revision on (ids.id=revision.id)
where
(revision.extra_headers = ARRAY[]::bytea[]
OR revision.extra_headers IS NULL)"""
,
rows
)
try
:
for
res
in
cur
:
id_
=
bytes
(
res
[
0
])
print
(
"update needed:"
,
id_
.
hex
())
print
([
row
[
1
]
for
row
in
rows
if
row
[
0
]
==
id_
])
except
psycopg2
.
ProgrammingError
:
pass
psycopg2
.
extras
.
execute_values
(
cur
,
"""
UPDATE revision
SET extra_headers = data.extra_headers
FROM (VALUES %s) AS data (id, extra_headers)
WHERE
revision.id=data.id
AND (
-- Don't unnecessarily update rows that already have their
-- 'extra_headers' cell populated
revision.extra_headers = ARRAY[]::bytea[]
OR revision.extra_headers IS NULL
)
"""
,
rows
,
)
total_revisions_processed
+=
len
(
revisions
)
print
(
f
"processed {len(revisions)} revisions,
\t
{len(rows)} updates"
)
def
main
():
client
=
get_journal_client
(
cls
=
"kafka"
,
prefix
=
"swh.journal.objects"
,
object_types
=
[
"revision"
],
brokers
=
[
f
"kafka{i+1}.internal.softwareheritage.org:9092"
for
i
in
range
(
4
)],
group_id
=
"vlorentz-T2564-migrate-extra-headers"
,
batch_size
=
10
,
#batch_size=10000, # Sends mostly batches of 1000 to 3000 rows to postgresql
)
conn
=
psycopg2
.
connect
(
<
redacted
>
)
worker_fn
=
functools
.
partial
(
process_journal_objects
,
conn
=
conn
)
try
:
client
.
process
(
worker_fn
)
except
KeyboardInterrupt
:
pass
finally
:
print
(
f
"Total revisions processed: {total_revisions_processed}"
)
if
__name__
==
"__main__"
:
main
()
Event Timeline
vlorentz
created this paste.
Apr 27 2021, 3:25 PM
2021-04-27 15:25:02 (UTC+2)
vlorentz
edited the content of this paste.
(Show Details)
Apr 27 2021, 3:31 PM
2021-04-27 15:31:43 (UTC+2)
Log In to Comment