diff --git a/java/src/main/java/org/softwareheritage/graph/SwhGraph.java b/java/src/main/java/org/softwareheritage/graph/SwhGraph.java
index aee50cd..d95bc73 100644
--- a/java/src/main/java/org/softwareheritage/graph/SwhGraph.java
+++ b/java/src/main/java/org/softwareheritage/graph/SwhGraph.java
@@ -1,151 +1,156 @@
/*
* Copyright (c) 2021-2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
package org.softwareheritage.graph;
import java.io.IOException;
/**
* Common interface for SWH graph classes.
*
* This interface forwards all property loading/access methods to the SwhGraphProperties object
* returned by the getProperties() method of the implementing class. This allows API users to write
* graph.getNodeType() instead of graph.getProperties().getNodeType().
*/
public interface SwhGraph {
/**
* Cleans up graph resources after use.
*/
void close() throws IOException;
/**
* Returns the SWH graph properties object of this graph.
*
* @return graph properties
*/
SwhGraphProperties getProperties();
/** @see SwhGraphProperties#getPath() */
default String getPath() {
return getProperties().getPath();
}
/** @see SwhGraphProperties#getNodeId(SWHID) */
default long getNodeId(SWHID swhid) {
return getProperties().getNodeId(swhid);
}
/** @see SwhGraphProperties#getSWHID(long) */
default SWHID getSWHID(long nodeId) {
return getProperties().getSWHID(nodeId);
}
/** @see SwhGraphProperties#getNodeType(long) */
default SwhType getNodeType(long nodeId) {
return getProperties().getNodeType(nodeId);
}
/** @see SwhGraphProperties#loadContentLength() */
default void loadContentLength() throws IOException {
getProperties().loadContentLength();
}
/** @see SwhGraphProperties#getContentLength(long) */
default Long getContentLength(long nodeId) {
return getProperties().getContentLength(nodeId);
}
/** @see SwhGraphProperties#loadPersonIds() */
default void loadPersonIds() throws IOException {
getProperties().loadPersonIds();
}
/** @see SwhGraphProperties#getAuthorId(long) */
default Long getAuthorId(long nodeId) {
return getProperties().getAuthorId(nodeId);
}
/** @see SwhGraphProperties#getCommitterId(long) */
default Long getCommitterId(long nodeId) {
return getProperties().getCommitterId(nodeId);
}
/** @see SwhGraphProperties#loadContentIsSkipped() */
default void loadContentIsSkipped() throws IOException {
getProperties().loadContentIsSkipped();
}
/** @see SwhGraphProperties#isContentSkipped(long) */
default boolean isContentSkipped(long nodeId) {
return getProperties().isContentSkipped(nodeId);
}
/** @see SwhGraphProperties#loadAuthorTimestamps() */
default void loadAuthorTimestamps() throws IOException {
getProperties().loadAuthorTimestamps();
}
/** @see SwhGraphProperties#getAuthorTimestamp(long) */
default Long getAuthorTimestamp(long nodeId) {
return getProperties().getAuthorTimestamp(nodeId);
}
/** @see SwhGraphProperties#getAuthorTimestampOffset(long) */
default Short getAuthorTimestampOffset(long nodeId) {
return getProperties().getAuthorTimestampOffset(nodeId);
}
/** @see SwhGraphProperties#loadCommitterTimestamps() */
default void loadCommitterTimestamps() throws IOException {
getProperties().loadCommitterTimestamps();
}
/** @see SwhGraphProperties#getCommitterTimestamp(long) */
default Long getCommitterTimestamp(long nodeId) {
return getProperties().getCommitterTimestamp(nodeId);
}
/** @see SwhGraphProperties#getCommitterTimestampOffset(long) */
default Short getCommitterTimestampOffset(long nodeId) {
return getProperties().getCommitterTimestampOffset(nodeId);
}
/** @see SwhGraphProperties#loadMessages() */
default void loadMessages() throws IOException {
getProperties().loadMessages();
}
/** @see SwhGraphProperties#getMessage(long) */
default byte[] getMessage(long nodeId) {
return getProperties().getMessage(nodeId);
}
+ /** @see SwhGraphProperties#getMessageBase64(long) */
+ default byte[] getMessageBase64(long nodeId) {
+ return getProperties().getMessageBase64(nodeId);
+ }
+
/** @see SwhGraphProperties#getUrl(long) */
default String getUrl(long nodeId) {
return getProperties().getUrl(nodeId);
}
/** @see SwhGraphProperties#loadTagNames() */
default void loadTagNames() throws IOException {
getProperties().loadTagNames();
}
/** @see SwhGraphProperties#getTagName(long) */
default byte[] getTagName(long nodeId) {
return getProperties().getTagName(nodeId);
}
/** @see SwhGraphProperties#loadLabelNames() */
default void loadLabelNames() throws IOException {
getProperties().loadLabelNames();
}
/** @see SwhGraphProperties#getLabelName(long) */
default byte[] getLabelName(long labelId) {
return getProperties().getLabelName(labelId);
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java b/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java
index 3372947..064a7df 100644
--- a/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java
+++ b/java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java
@@ -1,330 +1,335 @@
/*
* Copyright (c) 2021-2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
package org.softwareheritage.graph;
import it.unimi.dsi.big.util.MappedFrontCodedStringBigList;
import it.unimi.dsi.bits.LongArrayBitVector;
import it.unimi.dsi.fastutil.bytes.ByteBigList;
import it.unimi.dsi.fastutil.bytes.ByteMappedBigList;
import it.unimi.dsi.fastutil.ints.IntBigList;
import it.unimi.dsi.fastutil.ints.IntMappedBigList;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.longs.LongMappedBigList;
import it.unimi.dsi.fastutil.shorts.ShortBigList;
import it.unimi.dsi.fastutil.shorts.ShortMappedBigList;
import it.unimi.dsi.sux4j.util.EliasFanoLongBigList;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.softwareheritage.graph.maps.NodeIdMap;
import org.softwareheritage.graph.maps.NodeTypesMap;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Base64;
/**
* This objects contains SWH graph properties such as node labels.
*
* Some property mappings are necessary because Software Heritage uses string based persistent
* identifiers (SWHID) while WebGraph uses integers internally.
*
* The two node ID mappings (long id ↔ SWHID) are used for the input (users refer to the graph
* using SWHID) and the output (convert back to SWHID for users results).
*
* Since graph traversal can be restricted depending on the node type (see {@link AllowedEdges}), a
* long id → node type map is stored as well to avoid a full SWHID lookup.
*
* @see NodeIdMap
* @see NodeTypesMap
*/
public class SwhGraphProperties {
private final String path;
private final NodeIdMap nodeIdMap;
private final NodeTypesMap nodeTypesMap;
private LongBigList authorTimestamp;
private ShortBigList authorTimestampOffset;
private LongBigList committerTimestamp;
private ShortBigList committerTimestampOffset;
private LongBigList contentLength;
private LongArrayBitVector contentIsSkipped;
private IntBigList authorId;
private IntBigList committerId;
private ByteBigList messageBuffer;
private LongBigList messageOffsets;
private ByteBigList tagNameBuffer;
private LongBigList tagNameOffsets;
private MappedFrontCodedStringBigList edgeLabelNames;
protected SwhGraphProperties(String path, NodeIdMap nodeIdMap, NodeTypesMap nodeTypesMap) {
this.path = path;
this.nodeIdMap = nodeIdMap;
this.nodeTypesMap = nodeTypesMap;
}
public static SwhGraphProperties load(String path) throws IOException {
return new SwhGraphProperties(path, new NodeIdMap(path), new NodeTypesMap(path));
}
/**
* Cleans up resources after use.
*/
public void close() throws IOException {
edgeLabelNames.close();
}
/** Return the basename of the compressed graph */
public String getPath() {
return path;
}
/**
* Converts {@link SWHID} node to long.
*
* @param swhid node specified as a {@link SWHID}
* @return internal long node id
* @see SWHID
*/
public long getNodeId(SWHID swhid) {
return nodeIdMap.getNodeId(swhid);
}
/**
* Converts long id node to {@link SWHID}.
*
* @param nodeId node specified as a long id
* @return external SWHID
* @see SWHID
*/
public SWHID getSWHID(long nodeId) {
return nodeIdMap.getSWHID(nodeId);
}
/**
* Returns node type.
*
* @param nodeId node specified as a long id
* @return corresponding node type
* @see SwhType
*/
public SwhType getNodeType(long nodeId) {
return nodeTypesMap.getType(nodeId);
}
private static LongBigList loadMappedLongs(String path) throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
return LongMappedBigList.map(raf.getChannel());
}
}
private static IntBigList loadMappedInts(String path) throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
return IntMappedBigList.map(raf.getChannel());
}
}
private static ShortBigList loadMappedShorts(String path) throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
return ShortMappedBigList.map(raf.getChannel());
}
}
private static ByteBigList loadMappedBytes(String path) throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
return ByteMappedBigList.map(raf.getChannel());
}
}
private static LongBigList loadEFLongs(String path) throws IOException {
try {
return (EliasFanoLongBigList) BinIO.loadObject(path);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
private static byte[] getLine(ByteBigList byteArray, long start) {
long end = start;
while (end < byteArray.size64() && byteArray.getByte(end) != '\n') {
end++;
}
int length = (int) (end - start);
byte[] buffer = new byte[length];
byteArray.getElements(start, buffer, 0, length);
return buffer;
}
/** Load the sizes of the content nodes */
public void loadContentLength() throws IOException {
contentLength = loadMappedLongs(path + ".property.content.length.bin");
}
/** Get the size (in bytes) of the given content node */
public Long getContentLength(long nodeId) {
if (contentLength == null) {
throw new IllegalStateException("Content lengths not loaded");
}
long res = contentLength.getLong(nodeId);
return (res >= 0) ? res : null;
}
/** Load the IDs of the persons (authors and committers) */
public void loadPersonIds() throws IOException {
authorId = loadMappedInts(path + ".property.author_id.bin");
committerId = loadMappedInts(path + ".property.committer_id.bin");
}
/** Get a unique integer ID representing the author of the given revision or release node */
public Long getAuthorId(long nodeId) {
if (authorId == null) {
throw new IllegalStateException("Author IDs not loaded");
}
long res = authorId.getInt(nodeId);
return (res >= 0) ? res : null;
}
/** Get a unique integer ID representing the committer of the given revision node */
public Long getCommitterId(long nodeId) {
if (committerId == null) {
throw new IllegalStateException("Committer IDs not loaded");
}
long res = committerId.getInt(nodeId);
return (res >= 0) ? res : null;
}
/**
* Loads a boolean array indicating whether the given content node was skipped during archive
* ingestion
*/
public void loadContentIsSkipped() throws IOException {
try {
contentIsSkipped = (LongArrayBitVector) BinIO.loadObject(path + ".property.content.is_skipped.bin");
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
/** Returns whether the given content node was skipped during archive ingestion */
public boolean isContentSkipped(long nodeId) {
if (contentIsSkipped == null) {
throw new IllegalStateException("Skipped content array not loaded");
}
return contentIsSkipped.getBoolean(nodeId);
}
/** Load the timestamps at which the releases and revisions were authored */
public void loadAuthorTimestamps() throws IOException {
authorTimestamp = loadMappedLongs(path + ".property.author_timestamp.bin");
authorTimestampOffset = loadMappedShorts(path + ".property.author_timestamp_offset.bin");
}
/** Return the timestamp at which the given revision or release was authored */
public Long getAuthorTimestamp(long nodeId) {
if (authorTimestamp == null) {
throw new IllegalStateException("Author timestamps not loaded");
}
long res = authorTimestamp.getLong(nodeId);
return (res > Long.MIN_VALUE) ? res : null;
}
/** Return the timestamp offset at which the given revision or release was authored */
public Short getAuthorTimestampOffset(long nodeId) {
if (authorTimestampOffset == null) {
throw new IllegalStateException("Author timestamp offsets not loaded");
}
short res = authorTimestampOffset.getShort(nodeId);
return (res > Short.MIN_VALUE) ? res : null;
}
/** Load the timestamps at which the releases and revisions were committed */
public void loadCommitterTimestamps() throws IOException {
committerTimestamp = loadMappedLongs(path + ".property.committer_timestamp.bin");
committerTimestampOffset = loadMappedShorts(path + ".property.committer_timestamp_offset.bin");
}
/** Return the timestamp at which the given revision was committed */
public Long getCommitterTimestamp(long nodeId) {
if (committerTimestamp == null) {
throw new IllegalStateException("Committer timestamps not loaded");
}
long res = committerTimestamp.getLong(nodeId);
return (res > Long.MIN_VALUE) ? res : null;
}
/** Return the timestamp offset at which the given revision was committed */
public Short getCommitterTimestampOffset(long nodeId) {
if (committerTimestampOffset == null) {
throw new IllegalStateException("Committer timestamp offsets not loaded");
}
short res = committerTimestampOffset.getShort(nodeId);
return (res > Short.MIN_VALUE) ? res : null;
}
/** Load the revision messages, the release messages and the origin URLs */
public void loadMessages() throws IOException {
messageBuffer = loadMappedBytes(path + ".property.message.bin");
messageOffsets = loadMappedLongs(path + ".property.message.offset.bin");
}
/** Get the message of the given revision or release node */
public byte[] getMessage(long nodeId) {
+ return Base64.getDecoder().decode(getMessageBase64(nodeId));
+ }
+
+ /** Get the message of the given revision or release node, encoded as a base64 byte array */
+ public byte[] getMessageBase64(long nodeId) {
if (messageBuffer == null || messageOffsets == null) {
throw new IllegalStateException("Messages not loaded");
}
long startOffset = messageOffsets.getLong(nodeId);
if (startOffset == -1) {
return null;
}
- return Base64.getDecoder().decode(getLine(messageBuffer, startOffset));
+ return getLine(messageBuffer, startOffset);
}
/** Get the URL of the given origin node */
public String getUrl(long nodeId) {
byte[] url = getMessage(nodeId);
return (url != null) ? new String(url) : null;
}
/** Load the release names */
public void loadTagNames() throws IOException {
tagNameBuffer = loadMappedBytes(path + ".property.tag_name.bin");
tagNameOffsets = loadMappedLongs(path + ".property.tag_name.offset.bin");
}
/** Get the name of the given release node */
public byte[] getTagName(long nodeId) {
if (tagNameBuffer == null || tagNameOffsets == null) {
throw new IllegalStateException("Tag names not loaded");
}
long startOffset = tagNameOffsets.getLong(nodeId);
if (startOffset == -1) {
return null;
}
return Base64.getDecoder().decode(getLine(tagNameBuffer, startOffset));
}
/** Load the arc label names (directory entry names and snapshot branch names) */
public void loadLabelNames() throws IOException {
try {
edgeLabelNames = MappedFrontCodedStringBigList.load(path + ".labels.fcl");
} catch (ConfigurationException e) {
throw new IOException(e);
}
}
/**
* Get the arc label name (either a directory entry name or snapshot branch name) associated with
* the given label ID
*/
public byte[] getLabelName(long labelId) {
if (edgeLabelNames == null) {
throw new IllegalStateException("Label names not loaded");
}
return Base64.getDecoder().decode(edgeLabelNames.getArray(labelId));
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java
index ba13af9..f7132af 100644
--- a/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java
+++ b/java/src/main/java/org/softwareheritage/graph/utils/ListOriginContributors.java
@@ -1,151 +1,166 @@
/*
* Copyright (c) 2022 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
/* For each origin and each person, outputs a line "origin_id,person_id",
* if that person contributed to the origin.
*
+ * A .csv table containing "origin_id,origin_url_base64" is also written
+ * to the given path.
+ *
* This takes the output of TopoSort on stdin.
*
*/
package org.softwareheritage.graph.utils;
import it.unimi.dsi.big.webgraph.LazyLongIterator;
import org.softwareheritage.graph.*;
+import java.io.PrintWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Scanner;
public class ListOriginContributors {
/*
* For nodes with a single ancestor, reuses the ancestor's set of contributors instead of copying,
* when that ancestor has no more pending successors.
*/
private static boolean optimizeReuse = true;
public static void main(String[] args) throws IOException, ClassNotFoundException {
- if (args.length != 1) {
- System.err.println("Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision ");
+ if (args.length != 2) {
+ System.err.println(
+ "Syntax: java org.softwareheritage.graph.utils.FindEarliestRevision ");
System.exit(1);
}
String graphBasename = args[0];
+ PrintWriter originUrlsFileWriter = new PrintWriter(args[1]);
System.err.println("Loading graph " + graphBasename + " ...");
SwhBidirectionalGraph underlyingGraph = SwhBidirectionalGraph.loadMapped(graphBasename);
System.err.println("Loading person ids");
underlyingGraph.loadPersonIds();
+ System.err.println("Loading messages");
+ underlyingGraph.loadMessages();
System.err.println("Selecting subgraph.");
Subgraph graph = new Subgraph(underlyingGraph, new AllowedNodes("rev,rel,snp,ori"));
System.err.println("Graph loaded.");
Scanner stdin = new Scanner(System.in);
String firstLine = stdin.nextLine().strip();
if (!firstLine.equals("SWHID,ancestors,successors,sample_ancestor1,sample_ancestor2")) {
System.err.format("Unexpected header: %s\n", firstLine);
System.exit(2);
}
/* Map each node id to its set of contributor person ids */
HashMap> contributors = new HashMap<>();
/*
* For each node it, counts its number of direct successors that still need to be handled
*/
HashMap pendingSuccessors = new HashMap<>();
System.out.println("origin_id,person_id");
+ originUrlsFileWriter.println("origin_id,origin_url_base64");
while (stdin.hasNextLine()) {
String cells[] = stdin.nextLine().strip().split(",", -1);
SWHID nodeSWHID = new SWHID(cells[0]);
long nodeId = graph.getNodeId(nodeSWHID);
long ancestorCount = Long.parseLong(cells[1]);
long successorCount = Long.parseLong(cells[2]);
String sampleAncestor1SWHID = cells[3];
String sampleAncestor2SWHID = cells[4];
HashSet nodeContributors;
boolean reuseAncestorSet = optimizeReuse && (ancestorCount == 1);
if (reuseAncestorSet) {
long ancestorNodeId = underlyingGraph.getNodeId(new SWHID(sampleAncestor1SWHID));
if (pendingSuccessors.get(ancestorNodeId) == 1) {
nodeContributors = contributors.remove(ancestorNodeId);
pendingSuccessors.remove(ancestorNodeId);
} else {
/* Ancestor is not yet ready to be popped */
pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1);
nodeContributors = new HashSet<>();
}
} else {
nodeContributors = new HashSet<>();
}
Long personId;
if (nodeSWHID.getType() == SwhType.REV) {
personId = underlyingGraph.getAuthorId(nodeId);
if (personId != null) {
nodeContributors.add(personId);
}
personId = underlyingGraph.getCommitterId(nodeId);
if (personId != null) {
nodeContributors.add(personId);
}
} else if (nodeSWHID.getType() == SwhType.REL) {
personId = underlyingGraph.getAuthorId(nodeId);
if (personId != null) {
nodeContributors.add(personId);
}
}
if (!reuseAncestorSet) {
long computedAncestorCount = 0;
LazyLongIterator it = graph.successors(nodeId);
for (long ancestorNodeId; (ancestorNodeId = it.nextLong()) != -1;) {
computedAncestorCount++;
if (pendingSuccessors.get(ancestorNodeId) == 1) {
/*
* If this node is the last unhandled successor of the ancestor; pop the ancestor information,
* as we won't need it anymore
*/
pendingSuccessors.remove(ancestorNodeId);
nodeContributors.addAll(contributors.remove(ancestorNodeId));
} else {
/*
* The ancestor has remaining successors to handle; decrement the counter and copy its set of
* contributors to the current set
*/
pendingSuccessors.put(ancestorNodeId, pendingSuccessors.get(ancestorNodeId) - 1);
nodeContributors.addAll(contributors.get(ancestorNodeId));
}
}
if (ancestorCount != computedAncestorCount) {
System.err.format("Mismatched ancestor count: expected %d, found %d", ancestorCount,
computedAncestorCount);
System.exit(2);
}
}
if (nodeSWHID.getType() == SwhType.ORI) {
nodeContributors.forEach((contributorId) -> {
System.out.format("%d,%d\n", nodeId, contributorId);
});
+ byte[] url = underlyingGraph.getMessageBase64(nodeId);
+ if (url != null) {
+ originUrlsFileWriter.format("%d,%s\n", nodeId, new String(url));
+ }
}
if (successorCount > 0) {
/*
* If the node has any successor, store its set of contributors for later
*/
contributors.put(nodeId, nodeContributors);
pendingSuccessors.put(nodeId, successorCount);
}
}
+
+ originUrlsFileWriter.flush();
}
}
diff --git a/swh/graph/luigi/origin_contributors.py b/swh/graph/luigi/origin_contributors.py
index 96e5374..654a851 100644
--- a/swh/graph/luigi/origin_contributors.py
+++ b/swh/graph/luigi/origin_contributors.py
@@ -1,199 +1,209 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks for contribution graph
==================================
This module contains `Luigi `_ tasks
driving the creation of the graph of contributions of people (pseudonymized
by default).
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from pathlib import Path
from typing import Dict, Iterable, List, Tuple, cast
import luigi
from .compressed_graph import LocalGraph
from .misc_datasets import TopoSort
from .utils import run_script
class ListOriginContributors(luigi.Task):
"""Creates a file that contains all SWHIDs in topological order from a compressed
graph."""
local_graph_path = luigi.PathParameter()
topological_order_path = luigi.PathParameter()
origin_contributors_path = luigi.PathParameter()
+ origin_urls_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
def requires(self) -> List[luigi.Task]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
and :class:`swh.graph.luigi.misc_datasets.TopoSort`."""
return [
LocalGraph(local_graph_path=self.local_graph_path),
TopoSort(
local_graph_path=self.local_graph_path,
topological_order_path=self.topological_order_path,
graph_name=self.graph_name,
),
]
def output(self) -> luigi.Target:
""".csv.zst file that contains the topological order."""
return luigi.LocalTarget(self.origin_contributors_path)
def run(self) -> None:
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
+ import tempfile
+
class_name = "org.softwareheritage.graph.utils.ListOriginContributors"
- script = f"""
- zstdcat {self.topological_order_path} \
- | java {class_name} '{self.local_graph_path}/{self.graph_name}' \
- | pv --line-mode --wait \
- | zstdmt -19
- """
- run_script(script, self.origin_contributors_path)
+ with tempfile.NamedTemporaryFile(
+ prefix="origin_urls_", suffix=".csv"
+ ) as origin_urls_fd:
+ script = f"""
+ zstdcat {self.topological_order_path} \
+ | java {class_name} '{self.local_graph_path}/{self.graph_name}' '{origin_urls_fd.name}' \
+ | pv --line-mode --wait \
+ | zstdmt -19
+ """ # noqa
+ run_script(script, self.origin_contributors_path)
+ run_script(
+ f"pv '{origin_urls_fd.name}' | zstdmt -19",
+ self.origin_urls_path,
+ )
class ExportDeanonymizationTable(luigi.Task):
"""Exports (from swh-storage) a .csv.zst file that contains the columns:
``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
in the compressed graph, and the latter two are the original name."""
storage_dsn = luigi.Parameter(
default="service=swh",
description="postgresql DSN of the swh-storage database to read from.",
)
deanonymization_table_path = luigi.PathParameter()
def output(self) -> luigi.Target:
""".csv.zst file that contains the table."""
return luigi.LocalTarget(self.deanonymization_table_path)
def run(self) -> None:
"""Runs a postgresql query to compute the table."""
run_script(
f"""
psql '{self.storage_dsn}' -c "\
COPY (
SELECT
encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, \
encode(fullname, 'base64') as base64, \
encode(fullname, 'escape') as escaped \
FROM person \
) TO STDOUT CSV HEADER \
" | zstdmt -19
""", # noqa
self.deanonymization_table_path,
)
class DeanonymizeOriginContributors(luigi.Task):
"""Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
but with ``person_base64`` and ``person_escaped`` columns in addition to
``person_id``.
This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
instead of names); which may not be true depending on how the swh-dataset export
was configured.
"""
local_graph_path = luigi.PathParameter()
graph_name = luigi.Parameter(default="graph")
origin_contributors_path = luigi.PathParameter()
deanonymization_table_path = luigi.PathParameter()
deanonymized_origin_contributors_path = luigi.PathParameter()
def requires(self) -> List[luigi.Task]:
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
and :class:`ExportDeanonymizationTable`."""
return [
LocalGraph(local_graph_path=self.local_graph_path),
ListOriginContributors(
local_graph_path=self.local_graph_path,
origin_contributors_path=self.origin_contributors_path,
),
ExportDeanonymizationTable(
deanonymization_table_path=self.deanonymization_table_path,
),
]
def output(self) -> luigi.Target:
""".csv.zst file similar to :meth:`ListOriginContributors.output`'s,
but with ``person_base64`` and ``person_escaped`` columns in addition to
``person_id``"""
return luigi.LocalTarget(self.deanonymized_origin_contributors_path)
def run(self) -> None:
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
and the deanonymization table in memory, then uses them to map each row
in the original (anonymized) contributors list to the deanonymized one."""
# TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
# was configured to do so); this should add support for it.
import base64
import csv
import pyzstd
# Load the deanonymization table, to map sha256(name) to base64(name)
# and escape(name)
sha256_to_names: Dict[bytes, Tuple[bytes, str]] = {}
with pyzstd.open(self.deanonymization_table_path, "rt") as fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], fd))
header = next(csv_reader)
assert header == ["sha256_base64", "base64", "escaped"], header
for line in csv_reader:
(base64_sha256_name, base64_name, escaped_name) = line
sha256_name = base64.b64decode(base64_sha256_name)
name = base64.b64decode(base64_name)
sha256_to_names[sha256_name] = (name, escaped_name)
# Combine with the list of sha256(name), to get the list of base64(name)
# and escape(name)
persons_path = self.local_graph_path / f"{self.graph_name}.persons.csv.zst"
with pyzstd.open(persons_path, "rb") as fd:
person_id_to_names: List[Tuple[bytes, str]] = [
sha256_to_names.pop(base64.b64decode(line.strip()), (b"", ""))
for line in fd
]
tmp_output_path = Path(f"{self.deanonymized_origin_contributors_path}.tmp")
tmp_output_path.parent.mkdir(parents=True, exist_ok=True)
# Finally, write a new table of origin_contributors, by reading the anonymized
# table line-by-line and deanonymizing each id
# Open temporary output for writes as CSV
with pyzstd.open(tmp_output_path, "wt") as output_fd:
csv_writer = csv.writer(output_fd, lineterminator="\n")
# write header
csv_writer.writerow(("origin_id", "person_base64", "person_escaped"))
# Open input for reads as CSV
with pyzstd.open(self.origin_contributors_path, "rt") as input_fd:
# TODO: remove that cast once we dropped Python 3.7 support
csv_reader = csv.reader(cast(Iterable[str], input_fd))
header = next(csv_reader)
assert header == ["origin_id", "person_id"], header
for (origin_id, person_id) in csv_reader:
if person_id == "null":
# FIXME: workaround for a bug in contribution graphs generated
# before 2022-12-01. Those were only used in tests and never
# published, so the conditional can be removed when this is
# productionized
continue
(name, escaped_name) = person_id_to_names[int(person_id)]
base64_name = base64.b64encode(name).decode("ascii")
csv_writer.writerow((origin_id, base64_name, escaped_name))
tmp_output_path.replace(self.deanonymized_origin_contributors_path)
diff --git a/swh/graph/tests/test_origin_contributors.py b/swh/graph/tests/test_origin_contributors.py
index 29aaf40..4cbe74e 100644
--- a/swh/graph/tests/test_origin_contributors.py
+++ b/swh/graph/tests/test_origin_contributors.py
@@ -1,186 +1,206 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import base64
import datetime
from pathlib import Path
import subprocess
from swh.graph.luigi.origin_contributors import (
DeanonymizeOriginContributors,
ExportDeanonymizationTable,
ListOriginContributors,
)
from swh.model.model import (
ObjectType,
Person,
Release,
Revision,
RevisionType,
TimestampWithTimezone,
)
from .test_toposort import EXPECTED as TOPOLOGICAL_ORDER
DATA_DIR = Path(__file__).parents[0] / "dataset"
# FIXME: do not hardcode ids here; they should be dynamically loaded
# from the test graph
ORIGIN_CONTRIBUTORS = """\
origin_id,person_id
2,0
2,2
0,0
0,1
0,2
"""
+assert (
+ base64.b64decode("aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGg=")
+ == b"https://example.com/swh/graph"
+)
+assert (
+ base64.b64decode("aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGgy")
+ == b"https://example.com/swh/graph2"
+)
+
+ORIGIN_URLS = """\
+origin_id,origin_url_base64
+2,aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGg=
+0,aHR0cHM6Ly9leGFtcGxlLmNvbS9zd2gvZ3JhcGgy
+"""
+
DEANONYMIZATION_TABLE = """\
sha256_base64,base64,escaped
8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe
""" # noqa
PERSONS = """\
aZA9TeLhVzqVDQHQOd53UABAZYyek0tY3vTo6VSlA4U=
UaCrgAZBvn1LBd2sAinmdNvAX/G4sjo1aJA9GDd9UUs=
8qhF7WQ2bmeoRbZipAaqtNw6QdOCDcpggLWCQLzITsI=
"""
DEANONYMIZED_ORIGIN_CONTRIBUTORS = """\
origin_id,person_base64,person_escaped
2,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
2,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
0,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5jb20+,Jane Doe
0,SmFuZSBEb2UgPGpkb2VAZXhhbXBsZS5uZXQ+,Jane Doe
0,Sm9obiBEb2UgPGpkb2VAZXhhbXBsZS5vcmc+,John Doe
""" # noqa
def test_list_origin_contributors(tmpdir):
tmpdir = Path(tmpdir)
topological_order_path = tmpdir / "topo_order.csv.zst"
origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
+ origin_urls_path = tmpdir / "origin_urls.csv.zst"
subprocess.run(
["zstdmt", "-o", topological_order_path],
input=TOPOLOGICAL_ORDER.encode(),
check=True,
)
task = ListOriginContributors(
local_graph_path=DATA_DIR / "compressed",
topological_order_path=topological_order_path,
origin_contributors_path=origin_contributors_path,
+ origin_urls_path=origin_urls_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(["zstdcat", origin_contributors_path]).decode()
-
assert csv_text == ORIGIN_CONTRIBUTORS
+ urls_text = subprocess.check_output(["zstdcat", origin_urls_path]).decode()
+ assert urls_text == ORIGIN_URLS
+
def test_export_deanonymization_table(tmpdir, swh_storage_postgresql, swh_storage):
tmpdir = Path(tmpdir)
tstz = TimestampWithTimezone.from_datetime(
datetime.datetime.now(tz=datetime.timezone.utc)
)
swh_storage.release_add(
[
Release(
name=b"v1.0",
message=b"first release",
author=Person.from_fullname(b"John Doe "),
target=b"\x00" * 20,
target_type=ObjectType.REVISION,
synthetic=True,
)
]
)
swh_storage.revision_add(
[
Revision(
message=b"first commit",
author=Person.from_fullname(b"Jane Doe "),
committer=Person.from_fullname(b"Jane Doe "),
date=tstz,
committer_date=tstz,
directory=b"\x00" * 20,
type=RevisionType.GIT,
synthetic=True,
)
]
)
deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
task = ExportDeanonymizationTable(
storage_dsn=swh_storage_postgresql.dsn,
deanonymization_table_path=deanonymization_table_path,
)
task.run()
csv_text = subprocess.check_output(["zstdcat", deanonymization_table_path]).decode()
(header, *rows) = csv_text.split("\n")
(expected_header, *expected_rows) = DEANONYMIZATION_TABLE.split("\n")
assert header == expected_header
assert rows.pop() == "", "Missing trailing newline"
expected_rows.pop()
assert set(rows) == set(expected_rows)
def test_deanonymize_origin_contributors(tmpdir):
tmpdir = Path(tmpdir)
persons_path = tmpdir / "example.persons.csv.zst"
origin_contributors_path = tmpdir / "origin_contributors.csv.zst"
deanonymization_table_path = tmpdir / "person_sha256_to_names.csv.zst"
deanonymized_origin_contributors_path = (
tmpdir / "sensitive" / "origin_contributors.deanonymized.csv.zst"
)
subprocess.run(
["zstdmt", "-o", origin_contributors_path],
input=ORIGIN_CONTRIBUTORS.encode(),
check=True,
)
subprocess.run(
["zstdmt", "-o", persons_path],
input=PERSONS.encode(),
check=True,
)
subprocess.run(
["zstdmt", "-o", deanonymization_table_path],
input=DEANONYMIZATION_TABLE.encode(),
check=True,
)
task = DeanonymizeOriginContributors(
local_graph_path=tmpdir,
origin_contributors_path=origin_contributors_path,
deanonymization_table_path=deanonymization_table_path,
deanonymized_origin_contributors_path=deanonymized_origin_contributors_path,
graph_name="example",
)
task.run()
csv_text = subprocess.check_output(
["zstdcat", deanonymized_origin_contributors_path]
).decode()
assert csv_text == DEANONYMIZED_ORIGIN_CONTRIBUTORS