diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java index f7132af..16cb8eb 100644 --- a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java +++ b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java @@ -1,166 +1,166 @@ /* * Copyright (c) 2022 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ -/* For each origin and each person, outputs a line "origin_id,person_id", - * if that person contributed to the origin. +/* For each origin and each contributor, outputs a line "origin_id,contributor_id", + * if that contributor contributed to the origin. * * A .csv table containing "origin_id,origin_url_base64" is also written * to the given path. * * This takes the output of TopoSort on stdin. * */ package org.softwareheritage.graph.utils; import it.unimi.dsi.big.webgraph.LazyLongIterator; import org.softwareheritage.graph.*; import java.io.PrintWriter; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Scanner; public class ListOriginContributors { /* * For nodes with a single ancestor, reuses the ancestor's set of contributors instead of copying, * when that ancestor has no more pending successors. */ private static boolean optimizeReuse = true; public static void main(String[] args) throws IOException, ClassNotFoundException { if (args.length != 2) { System.err.println( "Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision "); System.exit(1); } String graphBasename = args[0]; PrintWriter originUrlsFileWriter = new PrintWriter(args[1]); System.err.println("Loading graph " + graphBasename + " ..."); SwhBidirectionalGraph underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename); System.err.println("Loading person ids"); underlyingGraph.loadPersonIds(); System.err.println("Loading messages"); underlyingGraph.loadMessages(); System.err.println("Selecting subgraph."); Subgraph graph = new Subgraph(underlyingGraph, new AllowedNodes("rev,rel,snp,ori")); System.err.println("Graph loaded."); Scanner stdin = new Scanner(System.in); String firstLine = stdin.nextLine().strip(); if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) { System.err.format("Unexpected header: %s\n", firstLine); System.exit(2); } /* Map each node id to its set of contributor person ids */ HashMap> contributors = new HashMap<>(); /* * For each node it, counts its number of direct successors that still need to be handled */ HashMap pendingSuccessors = new HashMap<>(); - System.out.println("origin_id,person_id"); + System.out.println("origin_id,contributor_id"); originUrlsFileWriter.println("origin_id,origin_url_base64"); while (stdin.hasNextLine()) { String cells[] = stdin.nextLine().strip().split(",", -1); SWHID nodeSWHID = new SWHID(cells[0]); long nodeId = graph.getNodeId(nodeSWHID); long ancestorCount = Long.parseLong(cells[1]); long successorCount = Long.parseLong(cells[2]); String sampleAncestor1SWHID = cells[3]; String sampleAncestor2SWHID = cells[4]; HashSet nodeContributors; boolean reuseAncestorSet = optimizeReuse && (ancestorCount == 1); if (reuseAncestorSet) { long ancestorNodeId = underlyingGraph.getNodeId(new SWHID(sampleAncestor1SWHID)); if (pendingSuccessors.get(ancestorNodeId) == 1) { nodeContributors = contributors.remove(ancestorNodeId); pendingSuccessors.remove(ancestorNodeId); } else { /* Ancestor is not yet ready to be popped */ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); nodeContributors = new HashSet<>(); } } else { nodeContributors = new HashSet<>(); } Long personId; if (nodeSWHID.getType() == SwhType.REV) { personId = underlyingGraph.getAuthorId(nodeId); if (personId != null) { nodeContributors.add(personId); } personId = underlyingGraph.getCommitterId(nodeId); if (personId != null) { nodeContributors.add(personId); } } else if (nodeSWHID.getType() == SwhType.REL) { personId = underlyingGraph.getAuthorId(nodeId); if (personId != null) { nodeContributors.add(personId); } } if (!reuseAncestorSet) { long computedAncestorCount = 0; LazyLongIterator it = graph.successors(nodeId); for (long ancestorNodeId; (ancestorNodeId = it.nextLong()) != -1;) { computedAncestorCount++; if (pendingSuccessors.get(ancestorNodeId) == 1) { /* * If this node is the last unhandled successor of the ancestor; pop the ancestor information, * as we won't need it anymore */ pendingSuccessors.remove(ancestorNodeId); nodeContributors.addAll(contributors.remove(ancestorNodeId)); } else { /* * The ancestor has remaining successors to handle; decrement the counter and copy its set of * contributors to the current set */ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); nodeContributors.addAll(contributors.get(ancestorNodeId)); } } if (ancestorCount != computedAncestorCount) { System.err.format("Mismatched ancestor count: expected %d, found %d", ancestorCount, computedAncestorCount); System.exit(2); } } if (nodeSWHID.getType() == SwhType.ORI) { nodeContributors.forEach((contributorId) -> { System.out.format("%d,%d\n", nodeId, contributorId); }); byte[] url = underlyingGraph.getMessageBase64(nodeId); if (url != null) { originUrlsFileWriter.format("%d,%s\n", nodeId, new String(url)); } } if (successorCount > 0) { /* * If the node has any successor, store its set of contributors for later */ contributors.put(nodeId, nodeContributors); pendingSuccessors.put(nodeId, successorCount); } } originUrlsFileWriter.flush(); } } diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py index 654a851..8db236a 100644 --- a/swh/graph/luigi/origin_contributors.py +++ b/swh/graph/luigi/origin_contributors.py @@ -1,209 +1,211 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Luigi tasks for contribution graph ================================== This module contains `Luigi `_ tasks driving the creation of the graph of contributions of people (pseudonymized by default). """ # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path from typing import Dict, Iterable, List, Tuple, cast import luigi from .compressed_graph import LocalGraph from .misc_datasets import TopoSort from .utils import run_script class ListOriginContributors(luigi.Task): """Creates a file that contains all SWHIDs in topological order from a compressed graph.""" local_graph_path = luigi.PathParameter() topological_order_path = luigi.PathParameter() origin_contributors_path = luigi.PathParameter() origin_urls_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") def requires(self) -> List[luigi.Task]: """Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph` and :class:`swh.graph.luigi.misc_datasets.TopoSort`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), TopoSort( local_graph_path=self.local_graph_path, topological_order_path=self.topological_order_path, graph_name=self.graph_name, ), ] def output(self) -> luigi.Target: """.csv.zst file that contains the topological order.""" return luigi.LocalTarget(self.origin_contributors_path) def run(self) -> None: """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" import tempfile class_name = "org.softwareheritage.graph.utils.ListOriginContributors" with tempfile.NamedTemporaryFile( prefix="origin_urls_", suffix=".csv" ) as origin_urls_fd: script = f""" zstdcat {self.topological_order_path} \ | java {class_name} '{self.local_graph_path}/{self.graph_name}' '{origin_urls_fd.name}' \ | pv --line-mode --wait \ | zstdmt -19 """ # noqa run_script(script, self.origin_contributors_path) run_script( f"pv '{origin_urls_fd.name}' | zstdmt -19", self.origin_urls_path, ) class ExportDeanonymizationTable(luigi.Task): """Exports (from swh-storage) a .csv.zst file that contains the columns: ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. The first column is the anonymized full name found in :file:`graph.persons.csv.zst` in the compressed graph, and the latter two are the original name.""" storage_dsn = luigi.Parameter( default="service=swh", description="postgresql DSN of the swh-storage database to read from.", ) deanonymization_table_path = luigi.PathParameter() def output(self) -> luigi.Target: """.csv.zst file that contains the table.""" return luigi.LocalTarget(self.deanonymization_table_path) def run(self) -> None: """Runs a postgresql query to compute the table.""" run_script( f""" psql '{self.storage_dsn}' -c "\ COPY ( SELECT encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \ encode(fullname, 'base64') as base64, \ encode(fullname, 'escape') as escaped \ FROM person \ ) TO STDOUT CSV HEADER \ " | zstdmt -19 """, # noqa self.deanonymization_table_path, ) class DeanonymizeOriginContributors(luigi.Task): """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, - but with ``person_base64`` and ``person_escaped`` columns in addition to - ``person_id``. + but with ``contributor_base64`` and ``contributor_escaped`` columns in addition to + ``contributor_id``. This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names instead of names); which may not be true depending on how the swh-dataset export was configured. """ local_graph_path = luigi.PathParameter() graph_name = luigi.Parameter(default="graph") origin_contributors_path = luigi.PathParameter() deanonymization_table_path = luigi.PathParameter() deanonymized_origin_contributors_path = luigi.PathParameter() def requires(self) -> List[luigi.Task]: """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, and :class:`ExportDeanonymizationTable`.""" return [ LocalGraph(local_graph_path=self.local_graph_path), ListOriginContributors( local_graph_path=self.local_graph_path, origin_contributors_path=self.origin_contributors_path, ), ExportDeanonymizationTable( deanonymization_table_path=self.deanonymization_table_path, ), ] def output(self) -> luigi.Target: """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, - but with ``person_base64`` and ``person_escaped`` columns in addition to - ``person_id``""" + but with ``contributor_base64`` and ``contributor_escaped`` columns in addition + to ``contributor_id``""" return luigi.LocalTarget(self.deanonymized_origin_contributors_path) def run(self) -> None: """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset and the deanonymization table in memory, then uses them to map each row in the original (anonymized) contributors list to the deanonymized one.""" # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export # was configured to do so); this should add support for it. import base64 import csv import pyzstd # Load the deanonymization table, to map sha256(name) to base64(name) # and escape(name) sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} with pyzstd.open(self.deanonymization_table_path, "rt") as fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], fd)) header = next(csv_reader) assert header == ["sha256_base64", "base64", "escaped"], header for line in csv_reader: (base64_sha256_name, base64_name, escaped_name) = line sha256_name = base64.b64decode(base64_sha256_name) name = base64.b64decode(base64_name) sha256_to_names[sha256_name] = (name, escaped_name) # Combine with the list of sha256(name), to get the list of base64(name) # and escape(name) persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" with pyzstd.open(persons_path, "rb") as fd: person_id_to_names: List[Tuple[bytes, str]] = [ sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) for line in fd ] tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") tmp_output_path.parent.mkdir(parents=True, exist_ok=True) # Finally, write a new table of origin_contributors, by reading the anonymized # table line-by-line and deanonymizing each id # Open temporary output for writes as CSV with pyzstd.open(tmp_output_path, "wt") as output_fd: csv_writer = csv.writer(output_fd, lineterminator="\n") # write header - csv_writer.writerow(("origin_id", "person_base64", "person_escaped")) + csv_writer.writerow( + ("origin_id", "contributor_base64", "contributor_escaped") + ) # Open input for reads as CSV with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: # TODO: remove that cast once we dropped Python 3.7 support csv_reader = csv.reader(cast(Iterable[str], input_fd)) header = next(csv_reader) - assert header == ["origin_id", "person_id"], header + assert header == ["origin_id", "contributor_id"], header for (origin_id, person_id) in csv_reader: if person_id == "null": # FIXME: workaround for a bug in contribution graphs generated # before 2022-12-01. Those were only used in tests and never # published, so the conditional can be removed when this is # productionized continue (name, escaped_name) = person_id_to_names[int(person_id)] base64_name = base64.b64encode(name).decode("ascii") csv_writer.writerow((origin_id, base64_name, escaped_name)) tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py index 4cbe74e..46189dc 100644 --- a/swh/graph/tests/test_origin_contributors.py +++ b/swh/graph/tests/test_origin_contributors.py @@ -1,206 +1,206 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import datetime from pathlib import Path import subprocess from swh.graph.luigi.origin_contributors import ( DeanonymizeOriginContributors, ExportDeanonymizationTable, ListOriginContributors, ) from swh.model.model import ( ObjectType, Person, Release, Revision, RevisionType, TimestampWithTimezone, ) from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER DATA_DIR = Path(__file__).parents[0] / "dataset" # FIXME: do not hardcode ids here; they should be dynamically loaded # from the test graph ORIGIN_CONTRIBUTORS = """\ -origin_id,person_id +origin_id,contributor_id 2,0 2,2 0,0 0,1 0,2 """ assert ( base64.b64decode("aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGg=") == b"https://example.com/swh/graph" ) assert ( base64.b64decode("aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGgy") == b"https://example.com/swh/graph2" ) ORIGIN_URLS = """\ origin_id,origin_url_base64 2,aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGg= 0,aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGgy """ DEANONYMIZATION_TABLE = """\ sha256_base64,base64,escaped 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe """ # noqa PERSONS = """\ aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U= UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs= 8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI= """ DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\ -origin_id,person_base64,person_escaped +origin_id,contributor_base64,contributor_escaped 2,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe 2,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe 0,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe 0,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe 0,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe """ # noqa def test_list_origin_contributors(tmpdir): tmpdir = Path(tmpdir) topological_order_path = tmpdir / "topo_order.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" origin_urls_path = tmpdir / "origin_urls.csv.zst" subprocess.run( ["zstdmt", "-o", topological_order_path], input=TOPOLOGICAL_ORDER.encode(), check=True, ) task = ListOriginContributors( local_graph_path=DATA_DIR / "compressed", topological_order_path=topological_order_path, origin_contributors_path=origin_contributors_path, origin_urls_path=origin_urls_path, graph_name="example", ) task.run() csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode() assert csv_text == ORIGIN_CONTRIBUTORS urls_text = subprocess.check_output(["zstdcat", origin_urls_path]).decode() assert urls_text == ORIGIN_URLS def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage): tmpdir = Path(tmpdir) tstz = TimestampWithTimezone.from_datetime( datetime.datetime.now(tz=datetime.timezone.utc) ) swh_storage.release_add( [ Release( name=b"v1.0", message=b"first release", author=Person.from_fullname(b"John Doe "), target=b"\x00" * 20, target_type=ObjectType.REVISION, synthetic=True, ) ] ) swh_storage.revision_add( [ Revision( message=b"first commit", author=Person.from_fullname(b"Jane Doe "), committer=Person.from_fullname(b"Jane Doe "), date=tstz, committer_date=tstz, directory=b"\x00" * 20, type=RevisionType.GIT, synthetic=True, ) ] ) - deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" + deanonymization_table_path = tmpdir / "contributor_sha256_to_names.csv.zst" task = ExportDeanonymizationTable( storage_dsn=swh_storage_postgresql.dsn, deanonymization_table_path=deanonymization_table_path, ) task.run() csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode() (header, *rows) = csv_text.split("\n") (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n") assert header == expected_header assert rows.pop() == "", "Missing trailing newline" expected_rows.pop() assert set(rows) == set(expected_rows) def test_deanonymize_origin_contributors(tmpdir): tmpdir = Path(tmpdir) persons_path = tmpdir / "example.persons.csv.zst" origin_contributors_path = tmpdir / "origin_contributors.csv.zst" - deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" + deanonymization_table_path = tmpdir / "contributor_sha256_to_names.csv.zst" deanonymized_origin_contributors_path = ( tmpdir / "sensitive" / "origin_contributors.deanonymized.csv.zst" ) subprocess.run( ["zstdmt", "-o", origin_contributors_path], input=ORIGIN_CONTRIBUTORS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", persons_path], input=PERSONS.encode(), check=True, ) subprocess.run( ["zstdmt", "-o", deanonymization_table_path], input=DEANONYMIZATION_TABLE.encode(), check=True, ) task = DeanonymizeOriginContributors( local_graph_path=tmpdir, origin_contributors_path=origin_contributors_path, deanonymization_table_path=deanonymization_table_path, deanonymized_origin_contributors_path=deanonymized_origin_contributors_path, graph_name="example", ) task.run() csv_text = subprocess.check_output( ["zstdcat", deanonymized_origin_contributors_path] ).decode() assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS