diff --git a/java/server/src/main/java/org/softwareheritage/graph/Node.java b/java/server/src/main/java/org/softwareheritage/graph/Node.java index 5c04ac1..0f18ee0 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/Node.java +++ b/java/server/src/main/java/org/softwareheritage/graph/Node.java @@ -1,105 +1,105 @@ package org.softwareheritage.graph; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * A node in the Software Heritage graph. * * @author The Software Heritage developers */ public class Node { /** * Software Heritage graph node types, as described in the * data model. */ public enum Type { /** Content node */ CNT, /** Directory node */ DIR, /** Origin node */ ORI, /** Release node */ REL, /** Revision node */ REV, /** Snapshot node */ SNP; /** * Converts integer to corresponding SWH node type. * * @param intType node type represented as an integer * @return the corresponding {@link Node.Type} value * @see org.softwareheritage.graph.Node.Type */ public static Node.Type fromInt(int intType) { switch (intType) { case 0: return CNT; case 1: return DIR; case 2: return ORI; case 3: return REL; case 4: return REV; case 5: return SNP; } return null; } /** * Converts node types to the corresponding int value * * @param type node type as an enum * @return the corresponding int value */ public static int toInt(Node.Type type) { switch (type) { case CNT: return 0; case DIR: return 1; case ORI: return 2; case REL: return 3; case REV: return 4; case SNP: return 5; } - throw new IllegalArgumentException("Unknown node type: " + type); + throw new IllegalArgumentException("Unknown node type: " + type); } /** * Converts string to corresponding SWH node type. * * @param strType node type represented as a string * @return the corresponding {@link Node.Type} value * @see org.softwareheritage.graph.Node.Type */ public static Node.Type fromStr(String strType) { if (!strType.matches("cnt|dir|ori|rel|rev|snp")) { throw new IllegalArgumentException("Unknown node type: " + strType); } return Node.Type.valueOf(strType.toUpperCase()); } /** * Parses SWH node type possible values from formatted string (see the API * syntax). * * @param strFmtType node types represented as a formatted string * @return a list containing the {@link Node.Type} values * @see org.softwareheritage.graph.Node.Type */ public static ArrayList parse(String strFmtType) { ArrayList types = new ArrayList<>(); if (strFmtType.equals("*")) { List nodeTypes = Arrays.asList(Node.Type.values()); types.addAll(nodeTypes); } else { types.add(Node.Type.fromStr(strFmtType)); } return types; } } } diff --git a/java/server/src/main/java/org/softwareheritage/graph/SwhPID.java b/java/server/src/main/java/org/softwareheritage/graph/SwhPID.java index 75c7aa8..a36d399 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/SwhPID.java +++ b/java/server/src/main/java/org/softwareheritage/graph/SwhPID.java @@ -1,108 +1,108 @@ package org.softwareheritage.graph; import java.lang.System; import com.fasterxml.jackson.annotation.JsonValue; import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.DecoderException; import org.softwareheritage.graph.Node; /** * A Software Heritage PID, see persistent * identifier documentation. * * @author The Software Heritage developers */ public class SwhPID { /** Fixed hash length of the PID */ public static final int HASH_LENGTH = 40; /** Full PID as a string */ String swhPID; /** PID node type */ Node.Type type; /** * Constructor. * * @param swhPID full PID as a string */ public SwhPID(String swhPID) { this.swhPID = swhPID; // PID format: 'swh:1:type:hash' String[] parts = swhPID.split(":"); if (parts.length != 4 || !parts[0].equals("swh") || !parts[1].equals("1")) { throw new IllegalArgumentException("malformed SWH PID: " + swhPID); } this.type = Node.Type.fromStr(parts[2]); if (!parts[3].matches("[0-9a-f]{" + HASH_LENGTH + "}")) { throw new IllegalArgumentException("malformed SWH PID: " + swhPID); } } @Override public boolean equals(Object otherObj) { if (otherObj == this) return true; if (!(otherObj instanceof SwhPID)) return false; SwhPID other = (SwhPID) otherObj; return swhPID.equals(other.getSwhPID()); } @Override public int hashCode() { return swhPID.hashCode(); } @Override public String toString() { return swhPID; } /** Converts PID to a compact binary representation. * * The binary format is specified in the Python module * swh.graph.pid:str_to_bytes . */ public byte[] toBytes() { - byte[] bytes = new byte[22]; - byte[] digest; - - bytes[0] = (byte) 1; // namespace version - bytes[1] = (byte) Node.Type.toInt(this.type); // PID type - try { - digest = Hex.decodeHex(this.swhPID.substring(10)); // SHA1 hash - System.arraycopy(digest, 0, bytes, 2, digest.length); - } catch (DecoderException e) { + byte[] bytes = new byte[22]; + byte[] digest; + + bytes[0] = (byte) 1; // namespace version + bytes[1] = (byte) Node.Type.toInt(this.type); // PID type + try { + digest = Hex.decodeHex(this.swhPID.substring(10)); // SHA1 hash + System.arraycopy(digest, 0, bytes, 2, digest.length); + } catch (DecoderException e) { throw new IllegalArgumentException("invalid hex sequence in PID: " + this.swhPID); - } + } - return bytes; + return bytes; } /** * Returns full PID as a string. * * @return full PID string */ @JsonValue public String getSwhPID() { return swhPID; } /** * Returns PID node type. * * @return PID corresponding {@link Node.Type} * @see org.softwareheritage.graph.Node.Type */ public Node.Type getType() { return type; } } diff --git a/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java b/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java index fbae355..8636549 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java +++ b/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java @@ -1,361 +1,361 @@ package org.softwareheritage.graph.algo; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.Stack; import it.unimi.dsi.bits.LongArrayBitVector; import org.softwareheritage.graph.AllowedEdges; import org.softwareheritage.graph.Endpoint; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Neighbors; import org.softwareheritage.graph.Node; /** * Traversal algorithms on the compressed graph. *

* Internal implementation of the traversal API endpoints. These methods only input/output internal * long ids, which are converted in the {@link Endpoint} higher-level class to Software Heritage * PID. * * @author The Software Heritage developers * @see org.softwareheritage.graph.Endpoint */ public class Traversal { /** Graph used in the traversal */ Graph graph; /** Boolean to specify the use of the transposed graph */ boolean useTransposed; /** Graph edge restriction */ AllowedEdges edges; /** Hash set storing if we have visited a node */ HashSet visited; /** Hash map storing parent node id for each nodes during a traversal */ Map parentNode; /** Number of edges accessed during traversal */ long nbEdgesAccessed; /** * Constructor. * * @param graph graph used in the traversal * @param direction a string (either "forward" or "backward") specifying edge orientation * @param edgesFmt a formatted string describing allowed edges */ public Traversal(Graph graph, String direction, String edgesFmt) { if (!direction.matches("forward|backward")) { throw new IllegalArgumentException("Unknown traversal direction: " + direction); } this.graph = graph; this.useTransposed = (direction.equals("backward")); this.edges = new AllowedEdges(graph, edgesFmt); long nbNodes = graph.getNbNodes(); this.visited = new HashSet<>(); this.parentNode = new HashMap<>(); this.nbEdgesAccessed = 0; } /** * Returns number of accessed edges during traversal. * * @return number of edges accessed in last traversal */ public long getNbEdgesAccessed() { return nbEdgesAccessed; } /** * Returns number of accessed nodes during traversal. * * @return number of nodes accessed in last traversal */ public long getNbNodesAccessed() { return this.visited.size(); } /** * Push version of {@link leaves}: will fire passed callback for each leaf. */ public void leavesVisitor(long srcNodeId, NodeIdConsumer cb) { Stack stack = new Stack(); this.nbEdgesAccessed = 0; stack.push(srcNodeId); visited.add(srcNodeId); while (!stack.isEmpty()) { long currentNodeId = stack.pop(); long neighborsCnt = 0; nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { neighborsCnt++; if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); } } if (neighborsCnt == 0) { cb.accept(currentNodeId); } } } /** * Returns the leaves of a subgraph rooted at the specified source node. * * @param srcNodeId source node * @return list of node ids corresponding to the leaves */ public ArrayList leaves(long srcNodeId) { ArrayList nodeIds = new ArrayList(); - leavesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); + leavesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); return nodeIds; } /** * Push version of {@link neighbors}: will fire passed callback on each * neighbor. */ public void neighborsVisitor(long srcNodeId, NodeIdConsumer cb) { this.nbEdgesAccessed = graph.degree(srcNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, srcNodeId)) { - cb.accept(neighborNodeId); + cb.accept(neighborNodeId); } } /** * Returns node direct neighbors (linked with exactly one edge). * * @param srcNodeId source node * @return list of node ids corresponding to the neighbors */ public ArrayList neighbors(long srcNodeId) { ArrayList nodeIds = new ArrayList(); - neighborsVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); + neighborsVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); return nodeIds; } /** * Push version of {@link visitNodes}: will fire passed callback on each * visited node. */ public void visitNodesVisitor(long srcNodeId, NodeIdConsumer cb) { Stack stack = new Stack(); this.nbEdgesAccessed = 0; stack.push(srcNodeId); visited.add(srcNodeId); while (!stack.isEmpty()) { long currentNodeId = stack.pop(); - cb.accept(currentNodeId); + cb.accept(currentNodeId); nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); } } } } /** * Performs a graph traversal and returns explored nodes. * * @param srcNodeId source node * @return list of explored node ids */ public ArrayList visitNodes(long srcNodeId) { ArrayList nodeIds = new ArrayList(); - visitNodesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); - return nodeIds; + visitNodesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId)); + return nodeIds; } /** * Push version of {@link visitPaths}: will fire passed callback on each * discovered (complete) path. */ public void visitPathsVisitor(long srcNodeId, PathConsumer cb) { Stack currentPath = new Stack(); this.nbEdgesAccessed = 0; visitPathsInternalVisitor(srcNodeId, currentPath, cb); } /** * Performs a graph traversal and returns explored paths. * * @param srcNodeId source node * @return list of explored paths (represented as a list of node ids) */ public ArrayList> visitPaths(long srcNodeId) { ArrayList> paths = new ArrayList<>(); - visitPathsVisitor(srcNodeId, (path) -> paths.add(path)); + visitPathsVisitor(srcNodeId, (path) -> paths.add(path)); return paths; } private void visitPathsInternalVisitor(long currentNodeId, - Stack currentPath, - PathConsumer cb) { + Stack currentPath, + PathConsumer cb) { currentPath.push(currentNodeId); long visitedNeighbors = 0; nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { visitPathsInternalVisitor(neighborNodeId, currentPath, cb); visitedNeighbors++; } if (visitedNeighbors == 0) { ArrayList path = new ArrayList(); for (long nodeId : currentPath) { path.add(nodeId); } - cb.accept(path); + cb.accept(path); } currentPath.pop(); } /** * Performs a graph traversal and returns the first found path from source to destination. * * @param srcNodeId source node * @param dst destination (either a node or a node type) * @return found path as a list of node ids */ public ArrayList walk(long srcNodeId, T dst, String algorithm) { long dstNodeId = -1; if (algorithm.equals("dfs")) { dstNodeId = walkInternalDfs(srcNodeId, dst); } else if (algorithm.equals("bfs")) { dstNodeId = walkInternalBfs(srcNodeId, dst); } else { throw new IllegalArgumentException("Unknown traversal algorithm: " + algorithm); } if (dstNodeId == -1) { throw new IllegalArgumentException("Unable to find destination point: " + dst); } ArrayList nodeIds = backtracking(srcNodeId, dstNodeId); return nodeIds; } /** * Internal DFS function of {@link #walk}. * * @param srcNodeId source node * @param dst destination (either a node or a node type) * @return final destination node or -1 if no path found */ private long walkInternalDfs(long srcNodeId, T dst) { Stack stack = new Stack(); this.nbEdgesAccessed = 0; stack.push(srcNodeId); visited.add(srcNodeId); while (!stack.isEmpty()) { long currentNodeId = stack.pop(); if (isDstNode(currentNodeId, dst)) { return currentNodeId; } nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); parentNode.put(neighborNodeId, currentNodeId); } } } return -1; } /** * Internal BFS function of {@link #walk}. * * @param srcNodeId source node * @param dst destination (either a node or a node type) * @return final destination node or -1 if no path found */ private long walkInternalBfs(long srcNodeId, T dst) { Queue queue = new LinkedList(); this.nbEdgesAccessed = 0; queue.add(srcNodeId); visited.add(srcNodeId); while (!queue.isEmpty()) { long currentNodeId = queue.poll(); if (isDstNode(currentNodeId, dst)) { return currentNodeId; } nbEdgesAccessed += graph.degree(currentNodeId, useTransposed); for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) { if (!visited.contains(neighborNodeId)) { queue.add(neighborNodeId); visited.add(neighborNodeId); parentNode.put(neighborNodeId, currentNodeId); } } } return -1; } /** * Internal function of {@link #walk} to check if a node corresponds to the destination. * * @param nodeId current node * @param dst destination (either a node or a node type) * @return true if the node is a destination, or false otherwise */ private boolean isDstNode(long nodeId, T dst) { if (dst instanceof Long) { long dstNodeId = (Long) dst; return nodeId == dstNodeId; } else if (dst instanceof Node.Type) { Node.Type dstType = (Node.Type) dst; return graph.getNodeType(nodeId) == dstType; } else { return false; } } /** * Internal backtracking function of {@link #walk}. * * @param srcNodeId source node * @param dstNodeId destination node * @return the found path, as a list of node ids */ private ArrayList backtracking(long srcNodeId, long dstNodeId) { ArrayList path = new ArrayList(); long currentNodeId = dstNodeId; while (currentNodeId != srcNodeId) { path.add(currentNodeId); currentNodeId = parentNode.get(currentNodeId); } path.add(srcNodeId); Collections.reverse(path); return path; } } diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java index aa7ab3c..3cd0eb7 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java @@ -1,42 +1,42 @@ package org.softwareheritage.graph.backend; import java.io.BufferedWriter; import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Writer; import java.util.zip.GZIPInputStream; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.objects.ObjectBigArrays; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.NodeTypesMap; public class Pp { public static void main(String[] args) throws IOException { Object2LongFunction mphMap = null; try { mphMap = (Object2LongFunction) BinIO.loadObject("all.mph"); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("The .mph file contains unknown class object: " + e); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); - System.out.println("mph size: " + nbIds); + System.out.println("mph size: " + nbIds); } } diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java index c9492a2..19bef05 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java @@ -1,207 +1,207 @@ package org.softwareheritage.graph.backend; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.*; import java.util.zip.GZIPInputStream; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.objects.ObjectBigArrays; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.NodeTypesMap; /** * Create maps needed at runtime by the graph service, in particular: * * - SWH PID → WebGraph long node id * - WebGraph long node id → SWH PID (converse of the former) * - WebGraph long node id → SWH node type (enum) * * @author The Software Heritage developers */ public class Setup { final static long SORT_BUFFER_SIZE = Runtime.getRuntime().maxMemory() * 40 / 100; // 40% max_ram final static Logger logger = LoggerFactory.getLogger(Setup.class); /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { if (args.length != 3) { logger.error("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME TEMP_DIR"); System.exit(1); } String nodesPath = args[0]; String graphPath = args[1]; String tmpDir = args[2]; - logger.info("starting maps generation..."); + logger.info("starting maps generation..."); precomputeNodeIdMap(nodesPath, graphPath, tmpDir); - logger.info("maps generation completed"); + logger.info("maps generation completed"); } /** * Computes and dumps on disk mapping files. * * @param nodesPath path of the compressed csv nodes file * @param graphPath path of the compressed graph */ // Suppress warning for Object2LongFunction cast @SuppressWarnings("unchecked") static void precomputeNodeIdMap(String nodesPath, String graphPath, String tmpDir) - throws IOException + throws IOException { - ProgressLogger plPid2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); - ProgressLogger plNode2Pid = new ProgressLogger(logger, 10, TimeUnit.SECONDS); + ProgressLogger plPid2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); + ProgressLogger plNode2Pid = new ProgressLogger(logger, 10, TimeUnit.SECONDS); // first half of PID->node mapping: PID -> WebGraph MPH (long) Object2LongFunction mphMap = null; try { mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); } catch (ClassNotFoundException e) { - logger.error("unknown class object in .mph file: " + e); - System.exit(2); + logger.error("unknown class object in .mph file: " + e); + System.exit(2); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); // second half of PID->node mapping: WebGraph MPH (long) -> BFS order (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); if (loaded != nbIds) { - logger.error("graph contains " + nbIds + " nodes, but read " + loaded); - System.exit(2); + logger.error("graph contains " + nbIds + " nodes, but read " + loaded); + System.exit(2); } // Create mapping SWH PID -> WebGraph node id, by sequentially reading // nodes, hasing them with MPH, and permuting according to BFS order InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath)); FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, - StandardCharsets.US_ASCII)); + StandardCharsets.US_ASCII)); LineIterator swhPIDIterator = new LineIterator(buffer); - // The WebGraph node id -> SWH PID mapping can be obtained from the - // PID->node one by numerically sorting on node id and sequentially - // writing obtained PIDs to a binary map. Delegates the sorting job to - // /usr/bin/sort via pipes - ProcessBuilder processBuilder = new ProcessBuilder(); - processBuilder.command("sort", "--numeric-sort", "--key", "2", - "--buffer-size", Long.toString(SORT_BUFFER_SIZE), - "--temporary-directory", tmpDir); - Process sort = processBuilder.start(); - BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); - BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); - - // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap - // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap + // The WebGraph node id -> SWH PID mapping can be obtained from the + // PID->node one by numerically sorting on node id and sequentially + // writing obtained PIDs to a binary map. Delegates the sorting job to + // /usr/bin/sort via pipes + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command("sort", "--numeric-sort", "--key", "2", + "--buffer-size", Long.toString(SORT_BUFFER_SIZE), + "--temporary-directory", tmpDir); + Process sort = processBuilder.start(); + BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); + BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); + + // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap + // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); - BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { + BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { - // background handler for sort output, it will be fed PID/node - // pairs while pidToNodeMap is being filled, and will itself fill - // nodeToPidMap as soon as data from sort is ready - SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap, plNode2Pid); - outputHandler.start(); + // background handler for sort output, it will be fed PID/node + // pairs while pidToNodeMap is being filled, and will itself fill + // nodeToPidMap as soon as data from sort is ready + SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap, plNode2Pid); + outputHandler.start(); // Type map from WebGraph node ID to SWH type. Used at runtime by // pure Java graph traversals to efficiently check edge // restrictions. final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) - / Math.log(2)); + / Math.log(2)); final int nbBitsPerNodeType = log2NbTypes; LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); plPid2Node.start("filling pid2node map"); for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { String strSwhPID = swhPIDIterator.next().toString(); SwhPID swhPID = new SwhPID(strSwhPID); - byte[] swhPIDBin = swhPID.toBytes(); + byte[] swhPIDBin = swhPID.toBytes(); long mphId = mphMap.getLong(strSwhPID); long nodeId = LongBigArrays.get(bfsMap, mphId); - pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); - pidToNodeMap.writeLong(nodeId); - sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") - .getBytes(StandardCharsets.US_ASCII)); + pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); + pidToNodeMap.writeLong(nodeId); + sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") + .getBytes(StandardCharsets.US_ASCII)); nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); plPid2Node.lightUpdate(); } plPid2Node.done(); - sort_stdin.close(); + sort_stdin.close(); - // write type map + // write type map logger.info("storing type map"); - BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); + BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); logger.info("type map stored"); - // wait for nodeToPidMap filling - try { - int sortExitCode = sort.waitFor(); - if (sortExitCode != 0) { - logger.error("sort returned non-zero exit code: " + sortExitCode); - System.exit(2); - } - outputHandler.join(); - } catch (InterruptedException e) { - logger.error("processing of sort output failed with: " + e); - System.exit(2); - } + // wait for nodeToPidMap filling + try { + int sortExitCode = sort.waitFor(); + if (sortExitCode != 0) { + logger.error("sort returned non-zero exit code: " + sortExitCode); + System.exit(2); + } + outputHandler.join(); + } catch (InterruptedException e) { + logger.error("processing of sort output failed with: " + e); + System.exit(2); + } } } private static class SortOutputHandler extends Thread { private Scanner input; private OutputStream output; private ProgressLogger pl; SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { this.input = new Scanner(input, StandardCharsets.US_ASCII); this.output = output; this.pl = pl; } public void run() { boolean sortDone = false; - logger.info("node2pid: waiting for sort output..."); - while (input.hasNextLine()) { + logger.info("node2pid: waiting for sort output..."); + while (input.hasNextLine()) { if (! sortDone) { sortDone = true; this.pl.start("filling node2pid map"); } - String line = input.nextLine(); // format: SWH_PID NODE_ID - SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID - try { - output.write((byte[]) swhPID.toBytes()); - } catch (IOException e) { - logger.error("writing to node->PID map failed with: " + e); - } + String line = input.nextLine(); // format: SWH_PID NODE_ID + SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID + try { + output.write((byte[]) swhPID.toBytes()); + } catch (IOException e) { + logger.error("writing to node->PID map failed with: " + e); + } this.pl.lightUpdate(); - } + } this.pl.done(); } } }