Page MenuHomeSoftware Heritage
Paste P1021

migrate_extra_headers.py
ActivePublic

Authored by vlorentz on Apr 27 2021, 3:25 PM.
#!/usr/bin/env python3
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import functools
import psycopg2
import psycopg2.extras
from swh.journal.client import get_journal_client
total_revisions_processed = 0
def process_journal_objects(messages, *, conn):
global total_revisions_processed
assert set(messages) == {"revision"}, set(messages)
revisions = messages["revision"]
rows = []
for revision in revisions:
if revision.get("extra_headers"):
rows.append((revision["id"], revision["extra_headers"]))
else:
eh = (revision.get("metadata") or {}).get("extra_headers")
if eh:
print(eh)
elif revision["type"] != "tar" and revision.get("metadata"):
print(revision["metadata"])
cur = conn.cursor()
psycopg2.extras.execute_values(cur, """
select ids.id from (VALUES %s) AS ids (id, eh)
inner join revision on (ids.id=revision.id)
where
(revision.extra_headers = ARRAY[]::bytea[]
OR revision.extra_headers IS NULL)""", rows)
try:
for res in cur:
id_ = bytes(res[0])
print("update needed:", id_.hex())
print([row[1] for row in rows if row[0] == id_])
except psycopg2.ProgrammingError:
pass
psycopg2.extras.execute_values(
cur,
"""
UPDATE revision
SET extra_headers = data.extra_headers
FROM (VALUES %s) AS data (id, extra_headers)
WHERE
revision.id=data.id
AND (
-- Don't unnecessarily update rows that already have their
-- 'extra_headers' cell populated
revision.extra_headers = ARRAY[]::bytea[]
OR revision.extra_headers IS NULL
)
""",
rows,
)
total_revisions_processed += len(revisions)
print(f"processed {len(revisions)} revisions,\t{len(rows)} updates")
def main():
client = get_journal_client(
cls="kafka",
prefix="swh.journal.objects",
object_types=["revision"],
brokers=[f"kafka{i+1}.internal.softwareheritage.org:9092" for i in range(4)],
group_id="vlorentz-T2564-migrate-extra-headers",
batch_size=10,
#batch_size=10000, # Sends mostly batches of 1000 to 3000 rows to postgresql
)
conn = psycopg2.connect(<redacted>)
worker_fn = functools.partial(process_journal_objects, conn=conn)
try:
client.process(worker_fn)
except KeyboardInterrupt:
pass
finally:
print(f"Total revisions processed: {total_revisions_processed}")
if __name__ == "__main__":
main()