diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java index aafafa7..8c47b67 100644 --- a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java +++ b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java @@ -1,141 +1,151 @@ /* * Copyright (c) 2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ /* For each origin and each person, outputs a line "origin_swhid,person_id", * if that person contributed to the origin. * * This takes the output of TopoSort on stdin. * */ package org.softwareheritage.graph.utils; import it.unimi.dsi.big.webgraph.LazyLongIterator; import org.softwareheritage.graph.*; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Scanner; public class ListOriginContributors { /* * For nodes with a single ancestor, reuses the ancestor's set of contributors instead of copying, * when that ancestor has no more pending successors. */ private static boolean optimizeReuse = true; public static void main(String[] args) throws IOException, ClassNotFoundException { if (args.length != 1) { System.err.println("Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision "); System.exit(1); } String graphBasename = args[0]; System.err.println("Loading graph " + graphBasename + " ..."); SwhBidirectionalGraph underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename); System.err.println("Loading person ids"); underlyingGraph.loadPersonIds(); System.err.println("Selecting subgraph."); Subgraph graph = new Subgraph(underlyingGraph, new AllowedNodes("rev,rel,snp,ori")); System.err.println("Graph loaded."); Scanner stdin = new Scanner(System.in); String firstLine = stdin.nextLine().strip(); if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) { System.err.format("Unexpected header: %s\n", firstLine); System.exit(2); } /* Map each node id to its set of contributor person ids */ HashMap> contributors = new HashMap<>(); /* * For each node it, counts its number of direct successors that still need to be handled */ HashMap pendingSuccessors = new HashMap<>(); System.out.println("origin_SWHID,person_id"); while (stdin.hasNextLine()) { String cells[] = stdin.nextLine().strip().split(",", -1); SWHID nodeSWHID = new SWHID(cells[0]); long nodeId = graph.getNodeId(nodeSWHID); long ancestorCount = Long.parseLong(cells[1]); long successorCount = Long.parseLong(cells[2]); String sampleAncestor1SWHID = cells[3]; String sampleAncestor2SWHID = cells[4]; HashSet nodeContributors; boolean reuseAncestorSet = optimizeReuse && (ancestorCount == 1); if (reuseAncestorSet) { long ancestorNodeId = underlyingGraph.getNodeId(new SWHID(sampleAncestor1SWHID)); if (pendingSuccessors.get(ancestorNodeId) == 1) { nodeContributors = contributors.remove(ancestorNodeId); pendingSuccessors.remove(ancestorNodeId); } else { /* Ancestor is not yet ready to be popped */ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); nodeContributors = new HashSet<>(); } } else { nodeContributors = new HashSet<>(); } + Long personId; if (nodeSWHID.getType() == SwhType.REV) { - nodeContributors.add(underlyingGraph.getAuthorId(nodeId)); - nodeContributors.add(underlyingGraph.getCommitterId(nodeId)); + personId = underlyingGraph.getAuthorId(nodeId); + if (personId != null) { + nodeContributors.add(personId); + } + personId = underlyingGraph.getCommitterId(nodeId); + if (personId != null) { + nodeContributors.add(personId); + } } else if (nodeSWHID.getType() == SwhType.REL) { - nodeContributors.add(underlyingGraph.getAuthorId(nodeId)); + personId = underlyingGraph.getAuthorId(nodeId); + if (personId != null) { + nodeContributors.add(personId); + } } if (!reuseAncestorSet) { long computedAncestorCount = 0; LazyLongIterator it = graph.successors(nodeId); for (long ancestorNodeId; (ancestorNodeId = it.nextLong()) != -1;) { computedAncestorCount++; if (pendingSuccessors.get(ancestorNodeId) == 1) { /* * If this node is the last unhandled successor of the ancestor; pop the ancestor information, * as we won't need it anymore */ pendingSuccessors.remove(ancestorNodeId); nodeContributors.addAll(contributors.remove(ancestorNodeId)); } else { /* * The ancestor has remaining successors to handle; decrement the counter and copy its set of * contributors to the current set */ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); nodeContributors.addAll(contributors.get(ancestorNodeId)); } } if (ancestorCount != computedAncestorCount) { System.err.format("Mismatched ancestor count: expected %d, found %d", ancestorCount, computedAncestorCount); System.exit(2); } } if (nodeSWHID.getType() == SwhType.ORI) { nodeContributors.forEach((contributorId) -> { System.out.format("%s,%d\n", nodeSWHID.toString(), contributorId); }); } if (successorCount > 0) { /* * If the node has any successor, store its set of contributors for later */ contributors.put(nodeId, nodeContributors); pendingSuccessors.put(nodeId, successorCount); } } } } diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py index dccb45c..883872c 100644 --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -1,187 +1,186 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from pathlib import Path import subprocess from swh.graph.luigi import ( DeanonymizeOriginContributors, ExportDeanonymizationTable, ListOriginContributors, ) from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, TimestampWithTimezone, ) from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: do not hardcode ids here; they should be dynamically loaded # from the test graph ORIGIN_CONTRIBUTORS = """\ origin_SWHID,person_id swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0 swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,0 -swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,null swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,1 swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,2 """ DEANONYMIZATION_TABLE = """\ sha256_base64,base64,escaped 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe """ # noqa PERSONS = """\ aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U= UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs= 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI= """ DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\ origin_SWHID,person_base64,person_escaped swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe swh:1:ori:8f50d3f60eae370ddbf85c86219c55108a350165,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe """ # noqa def test_list_origin_contributors(tmpdir): tmpdir = Path(tmpdir) topological_order_path = tmpdir / "topo_order.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" subprocess.run( ["zstdmt", "-o", topological_order_path], input=TOPOLOGICAL_ORDER.encode(), check=True, ) task = ListOriginContributors( local_graph_path=DATA_DIR / "compressed", topological_order_path=topological_order_path, origin_contributors_path=origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode() assert csv_text == ORIGIN_CONTRIBUTORS def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage): tmpdir = Path(tmpdir) tstz = TimestampWithTimezone.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc) ) swh_storage.release_add( [ Release( name=b"v1.0", message=b"first release", author=Person.from_fullname(b"John Doe "), target=b"\x00" * 20, target_type=ObjectType.REVISION, synthetic=True, ) ] ) swh_storage.revision_add( [ Revision( message=b"first commit", author=Person.from_fullname(b"Jane Doe "), committer=Person.from_fullname(b"Jane Doe "), date=tstz, committer_date=tstz, directory=b"\x00" * 20, type=RevisionType.GIT, synthetic=True, ) ] ) deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" task = ExportDeanonymizationTable( storage_dsn=swh_storage_postgresql.dsn, deanonymization_table_path=deanonymization_table_path, ) task.run() csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode() (header, *rows) = csv_text.split("\n") (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n") assert header == expected_header assert rows.pop() == "", "Missing trailing newline" expected_rows.pop() assert set(rows) == set(expected_rows) def test_deanonymize_origin_contributors(tmpdir): tmpdir = Path(tmpdir) persons_path = tmpdir / "example.persons.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" deanonymized_origin_contributors_path = ( tmpdir / "origin_contributors.deanonymized.csv.zst" ) subprocess.run( ["zstdmt", "-o", origin_contributors_path], input=ORIGIN_CONTRIBUTORS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", persons_path], input=PERSONS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", deanonymization_table_path], input=DEANONYMIZATION_TABLE.encode(), check=True, ) task = DeanonymizeOriginContributors( local_graph_path=tmpdir, origin_contributors_path=origin_contributors_path, deanonymization_table_path=deanonymization_table_path, deanonymized_origin_contributors_path=deanonymized_origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output( ["zstdcat", deanonymized_origin_contributors_path] ).decode() assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS