Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import aiohttp | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | |||||
import click | import click | ||||
import logging | import logging | ||||
import shutil | |||||
import sys | import sys | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Any, Dict, Tuple | from typing import Any, Dict, Tuple, Set, TYPE_CHECKING | ||||
import swh.model.exceptions | |||||
from swh.core import config | |||||
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | ||||
from swh.graph import client, webgraph | |||||
from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT | if TYPE_CHECKING: | ||||
from swh.graph.pid import PidToNodeMap, NodeToPidMap | from swh.graph.webgraph import CompressionStep # noqa | ||||
from swh.graph.server.app import make_app | |||||
from swh.graph.backend import Backend | |||||
from swh.model.identifiers import parse_persistent_identifier | class StepOption(click.ParamType): | ||||
"""click type for specifying a compression step on the CLI | |||||
parse either individual steps, specified as step names or integers, or step | |||||
ranges | |||||
""" | |||||
name = "compression step" | |||||
def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] | |||||
from swh.graph.webgraph import CompressionStep, COMP_SEQ # noqa | |||||
steps: Set[CompressionStep] = set() | |||||
specs = value.split(",") | |||||
for spec in specs: | |||||
if "-" in spec: # step range | |||||
(raw_l, raw_r) = spec.split("-", maxsplit=1) | |||||
if raw_l == "": # no left endpoint | |||||
raw_l = COMP_SEQ[0].name | |||||
if raw_r == "": # no right endpoint | |||||
raw_r = COMP_SEQ[-1].name | |||||
l_step = self.convert(raw_l, param, ctx) | |||||
r_step = self.convert(raw_r, param, ctx) | |||||
if len(l_step) != 1 or len(r_step) != 1: | |||||
self.fail(f"invalid step specification: {value}, " f"see --help") | |||||
l_idx = l_step.pop() | |||||
r_idx = r_step.pop() | |||||
steps = steps.union( | |||||
set(map(CompressionStep, range(l_idx.value, r_idx.value + 1))) | |||||
) | |||||
else: # singleton step | |||||
try: | |||||
steps.add(CompressionStep(int(spec))) # integer step | |||||
except ValueError: | |||||
try: | |||||
steps.add(CompressionStep[spec.upper()]) # step name | |||||
except KeyError: | |||||
self.fail( | |||||
f"invalid step specification: {value}, " f"see --help" | |||||
) | |||||
return steps | |||||
class PathlibPath(click.Path): | class PathlibPath(click.Path): | ||||
"""A Click path argument that returns a pathlib Path, not a string""" | """A Click path argument that returns a pathlib Path, not a string""" | ||||
def convert(self, value, param, ctx): | def convert(self, value, param, ctx): | ||||
return Path(super().convert(value, param, ctx)) | return Path(super().convert(value, param, ctx)) | ||||
DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})} | DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})} | ||||
@click.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) | @click.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) | ||||
@click.option( | @click.option( | ||||
"--config-file", | "--config-file", | ||||
"-C", | "-C", | ||||
default=None, | default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="YAML configuration file", | help="YAML configuration file", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, config_file): | def cli(ctx, config_file): | ||||
"""Software Heritage graph tools.""" | """Software Heritage graph tools.""" | ||||
ctx.ensure_object(dict) | from swh.core import config | ||||
ctx.ensure_object(dict) | |||||
conf = config.read(config_file, DEFAULT_CONFIG) | conf = config.read(config_file, DEFAULT_CONFIG) | ||||
if "graph" not in conf: | if "graph" not in conf: | ||||
raise ValueError( | raise ValueError( | ||||
'no "graph" stanza found in configuration file %s' % config_file | 'no "graph" stanza found in configuration file %s' % config_file | ||||
) | ) | ||||
ctx.obj["config"] = conf | ctx.obj["config"] = conf | ||||
@cli.command("api-client") | @cli.command("api-client") | ||||
@click.option("--host", default="localhost", help="Graph server host") | @click.option("--host", default="localhost", help="Graph server host") | ||||
@click.option("--port", default="5009", help="Graph server port") | @click.option("--port", default="5009", help="Graph server port") | ||||
@click.pass_context | @click.pass_context | ||||
def api_client(ctx, host, port): | def api_client(ctx, host, port): | ||||
"""client for the graph REST service""" | """client for the graph REST service""" | ||||
from swh.graph import client | |||||
url = "http://{}:{}".format(host, port) | url = "http://{}:{}".format(host, port) | ||||
app = client.RemoteGraphClient(url) | app = client.RemoteGraphClient(url) | ||||
# TODO: run web app | # TODO: run web app | ||||
print(app.stats()) | print(app.stats()) | ||||
@cli.group("map") | @cli.group("map") | ||||
@click.pass_context | @click.pass_context | ||||
def map(ctx): | def map(ctx): | ||||
"""Manage swh-graph on-disk maps""" | """Manage swh-graph on-disk maps""" | ||||
pass | pass | ||||
def dump_pid2node(filename): | def dump_pid2node(filename): | ||||
from swh.graph.pid import PidToNodeMap | |||||
for (pid, int) in PidToNodeMap(filename): | for (pid, int) in PidToNodeMap(filename): | ||||
print("{}\t{}".format(pid, int)) | print("{}\t{}".format(pid, int)) | ||||
def dump_node2pid(filename): | def dump_node2pid(filename): | ||||
from swh.graph.pid import NodeToPidMap | |||||
for (int, pid) in NodeToPidMap(filename): | for (int, pid) in NodeToPidMap(filename): | ||||
print("{}\t{}".format(int, pid)) | print("{}\t{}".format(int, pid)) | ||||
def restore_pid2node(filename): | def restore_pid2node(filename): | ||||
"""read a textual PID->int map from stdin and write its binary version to | """read a textual PID->int map from stdin and write its binary version to | ||||
filename | filename | ||||
""" | """ | ||||
from swh.graph.pid import PidToNodeMap | |||||
with open(filename, "wb") as dst: | with open(filename, "wb") as dst: | ||||
for line in sys.stdin: | for line in sys.stdin: | ||||
(str_pid, str_int) = line.split() | (str_pid, str_int) = line.split() | ||||
PidToNodeMap.write_record(dst, str_pid, int(str_int)) | PidToNodeMap.write_record(dst, str_pid, int(str_int)) | ||||
def restore_node2pid(filename, length): | def restore_node2pid(filename, length): | ||||
"""read a textual int->PID map from stdin and write its binary version to | """read a textual int->PID map from stdin and write its binary version to | ||||
filename | filename | ||||
""" | """ | ||||
from swh.graph.pid import NodeToPidMap | |||||
node2pid = NodeToPidMap(filename, mode="wb", length=length) | node2pid = NodeToPidMap(filename, mode="wb", length=length) | ||||
for line in sys.stdin: | for line in sys.stdin: | ||||
(str_int, str_pid) = line.split() | (str_int, str_pid) = line.split() | ||||
node2pid[int(str_int)] = str_pid | node2pid[int(str_int)] = str_pid | ||||
node2pid.close() | node2pid.close() | ||||
@map.command("dump") | @map.command("dump") | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | def write(ctx, map_type, filename): | ||||
read from stdin a textual PID->node mapping (for pid2node, or a simple | read from stdin a textual PID->node mapping (for pid2node, or a simple | ||||
sequence of PIDs for node2pid) and write it to disk in the requested binary | sequence of PIDs for node2pid) and write it to disk in the requested binary | ||||
map format | map format | ||||
note that no sorting is applied, so the input should already be sorted as | note that no sorting is applied, so the input should already be sorted as | ||||
required by the chosen map type (by PID for pid2node, by int for node2pid) | required by the chosen map type (by PID for pid2node, by int for node2pid) | ||||
""" | """ | ||||
from swh.graph.pid import PidToNodeMap, NodeToPidMap | |||||
with open(filename, "wb") as f: | with open(filename, "wb") as f: | ||||
if map_type == "pid2node": | if map_type == "pid2node": | ||||
for line in sys.stdin: | for line in sys.stdin: | ||||
(pid, int_str) = line.rstrip().split(maxsplit=1) | (pid, int_str) = line.rstrip().split(maxsplit=1) | ||||
PidToNodeMap.write_record(f, pid, int(int_str)) | PidToNodeMap.write_record(f, pid, int(int_str)) | ||||
elif map_type == "node2pid": | elif map_type == "node2pid": | ||||
for line in sys.stdin: | for line in sys.stdin: | ||||
pid = line.rstrip() | pid = line.rstrip() | ||||
Show All 15 Lines | def map_lookup(graph, identifiers): | ||||
identifier into a node->PID (and return the PID). The desired behavior is | identifier into a node->PID (and return the PID). The desired behavior is | ||||
chosen depending on the syntax of each given identifier. | chosen depending on the syntax of each given identifier. | ||||
Identifiers can be passed either directly on the command line or on | Identifiers can be passed either directly on the command line or on | ||||
standard input, separate by blanks. Logical lines (as returned by | standard input, separate by blanks. Logical lines (as returned by | ||||
readline()) in stdin will be preserved in stdout. | readline()) in stdin will be preserved in stdout. | ||||
""" | """ | ||||
import swh.model.exceptions | |||||
from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT | |||||
from swh.graph.pid import PidToNodeMap, NodeToPidMap | |||||
from swh.model.identifiers import parse_persistent_identifier | |||||
success = True # no identifiers failed to be looked up | success = True # no identifiers failed to be looked up | ||||
pid2node = PidToNodeMap(f"{graph}.{PID2NODE_EXT}") | pid2node = PidToNodeMap(f"{graph}.{PID2NODE_EXT}") | ||||
node2pid = NodeToPidMap(f"{graph}.{NODE2PID_EXT}") | node2pid = NodeToPidMap(f"{graph}.{NODE2PID_EXT}") | ||||
def lookup(identifier): | def lookup(identifier): | ||||
nonlocal success, pid2node, node2pid | nonlocal success, pid2node, node2pid | ||||
is_pid = None | is_pid = None | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | @click.option( | ||||
help="port to bind the server on", | help="port to bind the server on", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" | "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def serve(ctx, host, port, graph): | def serve(ctx, host, port, graph): | ||||
"""run the graph REST service""" | """run the graph REST service""" | ||||
import aiohttp | |||||
from swh.graph.server.app import make_app | |||||
from swh.graph.backend import Backend | |||||
backend = Backend(graph_path=graph, config=ctx.obj["config"]) | backend = Backend(graph_path=graph, config=ctx.obj["config"]) | ||||
app = make_app(backend=backend) | app = make_app(backend=backend) | ||||
with backend: | with backend: | ||||
aiohttp.web.run_app(app, host=host, port=port) | aiohttp.web.run_app(app, host=host, port=port) | ||||
@cli.command() | @cli.command() | ||||
Show All 13 Lines | @click.option( | ||||
metavar="DIR", | metavar="DIR", | ||||
type=PathlibPath(), | type=PathlibPath(), | ||||
help="directory where to store compressed graph", | help="directory where to store compressed graph", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--steps", | "--steps", | ||||
"-s", | "-s", | ||||
metavar="STEPS", | metavar="STEPS", | ||||
type=webgraph.StepOption(), | type=StepOption(), | ||||
help="run only these compression steps (default: all steps)", | help="run only these compression steps (default: all steps)", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def compress(ctx, graph, out_dir, steps): | def compress(ctx, graph, out_dir, steps): | ||||
"""Compress a graph using WebGraph | """Compress a graph using WebGraph | ||||
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz | Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz | ||||
Output: a directory containing a WebGraph compressed graph | Output: a directory containing a WebGraph compressed graph | ||||
Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, | Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, | ||||
(6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, | (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, | ||||
(11) clean_tmp. Compression steps can be selected by name or number using | (11) clean_tmp. Compression steps can be selected by name or number using | ||||
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are | --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are | ||||
also supported. | also supported. | ||||
""" | """ | ||||
from swh.graph import webgraph | |||||
graph_name = graph.name | graph_name = graph.name | ||||
in_dir = graph.parent | in_dir = graph.parent | ||||
try: | try: | ||||
conf = ctx.obj["config"]["graph"]["compress"] | conf = ctx.obj["config"]["graph"]["compress"] | ||||
except KeyError: | except KeyError: | ||||
conf = {} # use defaults | conf = {} # use defaults | ||||
webgraph.compress(graph_name, in_dir, out_dir, steps, conf) | webgraph.compress(graph_name, in_dir, out_dir, steps, conf) | ||||
Show All 19 Lines | def cachemount(ctx, graph, cache): | ||||
This command creates a new directory at the path given by CACHE that has | This command creates a new directory at the path given by CACHE that has | ||||
the same structure as the compressed graph basename, except it copies the | the same structure as the compressed graph basename, except it copies the | ||||
files that require mmap access (*.graph) but uses symlinks from the source | files that require mmap access (*.graph) but uses symlinks from the source | ||||
for all the other files (.map, .bin, ...). | for all the other files (.map, .bin, ...). | ||||
The command outputs the path to the memory cache directory (particularly | The command outputs the path to the memory cache directory (particularly | ||||
useful when relying on the default value). | useful when relying on the default value). | ||||
""" | """ | ||||
import shutil | |||||
cache.mkdir(parents=True) | cache.mkdir(parents=True) | ||||
for src in Path(graph).parent.glob("*"): | for src in Path(graph).parent.glob("*"): | ||||
dst = cache / src.name | dst = cache / src.name | ||||
if src.suffix == ".graph": | if src.suffix == ".graph": | ||||
shutil.copy2(src, dst) | shutil.copy2(src, dst) | ||||
else: | else: | ||||
dst.symlink_to(src.resolve()) | dst.symlink_to(src.resolve()) | ||||
print(cache) | print(cache) | ||||
def main(): | def main(): | ||||
return cli(auto_envvar_prefix="SWH_GRAPH") | return cli(auto_envvar_prefix="SWH_GRAPH") | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |