Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/migrate_extrinsic_metadata.py
Show All 21 Lines | |||||
""" | """ | ||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import json | import json | ||||
import os | import os | ||||
import re | import re | ||||
import sys | import sys | ||||
import time | |||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Optional | ||||
from urllib.parse import unquote, urlparse | from urllib.parse import unquote, urlparse | ||||
import iso8601 | import iso8601 | ||||
import psycopg2 | |||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.identifiers import SWHID, parse_swhid | from swh.model.identifiers import SWHID, parse_swhid | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | MetadataTargetType, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Sha1Git, | |||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
# XML namespaces and fields for metadata coming from the deposit: | # XML namespaces and fields for metadata coming from the deposit: | ||||
CODEMETA_NS = "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0" | CODEMETA_NS = "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0" | ||||
ATOM_NS = "http://www.w3.org/2005/Atom" | ATOM_NS = "http://www.w3.org/2005/Atom" | ||||
ATOM_KEYS = ["id", "author", "external_identifier", "title"] | ATOM_KEYS = ["id", "author", "external_identifier", "title"] | ||||
▲ Show 20 Lines • Show All 775 Lines • ▼ Show 20 Lines | with db.cursor() as cur: | ||||
INSERT INTO metadata_fetcher (name, version, metadata) | INSERT INTO metadata_fetcher (name, version, metadata) | ||||
VALUES (%s, %s, %s) | VALUES (%s, %s, %s) | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
""", | """, | ||||
(FETCHER.name, FETCHER.version, FETCHER.metadata), | (FETCHER.name, FETCHER.version, FETCHER.metadata), | ||||
) | ) | ||||
def iter_revision_rows(storage_dbconn: str, first_id: Sha1Git): | |||||
after_id = first_id | |||||
failures = 0 | |||||
while True: | |||||
try: | |||||
storage_db = BaseDb.connect(storage_dbconn) | |||||
with storage_db.cursor() as cur: | |||||
while True: | |||||
cur.execute( | |||||
f"SELECT {', '.join(REVISION_COLS)} FROM revision " | |||||
f"WHERE id > %s AND metadata IS NOT NULL " | |||||
f"ORDER BY id LIMIT 1000", | |||||
(after_id,), | |||||
) | |||||
new_rows = 0 | |||||
for row in cur: | |||||
new_rows += 1 | |||||
row_d = dict(zip(REVISION_COLS, row)) | |||||
yield row_d | |||||
after_id = row_d["id"] | |||||
if new_rows == 0: | |||||
return | |||||
except psycopg2.OperationalError as e: | |||||
print(e) | |||||
# most likely a temporary error, try again | |||||
if failures >= 60: | |||||
raise | |||||
else: | |||||
time.sleep(60) | |||||
failures += 1 | |||||
def main(storage_dbconn, storage_url, deposit_dbconn, first_id, dry_run): | def main(storage_dbconn, storage_url, deposit_dbconn, first_id, dry_run): | ||||
storage_db = BaseDb.connect(storage_dbconn) | storage_db = BaseDb.connect(storage_dbconn) | ||||
deposit_db = BaseDb.connect(deposit_dbconn) | deposit_db = BaseDb.connect(deposit_dbconn) | ||||
storage = get_storage("remote", url=storage_url) | storage = get_storage("remote", url=storage_url) | ||||
if not dry_run: | if not dry_run: | ||||
create_fetchers(storage_db) | create_fetchers(storage_db) | ||||
# Not creating authorities, as the loaders are presumably already running | # Not creating authorities, as the loaders are presumably already running | ||||
# and created them already. | # and created them already. | ||||
# This also helps make sure this script doesn't accidentally create | # This also helps make sure this script doesn't accidentally create | ||||
# authorities that differ from what the loaders use. | # authorities that differ from what the loaders use. | ||||
total_rows = 0 | total_rows = 0 | ||||
with storage_db.cursor() as read_cur: | |||||
with deposit_db.cursor() as deposit_cur: | with deposit_db.cursor() as deposit_cur: | ||||
after_id = first_id | for row in iter_revision_rows(storage_dbconn, first_id): | ||||
while True: | handle_row(row, storage, deposit_cur, dry_run) | ||||
read_cur.execute( | |||||
f"SELECT {', '.join(REVISION_COLS)} FROM revision " | |||||
f"WHERE id > %s AND metadata IS NOT NULL ORDER BY id LIMIT 1000", | |||||
(after_id,), | |||||
) | |||||
new_rows = 0 | |||||
for row in read_cur: | |||||
row_d = dict(zip(REVISION_COLS, row)) | |||||
handle_row(row_d, storage, deposit_cur, dry_run) | |||||
new_rows += 1 | |||||
if new_rows == 0: | total_rows += 1 | ||||
break | |||||
after_id = row_d["id"] | |||||
total_rows += new_rows | if total_rows % 1000 == 0: | ||||
percents = ( | percents = ( | ||||
int.from_bytes(after_id[0:4], byteorder="big") * 100 / (1 << 32) | int.from_bytes(row["id"][0:4], byteorder="big") * 100 / (1 << 32) | ||||
) | ) | ||||
print( | print( | ||||
f"Migrated {total_rows/1000000.:.2f}M rows " | f"Processed {total_rows/1000000.:.2f}M rows " | ||||
f"(~{percents:.1f}%, last revision: {after_id.hex()})" | f"(~{percents:.1f}%, last revision: {row['id'].hex()})" | ||||
) | ) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
if len(sys.argv) == 4: | if len(sys.argv) == 4: | ||||
(_, storage_dbconn, storage_url, deposit_dbconn) = sys.argv | (_, storage_dbconn, storage_url, deposit_dbconn) = sys.argv | ||||
first_id = "00" * 20 | first_id = "00" * 20 | ||||
elif len(sys.argv) == 5: | elif len(sys.argv) == 5: | ||||
Show All 21 Lines |