Differential D7733 Diff 28055 java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
Changeset View
Changeset View
Standalone View
Standalone View
java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
package org.softwareheritage.graph.compress; | package org.softwareheritage.graph.compress; | ||||
import com.github.luben.zstd.ZstdOutputStream; | import com.github.luben.zstd.ZstdOutputStream; | ||||
import com.martiansoftware.jsap.*; | import com.martiansoftware.jsap.*; | ||||
import org.softwareheritage.graph.Node; | import org.softwareheritage.graph.Node; | ||||
import org.softwareheritage.graph.utils.Sort; | import org.softwareheritage.graph.utils.Sort; | ||||
import java.io.*; | import java.io.*; | ||||
import java.nio.charset.StandardCharsets; | import java.nio.charset.StandardCharsets; | ||||
import java.util.*; | import java.util.*; | ||||
import java.util.concurrent.ExecutionException; | |||||
import java.util.concurrent.ForkJoinPool; | |||||
import java.util.concurrent.atomic.AtomicLong; | |||||
import java.util.concurrent.atomic.AtomicLongArray; | |||||
/** | /** | ||||
* Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that | * Read a graph dataset and extract all the unique node SWHIDs it contains, including the ones that | ||||
* are not stored as actual objects in the graph, but only <em>referred to</em> by the edges. | * are not stored as actual objects in the graph, but only <em>referred to</em> by the edges. | ||||
* Additionally, extract the set of all unique edge labels in the graph. | * Additionally, extract the set of all unique edge labels in the graph. | ||||
* | * | ||||
* <ul> | * <ul> | ||||
* <li>The set of nodes is written in <code>${outputBasename}.nodes.csv.zst</code>, as a | * <li>The set of nodes is written in <code>${outputBasename}.nodes.csv.zst</code>, as a | ||||
▲ Show 20 Lines • Show All 93 Lines • ▼ Show 20 Lines | public static void extractNodes(GraphDataset dataset, String outputBasename, String sortBufferSize, | ||||
BufferedOutputStream labelSortStdin = new BufferedOutputStream(labelSort.getOutputStream()); | BufferedOutputStream labelSortStdin = new BufferedOutputStream(labelSort.getOutputStream()); | ||||
BufferedInputStream labelSortStdout = new BufferedInputStream(labelSort.getInputStream()); | BufferedInputStream labelSortStdout = new BufferedInputStream(labelSort.getInputStream()); | ||||
OutputStream labelsFileOutputStream = new ZstdOutputStream( | OutputStream labelsFileOutputStream = new ZstdOutputStream( | ||||
new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst"))); | new BufferedOutputStream(new FileOutputStream(outputBasename + ".labels.csv.zst"))); | ||||
LabelsOutputThread labelsOutputThread = new LabelsOutputThread(labelSortStdout, labelsFileOutputStream); | LabelsOutputThread labelsOutputThread = new LabelsOutputThread(labelSortStdout, labelsFileOutputStream); | ||||
labelsOutputThread.start(); | labelsOutputThread.start(); | ||||
// Read the dataset and write the nodes and labels to the sorting processes | // Read the dataset and write the nodes and labels to the sorting processes | ||||
long[] edgeCount = {0}; | AtomicLong edgeCount = new AtomicLong(0); | ||||
long[][] edgeCountByType = new long[Node.Type.values().length][Node.Type.values().length]; | AtomicLongArray edgeCountByType = new AtomicLongArray(Node.Type.values().length * Node.Type.values().length); | ||||
// long[][] edgeCountByType = new long[Node.Type.values().length][Node.Type.values().length]; | |||||
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); | |||||
try { | |||||
forkJoinPool.submit(() -> { | |||||
try { | |||||
dataset.readEdges((node) -> { | dataset.readEdges((node) -> { | ||||
synchronized (nodeSortStdin) { | |||||
nodeSortStdin.write(node); | nodeSortStdin.write(node); | ||||
nodeSortStdin.write('\n'); | nodeSortStdin.write('\n'); | ||||
} | |||||
}, (src, dst, label, perm) -> { | }, (src, dst, label, perm) -> { | ||||
synchronized (nodeSortStdin) { | |||||
nodeSortStdin.write(src); | nodeSortStdin.write(src); | ||||
nodeSortStdin.write('\n'); | nodeSortStdin.write('\n'); | ||||
nodeSortStdin.write(dst); | nodeSortStdin.write(dst); | ||||
nodeSortStdin.write('\n'); | nodeSortStdin.write('\n'); | ||||
} | |||||
if (label != null) { | if (label != null) { | ||||
synchronized (labelSortStdin) { | |||||
labelSortStdin.write(label); | labelSortStdin.write(label); | ||||
labelSortStdin.write('\n'); | labelSortStdin.write('\n'); | ||||
} | } | ||||
edgeCount[0]++; | } | ||||
edgeCount.incrementAndGet(); | |||||
// edgeCount[0]++; | |||||
// Extract type of src and dst from their SWHID: swh:1:XXX | // Extract type of src and dst from their SWHID: swh:1:XXX | ||||
byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3); | byte[] srcTypeBytes = Arrays.copyOfRange(src, 6, 6 + 3); | ||||
byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3); | byte[] dstTypeBytes = Arrays.copyOfRange(dst, 6, 6 + 3); | ||||
int srcType = Node.Type.byteNameToInt(srcTypeBytes); | int srcType = Node.Type.byteNameToInt(srcTypeBytes); | ||||
int dstType = Node.Type.byteNameToInt(dstTypeBytes); | int dstType = Node.Type.byteNameToInt(dstTypeBytes); | ||||
if (srcType != -1 && dstType != -1) { | if (srcType != -1 && dstType != -1) { | ||||
edgeCountByType[srcType][dstType]++; | edgeCountByType.incrementAndGet(srcType * Node.Type.values().length + dstType); | ||||
// edgeCountByType[srcType][dstType].incrementAndGet(); | |||||
// edgeCountByType[srcType][dstType]++; | |||||
} else { | } else { | ||||
System.err | System.err.println("Invalid edge type: " + new String(srcTypeBytes) + " -> " | ||||
.println("Invalid edge type: " + new String(srcTypeBytes) + " -> " + new String(dstTypeBytes)); | + new String(dstTypeBytes)); | ||||
System.exit(1); | System.exit(1); | ||||
} | } | ||||
}); | }); | ||||
} catch (IOException e) { | |||||
throw new RuntimeException(e); | |||||
} | |||||
}).get(); | |||||
} catch (ExecutionException e) { | |||||
throw new RuntimeException(e); | |||||
} | |||||
long[][] edgeCountByTypeArray = new long[Node.Type.values().length][Node.Type.values().length]; | |||||
for (int i = 0; i < edgeCountByTypeArray.length; i++) { | |||||
for (int j = 0; j < edgeCountByTypeArray[i].length; j++) { | |||||
edgeCountByTypeArray[i][j] = edgeCountByType.get(i * Node.Type.values().length + j); | |||||
} | |||||
} | |||||
// Wait for sorting processes to finish | // Wait for sorting processes to finish | ||||
nodeSortStdin.close(); | nodeSortStdin.close(); | ||||
nodeSort.waitFor(); | nodeSort.waitFor(); | ||||
labelSortStdin.close(); | labelSortStdin.close(); | ||||
labelSort.waitFor(); | labelSort.waitFor(); | ||||
nodesOutputThread.join(); | nodesOutputThread.join(); | ||||
labelsOutputThread.join(); | labelsOutputThread.join(); | ||||
// Write node, edge and label counts/statistics | // Write node, edge and label counts/statistics | ||||
printEdgeCounts(outputBasename, edgeCount[0], edgeCountByType); | printEdgeCounts(outputBasename, edgeCount.get(), edgeCountByTypeArray); | ||||
printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts()); | printNodeCounts(outputBasename, nodesOutputThread.getNodeCount(), nodesOutputThread.getNodeTypeCounts()); | ||||
printLabelCounts(outputBasename, labelsOutputThread.getLabelCount()); | printLabelCounts(outputBasename, labelsOutputThread.getLabelCount()); | ||||
} | } | ||||
private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException { | private static void printEdgeCounts(String basename, long edgeCount, long[][] edgeTypeCounts) throws IOException { | ||||
PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt"); | PrintWriter nodeCountWriter = new PrintWriter(basename + ".edges.count.txt"); | ||||
nodeCountWriter.println(edgeCount); | nodeCountWriter.println(edgeCount); | ||||
nodeCountWriter.close(); | nodeCountWriter.close(); | ||||
▲ Show 20 Lines • Show All 116 Lines • Show Last 20 Lines |