diff --git a/docs/docker.rst b/docs/docker.rst index c6739b2..c0d6ba5 100644 --- a/docs/docker.rst +++ b/docs/docker.rst @@ -1,79 +1,79 @@ Docker environment ================== Build ----- .. code:: bash $ git clone https://forge.softwareheritage.org/source/swh-graph.git $ cd swh-graph $ docker build --tag swh-graph dockerfiles Run --- Given a graph ``g`` specified by: - ``g.edges.csv.gz``: gzip-compressed csv file with one edge per line, as a "SRC_ID SPACE DST_ID" string, where identifiers are the :ref:`persistent-identifiers` of each node. - ``g.nodes.csv.gz``: sorted list of unique node identifiers appearing in the corresponding ``g.edges.csv.gz`` file. The format is a gzip-compressed csv file with one persistent identifier per line. .. code:: bash $ docker run -ti \ --volume /PATH/TO/GRAPH/:/srv/softwareheritage/graph/data \ --publish 127.0.0.1:5009:5009 \ swh-graph:latest \ bash Where ``/PATH/TO/GRAPH`` is a directory containing the ``g.edges.csv.gz`` and ``g.nodes.csv.gz`` files. By default, when entering the container the current working directory will be ``/srv/softwareheritage/graph``; all relative paths found below are intended to be relative to that dir. Graph compression ~~~~~~~~~~~~~~~~~ To compress the graph: .. code:: bash $ app/scripts/compress_graph.sh --lib lib/ --input data/g Warning: very large graphs may need a bigger batch size parameter for WebGraph internals (you can specify a value when running the compression script using: ``--batch-size 1000000000``). Node identifier mappings ~~~~~~~~~~~~~~~~~~~~~~~~ To dump the mapping files (i.e., various node id <-> other info mapping files, in either ``.csv.gz`` or ad-hoc ``.map`` format): .. code:: bash - $ java -cp lib/swh-graph-jar-with-dependencies.jar \ - org.softwareheritage.graph.backend.Setup \ + $ java -cp lib/swh-graph-*.jar \ + org.softwareheritage.graph.backend.MapBuilder \ data/g.nodes.csv.gz data/compressed/g Graph server ~~~~~~~~~~~~ To start the swh-graph server: .. code:: bash - $ java -cp lib/swh-graph-jar-with-dependencies.jar \ + $ java -cp lib/swh-graph-*.jar \ org.softwareheritage.graph.App data/compressed/g To specify the port on which the server will run, use the `--port` or `-p` flag (default is 5009). diff --git a/java/server/README.md b/java/server/README.md index f9662a7..8618fa9 100644 --- a/java/server/README.md +++ b/java/server/README.md @@ -1,50 +1,50 @@ -Graph service - Server side -=========================== +Graph service - Java backend +============================ Server side Java REST API. Build ----- ```bash $ mvn compile assembly:single ``` Start REST API -------------- ```bash -$ java -cp target/swh-graph-jar-with-dependencies.jar \ +$ java -cp target/swh-graph-*.jar \ org.softwareheritage.graph.App \ ``` Default port is 5009 (use the `--port` option to change port number). If you need timings metadata send back to the client in addition to the result, use the `--timings` flag. Tests ----- Unit tests rely on test data that are already available in the Git repository (under `src/swh/graph/tests/dataset/`). You generally only need to run them using Maven: ```bash $ mvn test ``` In case you want to regenerate the test data: ```bash # Graph compression $ cd src/swh/graph/tests/dataset $ ./generate_graph.sh $ cd ../../../.. $ mvn compile assembly:single # Dump mapping files -$ java -cp target/swh-graph-jar-with-dependencies.jar \ - org.softwareheritage.graph.backend.Setup \ +$ java -cp target/swh-graph-*.jar \ + org.softwareheritage.graph.backend.MapBuilder \ src/swh/graph/tests/dataset/example.nodes.csv.gz \ src/swh/graph/tests/dataset/output/example ``` diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java b/java/server/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java similarity index 98% rename from java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java rename to java/server/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java index 51600f3..d09b654 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/MapBuilder.java @@ -1,209 +1,209 @@ package org.softwareheritage.graph.backend; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.*; import java.util.zip.GZIPInputStream; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.objects.ObjectBigArrays; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.NodeTypesMap; /** * Create maps needed at runtime by the graph service, in particular: * * - SWH PID → WebGraph long node id * - WebGraph long node id → SWH PID (converse of the former) * - WebGraph long node id → SWH node type (enum) * * @author The Software Heritage developers */ -public class Setup { +public class MapBuilder { final static long SORT_BUFFER_SIZE = Runtime.getRuntime().maxMemory() * 40 / 100; // 40% max_ram - final static Logger logger = LoggerFactory.getLogger(Setup.class); + final static Logger logger = LoggerFactory.getLogger(MapBuilder.class); /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { if (args.length != 3) { logger.error("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME TEMP_DIR"); System.exit(1); } String nodesPath = args[0]; String graphPath = args[1]; String tmpDir = args[2]; logger.info("starting maps generation..."); precomputeNodeIdMap(nodesPath, graphPath, tmpDir); logger.info("maps generation completed"); } /** * Computes and dumps on disk mapping files. * * @param nodesPath path of the compressed csv nodes file * @param graphPath path of the compressed graph */ // Suppress warning for Object2LongFunction cast @SuppressWarnings("unchecked") static void precomputeNodeIdMap(String nodesPath, String graphPath, String tmpDir) throws IOException { ProgressLogger plPid2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); ProgressLogger plNode2Pid = new ProgressLogger(logger, 10, TimeUnit.SECONDS); // first half of PID->node mapping: PID -> WebGraph MPH (long) Object2LongFunction mphMap = null; try { mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); } catch (ClassNotFoundException e) { logger.error("unknown class object in .mph file: " + e); System.exit(2); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); plPid2Node.expectedUpdates = nbIds; plNode2Pid.expectedUpdates = nbIds; // second half of PID->node mapping: WebGraph MPH (long) -> BFS order (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); if (loaded != nbIds) { logger.error("graph contains " + nbIds + " nodes, but read " + loaded); System.exit(2); } // Create mapping SWH PID -> WebGraph node id, by sequentially reading // nodes, hasing them with MPH, and permuting according to BFS order InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath)); FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, StandardCharsets.US_ASCII)); LineIterator swhPIDIterator = new LineIterator(buffer); // The WebGraph node id -> SWH PID mapping can be obtained from the // PID->node one by numerically sorting on node id and sequentially // writing obtained PIDs to a binary map. Delegates the sorting job to // /usr/bin/sort via pipes ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command("sort", "--numeric-sort", "--key", "2", "--buffer-size", Long.toString(SORT_BUFFER_SIZE), "--temporary-directory", tmpDir); Process sort = processBuilder.start(); BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { // background handler for sort output, it will be fed PID/node // pairs while pidToNodeMap is being filled, and will itself fill // nodeToPidMap as soon as data from sort is ready SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToPidMap, plNode2Pid); outputHandler.start(); // Type map from WebGraph node ID to SWH type. Used at runtime by // pure Java graph traversals to efficiently check edge // restrictions. final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); final int nbBitsPerNodeType = log2NbTypes; LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); plPid2Node.start("filling pid2node map"); for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { String strSwhPID = swhPIDIterator.next().toString(); SwhPID swhPID = new SwhPID(strSwhPID); byte[] swhPIDBin = swhPID.toBytes(); long mphId = mphMap.getLong(strSwhPID); long nodeId = LongBigArrays.get(bfsMap, mphId); pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); pidToNodeMap.writeLong(nodeId); sort_stdin.write((strSwhPID + "\t" + nodeId + "\n") .getBytes(StandardCharsets.US_ASCII)); nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); plPid2Node.lightUpdate(); } plPid2Node.done(); sort_stdin.close(); // write type map logger.info("storing type map"); BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); logger.info("type map stored"); // wait for nodeToPidMap filling try { int sortExitCode = sort.waitFor(); if (sortExitCode != 0) { logger.error("sort returned non-zero exit code: " + sortExitCode); System.exit(2); } outputHandler.join(); } catch (InterruptedException e) { logger.error("processing of sort output failed with: " + e); System.exit(2); } } } private static class SortOutputHandler extends Thread { private Scanner input; private OutputStream output; private ProgressLogger pl; SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { this.input = new Scanner(input, StandardCharsets.US_ASCII); this.output = output; this.pl = pl; } public void run() { boolean sortDone = false; logger.info("node2pid: waiting for sort output..."); while (input.hasNextLine()) { if (! sortDone) { sortDone = true; this.pl.start("filling node2pid map"); } String line = input.nextLine(); // format: SWH_PID NODE_ID SwhPID swhPID = new SwhPID(line.split("\\t")[0]); // get PID try { output.write((byte[]) swhPID.toBytes()); } catch (IOException e) { logger.error("writing to node->PID map failed with: " + e); } this.pl.lightUpdate(); } this.pl.done(); } } } diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/NodeIdMap.java b/java/server/src/main/java/org/softwareheritage/graph/backend/NodeIdMap.java index 0ac98b0..d9c3c3d 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/NodeIdMap.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/NodeIdMap.java @@ -1,115 +1,114 @@ package org.softwareheritage.graph.backend; import java.io.IOException; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.MapFile; -import org.softwareheritage.graph.backend.Setup; /** * Mapping between internal long node id and external SWH PID. - *

- * Mappings in both directions are pre-computed and dumped on disk in the {@link Setup} class, then - * they are loaded here using mmap(). + * + * Mappings in both directions are pre-computed and dumped on disk in the + * {@link MapBuilder} class, then they are loaded here using mmap(). * * @author The Software Heritage developers - * @see org.softwareheritage.graph.backend.Setup + * @see org.softwareheritage.graph.backend.MapBuilder */ public class NodeIdMap { /** Fixed length of full SWH PID */ public static final int SWH_ID_LENGTH = 50; /** Fixed length of long node id */ public static final int NODE_ID_LENGTH = 20; /** Graph path and basename */ String graphPath; /** Number of ids to map */ long nbIds; /** mmap()-ed PID_TO_NODE file */ MapFile swhToNodeMap; /** mmap()-ed NODE_TO_PID file */ MapFile nodeToSwhMap; /** * Constructor. * * @param graphPath full graph path * @param nbNodes number of nodes in the graph */ public NodeIdMap(String graphPath, long nbNodes) throws IOException { this.graphPath = graphPath; this.nbIds = nbNodes; // +1 are for spaces and end of lines int swhToNodeLineLength = SWH_ID_LENGTH + 1 + NODE_ID_LENGTH + 1; int nodeToSwhLineLength = SWH_ID_LENGTH + 1; this.swhToNodeMap = new MapFile(graphPath + Graph.PID_TO_NODE, swhToNodeLineLength); this.nodeToSwhMap = new MapFile(graphPath + Graph.NODE_TO_PID, nodeToSwhLineLength); } /** * Converts SWH PID to corresponding long node id. * * @param swhPID node represented as a {@link SwhPID} * @return corresponding node as a long id * @see org.softwareheritage.graph.SwhPID */ public long getNodeId(SwhPID swhPID) { // Each line in PID_TO_NODE is formatted as: swhPID nodeId // The file is sorted by swhPID, hence we can binary search on swhPID to get corresponding // nodeId long start = 0; long end = nbIds - 1; while (start <= end) { long lineNumber = (start + end) / 2L; String[] parts = swhToNodeMap.readAtLine(lineNumber).split(" "); if (parts.length != 2) { break; } String currentSwhPID = parts[0]; long currentNodeId = Long.parseLong(parts[1]); int cmp = currentSwhPID.compareTo(swhPID.toString()); if (cmp == 0) { return currentNodeId; } else if (cmp < 0) { start = lineNumber + 1; } else { end = lineNumber - 1; } } throw new IllegalArgumentException("Unknown SWH PID: " + swhPID); } /** * Converts a node long id to corresponding SWH PID. * * @param nodeId node as a long id * @return corresponding node as a {@link SwhPID} * @see org.softwareheritage.graph.SwhPID */ public SwhPID getSwhPID(long nodeId) { // Each line in NODE_TO_PID is formatted as: swhPID // The file is ordered by nodeId, meaning node0's swhPID is at line 0, hence we can read the // nodeId-th line to get corresponding swhPID if (nodeId < 0 || nodeId >= nbIds) { throw new IllegalArgumentException("Node id " + nodeId + " should be between 0 and " + nbIds); } String swhPID = nodeToSwhMap.readAtLine(nodeId); return new SwhPID(swhPID); } /** * Closes the mapping files. */ public void close() throws IOException { swhToNodeMap.close(); nodeToSwhMap.close(); } } diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/NodeTypesMap.java b/java/server/src/main/java/org/softwareheritage/graph/backend/NodeTypesMap.java index 9bae82b..e7a9b23 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/NodeTypesMap.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/NodeTypesMap.java @@ -1,53 +1,55 @@ package org.softwareheritage.graph.backend; import java.io.IOException; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigList; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; /** * Mapping between long node id and SWH node type as described in the data model. - *

- * The type mapping is pre-computed and dumped on disk in the {@link Setup} class, then it is loaded - * in-memory here using fastutil LongBigList. To be - * space-efficient, the mapping is stored as a bitmap using minimum number of bits per {@link - * Node.Type}. + * href="https://docs.softwareheritage.org/devel/swh-model/data-model.html">data + * model. + * + * The type mapping is pre-computed and dumped on disk in the {@link * + * MapBuilder} class, then it is loaded in-memory here using fastutil LongBigList. To be + * space-efficient, the mapping is stored as a bitmap using minimum number of + * bits per {@link Node.Type}. * * @author The Software Heritage developers */ public class NodeTypesMap { /** * Array storing for each node its type */ public LongBigList nodeTypesMap; /** * Constructor. * * @param graphPath path and basename of the compressed graph */ public NodeTypesMap(String graphPath) throws IOException { try { nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + Graph.NODE_TO_TYPE); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Unknown class object: " + e); } } /** * Returns node type from a node long id. * * @param nodeId node as a long id * @return corresponding {@link Node.Type} value * @see org.softwareheritage.graph.Node.Type */ public Node.Type getType(long nodeId) { long type = nodeTypesMap.getLong(nodeId); return Node.Type.fromInt((int) type); } } diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index e393139..731c194 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,265 +1,265 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ import logging import os import subprocess from enum import Enum from datetime import datetime from pathlib import Path from typing import Dict, List, Set import psutil from click import ParamType from swh.graph.backend import find_graph_jar class CompressionStep(Enum): MPH = 1 BV = 2 BV_OBL = 3 BFS = 4 PERMUTE = 5 PERMUTE_OBL = 6 STATS = 7 TRANSPOSE = 8 TRANSPOSE_OBL = 9 MAPS = 10 CLEAN_TMP = 11 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV = { CompressionStep.MPH: ['zcat', '{in_dir}/{graph_name}.nodes.csv.gz', '|', '{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction', '--temp-dir', '{tmp_dir}', '{out_dir}/{graph_name}.mph'], CompressionStep.BV: ['zcat', '{in_dir}/{graph_name}.edges.csv.gz', '|', '{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph', '--temp-dir', '{tmp_dir}', '--function', '{out_dir}/{graph_name}.mph', '{out_dir}/{graph_name}-bv'], CompressionStep.BV_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}-bv'], CompressionStep.BFS: ['{java}', 'it.unimi.dsi.law.big.graph.BFS', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], CompressionStep.PERMUTE: ['{java}', 'it.unimi.dsi.big.webgraph.Transform', 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}', '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], CompressionStep.PERMUTE_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}'], CompressionStep.STATS: ['{java}', 'it.unimi.dsi.big.webgraph.Stats', '{out_dir}/{graph_name}'], CompressionStep.TRANSPOSE: ['{java}', 'it.unimi.dsi.big.webgraph.Transform', 'transposeOffline', '{out_dir}/{graph_name}', '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], CompressionStep.TRANSPOSE_OBL: ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', '--list', '{out_dir}/{graph_name}-transposed'], CompressionStep.MAPS: - ['{java}', 'org.softwareheritage.graph.backend.Setup', + ['{java}', 'org.softwareheritage.graph.backend.MapBuilder', '{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}', '{tmp_dir}'], CompressionStep.CLEAN_TMP: ['rm', '-rf', '{out_dir}/{graph_name}-bv.graph', '{out_dir}/{graph_name}-bv.obl', '{out_dir}/{graph_name}-bv.offsets', '{tmp_dir}'], } # type: Dict[CompressionStep, List[str]] class StepOption(ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx) -> Set[CompressionStep]: steps = set() # type: Set[CompressionStep] specs = value.split(',') for spec in specs: if '-' in spec: # step range (raw_l, raw_r) = spec.split('-', maxsplit=1) if raw_l == '': # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == '': # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail('invalid step specification: %s, see --help' % value) l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union(set(map(CompressionStep, range(l_idx.value, r_idx.value + 1)))) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail('invalid step specification: %s, see --help' % value) return steps def do_step(step, conf): cmd = ' '.join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options'] cmd_env['CLASSPATH'] = conf['classpath'] logging.info('running: %s' % cmd) process = subprocess.Popen(cmd, shell=True, env=cmd_env, encoding='utf8', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: raise RuntimeError('compression step %s returned non-zero ' 'exit code %d' % (step, rc)) else: return rc def check_config(conf, graph_name, in_dir, out_dir): """check compression configuration, propagate defaults, and initialize execution environment """ conf = conf.copy() conf['graph_name'] = graph_name conf['in_dir'] = str(in_dir) conf['out_dir'] = str(out_dir) out_dir.mkdir(parents=True, exist_ok=True) if 'tmp_dir' not in conf: tmp_dir = out_dir / 'tmp' conf['tmp_dir'] = str(tmp_dir) else: tmp_dir = Path(conf['tmp_dir']) tmp_dir.mkdir(parents=True, exist_ok=True) if 'batch_size' not in conf: conf['batch_size'] = '1000000000' # 1 billion if 'logback' not in conf: logback_confpath = tmp_dir / 'logback.xml' with open(logback_confpath, 'w') as conffile: conffile.write(""" %d %r %p [%t] %logger{1} - %m%n """) conf['logback'] = str(logback_confpath) if 'max_ram' not in conf: conf['max_ram'] = str(psutil.virtual_memory().total) if 'java_tool_options' not in conf: assert 'logback' in conf conf['java_tool_options'] = ' '.join([ '-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M', '-XX:MaxNewSize=4G', '-XX:+UseLargePages', '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', '-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}' ]).format(max_ram=conf['max_ram'], logback=conf['logback']) if 'java' not in conf: conf['java'] = 'java' if 'classpath' not in conf: conf['classpath'] = find_graph_jar() return conf def compress(graph_name: str, in_dir: Path, out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config(conf, graph_name, in_dir, out_dir) logging.info('starting compression') compression_start_time = datetime.now() seq_no = 0 for step in COMP_SEQ: if step not in steps: logging.debug('skipping compression step %s' % step) continue seq_no += 1 logging.info('starting compression step %s (%d/%d)' % (step, seq_no, len(steps))) step_start_time = datetime.now() do_step(step, conf) step_duration = datetime.now() - step_start_time logging.info('completed compression step %s (%d/%d) in %s' % (step, seq_no, len(steps), step_duration)) compression_duration = datetime.now() - compression_start_time logging.info('completed compression in %s' % compression_duration)