#!/usr/bin/env python3 # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import psycopg2 import psycopg2.extras from swh.journal.client import get_journal_client total_revisions_processed = 0 def process_journal_objects(messages, *, conn): global total_revisions_processed assert set(messages) == {"revision"}, set(messages) revisions = messages["revision"] rows = [] for revision in revisions: if revision.get("extra_headers"): rows.append((revision["id"], revision["extra_headers"])) else: eh = (revision.get("metadata") or {}).get("extra_headers") if eh: print(eh) elif revision["type"] != "tar" and revision.get("metadata"): print(revision["metadata"]) cur = conn.cursor() psycopg2.extras.execute_values(cur, """ select ids.id from (VALUES %s) AS ids (id, eh) inner join revision on (ids.id=revision.id) where (revision.extra_headers = ARRAY[]::bytea[] OR revision.extra_headers IS NULL)""", rows) try: for res in cur: id_ = bytes(res[0]) print("update needed:", id_.hex()) print([row[1] for row in rows if row[0] == id_]) except psycopg2.ProgrammingError: pass psycopg2.extras.execute_values( cur, """ UPDATE revision SET extra_headers = data.extra_headers FROM (VALUES %s) AS data (id, extra_headers) WHERE revision.id=data.id AND ( -- Don't unnecessarily update rows that already have their -- 'extra_headers' cell populated revision.extra_headers = ARRAY[]::bytea[] OR revision.extra_headers IS NULL ) """, rows, ) total_revisions_processed += len(revisions) print(f"processed {len(revisions)} revisions,\t{len(rows)} updates") def main(): client = get_journal_client( cls="kafka", prefix="swh.journal.objects", object_types=["revision"], brokers=[f"kafka{i+1}.internal.softwareheritage.org:9092" for i in range(4)], group_id="vlorentz-T2564-migrate-extra-headers", batch_size=10, #batch_size=10000, # Sends mostly batches of 1000 to 3000 rows to postgresql ) conn = psycopg2.connect() worker_fn = functools.partial(process_journal_objects, conn=conn) try: client.process(worker_fn) except KeyboardInterrupt: pass finally: print(f"Total revisions processed: {total_revisions_processed}") if __name__ == "__main__": main()