diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java index 4d47a12..718cc6f 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java @@ -1,124 +1,197 @@ package org.softwareheritage.graph.backend; -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; +import java.util.concurrent.*; import java.util.zip.GZIPInputStream; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.objects.ObjectBigArrays; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.NodeTypesMap; /** - * Pre-processing steps (such as dumping mapping files on disk) before running the graph service. + * Create maps needed at runtime by the graph service, in particular: + * + * - SWH PID → WebGraph long node id + * - WebGraph long node id → SWH PID (converse of the former) + * - WebGraph long node id → SWH node type (enum) * * @author The Software Heritage developers */ - public class Setup { + + final static long PROGRESS_TICK = 1_000_000; + final static long SORT_BUFFER_SIZE = Runtime.getRuntime().maxMemory() * 66 / 100; // 2/3 of max_ram + /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { - if (args.length != 2) { - System.err.println("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME"); + if (args.length != 3) { + System.err.println("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME TEMP_DIR"); System.exit(1); } - String nodesPath = args[0]; String graphPath = args[1]; + String tmpDir = args[2]; System.out.println("Pre-computing node id maps..."); long startTime = System.nanoTime(); - precomputeNodeIdMap(nodesPath, graphPath); + precomputeNodeIdMap(nodesPath, graphPath, tmpDir); long endTime = System.nanoTime(); double duration = (endTime - startTime) / 1_000_000_000; System.out.println("Done in: " + duration + " seconds"); } /** * Computes and dumps on disk mapping files. * * @param nodesPath path of the compressed csv nodes file * @param graphPath path of the compressed graph */ // Suppress warning for Object2LongFunction cast @SuppressWarnings("unchecked") - static void precomputeNodeIdMap(String nodesPath, String graphPath) throws IOException { - // First internal mapping: SWH PID (string) -> WebGraph MPH (long) + static void precomputeNodeIdMap(String nodesPath, String graphPath, String tmpDir) + throws IOException { + // first half of PID->node mapping: PID -> WebGraph MPH (long) Object2LongFunction mphMap = null; try { mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("The .mph file contains unknown class object: " + e); + System.err.println("unknown class object in .mph file: " + e); + System.exit(2); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); - // Second internal mapping: WebGraph MPH (long) -> BFS ordering (long) + // second half of PID->node mapping: WebGraph MPH (long) -> BFS order (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); if (loaded != nbIds) { - throw new IllegalArgumentException("Graph contains " + nbIds + " nodes, but read " + loaded); + System.err.println("graph contains " + nbIds + " nodes, but read " + loaded); + System.exit(2); } - // Dump complete mapping for all nodes: SWH PID (string) <=> WebGraph node id (long) - + // Create mapping SWH PID -> WebGraph node id, by sequentially reading + // nodes, hasing them with MPH, and permuting according to BFS order InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath)); - FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, "UTF-8")); + FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, + StandardCharsets.US_ASCII)); LineIterator swhPIDIterator = new LineIterator(buffer); - // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToNodeMap - // for the binary format of nodeToPidMap, see Python module swh.graph.pid:NodeToPidMap + // The WebGraph node id -> SWH PID mapping can be obtained from the + // PID->node one by numerically sorting on node id and sequentially + // writing obtained PIDs to a binary map. Delegates the sorting job to + // /usr/bin/sort via pipes + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command("sort", "--numeric-sort", "--key", "2", + "--buffer-size", Long.toString(SORT_BUFFER_SIZE), + "--temporary-directory", tmpDir); + Process sort = processBuilder.start(); + BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); + BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); + + // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap + // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); - BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { - // nodeToPidMap needs to write SWH PID in order of node id, so use a temporary array - Object[][] nodeToSwhPID = ObjectBigArrays.newBigArray(nbIds); - - // To effectively run edge restriction during graph traversals, we store node id (long) -> SWH - // type map. This is represented as a bitmap using minimum number of bits per Node.Type. - final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); + BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { + + // background handler for sort output, it will be fed PID/node + // pairs while pidToNodeMap is being filled, and will itself fill + // nodeToPidMap as soon as data from sort is ready + SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap); + outputHandler.start(); + + // Type map from WebGraph node ID to SWH type. Used at runtime by + // pure Java graph traversals to efficiently check edge + // restrictions. + final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) + / Math.log(2)); final int nbBitsPerNodeType = log2NbTypes; LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { + if (iNode > 0 && iNode % PROGRESS_TICK == 0) { + System.out.println("pid2node: processed " + iNode / PROGRESS_TICK + + "M nodes..."); + } String strSwhPID = swhPIDIterator.next().toString(); SwhPID swhPID = new SwhPID(strSwhPID); byte[] swhPIDBin = swhPID.toBytes(); long mphId = mphMap.getLong(strSwhPID); long nodeId = LongBigArrays.get(bfsMap, mphId); pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); pidToNodeMap.writeLong(nodeId); + sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") + .getBytes(StandardCharsets.US_ASCII)); - ObjectBigArrays.set(nodeToSwhPID, nodeId, swhPIDBin); nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); } + sort_stdin.close(); + + // write type map + BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); + + // wait for nodeToPidMap filling + try { + int sortExitCode = sort.waitFor(); + if (sortExitCode != 0) { + System.err.println("sort returned non-zero exit code: " + sortExitCode); + System.exit(2); + } + outputHandler.join(); + } catch (InterruptedException e) { + System.err.println("processing of sort output failed with: " + e); + System.exit(2); + } + } + + } - BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); + private static class SortOutputHandler extends Thread { + private Scanner input; + private OutputStream output; - for (long iNode = 0; iNode < nbIds; iNode++) { - nodeToPidMap.write((byte[]) ObjectBigArrays.get(nodeToSwhPID, iNode)); - } + SortOutputHandler(InputStream input, OutputStream output) { + this.input = new Scanner(input, StandardCharsets.US_ASCII); + this.output = output; + } + + public void run() { + System.out.println("node2pid: waiting for sort output..."); + long i = -1; + while (input.hasNextLine()) { + i++; + if (i > 0 && i % PROGRESS_TICK == 0) { + System.out.println("node2pid: processed " + i / PROGRESS_TICK + + "M nodes..."); + } + String line = input.nextLine(); // format: SWH_PID NODE_ID + SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID + try { + output.write((byte[]) swhPID.toBytes()); + } catch (IOException e) { + System.err.println("writing to node->PID map failed with: " + e); + } + } } } + } diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index 7a1f486..e393139 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,264 +1,265 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ import logging import os import subprocess from enum import Enum from datetime import datetime from pathlib import Path from typing import Dict, List, Set import psutil from click import ParamType from swh.graph.backend import find_graph_jar class CompressionStep(Enum): MPH = 1 BV = 2 BV_OBL = 3 BFS = 4 PERMUTE = 5 PERMUTE_OBL = 6 STATS = 7 TRANSPOSE = 8 TRANSPOSE_OBL = 9 MAPS = 10 CLEAN_TMP = 11 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV = { CompressionStep.MPH: ['zcat', '{in_dir}/{graph_name}.nodes.csv.gz', '|', '{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction', '--temp-dir', '{tmp_dir}', '{out_dir}/{graph_name}.mph'], CompressionStep.BV: ['zcat', '{in_dir}/{graph_name}.edges.csv.gz', '|', '{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph', '--temp-dir', '{tmp_dir}', '--function', '{out_dir}/{graph_name}.mph', '{out_dir}/{graph_name}-bv'], CompressionStep.BV_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}-bv'], CompressionStep.BFS: ['{java}', 'it.unimi.dsi.law.big.graph.BFS', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], CompressionStep.PERMUTE: ['{java}', 'it.unimi.dsi.big.webgraph.Transform', 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}', '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], CompressionStep.PERMUTE_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}'], CompressionStep.STATS: ['{java}', 'it.unimi.dsi.big.webgraph.Stats', '{out_dir}/{graph_name}'], CompressionStep.TRANSPOSE: ['{java}', 'it.unimi.dsi.big.webgraph.Transform', 'transposeOffline', '{out_dir}/{graph_name}', '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], CompressionStep.TRANSPOSE_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}-transposed'], CompressionStep.MAPS: ['{java}', 'org.softwareheritage.graph.backend.Setup', - '{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}'], + '{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}', + '{tmp_dir}'], CompressionStep.CLEAN_TMP: ['rm', '-rf', '{out_dir}/{graph_name}-bv.graph', '{out_dir}/{graph_name}-bv.obl', '{out_dir}/{graph_name}-bv.offsets', '{tmp_dir}'], } # type: Dict[CompressionStep, List[str]] class StepOption(ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx) -> Set[CompressionStep]: steps = set() # type: Set[CompressionStep] specs = value.split(',') for spec in specs: if '-' in spec: # step range (raw_l, raw_r) = spec.split('-', maxsplit=1) if raw_l == '': # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == '': # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail('invalid step specification: %s, see --help' % value) l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union(set(map(CompressionStep, range(l_idx.value, r_idx.value + 1)))) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail('invalid step specification: %s, see --help' % value) return steps def do_step(step, conf): cmd = ' '.join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options'] cmd_env['CLASSPATH'] = conf['classpath'] logging.info('running: %s' % cmd) process = subprocess.Popen(cmd, shell=True, env=cmd_env, encoding='utf8', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: raise RuntimeError('compression step %s returned non-zero ' 'exit code %d' % (step, rc)) else: return rc def check_config(conf, graph_name, in_dir, out_dir): """check compression configuration, propagate defaults, and initialize execution environment """ conf = conf.copy() conf['graph_name'] = graph_name conf['in_dir'] = str(in_dir) conf['out_dir'] = str(out_dir) out_dir.mkdir(parents=True, exist_ok=True) if 'tmp_dir' not in conf: tmp_dir = out_dir / 'tmp' conf['tmp_dir'] = str(tmp_dir) else: tmp_dir = Path(conf['tmp_dir']) tmp_dir.mkdir(parents=True, exist_ok=True) if 'batch_size' not in conf: conf['batch_size'] = '1000000000' # 1 billion if 'logback' not in conf: logback_confpath = tmp_dir / 'logback.xml' with open(logback_confpath, 'w') as conffile: conffile.write(""" %d %r %p [%t] %logger{1} - %m%n """) conf['logback'] = str(logback_confpath) if 'max_ram' not in conf: conf['max_ram'] = str(psutil.virtual_memory().total) if 'java_tool_options' not in conf: assert 'logback' in conf conf['java_tool_options'] = ' '.join([ '-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M', '-XX:MaxNewSize=4G', '-XX:+UseLargePages', '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', '-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}' ]).format(max_ram=conf['max_ram'], logback=conf['logback']) if 'java' not in conf: conf['java'] = 'java' if 'classpath' not in conf: conf['classpath'] = find_graph_jar() return conf def compress(graph_name: str, in_dir: Path, out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config(conf, graph_name, in_dir, out_dir) logging.info('starting compression') compression_start_time = datetime.now() seq_no = 0 for step in COMP_SEQ: if step not in steps: logging.debug('skipping compression step %s' % step) continue seq_no += 1 logging.info('starting compression step %s (%d/%d)' % (step, seq_no, len(steps))) step_start_time = datetime.now() do_step(step, conf) step_duration = datetime.now() - step_start_time logging.info('completed compression step %s (%d/%d) in %s' % (step, seq_no, len(steps), step_duration)) compression_duration = datetime.now() - compression_start_time logging.info('completed compression in %s' % compression_duration)