Differential D3990 Diff 14072 java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
Changeset View
Changeset View
Standalone View
Standalone View
java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
package org.softwareheritage.graph.maps; | package org.softwareheritage.graph.maps; | ||||
import it.unimi.dsi.bits.LongArrayBitVector; | import it.unimi.dsi.bits.LongArrayBitVector; | ||||
import it.unimi.dsi.fastutil.BigArrays; | import it.unimi.dsi.fastutil.BigArrays; | ||||
import it.unimi.dsi.fastutil.Size64; | import it.unimi.dsi.fastutil.Size64; | ||||
import it.unimi.dsi.fastutil.io.BinIO; | import it.unimi.dsi.fastutil.io.BinIO; | ||||
import it.unimi.dsi.fastutil.longs.LongBigArrays; | import it.unimi.dsi.fastutil.longs.LongBigArrays; | ||||
import it.unimi.dsi.fastutil.longs.LongBigList; | import it.unimi.dsi.fastutil.longs.LongBigList; | ||||
import it.unimi.dsi.fastutil.objects.Object2LongFunction; | import it.unimi.dsi.fastutil.objects.Object2LongFunction; | ||||
import it.unimi.dsi.io.FastBufferedReader; | import it.unimi.dsi.io.FastBufferedReader; | ||||
import it.unimi.dsi.io.LineIterator; | import it.unimi.dsi.io.LineIterator; | ||||
import it.unimi.dsi.logging.ProgressLogger; | import it.unimi.dsi.logging.ProgressLogger; | ||||
import org.slf4j.Logger; | import org.slf4j.Logger; | ||||
import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||
import org.softwareheritage.graph.Graph; | import org.softwareheritage.graph.Graph; | ||||
import org.softwareheritage.graph.Node; | import org.softwareheritage.graph.Node; | ||||
import org.softwareheritage.graph.SwhPID; | import org.softwareheritage.graph.SWHID; | ||||
import java.io.*; | import java.io.*; | ||||
import java.nio.charset.StandardCharsets; | import java.nio.charset.StandardCharsets; | ||||
import java.util.Scanner; | import java.util.Scanner; | ||||
import java.util.concurrent.TimeUnit; | import java.util.concurrent.TimeUnit; | ||||
/** | /** | ||||
* Create maps needed at runtime by the graph service, in particular: | * Create maps needed at runtime by the graph service, in particular: | ||||
* <p> | * <p> | ||||
* - SWH PID → WebGraph long node id | * - SWHID → WebGraph long node id | ||||
* - WebGraph long node id → SWH PID (converse of the former) | * - WebGraph long node id → SWHID (converse of the former) | ||||
* - WebGraph long node id → SWH node type (enum) | * - WebGraph long node id → SWH node type (enum) | ||||
* | * | ||||
* @author The Software Heritage developers | * @author The Software Heritage developers | ||||
*/ | */ | ||||
public class NodeMapBuilder { | public class NodeMapBuilder { | ||||
final static String SORT_BUFFER_SIZE = "40%"; | final static String SORT_BUFFER_SIZE = "40%"; | ||||
Show All 21 Lines | public class NodeMapBuilder { | ||||
* Computes and dumps on disk mapping files. | * Computes and dumps on disk mapping files. | ||||
* | * | ||||
* @param graphPath path of the compressed graph | * @param graphPath path of the compressed graph | ||||
*/ | */ | ||||
// Suppress warning for Object2LongFunction cast | // Suppress warning for Object2LongFunction cast | ||||
@SuppressWarnings("unchecked") | @SuppressWarnings("unchecked") | ||||
static void precomputeNodeIdMap(String graphPath, String tmpDir) | static void precomputeNodeIdMap(String graphPath, String tmpDir) | ||||
throws IOException { | throws IOException { | ||||
ProgressLogger plPid2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); | ProgressLogger plSWHID2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); | ||||
ProgressLogger plNode2Pid = new ProgressLogger(logger, 10, TimeUnit.SECONDS); | ProgressLogger plNode2SWHID = new ProgressLogger(logger, 10, TimeUnit.SECONDS); | ||||
plPid2Node.itemsName = "pid→node"; | plSWHID2Node.itemsName = "swhid→node"; | ||||
plNode2Pid.itemsName = "node→pid"; | plNode2SWHID.itemsName = "node→swhid"; | ||||
// avg speed for pid→node is sometime skewed due to write to the sort | // avg speed for swhid→node is sometime skewed due to write to the sort | ||||
// pipe hanging when sort is sorting; hence also desplay local speed | // pipe hanging when sort is sorting; hence also desplay local speed | ||||
plPid2Node.displayLocalSpeed = true; | plSWHID2Node.displayLocalSpeed = true; | ||||
// first half of PID->node mapping: PID -> WebGraph MPH (long) | // first half of SWHID->node mapping: SWHID -> WebGraph MPH (long) | ||||
Object2LongFunction<String> mphMap = null; | Object2LongFunction<String> mphMap = null; | ||||
try { | try { | ||||
logger.info("loading MPH function..."); | logger.info("loading MPH function..."); | ||||
mphMap = (Object2LongFunction<String>) BinIO.loadObject(graphPath + ".mph"); | mphMap = (Object2LongFunction<String>) BinIO.loadObject(graphPath + ".mph"); | ||||
logger.info("MPH function loaded"); | logger.info("MPH function loaded"); | ||||
} catch (ClassNotFoundException e) { | } catch (ClassNotFoundException e) { | ||||
logger.error("unknown class object in .mph file: " + e); | logger.error("unknown class object in .mph file: " + e); | ||||
System.exit(2); | System.exit(2); | ||||
} | } | ||||
long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); | long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); | ||||
plPid2Node.expectedUpdates = nbIds; | plSWHID2Node.expectedUpdates = nbIds; | ||||
plNode2Pid.expectedUpdates = nbIds; | plNode2SWHID.expectedUpdates = nbIds; | ||||
// second half of PID->node mapping: WebGraph MPH (long) -> BFS order (long) | // second half of SWHID->node mapping: WebGraph MPH (long) -> BFS order (long) | ||||
long[][] bfsMap = LongBigArrays.newBigArray(nbIds); | long[][] bfsMap = LongBigArrays.newBigArray(nbIds); | ||||
logger.info("loading BFS order file..."); | logger.info("loading BFS order file..."); | ||||
long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); | long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); | ||||
logger.info("BFS order file loaded"); | logger.info("BFS order file loaded"); | ||||
if (loaded != nbIds) { | if (loaded != nbIds) { | ||||
logger.error("graph contains " + nbIds + " nodes, but read " + loaded); | logger.error("graph contains " + nbIds + " nodes, but read " + loaded); | ||||
System.exit(2); | System.exit(2); | ||||
} | } | ||||
// Create mapping SWH PID -> WebGraph node id, by sequentially reading | // Create mapping SWHID -> WebGraph node id, by sequentially reading | ||||
// nodes, hashing them with MPH, and permuting according to BFS order | // nodes, hashing them with MPH, and permuting according to BFS order | ||||
FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, | FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, | ||||
StandardCharsets.US_ASCII)); | StandardCharsets.US_ASCII)); | ||||
LineIterator swhPIDIterator = new LineIterator(buffer); | LineIterator swhidIterator = new LineIterator(buffer); | ||||
// The WebGraph node id -> SWH PID mapping can be obtained from the | // The WebGraph node id -> SWHID mapping can be obtained from the | ||||
// PID->node one by numerically sorting on node id and sequentially | // SWHID->node one by numerically sorting on node id and sequentially | ||||
// writing obtained PIDs to a binary map. Delegates the sorting job to | // writing obtained SWHIDs to a binary map. Delegates the sorting job to | ||||
// /usr/bin/sort via pipes | // /usr/bin/sort via pipes | ||||
ProcessBuilder processBuilder = new ProcessBuilder(); | ProcessBuilder processBuilder = new ProcessBuilder(); | ||||
processBuilder.command("sort", "--numeric-sort", "--key", "2", | processBuilder.command("sort", "--numeric-sort", "--key", "2", | ||||
"--buffer-size", SORT_BUFFER_SIZE, | "--buffer-size", SORT_BUFFER_SIZE, | ||||
"--temporary-directory", tmpDir); | "--temporary-directory", tmpDir); | ||||
Process sort = processBuilder.start(); | Process sort = processBuilder.start(); | ||||
BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); | BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); | ||||
BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); | BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); | ||||
// for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap | // for the binary format of swhidToNodeMap, see Python module swh.graph.swhid:SwhidToIntMap | ||||
// for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap | // for the binary format of nodeToSwhidMap, see Python module swh.graph.swhid:IntToSwhidMap | ||||
try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); | try (DataOutputStream swhidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.SWHID_TO_NODE))); | ||||
BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { | BufferedOutputStream nodeToSwhidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_SWHID))) { | ||||
// background handler for sort output, it will be fed PID/node | // background handler for sort output, it will be fed SWHID/node | ||||
// pairs while pidToNodeMap is being filled, and will itself fill | // pairs while swhidToNodeMap is being filled, and will itself fill | ||||
// nodeToPidMap as soon as data from sort is ready | // nodeToSwhidMap as soon as data from sort is ready | ||||
SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap, plNode2Pid); | SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToSwhidMap, plNode2SWHID); | ||||
outputHandler.start(); | outputHandler.start(); | ||||
// Type map from WebGraph node ID to SWH type. Used at runtime by | // Type map from WebGraph node ID to SWH type. Used at runtime by | ||||
// pure Java graph traversals to efficiently check edge | // pure Java graph traversals to efficiently check edge | ||||
// restrictions. | // restrictions. | ||||
final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) | final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) | ||||
/ Math.log(2)); | / Math.log(2)); | ||||
final int nbBitsPerNodeType = log2NbTypes; | final int nbBitsPerNodeType = log2NbTypes; | ||||
LongArrayBitVector nodeTypesBitVector = | LongArrayBitVector nodeTypesBitVector = | ||||
LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); | LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); | ||||
LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); | LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); | ||||
plPid2Node.start("filling pid2node map"); | plSWHID2Node.start("filling swhid2node map"); | ||||
for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { | for (long iNode = 0; iNode < nbIds && swhidIterator.hasNext(); iNode++) { | ||||
String strSwhPID = swhPIDIterator.next().toString(); | String swhidStr = swhidIterator.next().toString(); | ||||
SwhPID swhPID = new SwhPID(strSwhPID); | SWHID swhid = new SWHID(swhidStr); | ||||
byte[] swhPIDBin = swhPID.toBytes(); | byte[] swhidBin = swhid.toBytes(); | ||||
long mphId = mphMap.getLong(strSwhPID); | long mphId = mphMap.getLong(swhidStr); | ||||
long nodeId = BigArrays.get(bfsMap, mphId); | long nodeId = BigArrays.get(bfsMap, mphId); | ||||
pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); | swhidToNodeMap.write(swhidBin, 0, swhidBin.length); | ||||
pidToNodeMap.writeLong(nodeId); | swhidToNodeMap.writeLong(nodeId); | ||||
sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") | sort_stdin.write((swhidStr + "\t" + nodeId + "\n") | ||||
.getBytes(StandardCharsets.US_ASCII)); | .getBytes(StandardCharsets.US_ASCII)); | ||||
nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); | nodeTypesMap.set(nodeId, swhid.getType().ordinal()); | ||||
plPid2Node.lightUpdate(); | plSWHID2Node.lightUpdate(); | ||||
} | } | ||||
plPid2Node.done(); | plSWHID2Node.done(); | ||||
sort_stdin.close(); | sort_stdin.close(); | ||||
// write type map | // write type map | ||||
logger.info("storing type map"); | logger.info("storing type map"); | ||||
BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); | BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); | ||||
logger.info("type map stored"); | logger.info("type map stored"); | ||||
// wait for nodeToPidMap filling | // wait for nodeToSwhidMap filling | ||||
try { | try { | ||||
logger.info("waiting for node2pid map..."); | logger.info("waiting for node2swhid map..."); | ||||
int sortExitCode = sort.waitFor(); | int sortExitCode = sort.waitFor(); | ||||
if (sortExitCode != 0) { | if (sortExitCode != 0) { | ||||
logger.error("sort returned non-zero exit code: " + sortExitCode); | logger.error("sort returned non-zero exit code: " + sortExitCode); | ||||
System.exit(2); | System.exit(2); | ||||
} | } | ||||
outputHandler.join(); | outputHandler.join(); | ||||
} catch (InterruptedException e) { | } catch (InterruptedException e) { | ||||
logger.error("processing of sort output failed with: " + e); | logger.error("processing of sort output failed with: " + e); | ||||
Show All 11 Lines | private static class SortOutputHandler extends Thread { | ||||
SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { | SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { | ||||
this.input = new Scanner(input, StandardCharsets.US_ASCII); | this.input = new Scanner(input, StandardCharsets.US_ASCII); | ||||
this.output = output; | this.output = output; | ||||
this.pl = pl; | this.pl = pl; | ||||
} | } | ||||
public void run() { | public void run() { | ||||
boolean sortDone = false; | boolean sortDone = false; | ||||
logger.info("node2pid: waiting for sort output..."); | logger.info("node2swhid: waiting for sort output..."); | ||||
while (input.hasNextLine()) { | while (input.hasNextLine()) { | ||||
if (!sortDone) { | if (!sortDone) { | ||||
sortDone = true; | sortDone = true; | ||||
this.pl.start("filling node2pid map"); | this.pl.start("filling node2swhid map"); | ||||
} | } | ||||
String line = input.nextLine(); // format: SWH_PID <TAB> NODE_ID | String line = input.nextLine(); // format: SWHID <TAB> NODE_ID | ||||
SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID | SWHID swhid = new SWHID(line.split("\\t")[0]); // get SWHID | ||||
try { | try { | ||||
output.write((byte[]) swhPID.toBytes()); | output.write((byte[]) swhid.toBytes()); | ||||
} catch (IOException e) { | } catch (IOException e) { | ||||
logger.error("writing to node->PID map failed with: " + e); | logger.error("writing to node->SWHID map failed with: " + e); | ||||
} | } | ||||
this.pl.lightUpdate(); | this.pl.lightUpdate(); | ||||
} | } | ||||
this.pl.done(); | this.pl.done(); | ||||
} | } | ||||
} | } | ||||
} | } |