diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index 3fdcb5c..954eb37 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,221 +1,185 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
-import logging
import os
-import pathlib
import struct
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
+from swh.graph.config import check_config
from swh.graph.pid import NodeToPidMap, PidToNodeMap
from swh.model.identifiers import PID_TYPES
BUF_SIZE = 64*1024
BIN_FMT = '>q' # 64 bit integer, big endian
PATH_SEPARATOR_ID = -1
NODE2PID_EXT = 'node2pid.bin'
PID2NODE_EXT = 'pid2node.bin'
-def find_graph_jar():
- """find swh-graph.jar, containing the Java part of swh-graph
-
- look both in development directories and installed data (for in-production
- deployments who fecthed the JAR from pypi)
-
- """
- swh_graph_root = pathlib.Path(__file__).parents[2]
- try_paths = [
- swh_graph_root / 'java/target/',
- pathlib.Path(sys.prefix) / 'share/swh-graph/',
- pathlib.Path(sys.prefix) / 'local/share/swh-graph/',
- ]
- for path in try_paths:
- glob = list(path.glob('swh-graph-*.jar'))
- if glob:
- if len(glob) > 1:
- logging.warn('found multiple swh-graph JARs, '
- 'arbitrarily picking one')
- logging.info('using swh-graph JAR: {0}'.format(glob[0]))
- return str(glob[0])
- raise RuntimeError('swh-graph JAR not found. Have you run `make java`?')
-
-
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
- def __init__(self, graph_path):
+ def __init__(self, graph_path, config=None):
self.gateway = None
self.entry = None
self.graph_path = graph_path
+ self.config = check_config(config or {})
def __enter__(self):
- # TODO: make all of that configurable with sane defaults
- java_opts = [
- '-Xmx200G',
- '-server',
- '-XX:PretenureSizeThreshold=512M',
- '-XX:MaxNewSize=4G',
- '-XX:+UseLargePages',
- '-XX:+UseTransparentHugePages',
- '-XX:+UseNUMA',
- '-XX:+UseTLAB',
- '-XX:+ResizeTLAB',
- ]
self.gateway = JavaGateway.launch_gateway(
java_path=None,
- javaopts=java_opts,
- classpath=find_graph_jar(),
+ javaopts=self.config['java_tool_options'].split(),
+ classpath=self.config['classpath'],
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
self.node2pid = NodeToPidMap(self.graph_path + '.' + NODE2PID_EXT)
self.pid2node = PidToNodeMap(self.graph_path + '.' + PID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
def stats(self):
return self.entry.stats()
def count(self, ttype, direction, edges_fmt, src):
method = getattr(self.entry, 'count_' + ttype)
return method(direction, edges_fmt, src)
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ('leaves', 'neighbors', 'visit_nodes')
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
if dst in PID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
src, dst)
else:
it = self.stream_proxy.walk(direction, edges_fmt, algo,
src, dst)
async for node_id in it:
yield node_id
async def random_walk(self, direction, edges_fmt, retries, src, dst):
if dst in PID_TYPES:
it = self.stream_proxy.random_walk_type(direction, edges_fmt,
retries, src, dst)
else:
it = self.stream_proxy.random_walk(direction, edges_fmt, retries,
src, dst)
async for node_id in it: # TODO return 404 if path is empty
yield node_id
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(
direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
path.append(node)
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
open_thread = loop.run_in_executor(None, open, fname, 'rb')
# Since the open() call on the FIFO is blocking until it is also opened
# on the Java side, we await it with a timeout in case there is an
# exception that prevents the write-side open().
with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
break
for data in struct.iter_unpack(BIN_FMT, data):
yield data[0]
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname:
cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
# If the read-side open() timeouts, an exception on the
# Java side probably happened that prevented the
# write-side open(). We propagate this exception here if
# that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index 60edeb4..374fb8d 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,264 +1,264 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import aiohttp
import click
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
import swh.model.exceptions
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.graph import client, webgraph
from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT
from swh.graph.pid import PidToNodeMap, NodeToPidMap
from swh.graph.server.app import make_app
from swh.graph.backend import Backend
from swh.model.identifiers import parse_persistent_identifier
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG = {
'graph': ('dict', {})
} # type: Dict[str, Tuple[str, Any]]
@click.group(name='graph', context_settings=CONTEXT_SETTINGS,
cls=AliasedGroup)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help='YAML configuration file')
@click.pass_context
def cli(ctx, config_file):
"""Software Heritage graph tools."""
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if 'graph' not in conf:
raise ValueError('no "graph" stanza found in configuration file %s'
% config_file)
ctx.obj['config'] = conf
@cli.command('api-client')
@click.option('--host', default='localhost', help='Graph server host')
@click.option('--port', default='5009', help='Graph server port')
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph REST service"""
url = 'http://{}:{}'.format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@cli.group('map')
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
def dump_pid2node(filename):
for (pid, int) in PidToNodeMap(filename):
print('{}\t{}'.format(pid, int))
def dump_node2pid(filename):
for (int, pid) in NodeToPidMap(filename):
print('{}\t{}'.format(int, pid))
def restore_pid2node(filename):
"""read a textual PID->int map from stdin and write its binary version to
filename
"""
with open(filename, 'wb') as dst:
for line in sys.stdin:
(str_pid, str_int) = line.split()
PidToNodeMap.write_record(dst, str_pid, int(str_int))
def restore_node2pid(filename, length):
"""read a textual int->PID map from stdin and write its binary version to
filename
"""
node2pid = NodeToPidMap(filename, mode='wb', length=length)
for line in sys.stdin:
(str_int, str_pid) = line.split()
node2pid[int(str_int)] = str_pid
node2pid.close()
@map.command('dump')
@click.option('--type', '-t', 'map_type', required=True,
type=click.Choice(['pid2node', 'node2pid']),
help='type of map to dump')
@click.argument('filename', required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
"""Dump a binary PID<->node map to textual format."""
if map_type == 'pid2node':
dump_pid2node(filename)
elif map_type == 'node2pid':
dump_node2pid(filename)
else:
raise ValueError('invalid map type: ' + map_type)
pass
@map.command('restore')
@click.option('--type', '-t', 'map_type', required=True,
type=click.Choice(['pid2node', 'node2pid']),
help='type of map to dump')
@click.option('--length', '-l', type=int,
help='''map size in number of logical records
(required for node2pid maps)''')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
"""Restore a binary PID<->node map from textual format."""
if map_type == 'pid2node':
restore_pid2node(filename)
elif map_type == 'node2pid':
if length is None:
raise click.UsageError(
'map length is required when restoring {} maps'.format(
map_type), ctx)
restore_node2pid(filename, length)
else:
raise ValueError('invalid map type: ' + map_type)
@map.command('write')
@click.option('--type', '-t', 'map_type', required=True,
type=click.Choice(['pid2node', 'node2pid']),
help='type of map to write')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def write(ctx, map_type, filename):
"""Write a map to disk sequentially.
read from stdin a textual PID->node mapping (for pid2node, or a simple
sequence of PIDs for node2pid) and write it to disk in the requested binary
map format
note that no sorting is applied, so the input should already be sorted as
required by the chosen map type (by PID for pid2node, by int for node2pid)
"""
with open(filename, 'wb') as f:
if map_type == 'pid2node':
for line in sys.stdin:
(pid, int_str) = line.rstrip().split(maxsplit=1)
PidToNodeMap.write_record(f, pid, int(int_str))
elif map_type == 'node2pid':
for line in sys.stdin:
pid = line.rstrip()
NodeToPidMap.write_record(f, pid)
else:
raise ValueError('invalid map type: ' + map_type)
@map.command('lookup')
@click.option('--graph', '-g', required=True, metavar='GRAPH',
help='compressed graph basename')
@click.argument('identifier', required=True)
def map_lookup(graph, identifier):
"""Lookup an identifier using on-disk maps.
Depending on the identifier type lookup either a PID into a PID->node (and
return the node integer identifier) or, vice-versa, lookup a node integer
identifier into a node->PID (and return the PID). The desired behavior is
chosen depending on the syntax of the given identifier.
"""
is_pid = None
try:
int(identifier)
is_pid = False
except ValueError:
try:
parse_persistent_identifier(identifier)
is_pid = True
except swh.model.exceptions.ValidationError:
raise ValueError(f'invalid identifier: {identifier}')
if is_pid:
print(PidToNodeMap(f'{graph}.{PID2NODE_EXT}')[identifier])
else:
print(NodeToPidMap(f'{graph}.{NODE2PID_EXT}')[int(identifier)])
@cli.command(name='rpc-serve')
@click.option('--host', '-h', default='0.0.0.0',
metavar='IP', show_default=True,
help='host IP address to bind the server on')
@click.option('--port', '-p', default=5009, type=click.INT,
metavar='PORT', show_default=True,
help='port to bind the server on')
@click.option('--graph', '-g', required=True, metavar='GRAPH',
help='compressed graph basename')
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph REST service"""
- backend = Backend(graph_path=graph)
+ backend = Backend(graph_path=graph, config=ctx.obj['config'])
app = make_app(backend=backend)
with backend:
aiohttp.web.run_app(app, host=host, port=port)
@cli.command()
@click.option('--graph', '-g', required=True, metavar='GRAPH',
type=PathlibPath(),
help='input graph basename')
@click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR',
type=PathlibPath(),
help='directory where to store compressed graph')
@click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(),
help='run only these compression steps (default: all steps)')
@click.pass_context
def compress(ctx, graph, out_dir, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute,
(6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps,
(11) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
graph_name = graph.name
in_dir = graph.parent
try:
conf = ctx.obj['config']['graph']['compress']
except KeyError:
conf = {} # use defaults
webgraph.compress(graph_name, in_dir, out_dir, steps, conf)
def main():
return cli(auto_envvar_prefix='SWH_GRAPH')
if __name__ == '__main__':
main()
diff --git a/swh/graph/config.py b/swh/graph/config.py
new file mode 100644
index 0000000..acb83e5
--- /dev/null
+++ b/swh/graph/config.py
@@ -0,0 +1,104 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import psutil
+import sys
+from pathlib import Path
+
+
+def find_graph_jar():
+ """find swh-graph.jar, containing the Java part of swh-graph
+
+ look both in development directories and installed data (for in-production
+ deployments who fecthed the JAR from pypi)
+
+ """
+ swh_graph_root = Path(__file__).parents[2]
+ try_paths = [
+ swh_graph_root / 'java/target/',
+ Path(sys.prefix) / 'share/swh-graph/',
+ Path(sys.prefix) / 'local/share/swh-graph/',
+ ]
+ for path in try_paths:
+ glob = list(path.glob('swh-graph-*.jar'))
+ if glob:
+ if len(glob) > 1:
+ logging.warn('found multiple swh-graph JARs, '
+ 'arbitrarily picking one')
+ logging.info('using swh-graph JAR: {0}'.format(glob[0]))
+ return str(glob[0])
+ raise RuntimeError('swh-graph JAR not found. Have you run `make java`?')
+
+
+def check_config(conf):
+ """check configuration and propagate defaults
+ """
+ conf = conf.copy()
+ if 'batch_size' not in conf:
+ conf['batch_size'] = '1000000000' # 1 billion
+ if 'max_ram' not in conf:
+ conf['max_ram'] = str(psutil.virtual_memory().total)
+ if 'java_tool_options' not in conf:
+ conf['java_tool_options'] = ' '.join([
+ '-Xmx{max_ram}',
+ '-XX:PretenureSizeThreshold=512M',
+ '-XX:MaxNewSize=4G',
+ '-XX:+UseLargePages',
+ '-XX:+UseTransparentHugePages',
+ '-XX:+UseNUMA',
+ '-XX:+UseTLAB',
+ '-XX:+ResizeTLAB',
+ ])
+ conf['java_tool_options'] = conf['java_tool_options'].format(
+ max_ram=conf['max_ram'])
+ if 'java' not in conf:
+ conf['java'] = 'java'
+ if 'classpath' not in conf:
+ conf['classpath'] = find_graph_jar()
+
+ return conf
+
+
+def check_config_compress(config, graph_name, in_dir, out_dir):
+ """check compression-specific configuration and initialize its execution
+ environment.
+ """
+ conf = check_config(config)
+
+ conf['graph_name'] = graph_name
+ conf['in_dir'] = str(in_dir)
+ conf['out_dir'] = str(out_dir)
+ out_dir.mkdir(parents=True, exist_ok=True)
+ if 'tmp_dir' not in conf:
+ tmp_dir = out_dir / 'tmp'
+ conf['tmp_dir'] = str(tmp_dir)
+ else:
+ tmp_dir = Path(conf['tmp_dir'])
+ tmp_dir.mkdir(parents=True, exist_ok=True)
+
+ if 'logback' not in conf:
+ logback_confpath = tmp_dir / 'logback.xml'
+ with open(logback_confpath, 'w') as conffile:
+ conffile.write("""
+
+
+
+ %d %r %p [%t] %logger{1} - %m%n
+
+
+
+
+
+
+""")
+ conf['logback'] = str(logback_confpath)
+
+ conf['java_tool_options'] += ' -Dlogback.configurationFile={logback}'
+ conf['java_tool_options'] = conf['java_tool_options'].format(
+ logback=conf['logback'])
+
+ print(conf)
+ return conf
diff --git a/swh/graph/tests/test_cli.py b/swh/graph/tests/test_cli.py
index 93d52d0..19ba7ec 100644
--- a/swh/graph/tests/test_cli.py
+++ b/swh/graph/tests/test_cli.py
@@ -1,67 +1,66 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory, NamedTemporaryFile
from typing import Dict
from click.testing import CliRunner
from swh.core import config
from swh.graph import cli
def read_properties(properties_fname) -> Dict[str, str]:
"""read a Java .properties file"""
properties = {}
with open(properties_fname) as f:
for line in f:
if line.startswith('#'):
continue
(key, value) = line.rstrip().split('=', maxsplit=1)
properties[key] = value
return properties
class TestCompress(unittest.TestCase):
DATA_DIR = Path(__file__).parents[0] / 'dataset'
def setUp(self):
self.runner = CliRunner()
tmpconf = NamedTemporaryFile(mode='w', delete=False,
prefix='swh-graph-test', suffix='.yml')
# bare bone configuration, to allow testing the compression pipeline
# with minimum RAM requirements on trivial graphs
tmpconf.write("""
graph:
compress:
batch_size: 1000
- java_tool_options: -Dlogback.configurationFile={logback}
""")
tmpconf.close()
self.conffile = Path(tmpconf.name)
self.config = config.read(self.conffile, cli.DEFAULT_CONFIG)
def tearDown(self):
if self.conffile.exists():
self.conffile.unlink()
def test_pipeline(self):
"""run full compression pipeline"""
with TemporaryDirectory(suffix='.swh-graph-test') as tmpdir:
result = self.runner.invoke(
cli.compress,
['--graph', self.DATA_DIR / 'example', '--outdir', tmpdir],
obj={'config': self.config})
self.assertEqual(result.exit_code, 0)
properties = read_properties(Path(tmpdir) / 'example.properties')
self.assertEqual(int(properties['nodes']), 21)
self.assertEqual(int(properties['arcs']), 23)
diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index fb91be6..b50dea8 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,272 +1,215 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
import logging
import os
import subprocess
from enum import Enum
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set
-
-import psutil
-
from click import ParamType
-from swh.graph.backend import find_graph_jar
+from swh.graph.config import check_config_compress
class CompressionStep(Enum):
MPH = 1
BV = 2
BV_OBL = 3
BFS = 4
PERMUTE = 5
PERMUTE_OBL = 6
STATS = 7
TRANSPOSE = 8
TRANSPOSE_OBL = 9
MAPS = 10
CLEAN_TMP = 11
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
# Mapping from compression steps to shell commands implementing them. Commands
# will be executed by the shell, so be careful with meta characters. They are
# specified here as lists of tokens that will be joined together only for ease
# of line splitting. In commands, {tokens} will be interpolated with
# configuration values, see :func:`compress`.
STEP_ARGV = {
CompressionStep.MPH:
['{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction',
'--temp-dir', '{tmp_dir}', '{out_dir}/{graph_name}.mph',
'<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )'],
# use process substitution (and hence FIFO) above as MPH class load the
# entire file in memory when reading from stdin
CompressionStep.BV:
['zstdcat', '{in_dir}/{graph_name}.edges.csv.zst', '|',
'{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph',
'--temp-dir', '{tmp_dir}',
'--function', '{out_dir}/{graph_name}.mph', '{out_dir}/{graph_name}-bv'],
CompressionStep.BV_OBL:
['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
'--list', '{out_dir}/{graph_name}-bv'],
CompressionStep.BFS:
['{java}', 'it.unimi.dsi.law.big.graph.BFS',
'{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'],
CompressionStep.PERMUTE:
['{java}', 'it.unimi.dsi.big.webgraph.Transform',
'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}',
'{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'],
CompressionStep.PERMUTE_OBL:
['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
'--list', '{out_dir}/{graph_name}'],
CompressionStep.STATS:
['{java}', 'it.unimi.dsi.big.webgraph.Stats',
'{out_dir}/{graph_name}'],
CompressionStep.TRANSPOSE:
['{java}', 'it.unimi.dsi.big.webgraph.Transform',
'transposeOffline', '{out_dir}/{graph_name}',
'{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'],
CompressionStep.TRANSPOSE_OBL:
['{java}', 'it.unimi.dsi.big.webgraph.BVGraph',
'--list', '{out_dir}/{graph_name}-transposed'],
CompressionStep.MAPS:
['zstdcat', '{in_dir}/{graph_name}.nodes.csv.zst', '|',
'{java}', 'org.softwareheritage.graph.backend.MapBuilder',
'{out_dir}/{graph_name}', '{tmp_dir}'],
CompressionStep.CLEAN_TMP:
['rm', '-rf',
'{out_dir}/{graph_name}-bv.graph',
'{out_dir}/{graph_name}-bv.obl',
'{out_dir}/{graph_name}-bv.offsets',
'{tmp_dir}'],
} # type: Dict[CompressionStep, List[str]]
class StepOption(ParamType):
"""click type for specifying a compression step on the CLI
parse either individual steps, specified as step names or integers, or step
ranges
"""
name = "compression step"
def convert(self, value, param, ctx) -> Set[CompressionStep]:
steps = set() # type: Set[CompressionStep]
specs = value.split(',')
for spec in specs:
if '-' in spec: # step range
(raw_l, raw_r) = spec.split('-', maxsplit=1)
if raw_l == '': # no left endpoint
raw_l = COMP_SEQ[0].name
if raw_r == '': # no right endpoint
raw_r = COMP_SEQ[-1].name
l_step = self.convert(raw_l, param, ctx)
r_step = self.convert(raw_r, param, ctx)
if len(l_step) != 1 or len(r_step) != 1:
self.fail(f'invalid step specification: {value}, '
f'see --help')
l_idx = l_step.pop()
r_idx = r_step.pop()
steps = steps.union(set(map(CompressionStep,
range(l_idx.value,
r_idx.value + 1))))
else: # singleton step
try:
steps.add(CompressionStep(int(spec))) # integer step
except ValueError:
try:
steps.add(CompressionStep[spec.upper()]) # step name
except KeyError:
self.fail(f'invalid step specification: {value}, '
f'see --help')
return steps
def do_step(step, conf):
cmd = ' '.join(STEP_ARGV[step]).format(**conf)
cmd_env = os.environ.copy()
cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options']
cmd_env['CLASSPATH'] = conf['classpath']
logging.info(f'running: {cmd}')
process = subprocess.Popen(['/bin/bash', '-c', cmd],
env=cmd_env, encoding='utf8',
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
with process.stdout as stdout:
for line in stdout:
logging.info(line.rstrip())
rc = process.wait()
if rc != 0:
raise RuntimeError(f'compression step {step} returned non-zero '
f'exit code {rc}')
else:
return rc
-def check_config(conf, graph_name, in_dir, out_dir):
- """check compression configuration, propagate defaults, and initialize
- execution environment
-
- """
- conf = conf.copy()
- conf['graph_name'] = graph_name
- conf['in_dir'] = str(in_dir)
- conf['out_dir'] = str(out_dir)
- out_dir.mkdir(parents=True, exist_ok=True)
- if 'tmp_dir' not in conf:
- tmp_dir = out_dir / 'tmp'
- conf['tmp_dir'] = str(tmp_dir)
- else:
- tmp_dir = Path(conf['tmp_dir'])
- tmp_dir.mkdir(parents=True, exist_ok=True)
- if 'batch_size' not in conf:
- conf['batch_size'] = '1000000000' # 1 billion
- if 'logback' not in conf:
- logback_confpath = tmp_dir / 'logback.xml'
- with open(logback_confpath, 'w') as conffile:
- conffile.write("""
-
-
-
- %d %r %p [%t] %logger{1} - %m%n
-
-
-
-
-
-
-""")
- conf['logback'] = str(logback_confpath)
- if 'max_ram' not in conf:
- conf['max_ram'] = str(psutil.virtual_memory().total)
- if 'java_tool_options' not in conf:
- assert 'logback' in conf
- conf['java_tool_options'] = ' '.join([
- '-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M',
- '-XX:MaxNewSize=4G', '-XX:+UseLargePages',
- '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB',
- '-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}'
- ])
- conf['java_tool_options'] = conf['java_tool_options'].format(
- max_ram=conf['max_ram'], logback=conf['logback'])
- if 'java' not in conf:
- conf['java'] = 'java'
- if 'classpath' not in conf:
- conf['classpath'] = find_graph_jar()
-
- return conf
-
-
def compress(graph_name: str, in_dir: Path, out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {}):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- batch_size: batch size for `WebGraph transformations
`_;
defaults to 1 billion
- classpath: java classpath, defaults to swh-graph JAR only
- java: command to run java VM, defaults to "java"
- java_tool_options: value for JAVA_TOOL_OPTIONS environment
variable; defaults to various settings for high memory machines
- logback: path to a logback.xml configuration file; if not provided
a temporary one will be created and used
- max_ram: maximum RAM to use for compression; defaults to available
virtual memory
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
"""
if not steps:
steps = set(COMP_SEQ)
- conf = check_config(conf, graph_name, in_dir, out_dir)
+ conf = check_config_compress(conf, graph_name, in_dir, out_dir)
compression_start_time = datetime.now()
logging.info(f'starting compression at {compression_start_time}')
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logging.debug(f'skipping compression step {step}')
continue
seq_no += 1
step_start_time = datetime.now()
logging.info(f'starting compression step {step} '
f'({seq_no}/{len(steps)}) at {step_start_time}')
do_step(step, conf)
step_end_time = datetime.now()
step_duration = step_end_time - step_start_time
logging.info(f'completed compression step {step} '
f'({seq_no}/{len(steps)}) '
f'at {step_end_time} in {step_duration}')
compression_end_time = datetime.now()
compression_duration = compression_end_time - compression_start_time
logging.info(f'completed compression in {compression_duration}')