diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index cd88215..7a1f486 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,272 +1,264 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
import logging
import os
import subprocess
from enum import Enum
-from contextlib import ExitStack
from datetime import datetime
from pathlib import Path
-from typing import Dict, List, Set, Tuple
+from typing import Dict, List, Set
import psutil
from click import ParamType
from swh.graph.backend import find_graph_jar
class CompressionStep(Enum):
MPH = 1
BV = 2
BV_OBL = 3
BFS = 4
PERMUTE = 5
PERMUTE_OBL = 6
STATS = 7
TRANSPOSE = 8
TRANSPOSE_OBL = 9
MAPS = 10
CLEAN_TMP = 11
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
+# Mapping from compression steps to shell commands implementing them. Commands
+# will be executed by the shell, so be careful with meta characters. They are
+# specified here as lists of tokens that will be joined together only for ease
+# of line splitting. In commands, {tokens} will be interpolated with
+# configuration values, see :func:`compress`.
STEP_ARGV = {
CompressionStep.MPH:
- (['{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction',
- '--zipped', '{out_dir}/{graph_name}.mph',
- '--temp-dir', '{tmp_dir}',
- '{in_dir}/{graph_name}.nodes.csv.gz'], {}),
+ ['zcat', '{in_dir}/{graph_name}.nodes.csv.gz', '|',
+ '{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction',
+ '--temp-dir', '{tmp_dir}', '{out_dir}/{graph_name}.mph'],
CompressionStep.BV:
- (['{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph',
- '--function', '{out_dir}/{graph_name}.mph', '--temp-dir', '{tmp_dir}',
- '--zipped', '{out_dir}/{graph_name}-bv'],
- {'stdin': '{in_dir}/{graph_name}.edges.csv.gz'}),
+ ['zcat', '{in_dir}/{graph_name}.edges.csv.gz', '|',
+ '{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph',
+ '--temp-dir', '{tmp_dir}',
+ '--function', '{out_dir}/{graph_name}.mph', '{out_dir}/{graph_name}-bv'],
CompressionStep.BV_OBL:
- (['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
- '--list', '{out_dir}/{graph_name}-bv'], {}),
+ ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
+ '--list', '{out_dir}/{graph_name}-bv'],
CompressionStep.BFS:
- (['{java}', 'it.unimi.dsi.law.big.graph.BFS',
- '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], {}),
+ ['{java}', 'it.unimi.dsi.law.big.graph.BFS',
+ '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'],
CompressionStep.PERMUTE:
- (['{java}', 'it.unimi.dsi.big.webgraph.Transform',
- 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}',
- '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], {}),
+ ['{java}', 'it.unimi.dsi.big.webgraph.Transform',
+ 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}',
+ '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'],
CompressionStep.PERMUTE_OBL:
- (['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
- '--list', '{out_dir}/{graph_name}'], {}),
+ ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
+ '--list', '{out_dir}/{graph_name}'],
CompressionStep.STATS:
- (['{java}', 'it.unimi.dsi.big.webgraph.Stats',
- '{out_dir}/{graph_name}'], {}),
+ ['{java}', 'it.unimi.dsi.big.webgraph.Stats',
+ '{out_dir}/{graph_name}'],
CompressionStep.TRANSPOSE:
- (['{java}', 'it.unimi.dsi.big.webgraph.Transform',
- 'transposeOffline', '{out_dir}/{graph_name}',
- '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], {}),
+ ['{java}', 'it.unimi.dsi.big.webgraph.Transform',
+ 'transposeOffline', '{out_dir}/{graph_name}',
+ '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'],
CompressionStep.TRANSPOSE_OBL:
- (['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
- '--list', '{out_dir}/{graph_name}-transposed'],
- {}),
+ ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
+ '--list', '{out_dir}/{graph_name}-transposed'],
CompressionStep.MAPS:
- (['{java}', 'org.softwareheritage.graph.backend.Setup',
- '{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}'],
- {}),
+ ['{java}', 'org.softwareheritage.graph.backend.Setup',
+ '{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}'],
CompressionStep.CLEAN_TMP:
- (['rm', '-rf',
- '{out_dir}/{graph_name}-bv.graph',
- '{out_dir}/{graph_name}-bv.obl',
- '{out_dir}/{graph_name}-bv.offsets',
- '{tmp_dir}'],
- {}),
-} # type: Dict[CompressionStep, Tuple[List[str], Dict[str, str]]]
+ ['rm', '-rf',
+ '{out_dir}/{graph_name}-bv.graph',
+ '{out_dir}/{graph_name}-bv.obl',
+ '{out_dir}/{graph_name}-bv.offsets',
+ '{tmp_dir}'],
+} # type: Dict[CompressionStep, List[str]]
class StepOption(ParamType):
"""click type for specifying a compression step on the CLI
parse either individual steps, specified as step names or integers, or step
ranges
"""
name = "compression step"
def convert(self, value, param, ctx) -> Set[CompressionStep]:
steps = set() # type: Set[CompressionStep]
specs = value.split(',')
for spec in specs:
if '-' in spec: # step range
(raw_l, raw_r) = spec.split('-', maxsplit=1)
if raw_l == '': # no left endpoint
raw_l = COMP_SEQ[0].name
if raw_r == '': # no right endpoint
raw_r = COMP_SEQ[-1].name
l_step = self.convert(raw_l, param, ctx)
r_step = self.convert(raw_r, param, ctx)
if len(l_step) != 1 or len(r_step) != 1:
self.fail('invalid step specification: %s, see --help'
% value)
l_idx = l_step.pop()
r_idx = r_step.pop()
steps = steps.union(set(map(CompressionStep,
range(l_idx.value,
r_idx.value + 1))))
else: # singleton step
try:
steps.add(CompressionStep(int(spec))) # integer step
except ValueError:
try:
steps.add(CompressionStep[spec.upper()]) # step name
except KeyError:
self.fail('invalid step specification: %s, see --help'
% value)
return steps
def do_step(step, conf):
- (raw_cmd, raw_kwargs) = STEP_ARGV[step]
- cmd = list(map(lambda s: s.format(**conf), raw_cmd))
- kwargs = {k: v.format(**conf) for (k, v) in raw_kwargs.items()}
+ cmd = ' '.join(STEP_ARGV[step]).format(**conf)
cmd_env = os.environ.copy()
cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options']
cmd_env['CLASSPATH'] = conf['classpath']
- with ExitStack() as ctxt:
- run_kwargs = {}
- if 'stdin' in kwargs: # redirect standard input
- run_kwargs['stdin'] = ctxt.enter_context(open(kwargs['stdin']))
-
- logging.info('running: ' + ' '.join(cmd))
- # return subprocess.run(cmd, check=True, env=cmd_env,
- # stderr=subprocess.STDOUT, **run_kwargs)
- process = subprocess.Popen(cmd, env=cmd_env, encoding='utf8',
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, **run_kwargs)
- with process.stdout as stdout:
- for line in stdout:
- logging.info(line.rstrip())
- rc = process.wait()
- if rc != 0:
- raise RuntimeError('compression step %s returned non-zero '
- 'exit code %d' % (step, rc))
- else:
- return rc
+ logging.info('running: %s' % cmd)
+ process = subprocess.Popen(cmd, shell=True,
+ env=cmd_env, encoding='utf8',
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ with process.stdout as stdout:
+ for line in stdout:
+ logging.info(line.rstrip())
+ rc = process.wait()
+ if rc != 0:
+ raise RuntimeError('compression step %s returned non-zero '
+ 'exit code %d' % (step, rc))
+ else:
+ return rc
def check_config(conf, graph_name, in_dir, out_dir):
"""check compression configuration, propagate defaults, and initialize
execution environment
"""
conf = conf.copy()
conf['graph_name'] = graph_name
conf['in_dir'] = str(in_dir)
conf['out_dir'] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if 'tmp_dir' not in conf:
tmp_dir = out_dir / 'tmp'
conf['tmp_dir'] = str(tmp_dir)
else:
tmp_dir = Path(conf['tmp_dir'])
tmp_dir.mkdir(parents=True, exist_ok=True)
if 'batch_size' not in conf:
conf['batch_size'] = '1000000000' # 1 billion
if 'logback' not in conf:
logback_confpath = tmp_dir / 'logback.xml'
with open(logback_confpath, 'w') as conffile:
conffile.write("""
%d %r %p [%t] %logger{1} - %m%n
""")
conf['logback'] = str(logback_confpath)
if 'max_ram' not in conf:
conf['max_ram'] = str(psutil.virtual_memory().total)
if 'java_tool_options' not in conf:
assert 'logback' in conf
conf['java_tool_options'] = ' '.join([
'-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G', '-XX:+UseLargePages',
'-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB',
'-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}'
]).format(max_ram=conf['max_ram'], logback=conf['logback'])
if 'java' not in conf:
conf['java'] = 'java'
if 'classpath' not in conf:
conf['classpath'] = find_graph_jar()
return conf
def compress(graph_name: str, in_dir: Path, out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {}):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- batch_size: batch size for `WebGraph transformations
`_;
defaults to 1 billion
- classpath: java classpath, defaults to swh-graph JAR only
- java: command to run java VM, defaults to "java"
- java_tool_options: value for JAVA_TOOL_OPTIONS environment
variable; defaults to various settings for high memory machines
- logback: path to a logback.xml configuration file; if not provided
a temporary one will be created and used
- max_ram: maximum RAM to use for compression; defaults to available
virtual memory
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config(conf, graph_name, in_dir, out_dir)
logging.info('starting compression')
compression_start_time = datetime.now()
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logging.debug('skipping compression step %s' % step)
continue
seq_no += 1
logging.info('starting compression step %s (%d/%d)'
% (step, seq_no, len(steps)))
step_start_time = datetime.now()
do_step(step, conf)
step_duration = datetime.now() - step_start_time
logging.info('completed compression step %s (%d/%d) in %s'
% (step, seq_no, len(steps), step_duration))
compression_duration = datetime.now() - compression_start_time
logging.info('completed compression in %s' % compression_duration)