diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index d838023..ecf3cb9 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,194 +1,194 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
import os
import struct
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
from swh.graph.config import check_config
-from swh.graph.pid import NodeToPidMap, PidToNodeMap
-from swh.model.identifiers import PID_TYPES
+from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
+from swh.model.identifiers import SWHID_TYPES
BUF_SIZE = 64 * 1024
BIN_FMT = ">q" # 64 bit integer, big endian
PATH_SEPARATOR_ID = -1
-NODE2PID_EXT = "node2pid.bin"
-PID2NODE_EXT = "pid2node.bin"
+NODE2SWHID_EXT = "node2swhid.bin"
+SWHID2NODE_EXT = "swhid2node.bin"
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path, config=None):
self.gateway = None
self.entry = None
self.graph_path = graph_path
self.config = check_config(config or {})
def __enter__(self):
self.gateway = JavaGateway.launch_gateway(
java_path=None,
javaopts=self.config["java_tool_options"].split(),
classpath=self.config["classpath"],
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
- self.node2pid = NodeToPidMap(self.graph_path + "." + NODE2PID_EXT)
- self.pid2node = PidToNodeMap(self.graph_path + "." + PID2NODE_EXT)
+ self.node2swhid = NodeToSwhidMap(self.graph_path + "." + NODE2SWHID_EXT)
+ self.swhid2node = SwhidToNodeMap(self.graph_path + "." + SWHID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
def stats(self):
return self.entry.stats()
def count(self, ttype, direction, edges_fmt, src):
method = getattr(self.entry, "count_" + ttype)
return method(direction, edges_fmt, src)
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ("leaves", "neighbors", "visit_nodes")
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
- if dst in PID_TYPES:
+ if dst in SWHID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst)
else:
it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst)
async for node_id in it:
yield node_id
async def random_walk(self, direction, edges_fmt, retries, src, dst):
- if dst in PID_TYPES:
+ if dst in SWHID_TYPES:
it = self.stream_proxy.random_walk_type(
direction, edges_fmt, retries, src, dst
)
else:
it = self.stream_proxy.random_walk(direction, edges_fmt, retries, src, dst)
async for node_id in it: # TODO return 404 if path is empty
yield node_id
async def visit_edges(self, direction, edges_fmt, src):
it = self.stream_proxy.visit_edges(direction, edges_fmt, src)
# convert stream a, b, c, d -> (a, b), (c, d)
prevNode = None
async for node in it:
if prevNode is not None:
yield (prevNode, node)
prevNode = None
else:
prevNode = node
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
path.append(node)
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
open_thread = loop.run_in_executor(None, open, fname, "rb")
# Since the open() call on the FIFO is blocking until it is also opened
# on the Java side, we await it with a timeout in case there is an
# exception that prevents the write-side open().
with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
break
for data in struct.iter_unpack(BIN_FMT, data):
yield data[0]
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix="swh-graph-") as tmpdirname:
cli_fifo = os.path.join(tmpdirname, "swh-graph.fifo")
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
# If the read-side open() timeouts, an exception on the
# Java side probably happened that prevented the
# write-side open(). We propagate this exception here if
# that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index 5265ac2..1b8d34c 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,445 +1,445 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import sys
from typing import TYPE_CHECKING, Any, Dict, Set, Tuple
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
if TYPE_CHECKING:
from swh.graph.webgraph import CompressionStep # noqa
class StepOption(click.ParamType):
"""click type for specifying a compression step on the CLI
parse either individual steps, specified as step names or integers, or step
ranges
"""
name = "compression step"
def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep]
from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa
steps: Set[CompressionStep] = set()
specs = value.split(",")
for spec in specs:
if "-" in spec: # step range
(raw_l, raw_r) = spec.split("-", maxsplit=1)
if raw_l == "": # no left endpoint
raw_l = COMP_SEQ[0].name
if raw_r == "": # no right endpoint
raw_r = COMP_SEQ[-1].name
l_step = self.convert(raw_l, param, ctx)
r_step = self.convert(raw_r, param, ctx)
if len(l_step) != 1 or len(r_step) != 1:
self.fail(f"invalid step specification: {value}, " f"see --help")
l_idx = l_step.pop()
r_idx = r_step.pop()
steps = steps.union(
set(map(CompressionStep, range(l_idx.value, r_idx.value + 1)))
)
else: # singleton step
try:
steps.add(CompressionStep(int(spec))) # integer step
except ValueError:
try:
steps.add(CompressionStep[spec.upper()]) # step name
except KeyError:
self.fail(
f"invalid step specification: {value}, " f"see --help"
)
return steps
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})}
@click.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="YAML configuration file",
)
@click.pass_context
def cli(ctx, config_file):
"""Software Heritage graph tools."""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if "graph" not in conf:
raise ValueError(
'no "graph" stanza found in configuration file %s' % config_file
)
ctx.obj["config"] = conf
@cli.command("api-client")
@click.option("--host", default="localhost", help="Graph server host")
@click.option("--port", default="5009", help="Graph server port")
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph REST service"""
from swh.graph import client
url = "http://{}:{}".format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@cli.group("map")
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
-def dump_pid2node(filename):
- from swh.graph.pid import PidToNodeMap
+def dump_swhid2node(filename):
+ from swh.graph.swhid import SwhidToNodeMap
- for (pid, int) in PidToNodeMap(filename):
- print("{}\t{}".format(pid, int))
+ for (swhid, int) in SwhidToNodeMap(filename):
+ print("{}\t{}".format(swhid, int))
-def dump_node2pid(filename):
- from swh.graph.pid import NodeToPidMap
+def dump_node2swhid(filename):
+ from swh.graph.swhid import NodeToSwhidMap
- for (int, pid) in NodeToPidMap(filename):
- print("{}\t{}".format(int, pid))
+ for (int, swhid) in NodeToSwhidMap(filename):
+ print("{}\t{}".format(int, swhid))
-def restore_pid2node(filename):
- """read a textual PID->int map from stdin and write its binary version to
+def restore_swhid2node(filename):
+ """read a textual SWHID->int map from stdin and write its binary version to
filename
"""
- from swh.graph.pid import PidToNodeMap
+ from swh.graph.swhid import SwhidToNodeMap
with open(filename, "wb") as dst:
for line in sys.stdin:
- (str_pid, str_int) = line.split()
- PidToNodeMap.write_record(dst, str_pid, int(str_int))
+ (str_swhid, str_int) = line.split()
+ SwhidToNodeMap.write_record(dst, str_swhid, int(str_int))
-def restore_node2pid(filename, length):
- """read a textual int->PID map from stdin and write its binary version to
+def restore_node2swhid(filename, length):
+ """read a textual int->SWHID map from stdin and write its binary version to
filename
"""
- from swh.graph.pid import NodeToPidMap
+ from swh.graph.swhid import NodeToSwhidMap
- node2pid = NodeToPidMap(filename, mode="wb", length=length)
+ node2swhid = NodeToSwhidMap(filename, mode="wb", length=length)
for line in sys.stdin:
- (str_int, str_pid) = line.split()
- node2pid[int(str_int)] = str_pid
- node2pid.close()
+ (str_int, str_swhid) = line.split()
+ node2swhid[int(str_int)] = str_swhid
+ node2swhid.close()
@map.command("dump")
@click.option(
"--type",
"-t",
"map_type",
required=True,
- type=click.Choice(["pid2node", "node2pid"]),
+ type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to dump",
)
@click.argument("filename", required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
- """Dump a binary PID<->node map to textual format."""
- if map_type == "pid2node":
- dump_pid2node(filename)
- elif map_type == "node2pid":
- dump_node2pid(filename)
+ """Dump a binary SWHID<->node map to textual format."""
+ if map_type == "swhid2node":
+ dump_swhid2node(filename)
+ elif map_type == "node2swhid":
+ dump_node2swhid(filename)
else:
raise ValueError("invalid map type: " + map_type)
pass
@map.command("restore")
@click.option(
"--type",
"-t",
"map_type",
required=True,
- type=click.Choice(["pid2node", "node2pid"]),
+ type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to dump",
)
@click.option(
"--length",
"-l",
type=int,
help="""map size in number of logical records
- (required for node2pid maps)""",
+ (required for node2swhid maps)""",
)
@click.argument("filename", required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
- """Restore a binary PID<->node map from textual format."""
- if map_type == "pid2node":
- restore_pid2node(filename)
- elif map_type == "node2pid":
+ """Restore a binary SWHID<->node map from textual format."""
+ if map_type == "swhid2node":
+ restore_swhid2node(filename)
+ elif map_type == "node2swhid":
if length is None:
raise click.UsageError(
"map length is required when restoring {} maps".format(map_type), ctx
)
- restore_node2pid(filename, length)
+ restore_node2swhid(filename, length)
else:
raise ValueError("invalid map type: " + map_type)
@map.command("write")
@click.option(
"--type",
"-t",
"map_type",
required=True,
- type=click.Choice(["pid2node", "node2pid"]),
+ type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to write",
)
@click.argument("filename", required=True, type=click.Path())
@click.pass_context
def write(ctx, map_type, filename):
"""Write a map to disk sequentially.
- read from stdin a textual PID->node mapping (for pid2node, or a simple
- sequence of PIDs for node2pid) and write it to disk in the requested binary
+ read from stdin a textual SWHID->node mapping (for swhid2node, or a simple
+ sequence of SWHIDs for node2swhid) and write it to disk in the requested binary
map format
note that no sorting is applied, so the input should already be sorted as
- required by the chosen map type (by PID for pid2node, by int for node2pid)
+ required by the chosen map type (by SWHID for swhid2node, by int for node2swhid)
"""
- from swh.graph.pid import NodeToPidMap, PidToNodeMap
+ from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
with open(filename, "wb") as f:
- if map_type == "pid2node":
+ if map_type == "swhid2node":
for line in sys.stdin:
- (pid, int_str) = line.rstrip().split(maxsplit=1)
- PidToNodeMap.write_record(f, pid, int(int_str))
- elif map_type == "node2pid":
+ (swhid, int_str) = line.rstrip().split(maxsplit=1)
+ SwhidToNodeMap.write_record(f, swhid, int(int_str))
+ elif map_type == "node2swhid":
for line in sys.stdin:
- pid = line.rstrip()
- NodeToPidMap.write_record(f, pid)
+ swhid = line.rstrip()
+ NodeToSwhidMap.write_record(f, swhid)
else:
raise ValueError("invalid map type: " + map_type)
@map.command("lookup")
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.argument("identifiers", nargs=-1)
def map_lookup(graph, identifiers):
"""Lookup identifiers using on-disk maps.
- Depending on the identifier type lookup either a PID into a PID->node (and
+ Depending on the identifier type lookup either a SWHID into a SWHID->node (and
return the node integer identifier) or, vice-versa, lookup a node integer
- identifier into a node->PID (and return the PID). The desired behavior is
+ identifier into a node->SWHID (and return the SWHID). The desired behavior is
chosen depending on the syntax of each given identifier.
Identifiers can be passed either directly on the command line or on
standard input, separate by blanks. Logical lines (as returned by
readline()) in stdin will be preserved in stdout.
"""
- from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT
- from swh.graph.pid import NodeToPidMap, PidToNodeMap
+ from swh.graph.backend import NODE2SWHID_EXT, SWHID2NODE_EXT
+ from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
import swh.model.exceptions
- from swh.model.identifiers import parse_persistent_identifier
+ from swh.model.identifiers import parse_swhid
success = True # no identifiers failed to be looked up
- pid2node = PidToNodeMap(f"{graph}.{PID2NODE_EXT}")
- node2pid = NodeToPidMap(f"{graph}.{NODE2PID_EXT}")
+ swhid2node = SwhidToNodeMap(f"{graph}.{SWHID2NODE_EXT}")
+ node2swhid = NodeToSwhidMap(f"{graph}.{NODE2SWHID_EXT}")
def lookup(identifier):
- nonlocal success, pid2node, node2pid
- is_pid = None
+ nonlocal success, swhid2node, node2swhid
+ is_swhid = None
try:
int(identifier)
- is_pid = False
+ is_swhid = False
except ValueError:
try:
- parse_persistent_identifier(identifier)
- is_pid = True
+ parse_swhid(identifier)
+ is_swhid = True
except swh.model.exceptions.ValidationError:
success = False
logging.error(f'invalid identifier: "{identifier}", skipping')
try:
- if is_pid:
- return str(pid2node[identifier])
+ if is_swhid:
+ return str(swhid2node[identifier])
else:
- return node2pid[int(identifier)]
+ return node2swhid[int(identifier)]
except KeyError:
success = False
logging.error(f'identifier not found: "{identifier}", skipping')
if identifiers: # lookup identifiers passed via CLI
for identifier in identifiers:
print(lookup(identifier))
else: # lookup identifiers passed via stdin, preserving logical lines
for line in sys.stdin:
results = [lookup(id) for id in line.rstrip().split()]
if results: # might be empty if all IDs on the same line failed
print(" ".join(results))
sys.exit(0 if success else 1)
@cli.command(name="rpc-serve")
@click.option(
"--host",
"-h",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="host IP address to bind the server on",
)
@click.option(
"--port",
"-p",
default=5009,
type=click.INT,
metavar="PORT",
show_default=True,
help="port to bind the server on",
)
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph REST service"""
import aiohttp
from swh.graph.backend import Backend
from swh.graph.server.app import make_app
backend = Backend(graph_path=graph, config=ctx.obj["config"])
app = make_app(backend=backend)
with backend:
aiohttp.web.run_app(app, host=host, port=port)
@cli.command()
@click.option(
"--graph",
"-g",
required=True,
metavar="GRAPH",
type=PathlibPath(),
help="input graph basename",
)
@click.option(
"--outdir",
"-o",
"out_dir",
required=True,
metavar="DIR",
type=PathlibPath(),
help="directory where to store compressed graph",
)
@click.option(
"--steps",
"-s",
metavar="STEPS",
type=StepOption(),
help="run only these compression steps (default: all steps)",
)
@click.pass_context
def compress(ctx, graph, out_dir, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute,
(6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps,
(11) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
from swh.graph import webgraph
graph_name = graph.name
in_dir = graph.parent
try:
conf = ctx.obj["config"]["graph"]["compress"]
except KeyError:
conf = {} # use defaults
webgraph.compress(graph_name, in_dir, out_dir, steps, conf)
@cli.command(name="cachemount")
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.option(
"--cache",
"-c",
default="/dev/shm/swh-graph/default",
metavar="CACHE",
type=PathlibPath(),
help="Memory cache path (defaults to /dev/shm/swh-graph/default)",
)
@click.pass_context
def cachemount(ctx, graph, cache):
"""
Cache the mmapped files of the compressed graph in a tmpfs.
This command creates a new directory at the path given by CACHE that has
the same structure as the compressed graph basename, except it copies the
files that require mmap access (*.graph) but uses symlinks from the source
for all the other files (.map, .bin, ...).
The command outputs the path to the memory cache directory (particularly
useful when relying on the default value).
"""
import shutil
cache.mkdir(parents=True)
for src in Path(graph).parent.glob("*"):
dst = cache / src.name
if src.suffix == ".graph":
shutil.copy2(src, dst)
else:
dst.symlink_to(src.resolve())
print(cache)
def main():
return cli(auto_envvar_prefix="SWH_GRAPH")
if __name__ == "__main__":
main()
diff --git a/swh/graph/client.py b/swh/graph/client.py
index 7f32546..1541900 100644
--- a/swh/graph/client.py
+++ b/swh/graph/client.py
@@ -1,110 +1,110 @@
-# Copyright (C) 2019 The Software Heritage developers
+# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from swh.core.api import RPCClient
class GraphAPIError(Exception):
"""Graph API Error"""
def __str__(self):
return "An unexpected error occurred in the Graph backend: {}".format(self.args)
class RemoteGraphClient(RPCClient):
"""Client to the Software Heritage Graph."""
def __init__(self, url, timeout=None):
super().__init__(api_exception=GraphAPIError, url=url, timeout=timeout)
def raw_verb_lines(self, verb, endpoint, **kwargs):
response = self.raw_verb(verb, endpoint, stream=True, **kwargs)
self.raise_for_status(response)
for line in response.iter_lines():
yield line.decode().lstrip("\n")
def get_lines(self, endpoint, **kwargs):
yield from self.raw_verb_lines("get", endpoint, **kwargs)
# Web API endpoints
def stats(self):
return self.get("stats")
def leaves(self, src, edges="*", direction="forward"):
return self.get_lines(
"leaves/{}".format(src), params={"edges": edges, "direction": direction}
)
def neighbors(self, src, edges="*", direction="forward"):
return self.get_lines(
"neighbors/{}".format(src), params={"edges": edges, "direction": direction}
)
def visit_nodes(self, src, edges="*", direction="forward"):
return self.get_lines(
"visit/nodes/{}".format(src),
params={"edges": edges, "direction": direction},
)
def visit_edges(self, src, edges="*", direction="forward"):
for edge in self.get_lines(
"visit/edges/{}".format(src),
params={"edges": edges, "direction": direction},
):
yield tuple(edge.split())
def visit_paths(self, src, edges="*", direction="forward"):
def decode_path_wrapper(it):
for e in it:
yield json.loads(e)
return decode_path_wrapper(
self.get_lines(
"visit/paths/{}".format(src),
params={"edges": edges, "direction": direction},
)
)
def walk(
self, src, dst, edges="*", traversal="dfs", direction="forward", limit=None
):
endpoint = "walk/{}/{}"
return self.get_lines(
endpoint.format(src, dst),
params={
"edges": edges,
"traversal": traversal,
"direction": direction,
"limit": limit,
},
)
def random_walk(self, src, dst, edges="*", direction="forward", limit=None):
endpoint = "randomwalk/{}/{}"
return self.get_lines(
endpoint.format(src, dst),
params={"edges": edges, "direction": direction, "limit": limit},
)
def count_leaves(self, src, edges="*", direction="forward"):
return self.get(
"leaves/count/{}".format(src),
params={"edges": edges, "direction": direction},
)
def count_neighbors(self, src, edges="*", direction="forward"):
return self.get(
"neighbors/count/{}".format(src),
params={"edges": edges, "direction": direction},
)
def count_visit_nodes(self, src, edges="*", direction="forward"):
return self.get(
"visit/nodes/count/{}".format(src),
params={"edges": edges, "direction": direction},
)
diff --git a/swh/graph/config.py b/swh/graph/config.py
index 4487753..61b24c2 100644
--- a/swh/graph/config.py
+++ b/swh/graph/config.py
@@ -1,111 +1,110 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import sys
import psutil
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = Path(__file__).parents[2]
try_paths = [
swh_graph_root / "java/target/",
Path(sys.prefix) / "share/swh-graph/",
Path(sys.prefix) / "local/share/swh-graph/",
]
for path in try_paths:
glob = list(path.glob("swh-graph-*.jar"))
if glob:
if len(glob) > 1:
logging.warn(
"found multiple swh-graph JARs, " "arbitrarily picking one"
)
logging.info("using swh-graph JAR: {0}".format(glob[0]))
return str(glob[0])
raise RuntimeError("swh-graph JAR not found. Have you run `make java`?")
def check_config(conf):
- """check configuration and propagate defaults
- """
+ """check configuration and propagate defaults"""
conf = conf.copy()
if "batch_size" not in conf:
conf["batch_size"] = "1000000000" # 1 billion
if "max_ram" not in conf:
conf["max_ram"] = str(psutil.virtual_memory().total)
if "java_tool_options" not in conf:
conf["java_tool_options"] = " ".join(
[
"-Xmx{max_ram}",
"-XX:PretenureSizeThreshold=512M",
"-XX:MaxNewSize=4G",
"-XX:+UseLargePages",
"-XX:+UseTransparentHugePages",
"-XX:+UseNUMA",
"-XX:+UseTLAB",
"-XX:+ResizeTLAB",
]
)
conf["java_tool_options"] = conf["java_tool_options"].format(
max_ram=conf["max_ram"]
)
if "java" not in conf:
conf["java"] = "java"
if "classpath" not in conf:
conf["classpath"] = find_graph_jar()
return conf
def check_config_compress(config, graph_name, in_dir, out_dir):
"""check compression-specific configuration and initialize its execution
environment.
"""
conf = check_config(config)
conf["graph_name"] = graph_name
conf["in_dir"] = str(in_dir)
conf["out_dir"] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if "tmp_dir" not in conf:
tmp_dir = out_dir / "tmp"
conf["tmp_dir"] = str(tmp_dir)
else:
tmp_dir = Path(conf["tmp_dir"])
tmp_dir.mkdir(parents=True, exist_ok=True)
if "logback" not in conf:
logback_confpath = tmp_dir / "logback.xml"
with open(logback_confpath, "w") as conffile:
conffile.write(
"""
You have reached the Software Heritage graph API server.
See its API documentation for more information.
""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.backend = self.request.app["backend"] - def node_of_pid(self, pid): - """Lookup a PID in a pid2node map, failing in an HTTP-nice way if needed.""" + def node_of_swhid(self, swhid): + """Lookup a SWHID in a swhid2node map, failing in an HTTP-nice way if + needed.""" try: - return self.backend.pid2node[pid] + return self.backend.swhid2node[swhid] except KeyError: - raise aiohttp.web.HTTPNotFound(body=f"PID not found: {pid}") + raise aiohttp.web.HTTPNotFound(body=f"SWHID not found: {swhid}") except ValidationError: - raise aiohttp.web.HTTPBadRequest(body=f"malformed PID: {pid}") + raise aiohttp.web.HTTPBadRequest(body=f"malformed SWHID: {swhid}") - def pid_of_node(self, node): - """Lookup a node in a node2pid map, failing in an HTTP-nice way if needed.""" + def swhid_of_node(self, node): + """Lookup a node in a node2swhid map, failing in an HTTP-nice way if + needed.""" try: - return self.backend.node2pid[node] + return self.backend.node2swhid[node] except KeyError: raise aiohttp.web.HTTPInternalServerError( body=f"reverse lookup failed for node id: {node}" ) def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(body=f"invalid direction: {s}") return s def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ - node_type != "*" and node_type not in PID_TYPES + node_type != "*" and node_type not in SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(body=f"invalid edge restriction: {s}") return s def get_traversal(self): """Validate HTTP query parameter `traversal`, i.e., visit order""" s = self.request.query.get("traversal", "dfs") if s not in ("bfs", "dfs"): raise aiohttp.web.HTTPBadRequest(body=f"invalid traversal order: {s}") return s def get_limit(self): """Validate HTTP query parameter `limit`, i.e., number of results""" s = self.request.query.get("limit", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(body=f"invalid limit value: {s}") class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: await self.stream_response() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" await self.response_stream.write((line + "\n").encode()) class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): stats = self.backend.stats() return aiohttp.web.Response(body=stats, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" simple_traversal_type: Optional[str] = None async def prepare_response(self): src = self.request.match_info["src"] - self.src_node = self.node_of_pid(src) + self.src_node = self.node_of_swhid(src) self.edges = self.get_edges() self.direction = self.get_direction() async def stream_response(self): async for res_node in self.backend.simple_traversal( self.simple_traversal_type, self.direction, self.edges, self.src_node ): - res_pid = self.pid_of_node(res_node) - await self.stream_line(res_pid) + res_swhid = self.swhid_of_node(res_node) + await self.stream_line(res_swhid) class LeavesView(SimpleTraversalView): simple_traversal_type = "leaves" class NeighborsView(SimpleTraversalView): simple_traversal_type = "neighbors" class VisitNodesView(SimpleTraversalView): simple_traversal_type = "visit_nodes" class WalkView(StreamingGraphView): async def prepare_response(self): src = self.request.match_info["src"] dst = self.request.match_info["dst"] - self.src_node = self.node_of_pid(src) - if dst not in PID_TYPES: - self.dst_thing = self.node_of_pid(dst) + self.src_node = self.node_of_swhid(src) + if dst not in SWHID_TYPES: + self.dst_thing = self.node_of_swhid(dst) else: self.dst_thing = dst self.edges = self.get_edges() self.direction = self.get_direction() self.algo = self.get_traversal() self.limit = self.get_limit() async def get_walk_iterator(self): return self.backend.walk( self.direction, self.edges, self.algo, self.src_node, self.dst_thing ) async def stream_response(self): it = self.get_walk_iterator() if self.limit < 0: queue = deque(maxlen=-self.limit) async for res_node in it: - res_pid = self.pid_of_node(res_node) - queue.append(res_pid) + res_swhid = self.swhid_of_node(res_node) + queue.append(res_swhid) while queue: await self.stream_line(queue.popleft()) else: count = 0 async for res_node in it: if self.limit == 0 or count < self.limit: - res_pid = self.pid_of_node(res_node) - await self.stream_line(res_pid) + res_swhid = self.swhid_of_node(res_node) + await self.stream_line(res_swhid) count += 1 else: break class RandomWalkView(WalkView): def get_walk_iterator(self): return self.backend.random_walk( self.direction, self.edges, RANDOM_RETRIES, self.src_node, self.dst_thing ) class VisitEdgesView(SimpleTraversalView): async def stream_response(self): it = self.backend.visit_edges(self.direction, self.edges, self.src_node) async for (res_src, res_dst) in it: - res_src_pid = self.pid_of_node(res_src) - res_dst_pid = self.pid_of_node(res_dst) - await self.stream_line("{} {}".format(res_src_pid, res_dst_pid)) + res_src_swhid = self.swhid_of_node(res_src) + res_dst_swhid = self.swhid_of_node(res_dst) + await self.stream_line("{} {}".format(res_src_swhid, res_dst_swhid)) class VisitPathsView(SimpleTraversalView): content_type = "application/x-ndjson" async def stream_response(self): it = self.backend.visit_paths(self.direction, self.edges, self.src_node) async for res_path in it: - res_path_pid = [self.pid_of_node(n) for n in res_path] - line = json.dumps(res_path_pid) + res_path_swhid = [self.swhid_of_node(n) for n in res_path] + line = json.dumps(res_path_swhid) await self.stream_line(line) class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): src = self.request.match_info["src"] - self.src_node = self.node_of_pid(src) + self.src_node = self.node_of_swhid(src) self.edges = self.get_edges() self.direction = self.get_direction() loop = asyncio.get_event_loop() cnt = await loop.run_in_executor( None, self.backend.count, self.count_type, self.direction, self.edges, self.src_node, ) return aiohttp.web.Response(body=str(cnt), content_type="application/json") class CountNeighborsView(CountView): count_type = "neighbors" class CountLeavesView(CountView): count_type = "leaves" class CountVisitNodesView(CountView): count_type = "visit_nodes" def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), aiohttp.web.view("/graph/visit/paths/{src}", VisitPathsView), # temporarily disabled in wait of a proper fix for T1969 # aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) app["backend"] = backend return app diff --git a/swh/graph/pid.py b/swh/graph/swhid.py similarity index 65% rename from swh/graph/pid.py rename to swh/graph/swhid.py index f0e4a67..8d99307 100644 --- a/swh/graph/pid.py +++ b/swh/graph/swhid.py @@ -1,404 +1,402 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections.abc import MutableMapping from enum import Enum import mmap from mmap import MAP_SHARED, PROT_READ, PROT_WRITE import os import struct from typing import BinaryIO, Iterator, Tuple from swh.model.identifiers import SWHID, parse_swhid -PID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes +SWHID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes INT_BIN_FMT = ">q" # big endian, 8-byte integer -PID_BIN_SIZE = 22 # in bytes +SWHID_BIN_SIZE = 22 # in bytes INT_BIN_SIZE = 8 # in bytes -class PidType(Enum): - """types of existing PIDs, used to serialize PID type as a (char) integer +class SwhidType(Enum): + """types of existing SWHIDs, used to serialize SWHID type as a (char) integer Note that the order does matter also for driving the binary search in - PID-indexed maps. Integer values also matter, for compatibility with the + SWHID-indexed maps. Integer values also matter, for compatibility with the Java layer. """ content = 0 directory = 1 origin = 2 release = 3 revision = 4 snapshot = 5 -def str_to_bytes(pid_str: str) -> bytes: - """Convert a PID to a byte sequence +def str_to_bytes(swhid_str: str) -> bytes: + """Convert a SWHID to a byte sequence - The binary format used to represent PIDs as 22-byte long byte sequences as + The binary format used to represent SWHIDs as 22-byte long byte sequences as follows: - 1 byte for the namespace version represented as a C `unsigned char` - - 1 byte for the object type, as the int value of :class:`PidType` enums, + - 1 byte for the object type, as the int value of :class:`SwhidType` enums, represented as a C `unsigned char` - 20 bytes for the SHA1 digest as a byte sequence Args: - pid: persistent identifier + swhid: persistent identifier Returns: - bytes: byte sequence representation of pid + bytes: byte sequence representation of swhid """ - pid = parse_swhid(pid_str) + swhid = parse_swhid(swhid_str) return struct.pack( - PID_BIN_FMT, - pid.scheme_version, - PidType[pid.object_type].value, - bytes.fromhex(pid.object_id), + SWHID_BIN_FMT, + swhid.scheme_version, + SwhidType[swhid.object_type].value, + bytes.fromhex(swhid.object_id), ) def bytes_to_str(bytes: bytes) -> str: """Inverse function of :func:`str_to_bytes` - See :func:`str_to_bytes` for a description of the binary PID format. + See :func:`str_to_bytes` for a description of the binary SWHID format. Args: - bytes: byte sequence representation of pid + bytes: byte sequence representation of swhid Returns: - pid: persistent identifier + swhid: persistent identifier """ - (version, type, bin_digest) = struct.unpack(PID_BIN_FMT, bytes) - pid = SWHID(object_type=PidType(type).name, object_id=bin_digest) - return str(pid) + (version, type, bin_digest) = struct.unpack(SWHID_BIN_FMT, bytes) + swhid = SWHID(object_type=SwhidType(type).name, object_id=bin_digest) + return str(swhid) class _OnDiskMap: - """mmap-ed on-disk sequence of fixed size records - - """ + """mmap-ed on-disk sequence of fixed size records""" def __init__( self, record_size: int, fname: str, mode: str = "rb", length: int = None ): """open an existing on-disk map Args: record_size: size of each record in bytes fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize writable maps at creation time. Must be given when mode is 'wb' and the map doesn't exist on disk; ignored otherwise """ os_modes = {"rb": os.O_RDONLY, "wb": os.O_RDWR | os.O_CREAT, "rb+": os.O_RDWR} if mode not in os_modes: raise ValueError("invalid file open mode: " + mode) new_map = mode == "wb" writable_map = mode in ["wb", "rb+"] self.record_size = record_size self.fd = os.open(fname, os_modes[mode]) if new_map: if length is None: raise ValueError("missing length when creating new map") os.truncate(self.fd, length * self.record_size) self.size = os.path.getsize(fname) (self.length, remainder) = divmod(self.size, record_size) if remainder: raise ValueError( "map size {} is not a multiple of the record size {}".format( self.size, record_size ) ) self.mm = mmap.mmap( self.fd, self.size, prot=(PROT_READ | PROT_WRITE if writable_map else PROT_READ), flags=MAP_SHARED, ) def close(self) -> None: """close the map shuts down both the mmap and the underlying file descriptor """ if not self.mm.closed: self.mm.close() os.close(self.fd) def __len__(self) -> int: return self.length def __delitem__(self, pos: int) -> None: raise NotImplementedError("cannot delete records from fixed-size map") -class PidToNodeMap(_OnDiskMap, MutableMapping): +class SwhidToNodeMap(_OnDiskMap, MutableMapping): """memory mapped map from :ref:`SWHIDs