diff --git a/swh-team/swhphab.py b/swh-team/swhphab.py index bc91b35..98360a7 100644 --- a/swh-team/swhphab.py +++ b/swh-team/swhphab.py @@ -1,94 +1,94 @@ from functools import lru_cache def paginate(query, args, stop): """perform a query paginating through results until stop condition is met """ after = None keep_going = True while keep_going: r = query(**args, after=after) if not(r['data']): break for item in r['data']: if stop(item): keep_going = False break yield item after = r['cursor']['after'] @lru_cache() def lookup_repo(phab, repo_phid): """lookup Phabricator repository by PHID """ if repo_phid: return phab.phid.query(phids=[repo_phid])[repo_phid] else: return None # stacked diffs do not have an associated repo def pp_repo(repo): """pretty print a short name for a given repository """ if repo: return repo['uri'].split('/')[-2] else: return 'None' @lru_cache() def whoami(phab): """return current user's PHID """ return phab.user.whoami()['phid'] def print_tasks(phab, tasks): """print a brief list of Phabricator tasks, with some context Args: phab: Phabricator instance tasks(iterable): tasks to be printed """ for t in tasks: print('- T{id} | {status} | {name}'.format( id=t['id'], status=t['fields']['status']['value'], name=t['fields']['name'])) def print_commits(phab, commits): """print a list of Phabricator commits, with some context Args: phab: Phabricator instance commits(iterable): commits to be printed """ for c in commits: repo = lookup_repo(phab, c['fields']['repositoryPHID']) print('- {id} | {repo} | {msg}'.format( id=c['fields']['identifier'][:12], repo=pp_repo(repo), msg=c['fields']['message'].split('\n')[0])) -def print_reviews(phab, reviews): +def print_diffs(phab, reviews): """print a list of Phabricator diffs, with some context Args: phab: Phabricator instance reviews(iterable): diffs to be printed """ for r in reviews: repo = lookup_repo(phab, r['fields']['repositoryPHID']) - print('- D{id} | {repo} | {title}'.format( + print('- https://forge.softwareheritage.org/D{id} | {repo} | {title}'.format( id=r['id'], repo=pp_repo(repo), title=r['fields']['title'])) diff --git a/vlorentz/migrate_extid.py b/vlorentz/migrate_extid.py new file mode 100644 index 0000000..07da80d --- /dev/null +++ b/vlorentz/migrate_extid.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2020-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This is an executable script to external identifiers from metadata in +the revision table to the new ExtID storage, with a linear scan of the whole +table +""" + +import sys +import time +from typing import Any, Dict, Optional + +import psycopg2 + +from swh.core.db import BaseDb +from swh.loader.package.debian.loader import DebianLoader +from swh.loader.package.archive.loader import ArchiveLoader +from swh.loader.package.cran.loader import CRANLoader +from swh.loader.package.npm.loader import NpmLoader, EXTID_TYPE as NPM_EXTID_TYPE +from swh.loader.package.nixguix.loader import NixGuixLoader +from swh.loader.package.pypi.loader import PyPILoader +from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.model import ExtID, Sha1Git +from swh.storage import get_storage +from swh.storage.migrate_extrinsic_metadata import REVISION_COLS + +HG_EXTID_TYPE = "hg-nodeid" + +DEBIAN_LOADER = DebianLoader(None, "http://does-not-matter.example/", None, None, None) +"""Dummy debian loader, we only need it to compute the extid""" + +ARCHIVE_LOADER = ArchiveLoader(None, "http://does-not-matter.example/", []) +"""Dummy archive loader, we only need it to compute the extid""" + + +def handle_hg_row(revision_swhid: CoreSWHID, metadata: Dict) -> ExtID: + nodeid = hash_to_bytes(metadata["node"]) + return ExtID(extid_type=HG_EXTID_TYPE, extid=nodeid, target=revision_swhid) + + +def handle_dsc_row(revision_swhid: CoreSWHID, metadata: Dict) -> ExtID: + try: + res = DEBIAN_LOADER.known_artifact_to_extid(metadata) + except TypeError as e: + if e.args == ("__init__() missing 1 required positional argument: 'md5sum'",): + # ??? + return None + else: + raise + if res: + (extid_type, extid) = res + return ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) + return None + + +def handle_tar_row(revision_swhid: CoreSWHID, metadata: Dict) -> Optional[ExtID]: + import pprint + + # pprint.pprint(metadata) + + provider = metadata.get("extrinsic", {}).get("provider", "") + package_source = metadata.get("package_source", {}) + package_source_url = package_source.get("url", "") + + original_artifacts = metadata.get("original_artifact") + + # Old loaders wrote a dict instead of a list + if isinstance(original_artifacts, dict): + original_artifacts = [original_artifacts] + + if original_artifacts: + original_artifact = original_artifacts[0] + else: + original_artifact = {} + + url = original_artifact.get("url", "") + + if url.startswith("https://files.pythonhosted.org/") or provider.startswith( + "https://pypi.org/pypi/" + ): + res = PyPILoader.known_artifact_to_extid(metadata) + if res: + (extid_type, extid) = res + return ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) + + if provider.startswith("https://replicate.npmjs.com/"): + res = NpmLoader.known_artifact_to_extid(metadata) + if res: + (extid_type, extid) = res + return ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) + + if provider.startswith("https://cran.r-project.org/"): + res = CRANLoader.known_artifact_to_extid(metadata) + if res: + (extid_type, extid) = res + return ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) + + if provider in ( + "https://nix-community.github.io/nixpkgs-swh/sources-unstable.json", + "https://guix.gnu.org/sources.json", + ): + res = NixGuixLoader.known_artifact_to_extid(metadata) + if res: + (extid_type, extid) = res + return ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) + + if provider.startswith(("https://ftp.gnu.org/gnu/", "https://ftp.gnu.org/old-gnu/")): + res = ARCHIVE_LOADER.known_artifact_to_extid(metadata) + if res: + (extid_type, extid) = res + return ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) + + if package_source_url.startswith("https://registry.npmjs.org/"): + return ExtID( + extid_type=NPM_EXTID_TYPE, + extid=hash_to_bytes(package_source["sha1"]), + target=revision_swhid, + ) + + if url.startswith("https://deposit.softwareheritage.org/") or provider.startswith("https://deposit.softwareheritage.org/") or "{http://www.w3.org/2005/Atom}client" in metadata or "@xmlns:codemeta" in metadata: + # Deposits don't have ExtIDs + return None + + if ( + "sha1_git" in original_artifact + and "url" not in original_artifact + and not provider + and not package_source_url + ): + # Very old loader, doesn't tell where the package is coming from + return None + + assert False, "\n" + pprint.pformat(metadata) + + return None + + +def handle_row(row: Dict[str, Any], storage, dry_run: bool): + type_ = row["type"] + + if type_ in ("git", "svn"): + return + + revision_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=row["id"]) + metadata = row["metadata"] + + if type_ == "hg": + extid = handle_hg_row(revision_swhid, metadata) + elif type_ == "tar": + extid = handle_tar_row(revision_swhid, metadata) + elif type_ == "dsc": + extid = handle_dsc_row(revision_swhid, metadata) + else: + assert False, f"revision {hash_to_hex(row['id'])} has unknown type: {type_}" + + if extid is not None: + if dry_run: + print(f"storing: {extid.extid_type}:{extid.extid.hex()} -> {extid.target}") + else: + storage.extid_add([extid]) + + +def iter_revision_rows(storage_dbconn: str, first_id: Sha1Git): + after_id = first_id + failures = 0 + while True: + try: + storage_db = BaseDb.connect(storage_dbconn) + with storage_db.cursor() as cur: + while True: + cur.execute( + f"SELECT {', '.join(REVISION_COLS)} FROM revision " + f"WHERE id > %s AND metadata IS NOT NULL " + f"AND type IN ('hg', 'tar', 'dsc') " + f"ORDER BY id LIMIT 1000", + (after_id,), + ) + new_rows = 0 + for row in cur: + new_rows += 1 + row_d = dict(zip(REVISION_COLS, row)) + yield row_d + after_id = row_d["id"] + if new_rows == 0: + return + except psycopg2.OperationalError as e: + print(e) + # most likely a temporary error, try again + if failures >= 60: + raise + else: + time.sleep(60) + failures += 1 + + +def main(storage_dbconn, storage_url, first_id, dry_run): + storage = get_storage( + "pipeline", + steps=[ + {"cls": "buffer"}, + {"cls": "retry"}, + { + "cls": "local", + "db": storage_dbconn, + "objstorage": {"cls": "memory", "args": {}}, + }, + ], + ) + + total_rows = 0 + for row in iter_revision_rows(storage_dbconn, first_id): + handle_row(row, storage, dry_run) + + total_rows += 1 + + if total_rows % 1000 == 0: + percents = int.from_bytes(row["id"][0:4], byteorder="big") * 100 / (1 << 32) + print( + f"Processed {total_rows/1000000.:.2f}M rows " + f"(~{percents:.1f}%, last revision: {row['id'].hex()})" + ) + + storage.flush() + + +if __name__ == "__main__": + if len(sys.argv) == 3: + (_, storage_dbconn, storage_url) = sys.argv + first_id = "00" * 20 + elif len(sys.argv) == 4: + (_, storage_dbconn, storage_url, first_id) = sys.argv + else: + print(f"Syntax: {sys.argv[0]} []") + exit(1) + + main(storage_dbconn, storage_url, bytes.fromhex(first_id), True)