diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index 91ae173..74fe13e 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,200 +1,200 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import aiohttp
import click
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.graph import client, webgraph
from swh.graph.pid import PidToIntMap, IntToPidMap
from swh.graph.server.app import make_app
from swh.graph.backend import Backend
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG = {
'graph': ('dict', {})
} # type: Dict[str, Tuple[str, Any]]
@click.group(name='graph', context_settings=CONTEXT_SETTINGS,
cls=AliasedGroup)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help='YAML configuration file')
@click.pass_context
def cli(ctx, config_file):
"""Software Heritage graph tools."""
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if 'graph' not in conf:
raise ValueError('no "graph" stanza found in configuration file %s'
% config_file)
ctx.obj['config'] = conf
@cli.command('api-client')
@click.option('--host', default='localhost', help='Graph server host')
@click.option('--port', default='5009', help='Graph server port')
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph REST service"""
url = 'http://{}:{}'.format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@cli.group('map')
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
def dump_pid2int(filename):
for (pid, int) in PidToIntMap(filename):
print('{}\t{}'.format(pid, int))
def dump_int2pid(filename):
for (int, pid) in IntToPidMap(filename):
print('{}\t{}'.format(int, pid))
def restore_pid2int(filename):
"""read a textual PID->int map from stdin and write its binary version to
filename
"""
with open(filename, 'wb') as dst:
for line in sys.stdin:
(str_pid, str_int) = line.split()
PidToIntMap.write_record(dst, str_pid, int(str_int))
def restore_int2pid(filename, length):
"""read a textual int->PID map from stdin and write its binary version to
filename
"""
int2pid = IntToPidMap(filename, mode='wb', length=length)
for line in sys.stdin:
(str_int, str_pid) = line.split()
int2pid[int(str_int)] = str_pid
int2pid.close()
@map.command('dump')
@click.option('--type', '-t', 'map_type', required=True,
type=click.Choice(['pid2int', 'int2pid']),
help='type of map to dump')
@click.argument('filename', required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
"""dump a binary PID<->int map to textual format"""
if map_type == 'pid2int':
dump_pid2int(filename)
elif map_type == 'int2pid':
dump_int2pid(filename)
else:
raise ValueError('invalid map type: ' + map_type)
pass
@map.command('restore')
@click.option('--type', '-t', 'map_type', required=True,
type=click.Choice(['pid2int', 'int2pid']),
help='type of map to dump')
@click.option('--length', '-l', type=int,
help='''map size in number of logical records
(required for int2pid maps)''')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
"""restore a binary PID<->int map from textual format"""
if map_type == 'pid2int':
restore_pid2int(filename)
elif map_type == 'int2pid':
if length is None:
raise click.UsageError(
'map length is required when restoring {} maps'.format(
map_type), ctx)
restore_int2pid(filename, length)
else:
raise ValueError('invalid map type: ' + map_type)
@cli.command(name='rpc-serve')
@click.option('--host', '-h', default='0.0.0.0',
metavar='IP', show_default=True,
help='host IP address to bind the server on')
@click.option('--port', '-p', default=5009, type=click.INT,
metavar='PORT', show_default=True,
help='port to bind the server on')
@click.option('--graph', '-g', required=True, metavar='GRAPH',
help='compressed graph basename')
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph REST service"""
backend = Backend(graph_path=graph)
app = make_app(backend=backend)
with backend:
aiohttp.web.run_app(app, host=host, port=port)
@cli.command()
@click.option('--graph', '-g', required=True, metavar='GRAPH',
type=PathlibPath(),
help='input graph basename')
@click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR',
type=PathlibPath(),
help='directory where to store compressed graph')
@click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(),
help='run only these compression steps (default: all steps)')
@click.pass_context
def compress(ctx, graph, out_dir, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute,
(6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps,
(11) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
graph_name = graph.name
in_dir = graph.parent
try:
conf = ctx.obj['config']['graph']['compress']
except KeyError:
conf = {} # use defaults
- webgraph.compress(conf, graph_name, in_dir, out_dir, steps)
+ webgraph.compress(graph_name, in_dir, out_dir, steps, conf)
def main():
return cli(auto_envvar_prefix='SWH_GRAPH')
if __name__ == '__main__':
main()
diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index 7db9d7b..cd88215 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,249 +1,272 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
import logging
import os
import subprocess
from enum import Enum
from contextlib import ExitStack
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set, Tuple
import psutil
from click import ParamType
from swh.graph.backend import find_graph_jar
class CompressionStep(Enum):
MPH = 1
BV = 2
BV_OBL = 3
BFS = 4
PERMUTE = 5
PERMUTE_OBL = 6
STATS = 7
TRANSPOSE = 8
TRANSPOSE_OBL = 9
MAPS = 10
CLEAN_TMP = 11
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
STEP_ARGV = {
CompressionStep.MPH:
(['{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction',
'--zipped', '{out_dir}/{graph_name}.mph',
'--temp-dir', '{tmp_dir}',
'{in_dir}/{graph_name}.nodes.csv.gz'], {}),
CompressionStep.BV:
(['{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph',
'--function', '{out_dir}/{graph_name}.mph', '--temp-dir', '{tmp_dir}',
'--zipped', '{out_dir}/{graph_name}-bv'],
{'stdin': '{in_dir}/{graph_name}.edges.csv.gz'}),
CompressionStep.BV_OBL:
(['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
'--list', '{out_dir}/{graph_name}-bv'], {}),
CompressionStep.BFS:
(['{java}', 'it.unimi.dsi.law.big.graph.BFS',
'{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], {}),
CompressionStep.PERMUTE:
(['{java}', 'it.unimi.dsi.big.webgraph.Transform',
'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}',
'{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], {}),
CompressionStep.PERMUTE_OBL:
(['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
'--list', '{out_dir}/{graph_name}'], {}),
CompressionStep.STATS:
(['{java}', 'it.unimi.dsi.big.webgraph.Stats',
'{out_dir}/{graph_name}'], {}),
CompressionStep.TRANSPOSE:
(['{java}', 'it.unimi.dsi.big.webgraph.Transform',
'transposeOffline', '{out_dir}/{graph_name}',
'{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], {}),
CompressionStep.TRANSPOSE_OBL:
(['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
'--list', '{out_dir}/{graph_name}-transposed'],
{}),
CompressionStep.MAPS:
(['{java}', 'org.softwareheritage.graph.backend.Setup',
'{in_dir}/{graph_name}.nodes.csv.gz', '{out_dir}/{graph_name}'],
{}),
CompressionStep.CLEAN_TMP:
(['rm', '-rf',
'{out_dir}/{graph_name}-bv.graph',
'{out_dir}/{graph_name}-bv.obl',
'{out_dir}/{graph_name}-bv.offsets',
'{tmp_dir}'],
{}),
} # type: Dict[CompressionStep, Tuple[List[str], Dict[str, str]]]
class StepOption(ParamType):
"""click type for specifying a compression step on the CLI
parse either individual steps, specified as step names or integers, or step
ranges
"""
name = "compression step"
def convert(self, value, param, ctx) -> Set[CompressionStep]:
steps = set() # type: Set[CompressionStep]
specs = value.split(',')
for spec in specs:
if '-' in spec: # step range
(raw_l, raw_r) = spec.split('-', maxsplit=1)
if raw_l == '': # no left endpoint
raw_l = COMP_SEQ[0].name
if raw_r == '': # no right endpoint
raw_r = COMP_SEQ[-1].name
l_step = self.convert(raw_l, param, ctx)
r_step = self.convert(raw_r, param, ctx)
if len(l_step) != 1 or len(r_step) != 1:
self.fail('invalid step specification: %s, see --help'
% value)
l_idx = l_step.pop()
r_idx = r_step.pop()
steps = steps.union(set(map(CompressionStep,
range(l_idx.value,
r_idx.value + 1))))
else: # singleton step
try:
steps.add(CompressionStep(int(spec))) # integer step
except ValueError:
try:
steps.add(CompressionStep[spec.upper()]) # step name
except KeyError:
self.fail('invalid step specification: %s, see --help'
% value)
return steps
def do_step(step, conf):
(raw_cmd, raw_kwargs) = STEP_ARGV[step]
cmd = list(map(lambda s: s.format(**conf), raw_cmd))
kwargs = {k: v.format(**conf) for (k, v) in raw_kwargs.items()}
cmd_env = os.environ.copy()
cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options']
cmd_env['CLASSPATH'] = conf['classpath']
with ExitStack() as ctxt:
run_kwargs = {}
if 'stdin' in kwargs: # redirect standard input
run_kwargs['stdin'] = ctxt.enter_context(open(kwargs['stdin']))
logging.info('running: ' + ' '.join(cmd))
# return subprocess.run(cmd, check=True, env=cmd_env,
# stderr=subprocess.STDOUT, **run_kwargs)
process = subprocess.Popen(cmd, env=cmd_env, encoding='utf8',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, **run_kwargs)
with process.stdout as stdout:
for line in stdout:
logging.info(line.rstrip())
rc = process.wait()
if rc != 0:
raise RuntimeError('compression step %s returned non-zero '
'exit code %d' % (step, rc))
else:
return rc
def check_config(conf, graph_name, in_dir, out_dir):
"""check compression configuration, propagate defaults, and initialize
execution environment
"""
conf = conf.copy()
conf['graph_name'] = graph_name
conf['in_dir'] = str(in_dir)
conf['out_dir'] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if 'tmp_dir' not in conf:
tmp_dir = out_dir / 'tmp'
conf['tmp_dir'] = str(tmp_dir)
else:
tmp_dir = Path(conf['tmp_dir'])
tmp_dir.mkdir(parents=True, exist_ok=True)
if 'batch_size' not in conf:
conf['batch_size'] = '1000000000' # 1 billion
if 'logback' not in conf:
logback_confpath = tmp_dir / 'logback.xml'
with open(logback_confpath, 'w') as conffile:
conffile.write("""
%d %r %p [%t] %logger{1} - %m%n
""")
conf['logback'] = str(logback_confpath)
if 'max_ram' not in conf:
conf['max_ram'] = str(psutil.virtual_memory().total)
if 'java_tool_options' not in conf:
assert 'logback' in conf
conf['java_tool_options'] = ' '.join([
'-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G', '-XX:+UseLargePages',
'-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB',
'-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}'
]).format(max_ram=conf['max_ram'], logback=conf['logback'])
if 'java' not in conf:
conf['java'] = 'java'
if 'classpath' not in conf:
conf['classpath'] = find_graph_jar()
return conf
-def compress(conf: Dict[str, str], graph_name: str,
- in_dir: Path, out_dir: Path, steps: Set[CompressionStep]):
+def compress(graph_name: str, in_dir: Path, out_dir: Path,
+ steps: Set[CompressionStep] = set(COMP_SEQ),
+ conf: Dict[str, str] = {}):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
+ Args:
+ graph_name: graph base name, relative to in_dir
+ in_dir: input directory, where the uncompressed graph can be found
+ out_dir: output directory, where the compressed graph will be stored
+ steps: compression steps to run (default: all steps)
+ conf: compression configuration, supporting the following keys (all are
+ optional, so an empty configuration is fine and is the default)
+
+ - batch_size: batch size for `WebGraph transformations
+ `_;
+ defaults to 1 billion
+ - classpath: java classpath, defaults to swh-graph JAR only
+ - java: command to run java VM, defaults to "java"
+ - java_tool_options: value for JAVA_TOOL_OPTIONS environment
+ variable; defaults to various settings for high memory machines
+ - logback: path to a logback.xml configuration file; if not provided
+ a temporary one will be created and used
+ - max_ram: maximum RAM to use for compression; defaults to available
+ virtual memory
+ - tmp_dir: temporary directory, defaults to the "tmp" subdir of
+ out_dir
+
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config(conf, graph_name, in_dir, out_dir)
logging.info('starting compression')
compression_start_time = datetime.now()
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logging.debug('skipping compression step %s' % step)
continue
seq_no += 1
logging.info('starting compression step %s (%d/%d)'
% (step, seq_no, len(steps)))
step_start_time = datetime.now()
do_step(step, conf)
step_duration = datetime.now() - step_start_time
logging.info('completed compression step %s (%d/%d) in %s'
% (step, seq_no, len(steps), step_duration))
compression_duration = datetime.now() - compression_start_time
logging.info('completed compression in %s' % compression_duration)