Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/migrate_extrinsic_metadata.py
Show All 17 Lines | |||||
writing this script. | writing this script. | ||||
Additionally, this script contains many assertions to prevent false positives | Additionally, this script contains many assertions to prevent false positives | ||||
in its heuristics. | in its heuristics. | ||||
""" | """ | ||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import itertools | |||||
import json | import json | ||||
import os | import os | ||||
import re | import re | ||||
import sys | import sys | ||||
import time | import time | ||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Optional | ||||
from urllib.error import HTTPError | from urllib.error import HTTPError | ||||
from urllib.parse import unquote, urlparse | from urllib.parse import unquote, urlparse | ||||
▲ Show 20 Lines • Show All 1,102 Lines • ▼ Show 20 Lines | while True: | ||||
# most likely a temporary error, try again | # most likely a temporary error, try again | ||||
if failures >= 60: | if failures >= 60: | ||||
raise | raise | ||||
else: | else: | ||||
time.sleep(60) | time.sleep(60) | ||||
failures += 1 | failures += 1 | ||||
def main(storage_dbconn, storage_url, deposit_dbconn, first_id, dry_run): | def main(storage_dbconn, storage_url, deposit_dbconn, first_id, limit, dry_run): | ||||
storage_db = BaseDb.connect(storage_dbconn) | storage_db = BaseDb.connect(storage_dbconn) | ||||
deposit_db = BaseDb.connect(deposit_dbconn) | deposit_db = BaseDb.connect(deposit_dbconn) | ||||
storage = get_storage( | storage = get_storage( | ||||
"pipeline", | "pipeline", | ||||
steps=[ | steps=[ | ||||
{"cls": "retry"}, | {"cls": "retry"}, | ||||
{ | { | ||||
"cls": "postgresql", | "cls": "postgresql", | ||||
"db": storage_dbconn, | "db": storage_dbconn, | ||||
"objstorage": {"cls": "memory", "args": {}}, | "objstorage": {"cls": "memory", "args": {}}, | ||||
}, | }, | ||||
], | ], | ||||
) | ) | ||||
if not dry_run: | if not dry_run: | ||||
create_fetchers(storage_db) | create_fetchers(storage_db) | ||||
# Not creating authorities, as the loaders are presumably already running | # Not creating authorities, as the loaders are presumably already running | ||||
# and created them already. | # and created them already. | ||||
# This also helps make sure this script doesn't accidentally create | # This also helps make sure this script doesn't accidentally create | ||||
# authorities that differ from what the loaders use. | # authorities that differ from what the loaders use. | ||||
total_rows = 0 | total_rows = 0 | ||||
with deposit_db.cursor() as deposit_cur: | with deposit_db.cursor() as deposit_cur: | ||||
for row in iter_revision_rows(storage_dbconn, first_id): | rows = iter_revision_rows(storage_dbconn, first_id) | ||||
if limit is not None: | |||||
rows = itertools.islice(rows, limit) | |||||
for row in rows: | |||||
handle_row(row, storage, deposit_cur, dry_run) | handle_row(row, storage, deposit_cur, dry_run) | ||||
total_rows += 1 | total_rows += 1 | ||||
if total_rows % 1000 == 0: | if total_rows % 1000 == 0: | ||||
percents = ( | percents = ( | ||||
int.from_bytes(row["id"][0:4], byteorder="big") * 100 / (1 << 32) | int.from_bytes(row["id"][0:4], byteorder="big") * 100 / (1 << 32) | ||||
) | ) | ||||
print( | print( | ||||
f"Processed {total_rows/1000000.:.2f}M rows " | f"Processed {total_rows/1000000.:.2f}M rows " | ||||
f"(~{percents:.1f}%, last revision: {row['id'].hex()})" | f"(~{percents:.1f}%, last revision: {row['id'].hex()})" | ||||
) | ) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
if len(sys.argv) == 4: | if len(sys.argv) == 4: | ||||
(_, storage_dbconn, storage_url, deposit_dbconn) = sys.argv | (_, storage_dbconn, storage_url, deposit_dbconn) = sys.argv | ||||
first_id = "00" * 20 | first_id = "00" * 20 | ||||
elif len(sys.argv) == 5: | elif len(sys.argv) == 5: | ||||
(_, storage_dbconn, storage_url, deposit_dbconn, first_id) = sys.argv | (_, storage_dbconn, storage_url, deposit_dbconn, first_id) = sys.argv | ||||
limit = None | |||||
elif len(sys.argv) == 6: | |||||
(_, storage_dbconn, storage_url, deposit_dbconn, first_id, limit_str) = sys.argv | |||||
limit = int(limit_str) | |||||
else: | else: | ||||
print( | print( | ||||
f"Syntax: {sys.argv[0]} <storage_dbconn> <storage_url> " | f"Syntax: {sys.argv[0]} <storage_dbconn> <storage_url> " | ||||
f"<deposit_dbconn> [<first id>]" | f"<deposit_dbconn> [<first id> [limit]]" | ||||
) | ) | ||||
exit(1) | exit(1) | ||||
if os.path.isfile("./origins.txt"): | if os.path.isfile("./origins.txt"): | ||||
# You can generate this file with: | # You can generate this file with: | ||||
# psql service=swh-replica \ | # psql service=swh-replica \ | ||||
# -c "\copy (select digest(url, 'sha1') from origin) to stdout" \ | # -c "\copy (select digest(url, 'sha1') from origin) to stdout" \ | ||||
# | pv -l > origins.txt | # | pv -l > origins.txt | ||||
print("Loading origins...") | print("Loading origins...") | ||||
with open("./origins.txt") as fd: | with open("./origins.txt") as fd: | ||||
for line in fd: | for line in fd: | ||||
digest = line.strip()[3:] | digest = line.strip()[3:] | ||||
_origins.add(bytes.fromhex(digest)) | _origins.add(bytes.fromhex(digest)) | ||||
print("Done loading origins.") | print("Done loading origins.") | ||||
main(storage_dbconn, storage_url, deposit_dbconn, bytes.fromhex(first_id), True) | main( | ||||
storage_dbconn, | |||||
storage_url, | |||||
deposit_dbconn, | |||||
bytes.fromhex(first_id), | |||||
limit, | |||||
True, | |||||
) |