diff --git a/requirements.txt b/requirements.txt index 3f9470a..f496aaf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ aiohttp click vcversioner py4j +psutil diff --git a/swh/graph/backend.py b/swh/graph/backend.py index 0b2ba6c..ca2b59d 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,199 +1,205 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os import pathlib import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.pid import IntToPidMap, PidToIntMap from swh.model.identifiers import PID_TYPES BUF_SIZE = 64*1024 BIN_FMT = '>q' # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 NODE2PID_EXT = 'node2pid.bin' PID2NODE_EXT = 'pid2node.bin' def find_graph_jar(): + """find swh-graph.jar, containing the Java part of swh-graph + + look both in development directories and installed data (for in-production + deployments who fecthed the JAR from pypi) + + """ swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/server/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', ] for path in try_paths: glob = list(path.glob('swh-graph-*.jar')) if glob: return str(glob[0]) raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path): self.gateway = None self.entry = None self.graph_path = graph_path def __enter__(self): # TODO: make all of that configurable with sane defaults java_opts = [ '-Xmx200G', '-server', '-XX:PretenureSizeThreshold=512M', '-XX:MaxNewSize=4G', '-XX:+UseLargePages', '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', '-XX:+ResizeTLAB', ] self.gateway = JavaGateway.launch_gateway( java_path=None, javaopts=java_opts, classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() def count(self, ttype, direction, edges_fmt, src): method = getattr(self.entry, 'count_' + ttype) return method(direction, edges_fmt, src) async def simple_traversal(self, ttype, direction, edges_fmt, src): assert ttype in ('leaves', 'neighbors', 'visit_nodes') method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id async def visit_paths(self, direction, edges_fmt, src): path = [] async for node in self.stream_proxy.visit_paths( direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() open_thread = loop.run_in_executor(None, open, fname, 'rb') # Since the open() call on the FIFO is blocking until it is also opened # on the Java side, we await it with a timeout in case there is an # exception that prevents the write-side open(). with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: # If the read-side open() timeouts, an exception on the # Java side probably happened that prevented the # write-side open(). We propagate this exception here if # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py index 23b97c4..b11699d 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,146 +1,180 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import aiohttp import click import sys +from pathlib import Path + from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup -from swh.graph import client +from swh.graph import client, webgraph from swh.graph.pid import PidToIntMap, IntToPidMap from swh.graph.server.app import make_app from swh.graph.backend import Backend +class PathlibPath(click.Path): + """A Click path argument that returns a pathlib Path, not a string""" + def convert(self, value, param, ctx): + return Path(super().convert(value, param, ctx)) + + @click.group(name='graph', context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def cli(ctx): """Software Heritage graph tools.""" ctx.ensure_object(dict) @cli.command('api-client') @click.option('--host', default='localhost', help='Graph server host') @click.option('--port', default='5009', help='Graph server port') @click.pass_context def api_client(ctx, host, port): - """Client for the Software Heritage Graph REST service - - """ + """client for the graph REST service""" url = 'http://{}:{}'.format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) @cli.group('map') @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass def dump_pid2int(filename): for (pid, int) in PidToIntMap(filename): print('{}\t{}'.format(pid, int)) def dump_int2pid(filename): for (int, pid) in IntToPidMap(filename): print('{}\t{}'.format(int, pid)) def restore_pid2int(filename): """read a textual PID->int map from stdin and write its binary version to filename """ with open(filename, 'wb') as dst: for line in sys.stdin: (str_pid, str_int) = line.split() PidToIntMap.write_record(dst, str_pid, int(str_int)) def restore_int2pid(filename, length): """read a textual int->PID map from stdin and write its binary version to filename """ int2pid = IntToPidMap(filename, mode='wb', length=length) for line in sys.stdin: (str_int, str_pid) = line.split() int2pid[int(str_int)] = str_pid int2pid.close() @map.command('dump') @click.option('--type', '-t', 'map_type', required=True, type=click.Choice(['pid2int', 'int2pid']), help='type of map to dump') @click.argument('filename', required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): """dump a binary PID<->int map to textual format""" if map_type == 'pid2int': dump_pid2int(filename) elif map_type == 'int2pid': dump_int2pid(filename) else: raise ValueError('invalid map type: ' + map_type) pass @map.command('restore') @click.option('--type', '-t', 'map_type', required=True, type=click.Choice(['pid2int', 'int2pid']), help='type of map to dump') @click.option('--length', '-l', type=int, help='''map size in number of logical records (required for int2pid maps)''') @click.argument('filename', required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): """restore a binary PID<->int map from textual format""" if map_type == 'pid2int': restore_pid2int(filename) elif map_type == 'int2pid': if length is None: raise click.UsageError( 'map length is required when restoring {} maps'.format( map_type), ctx) restore_int2pid(filename, length) else: raise ValueError('invalid map type: ' + map_type) @cli.command(name='rpc-serve') -@click.option('--host', default='0.0.0.0', +@click.option('--host', '-h', default='0.0.0.0', metavar='IP', show_default=True, - help="Host ip address to bind the server on") -@click.option('--port', default=5009, type=click.INT, + help='host IP address to bind the server on') +@click.option('--port', '-p', default=5009, type=click.INT, metavar='PORT', show_default=True, - help="Binding port of the server") -@click.option('--graph', required=True, metavar='GRAPH', - help="Path prefix of the graph to load") + help='port to bind the server on') +@click.option('--graph', '-g', required=True, metavar='GRAPH', + help='compressed graph basename') @click.pass_context def serve(ctx, host, port, graph): - """Run the Software Heritage Graph REST service - - """ + """run the graph REST service""" backend = Backend(graph_path=graph) app = make_app(backend=backend) with backend: aiohttp.web.run_app(app, host=host, port=port) +@cli.command() +@click.option('--graph', '-g', required=True, metavar='GRAPH', + type=PathlibPath(), + help='input graph basename') +@click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR', + type=PathlibPath(), + help='directory where to store compressed graph') +@click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(), + help='run only these compression steps (default: all steps)') +@click.pass_context +def compress(ctx, graph, out_dir, steps): + """Compress a graph using WebGraph + + Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz + + Output: a directory containing a WebGraph compressed graph + + Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, + (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl. Compression + steps can be selected by name or number using --steps, separating them with + commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. + + """ + graph_name = graph.name + in_dir = graph.parent + conf = {} # use defaults + + webgraph.compress(conf, graph_name, in_dir, out_dir, steps) + + def main(): return cli(auto_envvar_prefix='SWH_GRAPH') if __name__ == '__main__': main() diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py new file mode 100644 index 0000000..442c0ce --- /dev/null +++ b/swh/graph/webgraph.py @@ -0,0 +1,244 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""WebGraph driver + +""" + +import logging +import os +import subprocess + +from enum import Enum +from contextlib import ExitStack +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Set, Tuple + +import psutil + +from click import ParamType + +from swh.graph.backend import find_graph_jar + + +class CompressionStep(Enum): + MPH = 1 + BV = 2 + BV_OBL = 3 + BFS = 4 + PERMUTE = 5 + PERMUTE_OBL = 6 + STATS = 7 + TRANSPOSE = 8 + TRANSPOSE_OBL = 9 + CLEAN_TMP = 10 + + def __str__(self): + return self.name + + +# full compression pipeline +COMP_SEQ = list(CompressionStep) + +STEP_ARGV = { + CompressionStep.MPH: + (['{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction', + '--zipped', '{out_dir}/{graph_name}.mph', + '--temp-dir', '{tmp_dir}', + '{in_dir}/{graph_name}.nodes.csv.gz'], {}), + CompressionStep.BV: + (['{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph', + '--function', '{out_dir}/{graph_name}.mph', '--temp-dir', '{tmp_dir}', + '--zipped', '{out_dir}/{graph_name}-bv'], + {'stdin': '{in_dir}/{graph_name}.edges.csv.gz'}), + CompressionStep.BV_OBL: + (['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', + '--list', '{out_dir}/{graph_name}-bv'], {}), + CompressionStep.BFS: + (['{java}', 'it.unimi.dsi.law.big.graph.BFS', + '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], {}), + CompressionStep.PERMUTE: + (['{java}', 'it.unimi.dsi.big.webgraph.Transform', + 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}', + '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], {}), + CompressionStep.PERMUTE_OBL: + (['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', + '--list', '{out_dir}/{graph_name}'], {}), + CompressionStep.STATS: + (['{java}', 'it.unimi.dsi.big.webgraph.Stats', + '{out_dir}/{graph_name}'], {}), + CompressionStep.TRANSPOSE: + (['{java}', 'it.unimi.dsi.big.webgraph.Transform', + 'transposeOffline', '{out_dir}/{graph_name}', + '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], {}), + CompressionStep.TRANSPOSE_OBL: + (['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', + '--list', '{out_dir}/{graph_name}-transposed'], + {}), + CompressionStep.CLEAN_TMP: + (['rm', '-rf', + '{out_dir}/{graph_name}-bv.graph', + '{out_dir}/{graph_name}-bv.obl', + '{out_dir}/{graph_name}-bv.offsets', + '{tmp_dir}'], + {}), +} # type: Dict[CompressionStep, Tuple[List[str], Dict[str, str]]] + + +class StepOption(ParamType): + """click type for specifying a compression step on the CLI + + parse either individual steps, specified as step names or integers, or step + ranges + + """ + name = "compression step" + + def convert(self, value, param, ctx) -> Set[CompressionStep]: + steps = set() # type: Set[CompressionStep] + + specs = value.split(',') + for spec in specs: + if '-' in spec: # step range + (raw_l, raw_r) = spec.split('-', maxsplit=1) + if raw_l == '': # no left endpoint + raw_l = COMP_SEQ[0].name + if raw_r == '': # no right endpoint + raw_r = COMP_SEQ[-1].name + l_step = self.convert(raw_l, param, ctx) + r_step = self.convert(raw_r, param, ctx) + if len(l_step) != 1 or len(r_step) != 1: + self.fail('invalid step specification: %s, see --help' + % value) + l_idx = l_step.pop() + r_idx = r_step.pop() + steps = steps.union(set(map(CompressionStep, + range(l_idx.value, + r_idx.value + 1)))) + else: # singleton step + try: + steps.add(CompressionStep(int(spec))) # integer step + except ValueError: + try: + steps.add(CompressionStep[spec.upper()]) # step name + except KeyError: + self.fail('invalid step specification: %s, see --help' + % value) + + return steps + + +def do_step(step, conf): + (raw_cmd, raw_kwargs) = STEP_ARGV[step] + cmd = list(map(lambda s: s.format(**conf), raw_cmd)) + kwargs = {k: v.format(**conf) for (k, v) in raw_kwargs.items()} + + cmd_env = os.environ.copy() + cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options'] + cmd_env['CLASSPATH'] = conf['classpath'] + + with ExitStack() as ctxt: + run_kwargs = {} + if 'stdin' in kwargs: # redirect standard input + run_kwargs['stdin'] = ctxt.enter_context(open(kwargs['stdin'])) + + logging.info('running: ' + ' '.join(cmd)) + # return subprocess.run(cmd, check=True, env=cmd_env, + # stderr=subprocess.STDOUT, **run_kwargs) + process = subprocess.Popen(cmd, env=cmd_env, encoding='utf8', + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, **run_kwargs) + with process.stdout as stdout: + for line in stdout: + logging.info(line.rstrip()) + rc = process.wait() + if rc != 0: + raise RuntimeError('compression step %s returned non-zero ' + 'exit code %d' % (step, rc)) + else: + return rc + + +def check_config(conf, graph_name, in_dir, out_dir): + """check compression configuration, propagate defaults, and initialize + execution environment + + """ + conf = conf.copy() + conf['graph_name'] = graph_name + conf['in_dir'] = str(in_dir) + conf['out_dir'] = str(out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + if 'tmp_dir' not in conf: + tmp_dir = out_dir / 'tmp' + conf['tmp_dir'] = str(tmp_dir) + else: + tmp_dir = Path(conf['tmp_dir']) + tmp_dir.mkdir(parents=True, exist_ok=True) + if 'batch_size' not in conf: + conf['batch_size'] = '1000000000' # 1 billion + if 'logback' not in conf: + logback_confpath = tmp_dir / 'logback.xml' + with open(logback_confpath, 'w') as conffile: + conffile.write(""" + + + + %d %r %p [%t] %logger{1} - %m%n + + + + + + +""") + conf['logback'] = str(logback_confpath) + if 'max_ram' not in conf: + conf['max_ram'] = str(psutil.virtual_memory().total) + if 'java_tool_options' not in conf: + assert 'logback' in conf + conf['java_tool_options'] = ' '.join([ + '-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M', + '-XX:MaxNewSize=4G', '-XX:+UseLargePages', + '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', + '-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}' + ]).format(max_ram=conf['max_ram'], logback=conf['logback']) + if 'java' not in conf: + conf['java'] = 'java' + if 'classpath' not in conf: + conf['classpath'] = find_graph_jar() + + return conf + + +def compress(conf: Dict[str, str], graph_name: str, + in_dir: Path, out_dir: Path, steps: Set[CompressionStep]): + """graph compression pipeline driver from nodes/edges files to compressed + on-disk representation + + """ + if not steps: + steps = set(COMP_SEQ) + + conf = check_config(conf, graph_name, in_dir, out_dir) + + logging.info('starting compression') + compression_start_time = datetime.now() + seq_no = 0 + for step in COMP_SEQ: + if step not in steps: + logging.debug('skipping compression step %s' % step) + continue + seq_no += 1 + logging.info('starting compression step %s (%d/%d)' + % (step, seq_no, len(steps))) + step_start_time = datetime.now() + do_step(step, conf) + step_duration = datetime.now() - step_start_time + logging.info('completed compression step %s (%d/%d) in %s' + % (step, seq_no, len(steps), step_duration)) + compression_duration = datetime.now() - compression_start_time + logging.info('completed compression in %s' % compression_duration)