diff --git a/swh/storage/migrate_extrinsic_metadata.py b/swh/storage/migrate_extrinsic_metadata.py --- a/swh/storage/migrate_extrinsic_metadata.py +++ b/swh/storage/migrate_extrinsic_metadata.py @@ -27,10 +27,12 @@ import os import re import sys +import time from typing import Any, Dict, Optional from urllib.parse import unquote, urlparse import iso8601 +import psycopg2 from swh.core.db import BaseDb from swh.model.hashutil import hash_to_hex @@ -41,6 +43,7 @@ MetadataFetcher, MetadataTargetType, RawExtrinsicMetadata, + Sha1Git, ) from swh.storage import get_storage @@ -832,6 +835,38 @@ ) +def iter_revision_rows(storage_dbconn: str, first_id: Sha1Git): + after_id = first_id + failures = 0 + while True: + try: + storage_db = BaseDb.connect(storage_dbconn) + with storage_db.cursor() as cur: + while True: + cur.execute( + f"SELECT {', '.join(REVISION_COLS)} FROM revision " + f"WHERE id > %s AND metadata IS NOT NULL " + f"ORDER BY id LIMIT 1000", + (after_id,), + ) + new_rows = 0 + for row in cur: + new_rows += 1 + row_d = dict(zip(REVISION_COLS, row)) + yield row_d + after_id = row_d["id"] + if new_rows == 0: + return + except psycopg2.OperationalError as e: + print(e) + # most likely a temporary error, try again + if failures >= 60: + raise + else: + time.sleep(60) + failures += 1 + + def main(storage_dbconn, storage_url, deposit_dbconn, first_id, dry_run): storage_db = BaseDb.connect(storage_dbconn) deposit_db = BaseDb.connect(deposit_dbconn) @@ -845,33 +880,19 @@ # authorities that differ from what the loaders use. total_rows = 0 - with storage_db.cursor() as read_cur: - with deposit_db.cursor() as deposit_cur: - after_id = first_id - while True: - read_cur.execute( - f"SELECT {', '.join(REVISION_COLS)} FROM revision " - f"WHERE id > %s AND metadata IS NOT NULL ORDER BY id LIMIT 1000", - (after_id,), - ) - new_rows = 0 - for row in read_cur: - row_d = dict(zip(REVISION_COLS, row)) - handle_row(row_d, storage, deposit_cur, dry_run) - new_rows += 1 - - if new_rows == 0: - break + with deposit_db.cursor() as deposit_cur: + for row in iter_revision_rows(storage_dbconn, first_id): + handle_row(row, storage, deposit_cur, dry_run) - after_id = row_d["id"] + total_rows += 1 - total_rows += new_rows + if total_rows % 1000 == 0: percents = ( - int.from_bytes(after_id[0:4], byteorder="big") * 100 / (1 << 32) + int.from_bytes(row["id"][0:4], byteorder="big") * 100 / (1 << 32) ) print( - f"Migrated {total_rows/1000000.:.2f}M rows " - f"(~{percents:.1f}%, last revision: {after_id.hex()})" + f"Processed {total_rows/1000000.:.2f}M rows " + f"(~{percents:.1f}%, last revision: {row['id'].hex()})" )