diff --git a/swh/storage/migrate_extrinsic_metadata.py b/swh/storage/migrate_extrinsic_metadata.py new file mode 100644 --- /dev/null +++ b/swh/storage/migrate_extrinsic_metadata.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import json +import re +import sys +from typing import Any, Dict + +from swh.core.db import BaseDb +from swh.model.hashutil import hash_to_hex +from swh.model.identifiers import SWHID +from swh.model.model import ( + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, +) +from swh.storage import get_storage +from swh.storage.algos.snapshot import visits_and_snapshots_get_from_revision + +CODEMETA_NS = "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0" + +ATOM_NS = "http://www.w3.org/2005/Atom" +ATOM_KEYS = ["id", "author", "external_identifier", "title"] + +REVISION_COLS = ["id", "date", "date_offset", "type", "message", "metadata"] + +DEPOSIT_COLS = [ + "deposit.id", + "deposit_request.metadata", + "deposit_request.date", + "deposit_client.provider_url", + "deposit_collection.name", + "auth_user.username", +] + +OLD_DEPOSIT_FORMAT = ( + "sword-v2-atom-codemeta-v2-in-json-with-expanded-namespaces" # before february 2018 +) +NEW_DEPOSIT_FORMAT = "sword-v2-atom-codemeta-v2-in-json" # after february 2018 +GNU_FORMAT = "gnu-tree-json" +NIXGUIX_FORMAT = "nixguix-sources-json" +NPM_FORMAT = "replicate-npm-package-json" +ORIGINAL_ARTIFACT_FORMAT = "original-artifact-json" +PYPI_FORMAT = "pypi-project-json" + +FETCHER = MetadataFetcher( + name="migrate-extrinsic-metadata-from-revisions", version="0.0.1", +) +AUTHORITIES = { + "npmjs": MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="https://npmjs.com/", metadata={} + ), + "pypi": MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="https://pypi.org/", metadata={} + ), + "gnu": MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="https://ftp.gnu.org/", metadata={} + ), + "swh": MetadataAuthority( + type=MetadataAuthorityType.REGISTRY, + url="https://softwareheritage.org/", + metadata={}, + ), +} + +deposit_revision_message_re = re.compile( + b"(?P[a-z]*): " + b"Deposit (?P[0-9]+) in collection (?P[a-z]+).*" +) + + +def remove_atom_codemeta_metadata_with_xmlns(metadata): + keys_to_remove = ATOM_KEYS + ["@xmlns", "@xmlns:codemeta"] + for key in list(metadata): + if key.startswith("codemeta:") or key in keys_to_remove: + del metadata[key] + + +def remove_atom_codemeta_metadata_without_xmlns(metadata): + for key in list(metadata): + if key.startswith(("{%s}" % ATOM_NS, "{%s}" % CODEMETA_NS)): + del metadata[key] + + +def load_metadata( + storage, + revision_id, + discovery_date: datetime.datetime, + metadata: Dict[str, Any], + format: str, + authority: MetadataAuthority, + dry_run: bool, +): + revision_swhid = SWHID(object_type="revision", object_id=hash_to_hex(revision_id)) + obj = RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=revision_swhid, + discovery_date=discovery_date, + authority=authority, + fetcher=FETCHER, + format=format, + metadata=json.dumps(metadata).encode(), + ) + if not dry_run: + storage.raw_extrinsic_metadata_add([obj]) + + +def handle_deposit_row(row, storage, deposit_cur, dry_run: bool): + parsed_message = deposit_revision_message_re.match(row["message"]) + assert parsed_message is not None, row["message"] + + deposit_id = int(parsed_message.group("deposit_id")) + collection = parsed_message.group("collection").decode() + client_name = parsed_message.group("client").decode() + + deposit_cur.execute( + f"SELECT {', '.join(DEPOSIT_COLS)} FROM deposit " + f"INNER JOIN deposit_collection " + f" ON (deposit.collection_id=deposit_collection.id) " + f"INNER JOIN deposit_client ON (deposit.client_id=deposit_client.user_ptr_id) " + f"INNER JOIN auth_user ON (deposit.client_id=auth_user.id) " + f"INNER JOIN deposit_request ON (deposit.id=deposit_request.deposit_id) " + f"WHERE deposit.id = %s", + (deposit_id,), + ) + + provider_urls = set() + metadata_entries = [] + for deposit_request_row in deposit_cur: + deposit_request = dict(zip(DEPOSIT_COLS, deposit_request_row)) + + # Sanity checks to make sure we selected the right deposit + assert deposit_request["deposit.id"] == deposit_id + assert deposit_request["deposit_collection.name"] == collection, deposit_request + if client_name != "": + # Sometimes it's missing from the commit message + assert deposit_request["auth_user.username"] == client_name + + provider_urls.add(deposit_request["deposit_client.provider_url"]) + date = deposit_request_row["deposit_request.date"] + metadata = deposit_request["deposit_request.metadata"] + if metadata is not None: + json.dumps(metadata).encode() # check it's valid + if "@xmlns" in metadata: + assert metadata["@xmlns"] == ATOM_NS + assert metadata["@xmlns:codemeta"] in (CODEMETA_NS, [CODEMETA_NS]) + format = NEW_DEPOSIT_FORMAT + else: + assert "{http://www.w3.org/2005/Atom}id" in metadata + assert "{https://doi.org/10.5063/SCHEMA/CODEMETA-2.0}author" in metadata + format = OLD_DEPOSIT_FORMAT + metadata_entries.append((date, format, metadata)) + + assert len(metadata_entries) >= 1, deposit_id + assert len(provider_urls) == 1, f"expected 1 provider url, got {provider_urls}" + (provider_url,) = provider_urls + + authority = MetadataAuthority( + type=MetadataAuthorityType.FORGE, url=provider_url, metadata={}, + ) + + for (date, format, metadata) in metadata_entries: + load_metadata( + storage, + row["id"], + date, + metadata, + format, + authority=authority, + dry_run=dry_run, + ) + + +def handle_row(row: Dict[str, Any], storage, deposit_cur, dry_run: bool): + type_ = row["type"] + + metadata = row["metadata"] + + if metadata is None: + return + + if type_ == "dsc": + if "extrinsic" in metadata: + print("dsc1", row) + extrinsic_files = metadata["extrinsic"]["raw"]["files"] + for artifact_entry in metadata["original_artifact"]: + extrinsic_file = extrinsic_files[artifact_entry["filename"]] + for key in ("sha256",): + assert artifact_entry["checksums"][key] == extrinsic_file[key] + artifact_entry["url"] = extrinsic_file["uri"] + del metadata["extrinsic"] + else: + print("dsc2", row) + + elif type_ == "tar": + provider = metadata.get("extrinsic", {}).get("provider") + if provider is not None: + # New versions of the loaders write the provider; use it. + if provider.startswith("https://replicate.npmjs.com/"): + # npm loader format 1 + load_metadata( + storage, + row["id"], + row["date"], + metadata["extrinsic"]["raw"], + NPM_FORMAT, + authority=AUTHORITIES["npmjs"], + dry_run=dry_run, + ) + del metadata["extrinsic"] + + elif provider.startswith("https://pypi.org/"): + # pypi loader format 1 + load_metadata( + storage, + row["id"], + row["date"], + metadata["extrinsic"]["raw"], + PYPI_FORMAT, + authority=AUTHORITIES["pypi"], + dry_run=dry_run, + ) + del metadata["extrinsic"] + + elif provider.startswith("https://cran.r-project.org/"): + # cran loader + + # the metadata is intrinsic, so there is nothing to do. + del metadata["extrinsic"] + + elif provider.startswith("https://nix-community.github.io/nixpkgs-swh/"): + # nixguix loader + authority = MetadataAuthority( + type=MetadataAuthorityType.FORGE, url=provider, metadata={}, + ) + assert row["date"] is None # the nixguix loader does not write dates + + # Let's figure out which visits produced this revision + dates = set() + for (visit, status, snapshot) in visits_and_snapshots_get_from_revision( + storage, provider, row["id"] + ): + dates.add(visit.date) + + for date in dates: + load_metadata( + storage, + row["id"], + date, + metadata["extrinsic"]["raw"], + NIXGUIX_FORMAT, + authority=authority, + dry_run=dry_run, + ) + del metadata["extrinsic"] + + elif provider.startswith("https://ftp.gnu.org/"): + # archive loader + load_metadata( + storage, + row["id"], + row["date"], + metadata["extrinsic"]["raw"], + GNU_FORMAT, + authority=AUTHORITIES["gnu"], + dry_run=dry_run, + ) + del metadata["extrinsic"] + + elif provider.startswith("https://deposit.softwareheritage.org/"): + if "@xmlns" in metadata: + assert metadata["@xmlns"] == ATOM_NS + assert metadata["@xmlns:codemeta"] in (CODEMETA_NS, [CODEMETA_NS]) + assert "intrinsic" not in metadata + assert "extra_headers" not in metadata + + # deposit loader format 1 + # (pretty rare? In id order, the first revision with this format + # is 022310df16fd9e4d4f81fe36a142e82db977c01d) + # in the case, the metadata seems to be both directly in metadata + # and in metadata["extrinsic"]["raw"]["metadata"] + + handle_deposit_row(row, storage, deposit_cur, dry_run) + + remove_atom_codemeta_metadata_with_xmlns(metadata) + if "client" in metadata: + del metadata["client"] + del metadata["extrinsic"] + else: + # deposit loader format 2 + actual_metadata = metadata["extrinsic"]["raw"]["origin_metadata"][ + "metadata" + ] + assert actual_metadata["@xmlns"] == ATOM_NS + assert actual_metadata["@xmlns:codemeta"] in ( + CODEMETA_NS, + [CODEMETA_NS], + ) + + handle_deposit_row(row, storage, deposit_cur, dry_run) + + del metadata["extrinsic"] + + else: + assert False, f"unknown provider {provider}" + + # Older versions don't write the provider; use heuristics instead. + elif ( + metadata.get("package_source", {}) + .get("url", "") + .startswith("https://registry.npmjs.org/") + ): + # npm loader format 2 + load_metadata( + storage, + row["id"], + row["date"], + metadata["package"], + NPM_FORMAT, + authority=AUTHORITIES["npmjs"], + dry_run=dry_run, + ) + del metadata["package"] + + assert "original_artifact" not in metadata + + # rebuild an "original_artifact"-like metadata dict from what we + # can salvage of "package_source" + package_source_metadata = metadata["package_source"] + keep_keys = {"blake2s256", "filename", "sha1", "sha256", "url"} + discard_keys = { + "date", # is equal to the revision date + "name", # was loaded above + "version", # same + } + assert ( + set(package_source_metadata) == keep_keys | discard_keys + ), package_source_metadata + + # will be loaded below + metadata["original_artifact"] = { + k: metadata["package_source"][k] for k in keep_keys + } + del metadata["package_source"] + + elif "project" in metadata: + assert metadata["original_artifact"]["url"].startswith( + "https://files.pythonhosted.org/" + ) + + # pypi loader format 2 + load_metadata( + storage, + row["id"], + row["date"], + metadata["project"], + PYPI_FORMAT, + authority=AUTHORITIES["pypi"], + dry_run=dry_run, + ) + del metadata["project"] + + elif "@xmlns" in metadata: + assert metadata["@xmlns:codemeta"] in (CODEMETA_NS, [CODEMETA_NS]) + assert "intrinsic" not in metadata + assert "extra_headers" not in metadata + + # deposit loader format 3 and 4 + handle_deposit_row(row, storage, deposit_cur, dry_run) + remove_atom_codemeta_metadata_with_xmlns(metadata) + if "client" in metadata: + del metadata["client"] # found in the deposit db + if "committer" in metadata: + del metadata["committer"] # found on the revision object + + elif "{http://www.w3.org/2005/Atom}id" in metadata: + assert "{https://doi.org/10.5063/SCHEMA/CODEMETA-2.0}author" in metadata + assert "intrinsic" not in metadata + assert "extra_headers" not in metadata + + # deposit loader format 5 + handle_deposit_row(row, storage, deposit_cur, dry_run) + remove_atom_codemeta_metadata_without_xmlns(metadata) + + # Ignore common intrinsic metadata keys + for key in ("intrinsic", "extra_headers"): + if key in metadata: + del metadata[key] + + # Ignore loader-specific intrinsic metadata keys + if type_ == "hg": + del metadata["node"] + elif type_ == "dsc": + if "package_info" in metadata: + del metadata["package_info"] + + if "original_artifact" in metadata: + load_metadata( + storage, + row["id"], + row["date"], + metadata["original_artifact"], + ORIGINAL_ARTIFACT_FORMAT, + authority=AUTHORITIES["swh"], + dry_run=dry_run, + ) + del metadata["original_artifact"] + + assert metadata == {}, ( + f"remaining metadata keys for {row['id'].hex()} (type: {row['type']}): " + f"{metadata}" + ) + + +def create_fetchers(db): + with db.cursor() as cur: + cur.execute( + """ + INSERT INTO metadata_fetcher (name, version, metadata) + VALUES (%s, %s, %s) + ON CONFLICT DO NOTHING + """, + (FETCHER.name, FETCHER.version, FETCHER.metadata), + ) + + +def main(storage_dbconn, storage_url, deposit_dbconn, first_id, dry_run): + storage_db = BaseDb.connect(storage_dbconn) + deposit_db = BaseDb.connect(deposit_dbconn) + storage = get_storage("remote", url=storage_url) + + if not dry_run: + create_fetchers(storage_db) + # Not creating authorities, as the loaders are presumably already running + # and created them already. + # This also helps make sure this script doesn't accidentally create + # authorities that differ from what the loaders use. + + total_rows = 0 + with storage_db.cursor() as read_cur: + with deposit_db.cursor() as deposit_cur: + after_id = first_id + while True: + read_cur.execute( + f"SELECT {', '.join(REVISION_COLS)} FROM revision " + f"WHERE id > %s AND metadata IS NOT NULL ORDER BY id LIMIT 1000", + (after_id,), + ) + new_rows = 0 + for row in read_cur: + row_d = dict(zip(REVISION_COLS, row)) + handle_row(row_d, storage, deposit_cur, dry_run) + new_rows += 1 + + if new_rows == 0: + break + + after_id = row_d["id"] + + total_rows += new_rows + percents = ( + int.from_bytes(after_id[0:4], byteorder="big") * 100 / (1 << 32) + ) + print( + f"Migrated {total_rows/1000000.:.2f}M rows " + f"(~{percents:.1f}%, last revision: {after_id.hex()})" + ) + + +if __name__ == "__main__": + if len(sys.argv) == 4: + (_, storage_dbconn, storage_url, deposit_dbconn) = sys.argv + first_id = "00" * 20 + elif len(sys.argv) == 5: + (_, storage_dbconn, storage_url, deposit_dbconn, first_id) = sys.argv + else: + print( + f"Syntax: {sys.argv[0]} " + f" []" + ) + exit(1) + main(storage_dbconn, storage_url, deposit_dbconn, bytes.fromhex(first_id), True)