Differential D7733 Diff 27963 java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
Changeset View
Changeset View
Standalone View
Standalone View
java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
package org.softwareheritage.graph.compress; | package org.softwareheritage.graph.compress; | ||||
import com.github.luben.zstd.ZstdOutputStream; | import com.github.luben.zstd.ZstdOutputStream; | ||||
import com.martiansoftware.jsap.*; | import com.martiansoftware.jsap.*; | ||||
import it.unimi.dsi.logging.ProgressLogger; | |||||
import org.slf4j.Logger; | |||||
import org.slf4j.LoggerFactory; | |||||
import org.softwareheritage.graph.Node; | import org.softwareheritage.graph.Node; | ||||
import org.softwareheritage.graph.utils.Sort; | import org.softwareheritage.graph.utils.Sort; | ||||
import java.io.*; | import java.io.*; | ||||
import java.nio.charset.StandardCharsets; | import java.nio.charset.StandardCharsets; | ||||
import java.util.*; | import java.util.*; | ||||
import java.util.concurrent.ExecutionException; | |||||
import java.util.concurrent.ForkJoinPool; | |||||
import java.util.concurrent.TimeUnit; | |||||
import java.util.concurrent.atomic.AtomicInteger; | |||||
import java.util.concurrent.atomic.AtomicLong; | |||||
import java.util.concurrent.atomic.AtomicLongArray; | |||||
/** | /** | ||||
* Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that | * Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that | ||||
* are not stored as actual objects in the graph, but only <em>referred to</em> by the edges. | * are not stored as actual objects in the graph, but only <em>referred to</em> by the edges. | ||||
* Additionally, extract the set of all unique edge labels in the graph. | * Additionally, extract the set of all unique edge labels in the graph. | ||||
* | * | ||||
* <ul> | * <ul> | ||||
* <li>The set of nodes is written in <code>${outputBasename}.nodes.csv.zst</code>, as a | * <li>The set of nodes is written in <code>${outputBasename}.nodes.csv.zst</code>, as a | ||||
Show All 22 Lines | |||||
* | * | ||||
* <p> | * <p> | ||||
* This class reads the entire graph dataset, and uses <code>sort -u</code> to extract the set of | * This class reads the entire graph dataset, and uses <code>sort -u</code> to extract the set of | ||||
* all the unique nodes and unique labels that will be needed as an input for the compression | * all the unique nodes and unique labels that will be needed as an input for the compression | ||||
* process. | * process. | ||||
* </p> | * </p> | ||||
*/ | */ | ||||
public class ExtractNodes { | public class ExtractNodes { | ||||
private final static Logger logger = LoggerFactory.getLogger(ExtractNodes.class); | |||||
private static JSAPResult parseArgs(String[] args) { | private static JSAPResult parseArgs(String[] args) { | ||||
JSAPResult config = null; | JSAPResult config = null; | ||||
try { | try { | ||||
SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{ | SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{ | ||||
new UnflaggedOption("dataset", JSAP.STRING_PARSER, JSAP.REQUIRED, "Path to the edges dataset"), | new UnflaggedOption("dataset", JSAP.STRING_PARSER, JSAP.REQUIRED, "Path to the edges dataset"), | ||||
new UnflaggedOption("outputBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, | new UnflaggedOption("outputBasename", JSAP.STRING_PARSER, JSAP.REQUIRED, | ||||
"Basename of the output files"), | "Basename of the output files"), | ||||
Show All 36 Lines | public static void main(String[] args) throws IOException, InterruptedException { | ||||
throw new IllegalArgumentException("Unknown dataset format: " + datasetFormat); | throw new IllegalArgumentException("Unknown dataset format: " + datasetFormat); | ||||
} | } | ||||
extractNodes(dataset, outputBasename, sortBufferSize, sortTmpDir); | extractNodes(dataset, outputBasename, sortBufferSize, sortTmpDir); | ||||
} | } | ||||
public static void extractNodes(GraphDataset dataset, String outputBasename, String sortBufferSize, | public static void extractNodes(GraphDataset dataset, String outputBasename, String sortBufferSize, | ||||
String sortTmpDir) throws IOException, InterruptedException { | String sortTmpDir) throws IOException, InterruptedException { | ||||
// Spawn node sorting process | // Read the dataset and write the nodes and labels to the sorting processes | ||||
Process nodeSort = Sort.spawnSort(sortBufferSize, sortTmpDir); | AtomicLong edgeCount = new AtomicLong(0); | ||||
BufferedOutputStream nodeSortStdin = new BufferedOutputStream(nodeSort.getOutputStream()); | AtomicLongArray edgeCountByType = new AtomicLongArray(Node.Type.values().length * Node.Type.values().length); | ||||
BufferedInputStream nodeSortStdout = new BufferedInputStream(nodeSort.getInputStream()); | |||||
OutputStream nodesFileOutputStream = new ZstdOutputStream( | |||||
new BufferedOutputStream(new FileOutputStream(outputBasename + ".nodes.csv.zst"))); | |||||
NodesOutputThread nodesOutputThread = new NodesOutputThread(nodeSortStdout, nodesFileOutputStream); | |||||
nodesOutputThread.start(); | |||||
// Spawn label sorting process | int numThreads = Runtime.getRuntime().availableProcessors(); | ||||
Process labelSort = Sort.spawnSort(sortBufferSize, sortTmpDir); | ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads); | ||||
BufferedOutputStream labelSortStdin = new BufferedOutputStream(labelSort.getOutputStream()); | |||||
BufferedInputStream labelSortStdout = new BufferedInputStream(labelSort.getInputStream()); | |||||
OutputStream labelsFileOutputStream = new ZstdOutputStream( | |||||
new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst"))); | |||||
LabelsOutputThread labelsOutputThread = new LabelsOutputThread(labelSortStdout, labelsFileOutputStream); | |||||
labelsOutputThread.start(); | |||||
// Read the dataset and write the nodes and labels to the sorting processes | Process[] nodeSorters = new Process[numThreads]; | ||||
long[] edgeCount = {0}; | String[] nodeBatchPaths = new String[numThreads]; | ||||
long[][] edgeCountByType = new long[Node.Type.values().length][Node.Type.values().length]; | Process[] labelSorters = new Process[numThreads]; | ||||
String[] labelBatchPaths = new String[numThreads]; | |||||
long[] progressCounts = new long[numThreads]; | |||||
AtomicInteger nextThreadId = new AtomicInteger(0); | |||||
ThreadLocal<Integer> threadLocalId = ThreadLocal.withInitial(nextThreadId::getAndIncrement); | |||||
ProgressLogger pl = new ProgressLogger(logger, 10, TimeUnit.SECONDS); | |||||
pl.itemsName = "edges"; | |||||
pl.start("Reading node/edge files and writing sorted batches."); | |||||
GraphDataset.NodeCallback nodeCallback = (node) -> { | |||||
int threadId = threadLocalId.get(); | |||||
if (nodeSorters[threadId] == null) { | |||||
nodeBatchPaths[threadId] = sortTmpDir + "/nodes." + threadId + ".txt"; | |||||
nodeSorters[threadId] = Sort.spawnSort(sortBufferSize, sortTmpDir, | |||||
List.of("-o", nodeBatchPaths[threadId])); | |||||
} | |||||
OutputStream nodeOutputStream = nodeSorters[threadId].getOutputStream(); | |||||
nodeOutputStream.write(node); | |||||
nodeOutputStream.write('\n'); | |||||
}; | |||||
GraphDataset.NodeCallback labelCallback = (label) -> { | |||||
int threadId = threadLocalId.get(); | |||||
if (labelSorters[threadId] == null) { | |||||
labelBatchPaths[threadId] = sortTmpDir + "/labels." + threadId + ".txt"; | |||||
labelSorters[threadId] = Sort.spawnSort(sortBufferSize, sortTmpDir, | |||||
List.of("-o", labelBatchPaths[threadId])); | |||||
} | |||||
OutputStream labelOutputStream = labelSorters[threadId].getOutputStream(); | |||||
labelOutputStream.write(label); | |||||
labelOutputStream.write('\n'); | |||||
}; | |||||
try { | |||||
forkJoinPool.submit(() -> { | |||||
try { | |||||
dataset.readEdges((node) -> { | dataset.readEdges((node) -> { | ||||
nodeSortStdin.write(node); | nodeCallback.onNode(node); | ||||
nodeSortStdin.write('\n'); | |||||
}, (src, dst, label, perm) -> { | }, (src, dst, label, perm) -> { | ||||
nodeSortStdin.write(src); | nodeCallback.onNode(src); | ||||
nodeSortStdin.write('\n'); | nodeCallback.onNode(dst); | ||||
nodeSortStdin.write(dst); | |||||
nodeSortStdin.write('\n'); | |||||
if (label != null) { | if (label != null) { | ||||
labelSortStdin.write(label); | labelCallback.onNode(label); | ||||
labelSortStdin.write('\n'); | |||||
} | } | ||||
edgeCount[0]++; | edgeCount.incrementAndGet(); | ||||
// Extract type of src and dst from their SWHID: swh:1:XXX | // Extract type of src and dst from their SWHID: swh:1:XXX | ||||
byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3); | byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3); | ||||
byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3); | byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3); | ||||
int srcType = Node.Type.byteNameToInt(srcTypeBytes); | int srcType = Node.Type.byteNameToInt(srcTypeBytes); | ||||
int dstType = Node.Type.byteNameToInt(dstTypeBytes); | int dstType = Node.Type.byteNameToInt(dstTypeBytes); | ||||
if (srcType != -1 && dstType != -1) { | if (srcType != -1 && dstType != -1) { | ||||
edgeCountByType[srcType][dstType]++; | edgeCountByType.incrementAndGet(srcType * Node.Type.values().length + dstType); | ||||
} else { | } else { | ||||
System.err | System.err.println("Invalid edge type: " + new String(srcTypeBytes) + " -> " | ||||
.println("Invalid edge type: " + new String(srcTypeBytes) + " -> " + new String(dstTypeBytes)); | + new String(dstTypeBytes)); | ||||
System.exit(1); | System.exit(1); | ||||
} | } | ||||
int threadId = threadLocalId.get(); | |||||
if (++progressCounts[threadId] > 1000) { | |||||
synchronized (pl) { | |||||
pl.update(progressCounts[threadId]); | |||||
} | |||||
progressCounts[threadId] = 0; | |||||
} | |||||
}); | }); | ||||
} catch (IOException e) { | |||||
throw new RuntimeException(e); | |||||
} | |||||
}).get(); | |||||
} catch (ExecutionException e) { | |||||
throw new RuntimeException(e); | |||||
} | |||||
// Close all the sorters stdin | |||||
for (int i = 0; i < numThreads; i++) { | |||||
if (nodeSorters[i] != null) { | |||||
nodeSorters[i].getOutputStream().close(); | |||||
} | |||||
if (labelSorters[i] != null) { | |||||
labelSorters[i].getOutputStream().close(); | |||||
} | |||||
} | |||||
// Wait for sorting processes to finish | // Wait for sorting processes to finish | ||||
nodeSortStdin.close(); | for (int i = 0; i < numThreads; i++) { | ||||
nodeSort.waitFor(); | if (nodeSorters[i] != null) { | ||||
labelSortStdin.close(); | nodeSorters[i].waitFor(); | ||||
labelSort.waitFor(); | } | ||||
if (labelSorters[i] != null) { | |||||
labelSorters[i].waitFor(); | |||||
} | |||||
} | |||||
pl.done(); | |||||
ArrayList<String> nodeSortMergerOptions = new ArrayList<>(List.of("-m")); | |||||
ArrayList<String> labelSortMergerOptions = new ArrayList<>(List.of("-m")); | |||||
for (int i = 0; i < numThreads; i++) { | |||||
if (nodeBatchPaths[i] != null) { | |||||
nodeSortMergerOptions.add(nodeBatchPaths[i]); | |||||
} | |||||
if (labelBatchPaths[i] != null) { | |||||
labelSortMergerOptions.add(labelBatchPaths[i]); | |||||
} | |||||
} | |||||
// Spawn node merge-sorting process | |||||
Process nodeSortMerger = Sort.spawnSort(sortBufferSize, sortTmpDir, nodeSortMergerOptions); | |||||
nodeSortMerger.getOutputStream().close(); | |||||
OutputStream nodesFileOutputStream = new ZstdOutputStream( | |||||
new BufferedOutputStream(new FileOutputStream(outputBasename + ".nodes.csv.zst"))); | |||||
NodesOutputThread nodesOutputThread = new NodesOutputThread( | |||||
new BufferedInputStream(nodeSortMerger.getInputStream()), nodesFileOutputStream); | |||||
nodesOutputThread.start(); | |||||
// Spawn label merge-sorting process | |||||
Process labelSortMerger = Sort.spawnSort(sortBufferSize, sortTmpDir, labelSortMergerOptions); | |||||
labelSortMerger.getOutputStream().close(); | |||||
OutputStream labelsFileOutputStream = new ZstdOutputStream( | |||||
new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst"))); | |||||
LabelsOutputThread labelsOutputThread = new LabelsOutputThread( | |||||
new BufferedInputStream(labelSortMerger.getInputStream()), labelsFileOutputStream); | |||||
labelsOutputThread.start(); | |||||
pl.logger().info("Waiting for merge-sort and writing output files..."); | |||||
nodeSortMerger.waitFor(); | |||||
labelSortMerger.waitFor(); | |||||
nodesOutputThread.join(); | nodesOutputThread.join(); | ||||
labelsOutputThread.join(); | labelsOutputThread.join(); | ||||
long[][] edgeCountByTypeArray = new long[Node.Type.values().length][Node.Type.values().length]; | |||||
for (int i = 0; i < edgeCountByTypeArray.length; i++) { | |||||
for (int j = 0; j < edgeCountByTypeArray[i].length; j++) { | |||||
edgeCountByTypeArray[i][j] = edgeCountByType.get(i * Node.Type.values().length + j); | |||||
} | |||||
} | |||||
// Write node, edge and label counts/statistics | // Write node, edge and label counts/statistics | ||||
printEdgeCounts(outputBasename, edgeCount[0], edgeCountByType); | printEdgeCounts(outputBasename, edgeCount.get(), edgeCountByTypeArray); | ||||
printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts()); | printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts()); | ||||
printLabelCounts(outputBasename, labelsOutputThread.getLabelCount()); | printLabelCounts(outputBasename, labelsOutputThread.getLabelCount()); | ||||
} | } | ||||
private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException { | private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException { | ||||
PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt"); | PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt"); | ||||
nodeCountWriter.println(edgeCount); | nodeCountWriter.println(edgeCount); | ||||
nodeCountWriter.close(); | nodeCountWriter.close(); | ||||
▲ Show 20 Lines • Show All 116 Lines • Show Last 20 Lines |