diff --git a/swh/graph/cli.py b/swh/graph/cli.py index ba52815..770899c 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,378 +1,444 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import aiohttp +# WARNING: do not import unnecessary things here to keep cli startup time under +# control import click import logging -import shutil import sys from pathlib import Path -from typing import Any, Dict, Tuple +from typing import Any, Dict, Tuple, Set, TYPE_CHECKING -import swh.model.exceptions - -from swh.core import config from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup -from swh.graph import client, webgraph -from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT -from swh.graph.pid import PidToNodeMap, NodeToPidMap -from swh.graph.server.app import make_app -from swh.graph.backend import Backend -from swh.model.identifiers import parse_persistent_identifier + +if TYPE_CHECKING: + from swh.graph.webgraph import CompressionStep # noqa + + +class StepOption(click.ParamType): + """click type for specifying a compression step on the CLI + + parse either individual steps, specified as step names or integers, or step + ranges + + """ + + name = "compression step" + + def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] + from swh.graph.webgraph import CompressionStep, COMP_SEQ # noqa + + steps: Set[CompressionStep] = set() + + specs = value.split(",") + for spec in specs: + if "-" in spec: # step range + (raw_l, raw_r) = spec.split("-", maxsplit=1) + if raw_l == "": # no left endpoint + raw_l = COMP_SEQ[0].name + if raw_r == "": # no right endpoint + raw_r = COMP_SEQ[-1].name + l_step = self.convert(raw_l, param, ctx) + r_step = self.convert(raw_r, param, ctx) + if len(l_step) != 1 or len(r_step) != 1: + self.fail(f"invalid step specification: {value}, " f"see --help") + l_idx = l_step.pop() + r_idx = r_step.pop() + steps = steps.union( + set(map(CompressionStep, range(l_idx.value, r_idx.value + 1))) + ) + else: # singleton step + try: + steps.add(CompressionStep(int(spec))) # integer step + except ValueError: + try: + steps.add(CompressionStep[spec.upper()]) # step name + except KeyError: + self.fail( + f"invalid step specification: {value}, " f"see --help" + ) + + return steps class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string""" def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx)) DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})} @click.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="YAML configuration file", ) @click.pass_context def cli(ctx, config_file): """Software Heritage graph tools.""" - ctx.ensure_object(dict) + from swh.core import config + ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) if "graph" not in conf: raise ValueError( 'no "graph" stanza found in configuration file %s' % config_file ) ctx.obj["config"] = conf @cli.command("api-client") @click.option("--host", default="localhost", help="Graph server host") @click.option("--port", default="5009", help="Graph server port") @click.pass_context def api_client(ctx, host, port): """client for the graph REST service""" + from swh.graph import client + url = "http://{}:{}".format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) @cli.group("map") @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass def dump_pid2node(filename): + from swh.graph.pid import PidToNodeMap + for (pid, int) in PidToNodeMap(filename): print("{}\t{}".format(pid, int)) def dump_node2pid(filename): + from swh.graph.pid import NodeToPidMap + for (int, pid) in NodeToPidMap(filename): print("{}\t{}".format(int, pid)) def restore_pid2node(filename): """read a textual PID->int map from stdin and write its binary version to filename """ + from swh.graph.pid import PidToNodeMap + with open(filename, "wb") as dst: for line in sys.stdin: (str_pid, str_int) = line.split() PidToNodeMap.write_record(dst, str_pid, int(str_int)) def restore_node2pid(filename, length): """read a textual int->PID map from stdin and write its binary version to filename """ + from swh.graph.pid import NodeToPidMap + node2pid = NodeToPidMap(filename, mode="wb", length=length) for line in sys.stdin: (str_int, str_pid) = line.split() node2pid[int(str_int)] = str_pid node2pid.close() @map.command("dump") @click.option( "--type", "-t", "map_type", required=True, type=click.Choice(["pid2node", "node2pid"]), help="type of map to dump", ) @click.argument("filename", required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): """Dump a binary PID<->node map to textual format.""" if map_type == "pid2node": dump_pid2node(filename) elif map_type == "node2pid": dump_node2pid(filename) else: raise ValueError("invalid map type: " + map_type) pass @map.command("restore") @click.option( "--type", "-t", "map_type", required=True, type=click.Choice(["pid2node", "node2pid"]), help="type of map to dump", ) @click.option( "--length", "-l", type=int, help="""map size in number of logical records (required for node2pid maps)""", ) @click.argument("filename", required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): """Restore a binary PID<->node map from textual format.""" if map_type == "pid2node": restore_pid2node(filename) elif map_type == "node2pid": if length is None: raise click.UsageError( "map length is required when restoring {} maps".format(map_type), ctx ) restore_node2pid(filename, length) else: raise ValueError("invalid map type: " + map_type) @map.command("write") @click.option( "--type", "-t", "map_type", required=True, type=click.Choice(["pid2node", "node2pid"]), help="type of map to write", ) @click.argument("filename", required=True, type=click.Path()) @click.pass_context def write(ctx, map_type, filename): """Write a map to disk sequentially. read from stdin a textual PID->node mapping (for pid2node, or a simple sequence of PIDs for node2pid) and write it to disk in the requested binary map format note that no sorting is applied, so the input should already be sorted as required by the chosen map type (by PID for pid2node, by int for node2pid) """ + from swh.graph.pid import PidToNodeMap, NodeToPidMap + with open(filename, "wb") as f: if map_type == "pid2node": for line in sys.stdin: (pid, int_str) = line.rstrip().split(maxsplit=1) PidToNodeMap.write_record(f, pid, int(int_str)) elif map_type == "node2pid": for line in sys.stdin: pid = line.rstrip() NodeToPidMap.write_record(f, pid) else: raise ValueError("invalid map type: " + map_type) @map.command("lookup") @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.argument("identifiers", nargs=-1) def map_lookup(graph, identifiers): """Lookup identifiers using on-disk maps. Depending on the identifier type lookup either a PID into a PID->node (and return the node integer identifier) or, vice-versa, lookup a node integer identifier into a node->PID (and return the PID). The desired behavior is chosen depending on the syntax of each given identifier. Identifiers can be passed either directly on the command line or on standard input, separate by blanks. Logical lines (as returned by readline()) in stdin will be preserved in stdout. """ + import swh.model.exceptions + from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT + from swh.graph.pid import PidToNodeMap, NodeToPidMap + from swh.model.identifiers import parse_persistent_identifier + success = True # no identifiers failed to be looked up pid2node = PidToNodeMap(f"{graph}.{PID2NODE_EXT}") node2pid = NodeToPidMap(f"{graph}.{NODE2PID_EXT}") def lookup(identifier): nonlocal success, pid2node, node2pid is_pid = None try: int(identifier) is_pid = False except ValueError: try: parse_persistent_identifier(identifier) is_pid = True except swh.model.exceptions.ValidationError: success = False logging.error(f'invalid identifier: "{identifier}", skipping') try: if is_pid: return str(pid2node[identifier]) else: return node2pid[int(identifier)] except KeyError: success = False logging.error(f'identifier not found: "{identifier}", skipping') if identifiers: # lookup identifiers passed via CLI for identifier in identifiers: print(lookup(identifier)) else: # lookup identifiers passed via stdin, preserving logical lines for line in sys.stdin: results = [lookup(id) for id in line.rstrip().split()] if results: # might be empty if all IDs on the same line failed print(" ".join(results)) sys.exit(0 if success else 1) @cli.command(name="rpc-serve") @click.option( "--host", "-h", default="0.0.0.0", metavar="IP", show_default=True, help="host IP address to bind the server on", ) @click.option( "--port", "-p", default=5009, type=click.INT, metavar="PORT", show_default=True, help="port to bind the server on", ) @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.pass_context def serve(ctx, host, port, graph): """run the graph REST service""" + import aiohttp + from swh.graph.server.app import make_app + from swh.graph.backend import Backend + backend = Backend(graph_path=graph, config=ctx.obj["config"]) app = make_app(backend=backend) with backend: aiohttp.web.run_app(app, host=host, port=port) @cli.command() @click.option( "--graph", "-g", required=True, metavar="GRAPH", type=PathlibPath(), help="input graph basename", ) @click.option( "--outdir", "-o", "out_dir", required=True, metavar="DIR", type=PathlibPath(), help="directory where to store compressed graph", ) @click.option( "--steps", "-s", metavar="STEPS", - type=webgraph.StepOption(), + type=StepOption(), help="run only these compression steps (default: all steps)", ) @click.pass_context def compress(ctx, graph, out_dir, steps): """Compress a graph using WebGraph Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz Output: a directory containing a WebGraph compressed graph Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, (11) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ + from swh.graph import webgraph + graph_name = graph.name in_dir = graph.parent try: conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults webgraph.compress(graph_name, in_dir, out_dir, steps, conf) @cli.command(name="cachemount") @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.option( "--cache", "-c", default="/dev/shm/swh-graph/default", metavar="CACHE", type=PathlibPath(), help="Memory cache path (defaults to /dev/shm/swh-graph/default)", ) @click.pass_context def cachemount(ctx, graph, cache): """ Cache the mmapped files of the compressed graph in a tmpfs. This command creates a new directory at the path given by CACHE that has the same structure as the compressed graph basename, except it copies the files that require mmap access (*.graph) but uses symlinks from the source for all the other files (.map, .bin, ...). The command outputs the path to the memory cache directory (particularly useful when relying on the default value). """ + import shutil + cache.mkdir(parents=True) for src in Path(graph).parent.glob("*"): dst = cache / src.name if src.suffix == ".graph": shutil.copy2(src, dst) else: dst.symlink_to(src.resolve()) print(cache) def main(): return cli(auto_envvar_prefix="SWH_GRAPH") if __name__ == "__main__": main() diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index acf9a1c..c27aaac 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,271 +1,226 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ import logging import os import subprocess from enum import Enum from datetime import datetime from pathlib import Path from typing import Dict, List, Set -from click import ParamType from swh.graph.config import check_config_compress class CompressionStep(Enum): MPH = 1 BV = 2 BV_OBL = 3 BFS = 4 PERMUTE = 5 PERMUTE_OBL = 6 STATS = 7 TRANSPOSE = 8 TRANSPOSE_OBL = 9 MAPS = 10 CLEAN_TMP = 11 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV: Dict[CompressionStep, List[str]] = { CompressionStep.MPH: [ "{java}", "it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction", "--temp-dir", "{tmp_dir}", "{out_dir}/{graph_name}.mph", "<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )", ], # use process substitution (and hence FIFO) above as MPH class load the # entire file in memory when reading from stdin CompressionStep.BV: [ "zstdcat", "{in_dir}/{graph_name}.edges.csv.zst", "|", "{java}", "it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph", "--temp-dir", "{tmp_dir}", "--function", "{out_dir}/{graph_name}.mph", "{out_dir}/{graph_name}-bv", ], CompressionStep.BV_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}-bv", ], CompressionStep.BFS: [ "{java}", "it.unimi.dsi.law.big.graph.BFS", "{out_dir}/{graph_name}-bv", "{out_dir}/{graph_name}.order", ], CompressionStep.PERMUTE: [ "{java}", "it.unimi.dsi.big.webgraph.Transform", "mapOffline", "{out_dir}/{graph_name}-bv", "{out_dir}/{graph_name}", "{out_dir}/{graph_name}.order", "{batch_size}", "{tmp_dir}", ], CompressionStep.PERMUTE_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}", ], CompressionStep.STATS: [ "{java}", "it.unimi.dsi.big.webgraph.Stats", "{out_dir}/{graph_name}", ], CompressionStep.TRANSPOSE: [ "{java}", "it.unimi.dsi.big.webgraph.Transform", "transposeOffline", "{out_dir}/{graph_name}", "{out_dir}/{graph_name}-transposed", "{batch_size}", "{tmp_dir}", ], CompressionStep.TRANSPOSE_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}-transposed", ], CompressionStep.MAPS: [ "zstdcat", "{in_dir}/{graph_name}.nodes.csv.zst", "|", "{java}", "org.softwareheritage.graph.maps.NodeMapBuilder", "{out_dir}/{graph_name}", "{tmp_dir}", ], CompressionStep.CLEAN_TMP: [ "rm", "-rf", "{out_dir}/{graph_name}-bv.graph", "{out_dir}/{graph_name}-bv.obl", "{out_dir}/{graph_name}-bv.offsets", "{tmp_dir}", ], } -class StepOption(ParamType): - """click type for specifying a compression step on the CLI - - parse either individual steps, specified as step names or integers, or step - ranges - - """ - - name = "compression step" - - def convert(self, value, param, ctx) -> Set[CompressionStep]: - steps: Set[CompressionStep] = set() - - specs = value.split(",") - for spec in specs: - if "-" in spec: # step range - (raw_l, raw_r) = spec.split("-", maxsplit=1) - if raw_l == "": # no left endpoint - raw_l = COMP_SEQ[0].name - if raw_r == "": # no right endpoint - raw_r = COMP_SEQ[-1].name - l_step = self.convert(raw_l, param, ctx) - r_step = self.convert(raw_r, param, ctx) - if len(l_step) != 1 or len(r_step) != 1: - self.fail(f"invalid step specification: {value}, " f"see --help") - l_idx = l_step.pop() - r_idx = r_step.pop() - steps = steps.union( - set(map(CompressionStep, range(l_idx.value, r_idx.value + 1))) - ) - else: # singleton step - try: - steps.add(CompressionStep(int(spec))) # integer step - except ValueError: - try: - steps.add(CompressionStep[spec.upper()]) # step name - except KeyError: - self.fail( - f"invalid step specification: {value}, " f"see --help" - ) - - return steps - - def do_step(step, conf): cmd = " ".join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"] cmd_env["CLASSPATH"] = conf["classpath"] logging.info(f"running: {cmd}") process = subprocess.Popen( ["/bin/bash", "-c", cmd], env=cmd_env, encoding="utf8", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: raise RuntimeError( f"compression step {step} returned non-zero " f"exit code {rc}" ) else: return rc def compress( graph_name: str, in_dir: Path, out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}, ): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config_compress(conf, graph_name, in_dir, out_dir) compression_start_time = datetime.now() logging.info(f"starting compression at {compression_start_time}") seq_no = 0 for step in COMP_SEQ: if step not in steps: logging.debug(f"skipping compression step {step}") continue seq_no += 1 step_start_time = datetime.now() logging.info( f"starting compression step {step} " f"({seq_no}/{len(steps)}) at {step_start_time}" ) do_step(step, conf) step_end_time = datetime.now() step_duration = step_end_time - step_start_time logging.info( f"completed compression step {step} " f"({seq_no}/{len(steps)}) " f"at {step_end_time} in {step_duration}" ) compression_end_time = datetime.now() compression_duration = compression_end_time - compression_start_time logging.info(f"completed compression in {compression_duration}")