diff --git a/java/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java b/java/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java index 7643fb2..993483c 100644 --- a/java/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java +++ b/java/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java @@ -1,220 +1,218 @@ package org.softwareheritage.graph.backend; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.*; import java.util.zip.GZIPInputStream; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.objects.ObjectBigArrays; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.NodeTypesMap; /** * Create maps needed at runtime by the graph service, in particular: * * - SWH PID → WebGraph long node id * - WebGraph long node id → SWH PID (converse of the former) * - WebGraph long node id → SWH node type (enum) * * @author The Software Heritage developers */ public class MapBuilder { final static String SORT_BUFFER_SIZE = "40%"; final static Logger logger = LoggerFactory.getLogger(MapBuilder.class); /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { - if (args.length != 3) { - logger.error("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME TEMP_DIR"); + if (args.length != 2) { + logger.error("Usage: COMPRESSED_GRAPH_BASE_NAME TEMP_DIR < NODES_CSV"); System.exit(1); } - String nodesPath = args[0]; - String graphPath = args[1]; - String tmpDir = args[2]; + String graphPath = args[0]; + String tmpDir = args[1]; logger.info("starting maps generation..."); - precomputeNodeIdMap(nodesPath, graphPath, tmpDir); + precomputeNodeIdMap(graphPath, tmpDir); logger.info("maps generation completed"); } /** * Computes and dumps on disk mapping files. * - * @param nodesPath path of the compressed csv nodes file * @param graphPath path of the compressed graph */ // Suppress warning for Object2LongFunction cast @SuppressWarnings("unchecked") - static void precomputeNodeIdMap(String nodesPath, String graphPath, String tmpDir) + static void precomputeNodeIdMap(String graphPath, String tmpDir) throws IOException { ProgressLogger plPid2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); ProgressLogger plNode2Pid = new ProgressLogger(logger, 10, TimeUnit.SECONDS); plPid2Node.itemsName = "pid→node"; plNode2Pid.itemsName = "node→pid"; // avg speed for pid→node is sometime skewed due to write to the sort // pipe hanging when sort is sorting; hence also desplay local speed plPid2Node.displayLocalSpeed = true; // first half of PID->node mapping: PID -> WebGraph MPH (long) Object2LongFunction mphMap = null; try { logger.info("loading MPH function..."); mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); logger.info("MPH function loaded"); } catch (ClassNotFoundException e) { logger.error("unknown class object in .mph file: " + e); System.exit(2); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); plPid2Node.expectedUpdates = nbIds; plNode2Pid.expectedUpdates = nbIds; // second half of PID->node mapping: WebGraph MPH (long) -> BFS order (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); logger.info("loading BFS order file..."); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); logger.info("BFS order file loaded"); if (loaded != nbIds) { logger.error("graph contains " + nbIds + " nodes, but read " + loaded); System.exit(2); } // Create mapping SWH PID -> WebGraph node id, by sequentially reading - // nodes, hasing them with MPH, and permuting according to BFS order + // nodes, hashing them with MPH, and permuting according to BFS order InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath)); - FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, + FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, StandardCharsets.US_ASCII)); LineIterator swhPIDIterator = new LineIterator(buffer); // The WebGraph node id -> SWH PID mapping can be obtained from the // PID->node one by numerically sorting on node id and sequentially // writing obtained PIDs to a binary map. Delegates the sorting job to // /usr/bin/sort via pipes ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command("sort", "--numeric-sort", "--key", "2", "--buffer-size", SORT_BUFFER_SIZE, "--temporary-directory", tmpDir); Process sort = processBuilder.start(); BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { // background handler for sort output, it will be fed PID/node // pairs while pidToNodeMap is being filled, and will itself fill // nodeToPidMap as soon as data from sort is ready SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap, plNode2Pid); outputHandler.start(); // Type map from WebGraph node ID to SWH type. Used at runtime by // pure Java graph traversals to efficiently check edge // restrictions. final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); final int nbBitsPerNodeType = log2NbTypes; LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); plPid2Node.start("filling pid2node map"); for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { String strSwhPID = swhPIDIterator.next().toString(); SwhPID swhPID = new SwhPID(strSwhPID); byte[] swhPIDBin = swhPID.toBytes(); long mphId = mphMap.getLong(strSwhPID); long nodeId = LongBigArrays.get(bfsMap, mphId); pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); pidToNodeMap.writeLong(nodeId); sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") .getBytes(StandardCharsets.US_ASCII)); nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); plPid2Node.lightUpdate(); } plPid2Node.done(); sort_stdin.close(); // write type map logger.info("storing type map"); BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); logger.info("type map stored"); // wait for nodeToPidMap filling try { logger.info("waiting for node2pid map..."); int sortExitCode = sort.waitFor(); if (sortExitCode != 0) { logger.error("sort returned non-zero exit code: " + sortExitCode); System.exit(2); } outputHandler.join(); } catch (InterruptedException e) { logger.error("processing of sort output failed with: " + e); System.exit(2); } } } private static class SortOutputHandler extends Thread { private Scanner input; private OutputStream output; private ProgressLogger pl; SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { this.input = new Scanner(input, StandardCharsets.US_ASCII); this.output = output; this.pl = pl; } public void run() { boolean sortDone = false; logger.info("node2pid: waiting for sort output..."); while (input.hasNextLine()) { if (! sortDone) { sortDone = true; this.pl.start("filling node2pid map"); } String line = input.nextLine(); // format: SWH_PID NODE_ID SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID try { output.write((byte[]) swhPID.toBytes()); } catch (IOException e) { logger.error("writing to node->PID map failed with: " + e); } this.pl.lightUpdate(); } this.pl.done(); } } } diff --git a/swh/graph/tests/dataset/example.edges.csv.gz b/swh/graph/tests/dataset/example.edges.csv.gz deleted file mode 100644 index 115984d..0000000 Binary files a/swh/graph/tests/dataset/example.edges.csv.gz and /dev/null differ diff --git a/swh/graph/tests/dataset/example.edges.csv.zst b/swh/graph/tests/dataset/example.edges.csv.zst new file mode 100644 index 0000000..41b40c1 Binary files /dev/null and b/swh/graph/tests/dataset/example.edges.csv.zst differ diff --git a/swh/graph/tests/dataset/example.nodes.csv.gz b/swh/graph/tests/dataset/example.nodes.csv.gz deleted file mode 100644 index a1dac9b..0000000 Binary files a/swh/graph/tests/dataset/example.nodes.csv.gz and /dev/null differ diff --git a/swh/graph/tests/dataset/example.nodes.csv.zst b/swh/graph/tests/dataset/example.nodes.csv.zst new file mode 100644 index 0000000..00cb5f4 Binary files /dev/null and b/swh/graph/tests/dataset/example.nodes.csv.zst differ diff --git a/swh/graph/tests/dataset/generate_graph.sh b/swh/graph/tests/dataset/generate_graph.sh index 7d26bf5..e1a72ee 100755 --- a/swh/graph/tests/dataset/generate_graph.sh +++ b/swh/graph/tests/dataset/generate_graph.sh @@ -1,27 +1,27 @@ #!/bin/bash # Clean previous run rm -rf dockerfiles output mkdir output # Build Docker work environment toplevel_dir=`git rev-parse --show-toplevel` mkdir -p dockerfiles cp -r $toplevel_dir/dockerfiles/ . docker build --tag swh-graph-test dockerfiles # Setup input for compression script tr ' ' '\n' < example.edges.csv | sort -u > example.nodes.csv -gzip --force --keep example.edges.csv -gzip --force --keep example.nodes.csv +zstd < example.nodes.csv > example.edges.csv.zst +zstd < example.edges.csv > example.nodes.csv.zst docker run \ --user $(id -u):$(id -g) \ --name swh-graph-test --rm --tty --interactive \ --volume $(pwd):/input \ --volume $(pwd)/output:/output \ swh-graph-test:latest \ app/scripts/compress_graph.sh \ --lib lib/ \ --input /input/example \ --outdir /output diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index 4d53097..33562f6 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,267 +1,267 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ import logging import os import subprocess from enum import Enum from datetime import datetime from pathlib import Path from typing import Dict, List, Set import psutil from click import ParamType from swh.graph.backend import find_graph_jar class CompressionStep(Enum): MPH = 1 BV = 2 BV_OBL = 3 BFS = 4 PERMUTE = 5 PERMUTE_OBL = 6 STATS = 7 TRANSPOSE = 8 TRANSPOSE_OBL = 9 MAPS = 10 CLEAN_TMP = 11 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV = { CompressionStep.MPH: - ['zcat', '{in_dir}/{graph_name}.nodes.csv.gz', '|', + ['zstdcat', '{in_dir}/{graph_name}.nodes.csv.zst', '|', '{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction', '--temp-dir', '{tmp_dir}', '{out_dir}/{graph_name}.mph'], CompressionStep.BV: - ['zcat', '{in_dir}/{graph_name}.edges.csv.gz', '|', + ['zstdcat', '{in_dir}/{graph_name}.edges.csv.zst', '|', '{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph', '--temp-dir', '{tmp_dir}', '--function', '{out_dir}/{graph_name}.mph', '{out_dir}/{graph_name}-bv'], CompressionStep.BV_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}-bv'], CompressionStep.BFS: ['{java}', 'it.unimi.dsi.law.big.graph.BFS', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], CompressionStep.PERMUTE: ['{java}', 'it.unimi.dsi.big.webgraph.Transform', 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}', '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], CompressionStep.PERMUTE_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}'], CompressionStep.STATS: ['{java}', 'it.unimi.dsi.big.webgraph.Stats', '{out_dir}/{graph_name}'], CompressionStep.TRANSPOSE: ['{java}', 'it.unimi.dsi.big.webgraph.Transform', 'transposeOffline', '{out_dir}/{graph_name}', '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], CompressionStep.TRANSPOSE_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}-transposed'], CompressionStep.MAPS: - ['{java}', 'org.softwareheritage.graph.backend.MapBuilder', - '{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}', - '{tmp_dir}'], + ['zstdcat', '{in_dir}/{graph_name}.nodes.csv.zst', '|', + '{java}', 'org.softwareheritage.graph.backend.MapBuilder', + '{out_dir}/{graph_name}', '{tmp_dir}'], CompressionStep.CLEAN_TMP: ['rm', '-rf', '{out_dir}/{graph_name}-bv.graph', '{out_dir}/{graph_name}-bv.obl', '{out_dir}/{graph_name}-bv.offsets', '{tmp_dir}'], } # type: Dict[CompressionStep, List[str]] class StepOption(ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx) -> Set[CompressionStep]: steps = set() # type: Set[CompressionStep] specs = value.split(',') for spec in specs: if '-' in spec: # step range (raw_l, raw_r) = spec.split('-', maxsplit=1) if raw_l == '': # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == '': # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail('invalid step specification: %s, see --help' % value) l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union(set(map(CompressionStep, range(l_idx.value, r_idx.value + 1)))) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail('invalid step specification: %s, see --help' % value) return steps def do_step(step, conf): cmd = ' '.join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options'] cmd_env['CLASSPATH'] = conf['classpath'] logging.info('running: %s' % cmd) process = subprocess.Popen(cmd, shell=True, env=cmd_env, encoding='utf8', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: raise RuntimeError('compression step %s returned non-zero ' 'exit code %d' % (step, rc)) else: return rc def check_config(conf, graph_name, in_dir, out_dir): """check compression configuration, propagate defaults, and initialize execution environment """ conf = conf.copy() conf['graph_name'] = graph_name conf['in_dir'] = str(in_dir) conf['out_dir'] = str(out_dir) out_dir.mkdir(parents=True, exist_ok=True) if 'tmp_dir' not in conf: tmp_dir = out_dir / 'tmp' conf['tmp_dir'] = str(tmp_dir) else: tmp_dir = Path(conf['tmp_dir']) tmp_dir.mkdir(parents=True, exist_ok=True) if 'batch_size' not in conf: conf['batch_size'] = '1000000000' # 1 billion if 'logback' not in conf: logback_confpath = tmp_dir / 'logback.xml' with open(logback_confpath, 'w') as conffile: conffile.write(""" %d %r %p [%t] %logger{1} - %m%n """) conf['logback'] = str(logback_confpath) if 'max_ram' not in conf: conf['max_ram'] = str(psutil.virtual_memory().total) if 'java_tool_options' not in conf: assert 'logback' in conf conf['java_tool_options'] = ' '.join([ '-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M', '-XX:MaxNewSize=4G', '-XX:+UseLargePages', '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', '-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}' ]) conf['java_tool_options'] = conf['java_tool_options'].format( max_ram=conf['max_ram'], logback=conf['logback']) if 'java' not in conf: conf['java'] = 'java' if 'classpath' not in conf: conf['classpath'] = find_graph_jar() return conf def compress(graph_name: str, in_dir: Path, out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config(conf, graph_name, in_dir, out_dir) logging.info('starting compression') compression_start_time = datetime.now() seq_no = 0 for step in COMP_SEQ: if step not in steps: logging.debug('skipping compression step %s' % step) continue seq_no += 1 logging.info('starting compression step %s (%d/%d)' % (step, seq_no, len(steps))) step_start_time = datetime.now() do_step(step, conf) step_duration = datetime.now() - step_start_time logging.info('completed compression step %s (%d/%d) in %s' % (step, seq_no, len(steps), step_duration)) compression_duration = datetime.now() - compression_start_time logging.info('completed compression in %s' % compression_duration)