diff --git a/conftest.py b/conftest.py --- a/conftest.py +++ b/conftest.py @@ -5,4 +5,5 @@ pytest_plugins = [ "swh.graph.pytest_plugin", + "swh.storage.pytest_plugin", ] diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java new file mode 100644 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021-2022 The Software Heritage developers + * Copyright (c) 2021 Antoine Pietri + * Copyright (c) 2021 Stefano Zacchiroli + * See the AUTHORS file at the top-level directory of this distribution + * License: GNU General Public License version 3, or any later version + * See top-level LICENSE file for more information + */ + +/* For each origin and each person, outputs a line "origin_swhid,person_id", + * if that person contributed to the origin. + * + * This takes the output of TopoSort on stdin. + * + */ + +package org.softwareheritage.graph.utils; + +import it.unimi.dsi.big.webgraph.LazyLongIterator; +import org.softwareheritage.graph.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Scanner; + +public class ListOriginContributors { + /* + * For nodes with a single ancestor, reuses the ancestor's set of contributors instead of copying, + * when that ancestor has no more pending successors. + */ + private static boolean optimizeReuse = true; + + public static void main(String[] args) throws IOException, ClassNotFoundException { + if (args.length != 1) { + System.err.println("Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision "); + System.exit(1); + } + String graphBasename = args[0]; + + System.err.println("Loading graph " + graphBasename + " ..."); + SwhBidirectionalGraph underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename); + System.err.println("Loading person ids"); + underlyingGraph.loadPersonIds(); + System.err.println("Selecting subgraph."); + Subgraph graph = new Subgraph(underlyingGraph, new AllowedNodes("rev,rel,snp,ori")); + System.err.println("Graph loaded."); + + Scanner stdin = new Scanner(System.in); + + String firstLine = stdin.nextLine().strip(); + if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) { + System.err.format("Unexpected header: %s\n", firstLine); + System.exit(2); + } + + /* Map each node id to its set of contributor person ids */ + HashMap> contributors = new HashMap<>(); + + /* + * For each node it, counts its number of direct successors that still need to be handled + */ + HashMap pendingSuccessors = new HashMap<>(); + + System.out.println("origin_SWHID,person_id"); + while (stdin.hasNextLine()) { + String cells[] = stdin.nextLine().strip().split(",", -1); + SWHID nodeSWHID = new SWHID(cells[0]); + long nodeId = graph.getNodeId(nodeSWHID); + long ancestorCount = Long.parseLong(cells[1]); + long successorCount = Long.parseLong(cells[2]); + String sampleAncestor1SWHID = cells[3]; + String sampleAncestor2SWHID = cells[4]; + + HashSet nodeContributors; + boolean reuseAncestorSet = optimizeReuse && (ancestorCount == 1); + + if (reuseAncestorSet) { + long ancestorNodeId = underlyingGraph.getNodeId(new SWHID(sampleAncestor1SWHID)); + if (pendingSuccessors.get(ancestorNodeId) == 1) { + nodeContributors = contributors.remove(ancestorNodeId); + pendingSuccessors.remove(ancestorNodeId); + } else { + /* Ancestor is not yet ready to be popped */ + pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); + nodeContributors = new HashSet<>(); + } + } else { + nodeContributors = new HashSet<>(); + } + + if (nodeSWHID.getType() == SwhType.REV) { + nodeContributors.add(underlyingGraph.getAuthorId(nodeId)); + nodeContributors.add(underlyingGraph.getCommitterId(nodeId)); + } else if (nodeSWHID.getType() == SwhType.REL) { + nodeContributors.add(underlyingGraph.getAuthorId(nodeId)); + } + + if (!reuseAncestorSet) { + long computedAncestorCount = 0; + LazyLongIterator it = graph.successors(nodeId); + for (long ancestorNodeId; (ancestorNodeId = it.nextLong()) != -1;) { + computedAncestorCount++; + if (pendingSuccessors.get(ancestorNodeId) == 1) { + /* + * If this node is the last unhandled successor of the ancestor; pop the ancestor information, + * as we won't need it anymore + */ + pendingSuccessors.remove(ancestorNodeId); + nodeContributors.addAll(contributors.remove(ancestorNodeId)); + } else { + /* + * The ancestor has remaining successors to handle; decrement the counter and copy its set of + * contributors to the current set + */ + pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1); + nodeContributors.addAll(contributors.get(ancestorNodeId)); + } + } + + if (ancestorCount != computedAncestorCount) { + System.err.format("Mismatched ancestor count: expected %d, found %d", ancestorCount, + computedAncestorCount); + System.exit(2); + } + } + + if (nodeSWHID.getType() == SwhType.ORI) { + nodeContributors.forEach((contributorId) -> { + System.out.format("%s,%d\n", nodeSWHID.toString(), contributorId); + }); + } + + if (successorCount > 0) { + /* + * If the node has any successor, store its set of contributors for later + */ + contributors.put(nodeId, nodeContributors); + pendingSuccessors.put(nodeId, successorCount); + } + } + } +} diff --git a/requirements-luigi.txt b/requirements-luigi.txt --- a/requirements-luigi.txt +++ b/requirements-luigi.txt @@ -1,2 +1,3 @@ luigi +pyzstd tqdm diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ swh.core[http] >= 0.3 swh.model >= 0.13.0 swh.dataset +swh.storage # only for tests diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py --- a/swh/graph/luigi.py +++ b/swh/graph/luigi.py @@ -74,7 +74,7 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Tuple import luigi @@ -437,6 +437,31 @@ return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") +def _run_script(script: str, output_path: Path) -> None: + import os + import subprocess + + from .config import check_config + + conf: Dict = {} # TODO: configurable + + conf = check_config(conf) + env = { + **os.environ.copy(), + "JAVA_TOOL_OPTIONS": conf["java_tool_options"], + "CLASSPATH": conf["classpath"], + } + + tmp_output_path = Path(f"{output_path}.tmp") + + subprocess.run( + ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True + ) + + # Atomically write the output file + tmp_output_path.replace(output_path) + + class TopoSort(luigi.Task): """Creates a file that contains all SWHIDs in topological order from a compressed graph.""" @@ -455,31 +480,169 @@ def run(self) -> None: """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" - import os - import subprocess + object_types = "rev,rel,snp,ori" + class_name = "org.softwareheritage.graph.utils.TopoSort" + script = f""" + java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ + | pv --line-mode --wait \ + | zstdmt -19 + """ + _run_script(script, self.topological_order_path) - from .config import check_config - conf: Dict = {} # TODO: configurable +class ListOriginContributors(luigi.Task): + """Creates a file that contains all SWHIDs in topological order from a compressed + graph.""" + + local_graph_path = luigi.PathParameter() + topological_order_path = luigi.PathParameter() + origin_contributors_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") - conf = check_config(conf) - env = { - **os.environ.copy(), - "JAVA_TOOL_OPTIONS": conf["java_tool_options"], - "CLASSPATH": conf["classpath"], - } + def requires(self) -> List[luigi.Task]: + """Returns an instance of :class:`LocalGraph` and :class:`TopoSort`.""" + return [ + LocalGraph(local_graph_path=self.local_graph_path), + TopoSort( + local_graph_path=self.local_graph_path, + topological_order_path=self.topological_order_path, + graph_name=self.graph_name, + ), + ] - tmp_path = Path(f"{self.topological_order_path}.tmp") + def output(self) -> luigi.Target: + """.csv.zst file that contains the topological order.""" + return luigi.LocalTarget(self.origin_contributors_path) - object_types = "rev,rel,snp,ori" - class_name = "org.softwareheritage.graph.utils.TopoSort" + def run(self) -> None: + """Runs org.softwareheritage.graph.utils.TopoSort and compresses""" + class_name = "org.softwareheritage.graph.utils.ListOriginContributors" script = f""" - java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \ - | pv --line-mode --wait \ - | zstdmt -19 \ - > '{tmp_path}' + zstdcat {self.topological_order_path} \ + | java {class_name} '{self.local_graph_path}/{self.graph_name}' \ + | pv --line-mode --wait \ + | zstdmt -19 """ - subprocess.run(["bash", "-c", script], env=env, check=True) + _run_script(script, self.origin_contributors_path) + + +class ExportDeanonymizationTable(luigi.Task): + """Exports (from swh-storage) a .csv.zst file that contains the columns: + ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``. + + The first column is the anonymized full name found in :file:`graph.persons.csv.zst` + in the compressed graph, and the latter two are the original name.""" + + storage_dsn = luigi.Parameter( + default="service=swh", + description="postgresql DSN of the swh-storage database to read from.", + ) + deanonymization_table_path = luigi.PathParameter() + + def output(self) -> luigi.Target: + """.csv.zst file that contains the table.""" + return luigi.LocalTarget(self.deanonymization_table_path) + + def run(self) -> None: + """Runs a postgresql query to compute the table.""" + + _run_script( + f""" + psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19 + """, # noqa + self.deanonymization_table_path, + ) + + +class DeanonymizeOriginContributors(luigi.Task): + """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s, + but with ``person_base64`` and ``person_escaped`` columns in addition to + ``person_id``. + + This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names + instead of names); which may not be true depending on how the swh-dataset export + cas configured. + """ + + local_graph_path = luigi.PathParameter() + graph_name = luigi.Parameter(default="graph") + origin_contributors_path = luigi.PathParameter() + deanonymization_table_path = luigi.PathParameter() + deanonymized_origin_contributors_path = luigi.PathParameter() + + def requires(self) -> List[luigi.Task]: + """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`, + and :class:`ExportDeanonymizationTable`.""" + return [ + LocalGraph(local_graph_path=self.local_graph_path), + ListOriginContributors( + local_graph_path=self.local_graph_path, + origin_contributors_path=self.origin_contributors_path, + ), + ExportDeanonymizationTable( + deanonymization_table_path=self.deanonymization_table_path, + ), + ] + + def output(self) -> luigi.Target: + """.csv.zst file similar to :meth:`ListOriginContributors.output`'s, + but with ``person_base64`` and ``person_escaped`` columns in addition to + ``person_id``""" + return luigi.LocalTarget(self.deanonymized_origin_contributors_path) + + def run(self) -> None: + """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset + and the deanonymization table in memory, then uses them to map each row + in the original (anonymized) contributors list to the deanonymized one.""" + # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export + # was configured to do so); this should add support for it. + + import base64 + import csv + + import pyzstd + + # Load the deanonymization table, to map sha256(name) to base64(name) + # and escape(name) + sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {} + with pyzstd.open(self.deanonymization_table_path, "rt") as fd: + csv_reader = csv.reader(fd) + header = next(csv_reader) + assert header == ["sha256_base64", "base64", "escaped"], header + for line in csv_reader: + (base64_sha256_name, base64_name, escaped_name) = line + sha256_name = base64.b64decode(base64_sha256_name) + name = base64.b64decode(base64_name) + sha256_to_names[sha256_name] = (name, escaped_name) + + # Combine with the list of sha256(name), to get the list of base64(name) + # and escape(name) + persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst" + with pyzstd.open(persons_path, "rb") as fd: + person_id_to_names: List[Tuple[bytes, str]] = [ + sha256_to_names.pop(base64.b64decode(line.strip()), (b"", "")) + for line in fd + ] + + tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp") + + # Finally, write a new table of origin_contributors, by reading the anonymized + # table line-by-line and deanonymizing each id + + # Open temporary output for writes as CSV + with pyzstd.open(tmp_output_path, "wt") as output_fd: + csv_writer = csv.writer(output_fd, lineterminator="\n") + # write header + csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped")) + + # Open input for reads as CSV + with pyzstd.open(self.origin_contributors_path, "rt") as input_fd: + csv_reader = csv.reader(input_fd) + header = next(csv_reader) + assert header == ["origin_SWHID", "person_id"], header + for (origin_swhid, person_id) in csv_reader: + (name, escaped_name) = person_id_to_names[int(person_id)] + base64_name = base64.b64encode(name).decode("ascii") + csv_writer.writerow((origin_swhid, base64_name, escaped_name)) - # Atomically write the output file - tmp_path.replace(self.topological_order_path) + tmp_output_path.replace(self.deanonymized_origin_contributors_path) diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py new file mode 100644 --- /dev/null +++ b/swh/graph/tests/test_origin_contributors.py @@ -0,0 +1,180 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +from pathlib import Path +import subprocess + +from swh.graph.luigi import ( + DeanonymizeOriginContributors, + ExportDeanonymizationTable, + ListOriginContributors, +) +from swh.model.model import ( + ObjectType, + Person, + Release, + Revision, + RevisionType, + TimestampWithTimezone, +) + +from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER + +DATA_DIR = Path(__file__).parents[0] / "dataset" + + +# FIXME: do not hardcode ids here; they should be dynamically loaded +# from the test graph +ORIGIN_CONTRIBUTORS = """\ +origin_SWHID,person_id +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,0 +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,2 +""" + +DEANONYMIZATION_TABLE = """\ +sha256_base64,base64,escaped +8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe +aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe +UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe +""" # noqa + +PERSONS = """\ +aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U= +UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs= +8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI= +""" + +DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\ +origin_SWHID,person_base64,person_escaped +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe +swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe +""" # noqa + + +def test_list_origin_contributors(tmpdir): + tmpdir = Path(tmpdir) + + topological_order_path = tmpdir / "topo_order.csv.zst" + origin_contributors_path = tmpdir / "origin_contributors.csv.zst" + + subprocess.run( + ["zstdmt", "-o", topological_order_path], + input=TOPOLOGICAL_ORDER.encode(), + check=True, + ) + + task = ListOriginContributors( + local_graph_path=DATA_DIR / "compressed", + topological_order_path=topological_order_path, + origin_contributors_path=origin_contributors_path, + graph_name="example", + ) + + task.run() + + csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode() + + assert csv_text == ORIGIN_CONTRIBUTORS + + +def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage): + tmpdir = Path(tmpdir) + + tstz = TimestampWithTimezone.from_datetime( + datetime.datetime.now(tz=datetime.timezone.utc) + ) + swh_storage.release_add( + [ + Release( + name=b"v1.0", + message=b"first release", + author=Person.from_fullname(b"John Doe "), + target=b"\x00" * 20, + target_type=ObjectType.REVISION, + synthetic=True, + ) + ] + ) + swh_storage.revision_add( + [ + Revision( + message=b"first commit", + author=Person.from_fullname(b"Jane Doe "), + committer=Person.from_fullname(b"Jane Doe "), + date=tstz, + committer_date=tstz, + directory=b"\x00" * 20, + type=RevisionType.GIT, + synthetic=True, + ) + ] + ) + + deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" + + task = ExportDeanonymizationTable( + storage_dsn=swh_storage_postgresql.dsn, + deanonymization_table_path=deanonymization_table_path, + ) + + task.run() + + csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode() + + (header, *rows) = csv_text.split("\n") + (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n") + + assert header == expected_header + + assert rows.pop() == "", "Missing trailing newline" + expected_rows.pop() + + assert set(rows) == set(expected_rows) + + +def test_deanonymize_origin_contributors(tmpdir): + tmpdir = Path(tmpdir) + + persons_path = tmpdir / "example.persons.csv.zst" + origin_contributors_path = tmpdir / "origin_contributors.csv.zst" + deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst" + deanonymized_origin_contributors_path = ( + tmpdir / "origin_contributors.deanonymized.csv.zst" + ) + + subprocess.run( + ["zstdmt", "-o", origin_contributors_path], + input=ORIGIN_CONTRIBUTORS.encode(), + check=True, + ) + + subprocess.run( + ["zstdmt", "-o", persons_path], + input=PERSONS.encode(), + check=True, + ) + + subprocess.run( + ["zstdmt", "-o", deanonymization_table_path], + input=DEANONYMIZATION_TABLE.encode(), + check=True, + ) + + task = DeanonymizeOriginContributors( + local_graph_path=tmpdir, + origin_contributors_path=origin_contributors_path, + deanonymization_table_path=deanonymization_table_path, + deanonymized_origin_contributors_path=deanonymized_origin_contributors_path, + graph_name="example", + ) + + task.run() + + csv_text = subprocess.check_output( + ["zstdcat", deanonymized_origin_contributors_path] + ).decode() + + assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS diff --git a/swh/graph/tests/test_toposort.py b/swh/graph/tests/test_toposort.py --- a/swh/graph/tests/test_toposort.py +++ b/swh/graph/tests/test_toposort.py @@ -11,7 +11,8 @@ DATA_DIR = Path(__file__).parents[0] / "dataset" -EXPECTED_ROWS = """ +EXPECTED = """\ +SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2 swh:1:rev:0000000000000000000000000000000000000003,0,1,, swh:1:rev:0000000000000000000000000000000000000009,1,3,swh:1:rev:0000000000000000000000000000000000000003, swh:1:rel:0000000000000000000000000000000000000010,1,1,swh:1:rev:0000000000000000000000000000000000000009, @@ -38,14 +39,14 @@ csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode() - lines = csv_text.split("\n") - (header, *rows) = lines - assert header == "SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2" + (header, *rows) = csv_text.split("\n") + (expected_header, *expected_lines) = EXPECTED.split("\n") + assert header == expected_header # The only possible first line assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,," - assert set(rows) == set(EXPECTED_ROWS.split("\n")) + assert set(rows) == set(expected_lines) assert rows.pop() == "", "Missing trailing newline"