Page MenuHomeSoftware Heritage

No OneTemporary


diff --git a/ b/
--- a/
+++ b/
@@ -5,4 +5,5 @@
pytest_plugins = [
+ "",
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ b/java/src/main/java/org/softwareheritage/graph/utils/
new file mode 100644
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/utils/
@@ -0,0 +1,141 @@
+ * Copyright (c) 2022 The Software Heritage developers
+ * See the AUTHORS file at the top-level directory of this distribution
+ * License: GNU General Public License version 3, or any later version
+ * See top-level LICENSE file for more information
+ */
+/* For each origin and each person, outputs a line "origin_swhid,person_id",
+ * if that person contributed to the origin.
+ *
+ * This takes the output of TopoSort on stdin.
+ *
+ */
+package org.softwareheritage.graph.utils;
+import it.unimi.dsi.big.webgraph.LazyLongIterator;
+import org.softwareheritage.graph.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Scanner;
+public class ListOriginContributors {
+ /*
+ * For nodes with a single ancestor, reuses the ancestor's set of contributors instead of copying,
+ * when that ancestor has no more pending successors.
+ */
+ private static boolean optimizeReuse = true;
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+ if (args.length != 1) {
+ System.err.println("Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision <path/to/graph>");
+ System.exit(1);
+ }
+ String graphBasename = args[0];
+ System.err.println("Loading graph " + graphBasename + " ...");
+ SwhBidirectionalGraph underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename);
+ System.err.println("Loading person ids");
+ underlyingGraph.loadPersonIds();
+ System.err.println("Selecting subgraph.");
+ Subgraph graph = new Subgraph(underlyingGraph, new AllowedNodes("rev,rel,snp,ori"));
+ System.err.println("Graph loaded.");
+ Scanner stdin = new Scanner(;
+ String firstLine = stdin.nextLine().strip();
+ if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) {
+ System.err.format("Unexpected header: %s\n", firstLine);
+ System.exit(2);
+ }
+ /* Map each node id to its set of contributor person ids */
+ HashMap<Long, HashSet<Long>> contributors = new HashMap<>();
+ /*
+ * For each node it, counts its number of direct successors that still need to be handled
+ */
+ HashMap<Long, Long> pendingSuccessors = new HashMap<>();
+ System.out.println("origin_SWHID,person_id");
+ while (stdin.hasNextLine()) {
+ String cells[] = stdin.nextLine().strip().split(",", -1);
+ SWHID nodeSWHID = new SWHID(cells[0]);
+ long nodeId = graph.getNodeId(nodeSWHID);
+ long ancestorCount = Long.parseLong(cells[1]);
+ long successorCount = Long.parseLong(cells[2]);
+ String sampleAncestor1SWHID = cells[3];
+ String sampleAncestor2SWHID = cells[4];
+ HashSet<Long> nodeContributors;
+ boolean reuseAncestorSet = optimizeReuse && (ancestorCount == 1);
+ if (reuseAncestorSet) {
+ long ancestorNodeId = underlyingGraph.getNodeId(new SWHID(sampleAncestor1SWHID));
+ if (pendingSuccessors.get(ancestorNodeId) == 1) {
+ nodeContributors = contributors.remove(ancestorNodeId);
+ pendingSuccessors.remove(ancestorNodeId);
+ } else {
+ /* Ancestor is not yet ready to be popped */
+ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1);
+ nodeContributors = new HashSet<>();
+ }
+ } else {
+ nodeContributors = new HashSet<>();
+ }
+ if (nodeSWHID.getType() == SwhType.REV) {
+ nodeContributors.add(underlyingGraph.getAuthorId(nodeId));
+ nodeContributors.add(underlyingGraph.getCommitterId(nodeId));
+ } else if (nodeSWHID.getType() == SwhType.REL) {
+ nodeContributors.add(underlyingGraph.getAuthorId(nodeId));
+ }
+ if (!reuseAncestorSet) {
+ long computedAncestorCount = 0;
+ LazyLongIterator it = graph.successors(nodeId);
+ for (long ancestorNodeId; (ancestorNodeId = it.nextLong()) != -1;) {
+ computedAncestorCount++;
+ if (pendingSuccessors.get(ancestorNodeId) == 1) {
+ /*
+ * If this node is the last unhandled successor of the ancestor; pop the ancestor information,
+ * as we won't need it anymore
+ */
+ pendingSuccessors.remove(ancestorNodeId);
+ nodeContributors.addAll(contributors.remove(ancestorNodeId));
+ } else {
+ /*
+ * The ancestor has remaining successors to handle; decrement the counter and copy its set of
+ * contributors to the current set
+ */
+ pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1);
+ nodeContributors.addAll(contributors.get(ancestorNodeId));
+ }
+ }
+ if (ancestorCount != computedAncestorCount) {
+ System.err.format("Mismatched ancestor count: expected %d, found %d", ancestorCount,
+ computedAncestorCount);
+ System.exit(2);
+ }
+ }
+ if (nodeSWHID.getType() == SwhType.ORI) {
+ nodeContributors.forEach((contributorId) -> {
+ System.out.format("%s,%d\n", nodeSWHID.toString(), contributorId);
+ });
+ }
+ if (successorCount > 0) {
+ /*
+ * If the node has any successor, store its set of contributors for later
+ */
+ contributors.put(nodeId, nodeContributors);
+ pendingSuccessors.put(nodeId, successorCount);
+ }
+ }
+ }
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -23,5 +23,8 @@
ignore_missing_imports = True
+ignore_missing_imports = True
ignore_missing_imports = True
diff --git a/requirements-luigi.txt b/requirements-luigi.txt
--- a/requirements-luigi.txt
+++ b/requirements-luigi.txt
@@ -1,2 +1,3 @@
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,4 @@
swh.core[http] >= 0.3
swh.model >= 0.13.0
swh.dataset # only for tests
diff --git a/swh/graph/ b/swh/graph/
--- a/swh/graph/
+++ b/swh/graph/
@@ -74,7 +74,7 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from pathlib import Path
-from typing import Dict, List
+from typing import Dict, List, Tuple
import luigi
@@ -437,6 +437,31 @@
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")
+def _run_script(script: str, output_path: Path) -> None:
+ import os
+ import subprocess
+ from .config import check_config
+ conf: Dict = {} # TODO: configurable
+ conf = check_config(conf)
+ env = {
+ **os.environ.copy(),
+ "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
+ "CLASSPATH": conf["classpath"],
+ }
+ tmp_output_path = Path(f"{output_path}.tmp")
+ ["bash", "-c", f"{script.strip()} > {tmp_output_path}"], env=env, check=True
+ )
+ # Atomically write the output file
+ tmp_output_path.replace(output_path)
class TopoSort(luigi.Task):
"""Creates a file that contains all SWHIDs in topological order from a compressed
@@ -455,31 +480,169 @@
def run(self) -> None:
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
- import os
- import subprocess
+ object_types = "rev,rel,snp,ori"
+ class_name = "org.softwareheritage.graph.utils.TopoSort"
+ script = f"""
+ java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
+ """
+ _run_script(script, self.topological_order_path)
- from .config import check_config
- conf: Dict = {} # TODO: configurable
+class ListOriginContributors(luigi.Task):
+ """Creates a file that contains all SWHIDs in topological order from a compressed
+ graph."""
+ local_graph_path = luigi.PathParameter()
+ topological_order_path = luigi.PathParameter()
+ origin_contributors_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
- conf = check_config(conf)
- env = {
- **os.environ.copy(),
- "JAVA_TOOL_OPTIONS": conf["java_tool_options"],
- "CLASSPATH": conf["classpath"],
- }
+ def requires(self) -> List[luigi.Task]:
+ """Returns an instance of :class:`LocalGraph` and :class:`TopoSort`."""
+ return [
+ LocalGraph(local_graph_path=self.local_graph_path),
+ TopoSort(
+ local_graph_path=self.local_graph_path,
+ topological_order_path=self.topological_order_path,
+ graph_name=self.graph_name,
+ ),
+ ]
- tmp_path = Path(f"{self.topological_order_path}.tmp")
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the topological order."""
+ return luigi.LocalTarget(self.origin_contributors_path)
- object_types = "rev,rel,snp,ori"
- class_name = "org.softwareheritage.graph.utils.TopoSort"
+ def run(self) -> None:
+ """Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
script = f"""
- java {class_name} '{self.local_graph_path}/{self.graph_name}' '{object_types}' \
- | pv --line-mode --wait \
- | zstdmt -19 \
- > '{tmp_path}'
+ zstdcat {self.topological_order_path} \
+ | java {class_name} '{self.local_graph_path}/{self.graph_name}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
-["bash", "-c", script], env=env, check=True)
+ _run_script(script, self.origin_contributors_path)
+class ExportDeanonymizationTable(luigi.Task):
+ """Exports (from swh-storage) a .csv.zst file that contains the columns:
+ ``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
+ The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
+ in the compressed graph, and the latter two are the original name."""
+ storage_dsn = luigi.Parameter(
+ default="service=swh",
+ description="postgresql DSN of the swh-storage database to read from.",
+ )
+ deanonymization_table_path = luigi.PathParameter()
+ def output(self) -> luigi.Target:
+ """.csv.zst file that contains the table."""
+ return luigi.LocalTarget(self.deanonymization_table_path)
+ def run(self) -> None:
+ """Runs a postgresql query to compute the table."""
+ _run_script(
+ f"""
+ psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19
+ """, # noqa
+ self.deanonymization_table_path,
+ )
+class DeanonymizeOriginContributors(luigi.Task):
+ """Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
+ but with ``person_base64`` and ``person_escaped`` columns in addition to
+ ``person_id``.
+ This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
+ instead of names); which may not be true depending on how the swh-dataset export
+ cas configured.
+ """
+ local_graph_path = luigi.PathParameter()
+ graph_name = luigi.Parameter(default="graph")
+ origin_contributors_path = luigi.PathParameter()
+ deanonymization_table_path = luigi.PathParameter()
+ deanonymized_origin_contributors_path = luigi.PathParameter()
+ def requires(self) -> List[luigi.Task]:
+ """Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
+ and :class:`ExportDeanonymizationTable`."""
+ return [
+ LocalGraph(local_graph_path=self.local_graph_path),
+ ListOriginContributors(
+ local_graph_path=self.local_graph_path,
+ origin_contributors_path=self.origin_contributors_path,
+ ),
+ ExportDeanonymizationTable(
+ deanonymization_table_path=self.deanonymization_table_path,
+ ),
+ ]
+ def output(self) -> luigi.Target:
+ """.csv.zst file similar to :meth:`ListOriginContributors.output`'s,
+ but with ``person_base64`` and ``person_escaped`` columns in addition to
+ ``person_id``"""
+ return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
+ def run(self) -> None:
+ """Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
+ and the deanonymization table in memory, then uses them to map each row
+ in the original (anonymized) contributors list to the deanonymized one."""
+ # TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
+ # was configured to do so); this should add support for it.
+ import base64
+ import csv
+ import pyzstd
+ # Load the deanonymization table, to map sha256(name) to base64(name)
+ # and escape(name)
+ sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
+ with, "rt") as fd:
+ csv_reader = csv.reader(fd)
+ header = next(csv_reader)
+ assert header == ["sha256_base64", "base64", "escaped"], header
+ for line in csv_reader:
+ (base64_sha256_name, base64_name, escaped_name) = line
+ sha256_name = base64.b64decode(base64_sha256_name)
+ name = base64.b64decode(base64_name)
+ sha256_to_names[sha256_name] = (name, escaped_name)
+ # Combine with the list of sha256(name), to get the list of base64(name)
+ # and escape(name)
+ persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
+ with, "rb") as fd:
+ person_id_to_names: List[Tuple[bytes, str]] = [
+ sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
+ for line in fd
+ ]
+ tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
+ # Finally, write a new table of origin_contributors, by reading the anonymized
+ # table line-by-line and deanonymizing each id
+ # Open temporary output for writes as CSV
+ with, "wt") as output_fd:
+ csv_writer = csv.writer(output_fd, lineterminator="\n")
+ # write header
+ csv_writer.writerow(("origin_SWHID", "person_base64", "person_escaped"))
+ # Open input for reads as CSV
+ with, "rt") as input_fd:
+ csv_reader = csv.reader(input_fd)
+ header = next(csv_reader)
+ assert header == ["origin_SWHID", "person_id"], header
+ for (origin_swhid, person_id) in csv_reader:
+ (name, escaped_name) = person_id_to_names[int(person_id)]
+ base64_name = base64.b64encode(name).decode("ascii")
+ csv_writer.writerow((origin_swhid, base64_name, escaped_name))
- # Atomically write the output file
- tmp_path.replace(self.topological_order_path)
+ tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/tests/ b/swh/graph/tests/
new file mode 100644
--- /dev/null
+++ b/swh/graph/tests/
@@ -0,0 +1,180 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+import datetime
+from pathlib import Path
+import subprocess
+from swh.graph.luigi import (
+ DeanonymizeOriginContributors,
+ ExportDeanonymizationTable,
+ ListOriginContributors,
+from swh.model.model import (
+ ObjectType,
+ Person,
+ Release,
+ Revision,
+ RevisionType,
+ TimestampWithTimezone,
+from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER
+DATA_DIR = Path(__file__).parents[0] / "dataset"
+# FIXME: do not hardcode ids here; they should be dynamically loaded
+# from the test graph
+8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe <>
+aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe <>
+UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe <>
+""" # noqa
+PERSONS = """\
+swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe <>
+swh:1:ori:83404f995118bd25774f4ac14422a8f175e7a054,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe <>
+""" # noqa
+def test_list_origin_contributors(tmpdir):
+ tmpdir = Path(tmpdir)
+ topological_order_path = tmpdir / "topo_order.csv.zst"
+ origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
+ ["zstdmt", "-o", topological_order_path],
+ input=TOPOLOGICAL_ORDER.encode(),
+ check=True,
+ )
+ task = ListOriginContributors(
+ local_graph_path=DATA_DIR / "compressed",
+ topological_order_path=topological_order_path,
+ origin_contributors_path=origin_contributors_path,
+ graph_name="example",
+ )
+ csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode()
+ assert csv_text == ORIGIN_CONTRIBUTORS
+def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage):
+ tmpdir = Path(tmpdir)
+ tstz = TimestampWithTimezone.from_datetime(
+ )
+ swh_storage.release_add(
+ [
+ Release(
+ name=b"v1.0",
+ message=b"first release",
+ author=Person.from_fullname(b"John Doe <>"),
+ target=b"\x00" * 20,
+ target_type=ObjectType.REVISION,
+ synthetic=True,
+ )
+ ]
+ )
+ swh_storage.revision_add(
+ [
+ Revision(
+ message=b"first commit",
+ author=Person.from_fullname(b"Jane Doe <>"),
+ committer=Person.from_fullname(b"Jane Doe <>"),
+ date=tstz,
+ committer_date=tstz,
+ directory=b"\x00" * 20,
+ type=RevisionType.GIT,
+ synthetic=True,
+ )
+ ]
+ )
+ deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
+ task = ExportDeanonymizationTable(
+ storage_dsn=swh_storage_postgresql.dsn,
+ deanonymization_table_path=deanonymization_table_path,
+ )
+ csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode()
+ (header, *rows) = csv_text.split("\n")
+ (expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n")
+ assert header == expected_header
+ assert rows.pop() == "", "Missing trailing newline"
+ expected_rows.pop()
+ assert set(rows) == set(expected_rows)
+def test_deanonymize_origin_contributors(tmpdir):
+ tmpdir = Path(tmpdir)
+ persons_path = tmpdir / "example.persons.csv.zst"
+ origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
+ deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
+ deanonymized_origin_contributors_path = (
+ tmpdir / "origin_contributors.deanonymized.csv.zst"
+ )
+ ["zstdmt", "-o", origin_contributors_path],
+ input=ORIGIN_CONTRIBUTORS.encode(),
+ check=True,
+ )
+ ["zstdmt", "-o", persons_path],
+ input=PERSONS.encode(),
+ check=True,
+ )
+ ["zstdmt", "-o", deanonymization_table_path],
+ check=True,
+ )
+ task = DeanonymizeOriginContributors(
+ local_graph_path=tmpdir,
+ origin_contributors_path=origin_contributors_path,
+ deanonymization_table_path=deanonymization_table_path,
+ deanonymized_origin_contributors_path=deanonymized_origin_contributors_path,
+ graph_name="example",
+ )
+ csv_text = subprocess.check_output(
+ ["zstdcat", deanonymized_origin_contributors_path]
+ ).decode()
diff --git a/swh/graph/tests/ b/swh/graph/tests/
--- a/swh/graph/tests/
+++ b/swh/graph/tests/
@@ -11,7 +11,8 @@
DATA_DIR = Path(__file__).parents[0] / "dataset"
+EXPECTED = """\
@@ -38,14 +39,14 @@
csv_text = subprocess.check_output(["zstdcat", topological_order_path]).decode()
- lines = csv_text.split("\n")
- (header, *rows) = lines
- assert header == "SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2"
+ (header, *rows) = csv_text.split("\n")
+ (expected_header, *expected_lines) = EXPECTED.split("\n")
+ assert header == expected_header
# The only possible first line
assert rows[0] == "swh:1:rev:0000000000000000000000000000000000000003,0,1,,"
- assert set(rows) == set(EXPECTED_ROWS.split("\n"))
+ assert set(rows) == set(expected_lines)
assert rows.pop() == "", "Missing trailing newline"

File Metadata

Mime Type
Dec 17 2024, 6:50 PM (11 w, 4 d ago)
Storage Engine
Storage Format
Raw Data
Storage Handle

Event Timeline