diff --git a/java/README.md b/java/README.md index cffc06a..daab1a9 100644 --- a/java/README.md +++ b/java/README.md @@ -1,50 +1,50 @@ Graph service - Java backend ============================ Server side Java REST API. Build ----- ```bash $ mvn compile assembly:single ``` Start REST API -------------- ```bash $ java -cp target/swh-graph-*.jar \ org.softwareheritage.graph.server.App \ ``` Default port is 5009 (use the `--port` option to change port number). If you need timings metadata send back to the client in addition to the result, use the `--timings` flag. Tests ----- Unit tests rely on test data that are already available in the Git repository (under `src/swh/graph/tests/dataset/`). You generally only need to run them using Maven: ```bash $ mvn test ``` In case you want to regenerate the test data: ```bash # Graph compression $ cd src/swh/graph/tests/dataset $ ./generate_graph.sh $ cd ../../../.. $ mvn compile assembly:single # Dump mapping files $ java -cp target/swh-graph-*.jar \ - org.softwareheritage.graph.maps.MapBuilder \ + org.softwareheritage.graph.maps.NodeMapBuilder \ src/swh/graph/tests/dataset/example.nodes.csv.gz \ src/swh/graph/tests/dataset/output/example ``` diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java index 5ddabb3..3fa9f1e 100644 --- a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java +++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java @@ -1,114 +1,114 @@ package org.softwareheritage.graph.maps; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.SwhPID; import java.io.IOException; /** * Mapping between internal long node id and external SWH PID. *

* Mappings in both directions are pre-computed and dumped on disk in the - * {@link MapBuilder} class, then they are loaded here using mmap(). + * {@link NodeMapBuilder} class, then they are loaded here using mmap(). * * @author The Software Heritage developers - * @see org.softwareheritage.graph.maps.MapBuilder + * @see NodeMapBuilder */ public class NodeIdMap { /** Fixed length of full SWH PID */ public static final int SWH_ID_LENGTH = 50; /** Fixed length of long node id */ public static final int NODE_ID_LENGTH = 20; /** Fixed length of binary SWH PID buffer */ public static final int SWH_ID_BIN_SIZE = 22; /** Fixed length of binary node id buffer */ public static final int NODE_ID_BIN_SIZE = 8; /** Graph path and basename */ String graphPath; /** Number of ids to map */ long nbIds; /** mmap()-ed PID_TO_NODE file */ MapFile swhToNodeMap; /** mmap()-ed NODE_TO_PID file */ MapFile nodeToSwhMap; /** * Constructor. * * @param graphPath full graph path * @param nbNodes number of nodes in the graph */ public NodeIdMap(String graphPath, long nbNodes) throws IOException { this.graphPath = graphPath; this.nbIds = nbNodes; this.swhToNodeMap = new MapFile(graphPath + Graph.PID_TO_NODE, SWH_ID_BIN_SIZE + NODE_ID_BIN_SIZE); this.nodeToSwhMap = new MapFile(graphPath + Graph.NODE_TO_PID, SWH_ID_BIN_SIZE); } /** * Converts SWH PID to corresponding long node id. * * @param swhPID node represented as a {@link SwhPID} * @return corresponding node as a long id * @see org.softwareheritage.graph.SwhPID */ public long getNodeId(SwhPID swhPID) { // The file is sorted by swhPID, hence we can binary search on swhPID to get corresponding // nodeId long start = 0; long end = nbIds - 1; while (start <= end) { long lineNumber = (start + end) / 2L; byte[] buffer = swhToNodeMap.readAtLine(lineNumber); byte[] pidBuffer = new byte[SWH_ID_BIN_SIZE]; byte[] nodeBuffer = new byte[NODE_ID_BIN_SIZE]; System.arraycopy(buffer, 0, pidBuffer, 0, SWH_ID_BIN_SIZE); System.arraycopy(buffer, SWH_ID_BIN_SIZE, nodeBuffer, 0, NODE_ID_BIN_SIZE); String currentSwhPID = SwhPID.fromBytes(pidBuffer).getSwhPID(); long currentNodeId = java.nio.ByteBuffer.wrap(nodeBuffer).getLong(); int cmp = currentSwhPID.compareTo(swhPID.toString()); if (cmp == 0) { return currentNodeId; } else if (cmp < 0) { start = lineNumber + 1; } else { end = lineNumber - 1; } } throw new IllegalArgumentException("Unknown SWH PID: " + swhPID); } /** * Converts a node long id to corresponding SWH PID. * * @param nodeId node as a long id * @return corresponding node as a {@link SwhPID} * @see org.softwareheritage.graph.SwhPID */ public SwhPID getSwhPID(long nodeId) { // Each line in NODE_TO_PID is formatted as: swhPID // The file is ordered by nodeId, meaning node0's swhPID is at line 0, hence we can read the // nodeId-th line to get corresponding swhPID if (nodeId < 0 || nodeId >= nbIds) { throw new IllegalArgumentException("Node id " + nodeId + " should be between 0 and " + nbIds); } return SwhPID.fromBytes(nodeToSwhMap.readAtLine(nodeId)); } /** * Closes the mapping files. */ public void close() throws IOException { swhToNodeMap.close(); nodeToSwhMap.close(); } } diff --git a/java/src/main/java/org/softwareheritage/graph/maps/MapBuilder.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java similarity index 98% rename from java/src/main/java/org/softwareheritage/graph/maps/MapBuilder.java rename to java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java index 90fcd50..ea2e9bc 100644 --- a/java/src/main/java/org/softwareheritage/graph/maps/MapBuilder.java +++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java @@ -1,212 +1,212 @@ package org.softwareheritage.graph.maps; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.BigArrays; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeUnit; /** * Create maps needed at runtime by the graph service, in particular: *

* - SWH PID → WebGraph long node id * - WebGraph long node id → SWH PID (converse of the former) * - WebGraph long node id → SWH node type (enum) * * @author The Software Heritage developers */ -public class MapBuilder { +public class NodeMapBuilder { final static String SORT_BUFFER_SIZE = "40%"; - final static Logger logger = LoggerFactory.getLogger(MapBuilder.class); + final static Logger logger = LoggerFactory.getLogger(NodeMapBuilder.class); /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { if (args.length != 2) { logger.error("Usage: COMPRESSED_GRAPH_BASE_NAME TEMP_DIR < NODES_CSV"); System.exit(1); } String graphPath = args[0]; String tmpDir = args[1]; logger.info("starting maps generation..."); precomputeNodeIdMap(graphPath, tmpDir); logger.info("maps generation completed"); } /** * Computes and dumps on disk mapping files. * * @param graphPath path of the compressed graph */ // Suppress warning for Object2LongFunction cast @SuppressWarnings("unchecked") static void precomputeNodeIdMap(String graphPath, String tmpDir) throws IOException { ProgressLogger plPid2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); ProgressLogger plNode2Pid = new ProgressLogger(logger, 10, TimeUnit.SECONDS); plPid2Node.itemsName = "pid→node"; plNode2Pid.itemsName = "node→pid"; // avg speed for pid→node is sometime skewed due to write to the sort // pipe hanging when sort is sorting; hence also desplay local speed plPid2Node.displayLocalSpeed = true; // first half of PID->node mapping: PID -> WebGraph MPH (long) Object2LongFunction mphMap = null; try { logger.info("loading MPH function..."); mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); logger.info("MPH function loaded"); } catch (ClassNotFoundException e) { logger.error("unknown class object in .mph file: " + e); System.exit(2); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); plPid2Node.expectedUpdates = nbIds; plNode2Pid.expectedUpdates = nbIds; // second half of PID->node mapping: WebGraph MPH (long) -> BFS order (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); logger.info("loading BFS order file..."); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); logger.info("BFS order file loaded"); if (loaded != nbIds) { logger.error("graph contains " + nbIds + " nodes, but read " + loaded); System.exit(2); } // Create mapping SWH PID -> WebGraph node id, by sequentially reading // nodes, hashing them with MPH, and permuting according to BFS order FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, StandardCharsets.US_ASCII)); LineIterator swhPIDIterator = new LineIterator(buffer); // The WebGraph node id -> SWH PID mapping can be obtained from the // PID->node one by numerically sorting on node id and sequentially // writing obtained PIDs to a binary map. Delegates the sorting job to // /usr/bin/sort via pipes ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command("sort", "--numeric-sort", "--key", "2", "--buffer-size", SORT_BUFFER_SIZE, "--temporary-directory", tmpDir); Process sort = processBuilder.start(); BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { // background handler for sort output, it will be fed PID/node // pairs while pidToNodeMap is being filled, and will itself fill // nodeToPidMap as soon as data from sort is ready SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap, plNode2Pid); outputHandler.start(); // Type map from WebGraph node ID to SWH type. Used at runtime by // pure Java graph traversals to efficiently check edge // restrictions. final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); final int nbBitsPerNodeType = log2NbTypes; LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); plPid2Node.start("filling pid2node map"); for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { String strSwhPID = swhPIDIterator.next().toString(); SwhPID swhPID = new SwhPID(strSwhPID); byte[] swhPIDBin = swhPID.toBytes(); long mphId = mphMap.getLong(strSwhPID); long nodeId = BigArrays.get(bfsMap, mphId); pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); pidToNodeMap.writeLong(nodeId); sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") .getBytes(StandardCharsets.US_ASCII)); nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); plPid2Node.lightUpdate(); } plPid2Node.done(); sort_stdin.close(); // write type map logger.info("storing type map"); BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); logger.info("type map stored"); // wait for nodeToPidMap filling try { logger.info("waiting for node2pid map..."); int sortExitCode = sort.waitFor(); if (sortExitCode != 0) { logger.error("sort returned non-zero exit code: " + sortExitCode); System.exit(2); } outputHandler.join(); } catch (InterruptedException e) { logger.error("processing of sort output failed with: " + e); System.exit(2); } } } private static class SortOutputHandler extends Thread { private Scanner input; private OutputStream output; private ProgressLogger pl; SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { this.input = new Scanner(input, StandardCharsets.US_ASCII); this.output = output; this.pl = pl; } public void run() { boolean sortDone = false; logger.info("node2pid: waiting for sort output..."); while (input.hasNextLine()) { if (!sortDone) { sortDone = true; this.pl.start("filling node2pid map"); } String line = input.nextLine(); // format: SWH_PID NODE_ID SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID try { output.write((byte[]) swhPID.toBytes()); } catch (IOException e) { logger.error("writing to node->PID map failed with: " + e); } this.pl.lightUpdate(); } this.pl.done(); } } } diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java index c2747a0..c64b283 100644 --- a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java +++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java @@ -1,54 +1,54 @@ package org.softwareheritage.graph.maps; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigList; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import java.io.IOException; /** * Mapping between long node id and SWH node type as described in the data * model. *

- * The type mapping is pre-computed and dumped on disk in the {@link MapBuilder} + * The type mapping is pre-computed and dumped on disk in the {@link NodeMapBuilder} * class, then it is loaded in-memory here using * fastutil LongBigList. To be * space-efficient, the mapping is stored as a bitmap using minimum number of * bits per {@link Node.Type}. * * @author The Software Heritage developers */ public class NodeTypesMap { /** * Array storing for each node its type */ public LongBigList nodeTypesMap; /** * Constructor. * * @param graphPath path and basename of the compressed graph */ public NodeTypesMap(String graphPath) throws IOException { try { nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + Graph.NODE_TO_TYPE); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Unknown class object: " + e); } } /** * Returns node type from a node long id. * * @param nodeId node as a long id * @return corresponding {@link Node.Type} value * @see org.softwareheritage.graph.Node.Type */ public Node.Type getType(long nodeId) { long type = nodeTypesMap.getLong(nodeId); return Node.Type.fromInt((int) type); } } diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index 946829b..acf9a1c 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,271 +1,271 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ import logging import os import subprocess from enum import Enum from datetime import datetime from pathlib import Path from typing import Dict, List, Set from click import ParamType from swh.graph.config import check_config_compress class CompressionStep(Enum): MPH = 1 BV = 2 BV_OBL = 3 BFS = 4 PERMUTE = 5 PERMUTE_OBL = 6 STATS = 7 TRANSPOSE = 8 TRANSPOSE_OBL = 9 MAPS = 10 CLEAN_TMP = 11 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV: Dict[CompressionStep, List[str]] = { CompressionStep.MPH: [ "{java}", "it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction", "--temp-dir", "{tmp_dir}", "{out_dir}/{graph_name}.mph", "<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )", ], # use process substitution (and hence FIFO) above as MPH class load the # entire file in memory when reading from stdin CompressionStep.BV: [ "zstdcat", "{in_dir}/{graph_name}.edges.csv.zst", "|", "{java}", "it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph", "--temp-dir", "{tmp_dir}", "--function", "{out_dir}/{graph_name}.mph", "{out_dir}/{graph_name}-bv", ], CompressionStep.BV_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}-bv", ], CompressionStep.BFS: [ "{java}", "it.unimi.dsi.law.big.graph.BFS", "{out_dir}/{graph_name}-bv", "{out_dir}/{graph_name}.order", ], CompressionStep.PERMUTE: [ "{java}", "it.unimi.dsi.big.webgraph.Transform", "mapOffline", "{out_dir}/{graph_name}-bv", "{out_dir}/{graph_name}", "{out_dir}/{graph_name}.order", "{batch_size}", "{tmp_dir}", ], CompressionStep.PERMUTE_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}", ], CompressionStep.STATS: [ "{java}", "it.unimi.dsi.big.webgraph.Stats", "{out_dir}/{graph_name}", ], CompressionStep.TRANSPOSE: [ "{java}", "it.unimi.dsi.big.webgraph.Transform", "transposeOffline", "{out_dir}/{graph_name}", "{out_dir}/{graph_name}-transposed", "{batch_size}", "{tmp_dir}", ], CompressionStep.TRANSPOSE_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}-transposed", ], CompressionStep.MAPS: [ "zstdcat", "{in_dir}/{graph_name}.nodes.csv.zst", "|", "{java}", - "org.softwareheritage.graph.maps.MapBuilder", + "org.softwareheritage.graph.maps.NodeMapBuilder", "{out_dir}/{graph_name}", "{tmp_dir}", ], CompressionStep.CLEAN_TMP: [ "rm", "-rf", "{out_dir}/{graph_name}-bv.graph", "{out_dir}/{graph_name}-bv.obl", "{out_dir}/{graph_name}-bv.offsets", "{tmp_dir}", ], } class StepOption(ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx) -> Set[CompressionStep]: steps: Set[CompressionStep] = set() specs = value.split(",") for spec in specs: if "-" in spec: # step range (raw_l, raw_r) = spec.split("-", maxsplit=1) if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail(f"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union( set(map(CompressionStep, range(l_idx.value, r_idx.value + 1))) ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail( f"invalid step specification: {value}, " f"see --help" ) return steps def do_step(step, conf): cmd = " ".join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"] cmd_env["CLASSPATH"] = conf["classpath"] logging.info(f"running: {cmd}") process = subprocess.Popen( ["/bin/bash", "-c", cmd], env=cmd_env, encoding="utf8", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: raise RuntimeError( f"compression step {step} returned non-zero " f"exit code {rc}" ) else: return rc def compress( graph_name: str, in_dir: Path, out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}, ): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config_compress(conf, graph_name, in_dir, out_dir) compression_start_time = datetime.now() logging.info(f"starting compression at {compression_start_time}") seq_no = 0 for step in COMP_SEQ: if step not in steps: logging.debug(f"skipping compression step {step}") continue seq_no += 1 step_start_time = datetime.now() logging.info( f"starting compression step {step} " f"({seq_no}/{len(steps)}) at {step_start_time}" ) do_step(step, conf) step_end_time = datetime.now() step_duration = step_end_time - step_start_time logging.info( f"completed compression step {step} " f"({seq_no}/{len(steps)}) " f"at {step_end_time} in {step_duration}" ) compression_end_time = datetime.now() compression_duration = compression_end_time - compression_start_time logging.info(f"completed compression in {compression_duration}")