diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index 82cf1fd..68632b9 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,449 +1,452 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import sys
from typing import TYPE_CHECKING, Any, Dict, Set, Tuple
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.core.cli import swh as swh_cli_group
if TYPE_CHECKING:
from swh.graph.webgraph import CompressionStep # noqa
class StepOption(click.ParamType):
"""click type for specifying a compression step on the CLI
parse either individual steps, specified as step names or integers, or step
ranges
"""
name = "compression step"
def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep]
from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa
steps: Set[CompressionStep] = set()
specs = value.split(",")
for spec in specs:
if "-" in spec: # step range
(raw_l, raw_r) = spec.split("-", maxsplit=1)
if raw_l == "": # no left endpoint
raw_l = COMP_SEQ[0].name
if raw_r == "": # no right endpoint
raw_r = COMP_SEQ[-1].name
l_step = self.convert(raw_l, param, ctx)
r_step = self.convert(raw_r, param, ctx)
if len(l_step) != 1 or len(r_step) != 1:
self.fail(f"invalid step specification: {value}, " f"see --help")
l_idx = l_step.pop()
r_idx = r_step.pop()
steps = steps.union(
set(CompressionStep(i) for i in range(l_idx.value, r_idx.value + 1))
)
else: # singleton step
try:
steps.add(CompressionStep(int(spec))) # integer step
except ValueError:
try:
steps.add(CompressionStep[spec.upper()]) # step name
except KeyError:
self.fail(
f"invalid step specification: {value}, " f"see --help"
)
return steps
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})}
@swh_cli_group.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup)
@click.option(
"--config-file",
"-C",
default=None,
- type=click.Path(exists=True, dir_okay=False,),
+ type=click.Path(
+ exists=True,
+ dir_okay=False,
+ ),
help="YAML configuration file",
)
@click.pass_context
def graph_cli_group(ctx, config_file):
"""Software Heritage graph tools."""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if "graph" not in conf:
raise ValueError(
'no "graph" stanza found in configuration file %s' % config_file
)
ctx.obj["config"] = conf
@graph_cli_group.command("api-client")
@click.option("--host", default="localhost", help="Graph server host")
@click.option("--port", default="5009", help="Graph server port")
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph RPC service"""
from swh.graph import client
url = "http://{}:{}".format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@graph_cli_group.group("map")
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
def dump_swhid2node(filename):
from swh.graph.swhid import SwhidToNodeMap
for (swhid, int) in SwhidToNodeMap(filename):
print("{}\t{}".format(swhid, int))
def dump_node2swhid(filename):
from swh.graph.swhid import NodeToSwhidMap
for (int, swhid) in NodeToSwhidMap(filename):
print("{}\t{}".format(int, swhid))
def restore_swhid2node(filename):
"""read a textual SWHID->int map from stdin and write its binary version to
filename
"""
from swh.graph.swhid import SwhidToNodeMap
with open(filename, "wb") as dst:
for line in sys.stdin:
(str_swhid, str_int) = line.split()
SwhidToNodeMap.write_record(dst, str_swhid, int(str_int))
def restore_node2swhid(filename, length):
"""read a textual int->SWHID map from stdin and write its binary version to
filename
"""
from swh.graph.swhid import NodeToSwhidMap
node2swhid = NodeToSwhidMap(filename, mode="wb", length=length)
for line in sys.stdin:
(str_int, str_swhid) = line.split()
node2swhid[int(str_int)] = str_swhid
node2swhid.close()
@map.command("dump")
@click.option(
"--type",
"-t",
"map_type",
required=True,
type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to dump",
)
@click.argument("filename", required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
"""Dump a binary SWHID<->node map to textual format."""
if map_type == "swhid2node":
dump_swhid2node(filename)
elif map_type == "node2swhid":
dump_node2swhid(filename)
else:
raise ValueError("invalid map type: " + map_type)
pass
@map.command("restore")
@click.option(
"--type",
"-t",
"map_type",
required=True,
type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to dump",
)
@click.option(
"--length",
"-l",
type=int,
help="""map size in number of logical records
(required for node2swhid maps)""",
)
@click.argument("filename", required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
"""Restore a binary SWHID<->node map from textual format."""
if map_type == "swhid2node":
restore_swhid2node(filename)
elif map_type == "node2swhid":
if length is None:
raise click.UsageError(
"map length is required when restoring {} maps".format(map_type), ctx
)
restore_node2swhid(filename, length)
else:
raise ValueError("invalid map type: " + map_type)
@map.command("write")
@click.option(
"--type",
"-t",
"map_type",
required=True,
type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to write",
)
@click.argument("filename", required=True, type=click.Path())
@click.pass_context
def write(ctx, map_type, filename):
"""Write a map to disk sequentially.
read from stdin a textual SWHID->node mapping (for swhid2node, or a simple
sequence of SWHIDs for node2swhid) and write it to disk in the requested binary
map format
note that no sorting is applied, so the input should already be sorted as
required by the chosen map type (by SWHID for swhid2node, by int for node2swhid)
"""
from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
with open(filename, "wb") as f:
if map_type == "swhid2node":
for line in sys.stdin:
(swhid, int_str) = line.rstrip().split(maxsplit=1)
SwhidToNodeMap.write_record(f, swhid, int(int_str))
elif map_type == "node2swhid":
for line in sys.stdin:
swhid = line.rstrip()
NodeToSwhidMap.write_record(f, swhid)
else:
raise ValueError("invalid map type: " + map_type)
@map.command("lookup")
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.argument("identifiers", nargs=-1)
def map_lookup(graph, identifiers):
"""Lookup identifiers using on-disk maps.
Depending on the identifier type lookup either a SWHID into a SWHID->node (and
return the node integer identifier) or, vice-versa, lookup a node integer
identifier into a node->SWHID (and return the SWHID). The desired behavior is
chosen depending on the syntax of each given identifier.
Identifiers can be passed either directly on the command line or on
standard input, separate by blanks. Logical lines (as returned by
readline()) in stdin will be preserved in stdout.
"""
from swh.graph.backend import NODE2SWHID_EXT, SWHID2NODE_EXT
from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
import swh.model.exceptions
from swh.model.swhids import ExtendedSWHID
success = True # no identifiers failed to be looked up
swhid2node = SwhidToNodeMap(f"{graph}.{SWHID2NODE_EXT}")
node2swhid = NodeToSwhidMap(f"{graph}.{NODE2SWHID_EXT}")
def lookup(identifier):
nonlocal success, swhid2node, node2swhid
is_swhid = None
try:
int(identifier)
is_swhid = False
except ValueError:
try:
ExtendedSWHID.from_string(identifier)
is_swhid = True
except swh.model.exceptions.ValidationError:
success = False
logging.error(f'invalid identifier: "{identifier}", skipping')
try:
if is_swhid:
return str(swhid2node[identifier])
else:
return node2swhid[int(identifier)]
except KeyError:
success = False
logging.error(f'identifier not found: "{identifier}", skipping')
if identifiers: # lookup identifiers passed via CLI
for identifier in identifiers:
print(lookup(identifier))
else: # lookup identifiers passed via stdin, preserving logical lines
for line in sys.stdin:
results = [lookup(id) for id in line.rstrip().split()]
if results: # might be empty if all IDs on the same line failed
print(" ".join(results))
sys.exit(0 if success else 1)
@graph_cli_group.command(name="rpc-serve")
@click.option(
"--host",
"-h",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="host IP address to bind the server on",
)
@click.option(
"--port",
"-p",
default=5009,
type=click.INT,
metavar="PORT",
show_default=True,
help="port to bind the server on",
)
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph RPC service"""
import aiohttp
from swh.graph.server.app import make_app
config = ctx.obj["config"]
config.setdefault("graph", {})
config["graph"]["path"] = graph
app = make_app(config=config)
aiohttp.web.run_app(app, host=host, port=port)
@graph_cli_group.command()
@click.option(
"--input-dataset",
"-i",
required=True,
type=PathlibPath(),
help="graph dataset directory, in ORC format",
)
@click.option(
"--output-directory",
"-o",
required=True,
type=PathlibPath(),
help="directory where to store compressed graph",
)
@click.option(
"--graph-name",
"-g",
default="graph",
type=PathlibPath(),
help="directory where to store compressed graph",
)
@click.option(
"--steps",
"-s",
metavar="STEPS",
type=StepOption(),
help="run only these compression steps (default: all steps)",
)
@click.pass_context
def compress(ctx, input_dataset, output_directory, graph_name, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
Compression steps are: (1) mph, (2) bv, (3) bfs, (4) permute_bfs,
(5) transpose_bfs, (6) simplify, (7) llp, (8) permute_llp, (9) obl, (10)
compose_orders, (11) stats, (12) transpose, (13) transpose_obl, (14) maps,
(15) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
from swh.graph import webgraph
try:
conf = ctx.obj["config"]["graph"]["compress"]
except KeyError:
conf = {} # use defaults
webgraph.compress(graph_name, input_dataset, output_directory, steps, conf)
@graph_cli_group.command(name="cachemount")
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.option(
"--cache",
"-c",
default="/dev/shm/swh-graph/default",
metavar="CACHE",
type=PathlibPath(),
help="Memory cache path (defaults to /dev/shm/swh-graph/default)",
)
@click.pass_context
def cachemount(ctx, graph, cache):
"""
Cache the mmapped files of the compressed graph in a tmpfs.
This command creates a new directory at the path given by CACHE that has
the same structure as the compressed graph basename, except it copies the
files that require mmap access (:file:`{*}.graph`) but uses symlinks from the source
for all the other files (:file:`{*}.map`, :file:`{*}.bin`, ...).
The command outputs the path to the memory cache directory (particularly
useful when relying on the default value).
"""
import shutil
cache.mkdir(parents=True)
for src in Path(graph).parent.glob("*"):
dst = cache / src.name
if src.suffix == ".graph":
shutil.copy2(src, dst)
else:
dst.symlink_to(src.resolve())
print(cache)
def main():
return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
if __name__ == "__main__":
main()
diff --git a/swh/graph/config.py b/swh/graph/config.py
index 76811af..d6b9c87 100644
--- a/swh/graph/config.py
+++ b/swh/graph/config.py
@@ -1,116 +1,117 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import sys
import psutil
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = Path(__file__).parents[2]
try_paths = [
swh_graph_root / "java/target/",
Path(sys.prefix) / "share/swh-graph/",
Path(sys.prefix) / "local/share/swh-graph/",
]
for path in try_paths:
glob = list(path.glob("swh-graph-*.jar"))
if glob:
if len(glob) > 1:
logging.warn(
"found multiple swh-graph JARs, " "arbitrarily picking one"
)
logging.info("using swh-graph JAR: {0}".format(glob[0]))
return str(glob[0])
raise RuntimeError("swh-graph JAR not found. Have you run `make java`?")
def check_config(conf):
"""check configuration and propagate defaults"""
conf = conf.copy()
if "batch_size" not in conf:
# Use 0.1% of the RAM as a batch size:
# ~1 billion for big servers, ~10 million for small desktop machines
- conf["batch_size"] = min(int(psutil.virtual_memory().total / 1000), 2 ** 30 - 1)
+ conf["batch_size"] = min(int(psutil.virtual_memory().total / 1000), 2**30 - 1)
if "llp_gammas" not in conf:
conf["llp_gammas"] = "-0,-1,-2,-3,-4"
if "max_ram" not in conf:
conf["max_ram"] = str(psutil.virtual_memory().total)
if "java_tool_options" not in conf:
conf["java_tool_options"] = " ".join(
[
"-Xmx{max_ram}",
"-XX:PretenureSizeThreshold=512M",
"-XX:MaxNewSize=4G",
"-XX:+UseLargePages",
"-XX:+UseTransparentHugePages",
"-XX:+UseNUMA",
"-XX:+UseTLAB",
"-XX:+ResizeTLAB",
]
)
conf["java_tool_options"] = conf["java_tool_options"].format(
max_ram=conf["max_ram"]
)
if "java" not in conf:
conf["java"] = "java"
if "classpath" not in conf:
conf["classpath"] = find_graph_jar()
return conf
def check_config_compress(config, graph_name, in_dir, out_dir):
"""check compression-specific configuration and initialize its execution
environment.
"""
conf = check_config(config)
conf["graph_name"] = graph_name
conf["in_dir"] = str(in_dir)
conf["out_dir"] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if "tmp_dir" not in conf:
tmp_dir = out_dir / "tmp"
conf["tmp_dir"] = str(tmp_dir)
else:
tmp_dir = Path(conf["tmp_dir"])
tmp_dir.mkdir(parents=True, exist_ok=True)
if "logback" not in conf:
logback_confpath = tmp_dir / "logback.xml"
with open(logback_confpath, "w") as conffile:
conffile.write(
"""
You have reached the Software Heritage graph API server.
See its API documentation for more information.
""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.backend = self.request.app["backend"] def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") return s def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}") return s def get_return_types(self): """Validate HTTP query parameter 'return types', i.e, a set of types which we will filter the query results with""" s = self.request.query.get("return_types", "*") if any( node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for node_type in s.split(",") ): raise aiohttp.web.HTTPBadRequest( text=f"invalid type for filtering res: {s}" ) # if the user puts a star, # then we filter nothing, we don't need the other information if "*" in s: return "*" else: return s def get_traversal(self): """Validate HTTP query parameter `traversal`, i.e., visit order""" s = self.request.query.get("traversal", "dfs") if s not in ("bfs", "dfs"): raise aiohttp.web.HTTPBadRequest(text=f"invalid traversal order: {s}") return s def get_limit(self): """Validate HTTP query parameter `limit`, i.e., number of results""" s = self.request.query.get("limit", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}") def get_max_edges(self): """Validate HTTP query parameter 'max_edges', i.e., the limit of the number of edges that can be visited""" s = self.request.query.get("max_edges", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") def check_swhid(self, swhid): """Validate that the given SWHID exists in the graph""" try: self.backend.check_swhid(swhid) except (NameError, ValueError) as e: raise aiohttp.web.HTTPBadRequest(text=str(e)) class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: self._buf = [] try: await self.stream_response() finally: await self._flush_buffer() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" self._buf.append(line) if len(self._buf) > 100: await self._flush_buffer() async def _flush_buffer(self): await self.response_stream.write("\n".join(self._buf).encode() + b"\n") self._buf = [] class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): stats = self.backend.stats() return aiohttp.web.Response(body=stats, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" simple_traversal_type: Optional[str] = None async def prepare_response(self): self.src = self.request.match_info["src"] self.edges = self.get_edges() self.direction = self.get_direction() self.max_edges = self.get_max_edges() self.return_types = self.get_return_types() self.check_swhid(self.src) async def stream_response(self): async for res_line in self.backend.traversal( self.simple_traversal_type, self.direction, self.edges, self.src, self.max_edges, self.return_types, ): await self.stream_line(res_line) class LeavesView(SimpleTraversalView): simple_traversal_type = "leaves" class NeighborsView(SimpleTraversalView): simple_traversal_type = "neighbors" class VisitNodesView(SimpleTraversalView): simple_traversal_type = "visit_nodes" class VisitEdgesView(SimpleTraversalView): simple_traversal_type = "visit_edges" class WalkView(StreamingGraphView): async def prepare_response(self): self.src = self.request.match_info["src"] self.dst = self.request.match_info["dst"] self.edges = self.get_edges() self.direction = self.get_direction() self.algo = self.get_traversal() self.limit = self.get_limit() self.max_edges = self.get_max_edges() self.return_types = self.get_return_types() self.check_swhid(self.src) if self.dst not in EXTENDED_SWHID_TYPES: self.check_swhid(self.dst) async def get_walk_iterator(self): return self.backend.traversal( "walk", self.direction, self.edges, self.algo, self.src, self.dst, self.max_edges, self.return_types, ) async def stream_response(self): it = self.get_walk_iterator() if self.limit < 0: queue = deque(maxlen=-self.limit) async for res_swhid in it: queue.append(res_swhid) while queue: await self.stream_line(queue.popleft()) else: count = 0 async for res_swhid in it: if self.limit == 0 or count < self.limit: await self.stream_line(res_swhid) count += 1 else: break class RandomWalkView(WalkView): def get_walk_iterator(self): return self.backend.traversal( "random_walk", self.direction, self.edges, RANDOM_RETRIES, self.src, self.dst, self.max_edges, self.return_types, ) class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): self.src = self.request.match_info["src"] self.check_swhid(self.src) self.edges = self.get_edges() self.direction = self.get_direction() self.max_edges = self.get_max_edges() loop = asyncio.get_event_loop() cnt = await loop.run_in_executor( None, self.backend.count, self.count_type, self.direction, self.edges, self.src, self.max_edges, ) return aiohttp.web.Response(body=str(cnt), content_type="application/json") class CountNeighborsView(CountView): count_type = "neighbors" class CountLeavesView(CountView): count_type = "leaves" class CountVisitNodesView(CountView): count_type = "visit_nodes" def make_app(config=None, backend=None, **kwargs): if (config is None) == (backend is None): raise ValueError("make_app() expects exactly one of 'config' or 'backend'") if backend is None: backend = Backend(graph_path=config["graph"]["path"], config=config["graph"]) app = GraphServerApp(**kwargs) app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), # temporarily disabled in wait of a proper fix for T1969 # aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) app["backend"] = backend return app def make_app_from_configfile(): - """Load configuration and then build application to run - - """ + """Load configuration and then build application to run""" config_file = os.environ.get("SWH_CONFIG_FILENAME") config = config_read(config_file) return make_app(config=config) diff --git a/swh/graph/tests/dataset/generate_dataset.py b/swh/graph/tests/dataset/generate_dataset.py index f7c5bfb..f91d6c7 100755 --- a/swh/graph/tests/dataset/generate_dataset.py +++ b/swh/graph/tests/dataset/generate_dataset.py @@ -1,285 +1,353 @@ #!/usr/bin/env python3 # type: ignore # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import argparse import datetime import logging from pathlib import Path from swh.dataset.exporters.edges import GraphEdgesExporter from swh.dataset.exporters.orc import ORCExporter from swh.graph.webgraph import compress from swh.model.model import ( Content, Directory, DirectoryEntry, ObjectType, Origin, OriginVisit, OriginVisitStatus, Person, Release, Revision, RevisionType, SkippedContent, Snapshot, SnapshotBranch, TargetType, Timestamp, TimestampWithTimezone, ) def h(id: int, width=40) -> bytes: return bytes.fromhex(f"{id:0{width}}") PERSONS = [ Person(fullname=b"foo", name=b"foo", email=b""), Person(fullname=b"bar", name=b"bar", email=b""), Person(fullname=b"baz", name=b"baz", email=b""), ] TEST_DATASET = [ Content(sha1_git=h(1), sha1=h(1), sha256=h(1, 64), blake2s256=h(1, 64), length=42), Content(sha1_git=h(4), sha1=h(4), sha256=h(4, 64), blake2s256=h(4, 64), length=404), Content( sha1_git=h(5), sha1=h(5), sha256=h(5, 64), blake2s256=h(5, 64), length=1337 ), Content(sha1_git=h(7), sha1=h(7), sha256=h(7, 64), blake2s256=h(7, 64), length=666), Content( sha1_git=h(11), sha1=h(11), sha256=h(11, 64), blake2s256=h(11, 64), length=313 ), Content( sha1_git=h(14), sha1=h(14), sha256=h(14, 64), blake2s256=h(14, 64), length=14 ), SkippedContent( sha1_git=h(15), sha1=h(15), sha256=h(15, 64), blake2s256=h(15, 64), length=404, status="absent", reason="Not found", ), Directory( id=h(2), entries=( DirectoryEntry( - name=b"README.md", perms=0o100644, type="file", target=h(1), + name=b"README.md", + perms=0o100644, + type="file", + target=h(1), ), ), ), Directory( id=h(6), entries=( DirectoryEntry( - name=b"README.md", perms=0o100644, type="file", target=h(4), + name=b"README.md", + perms=0o100644, + type="file", + target=h(4), + ), + DirectoryEntry( + name=b"parser.c", + perms=0o100644, + type="file", + target=h(5), ), - DirectoryEntry(name=b"parser.c", perms=0o100644, type="file", target=h(5),), ), ), Directory( id=h(8), entries=( DirectoryEntry( - name=b"README.md", perms=0o100644, type="file", target=h(1), + name=b"README.md", + perms=0o100644, + type="file", + target=h(1), + ), + DirectoryEntry( + name=b"parser.c", + perms=0o100644, + type="file", + target=h(7), + ), + DirectoryEntry( + name=b"tests", + perms=0o100755, + type="dir", + target=h(6), ), - DirectoryEntry(name=b"parser.c", perms=0o100644, type="file", target=h(7),), - DirectoryEntry(name=b"tests", perms=0o100755, type="dir", target=h(6),), ), ), Directory( id=h(12), entries=( DirectoryEntry( - name=b"README.md", perms=0o100644, type="file", target=h(11), + name=b"README.md", + perms=0o100644, + type="file", + target=h(11), ), DirectoryEntry( - name=b"oldproject", perms=0o100755, type="dir", target=h(8), + name=b"oldproject", + perms=0o100755, + type="dir", + target=h(8), ), ), ), Directory( id=h(16), entries=( DirectoryEntry( - name=b"TODO.txt", perms=0o100644, type="file", target=h(15), + name=b"TODO.txt", + perms=0o100644, + type="file", + target=h(15), ), ), ), Directory( id=h(17), entries=( DirectoryEntry( - name=b"TODO.txt", perms=0o100644, type="file", target=h(14), + name=b"TODO.txt", + perms=0o100644, + type="file", + target=h(14), + ), + DirectoryEntry( + name=b"old", + perms=0o100755, + type="dir", + target=h(16), ), - DirectoryEntry(name=b"old", perms=0o100755, type="dir", target=h(16),), ), ), Revision( id=h(3), message=b"Initial commit", date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111122220, microseconds=0,), + timestamp=Timestamp( + seconds=1111122220, + microseconds=0, + ), offset_bytes=b"+0200", ), committer=PERSONS[0], author=PERSONS[0], committer_date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111122220, microseconds=0,), + timestamp=Timestamp( + seconds=1111122220, + microseconds=0, + ), offset_bytes=b"+0200", ), type=RevisionType.GIT, directory=h(2), synthetic=False, metadata=None, parents=(), ), Revision( id=h(9), message=b"Add parser", date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111144440, microseconds=0,), + timestamp=Timestamp( + seconds=1111144440, + microseconds=0, + ), offset_bytes=b"+0200", ), committer=PERSONS[1], author=PERSONS[1], committer_date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111155550, microseconds=0,), + timestamp=Timestamp( + seconds=1111155550, + microseconds=0, + ), offset_bytes=b"+0200", ), type=RevisionType.GIT, directory=h(8), synthetic=False, metadata=None, parents=(h(3),), ), Revision( id=h(13), message=b"Add tests", date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111166660, microseconds=0,), + timestamp=Timestamp( + seconds=1111166660, + microseconds=0, + ), offset_bytes=b"+0200", ), committer=PERSONS[1], author=PERSONS[0], committer_date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111166660, microseconds=0,), + timestamp=Timestamp( + seconds=1111166660, + microseconds=0, + ), offset_bytes=b"+0200", ), type=RevisionType.GIT, directory=h(12), synthetic=False, metadata=None, parents=(h(9),), ), Revision( id=h(18), message=b"Refactor codebase", date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111177770, microseconds=0,), + timestamp=Timestamp( + seconds=1111177770, + microseconds=0, + ), offset_bytes=b"+0000", ), committer=PERSONS[0], author=PERSONS[2], committer_date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1111177770, microseconds=0,), + timestamp=Timestamp( + seconds=1111177770, + microseconds=0, + ), offset_bytes=b"+0000", ), type=RevisionType.GIT, directory=h(17), synthetic=False, metadata=None, parents=(h(13),), ), Release( id=h(10), name=b"v1.0", date=TimestampWithTimezone( - timestamp=Timestamp(seconds=1234567890, microseconds=0,), + timestamp=Timestamp( + seconds=1234567890, + microseconds=0, + ), offset_bytes=b"+0200", ), author=PERSONS[0], target_type=ObjectType.REVISION, target=h(9), message=b"Version 1.0", synthetic=False, ), Release( id=h(19), name=b"v2.0", date=None, author=PERSONS[1], target_type=ObjectType.REVISION, target=h(18), message=b"Version 2.0", synthetic=False, ), Snapshot( id=h(20), branches={ b"refs/heads/master": SnapshotBranch( target=h(9), target_type=TargetType.REVISION ), b"refs/tags/v1.0": SnapshotBranch( target=h(10), target_type=TargetType.RELEASE ), }, ), OriginVisit( origin="https://example.com/swh/graph", date=datetime.datetime( 2013, 5, 7, 4, 20, 39, 369271, tzinfo=datetime.timezone.utc ), visit=1, type="git", ), OriginVisitStatus( origin="https://example.com/swh/graph", date=datetime.datetime( 2013, 5, 7, 4, 20, 41, 369271, tzinfo=datetime.timezone.utc ), visit=1, type="git", status="full", snapshot=h(20), metadata=None, ), Origin(url="https://example.com/swh/graph"), ] def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser(description="Generate a test dataset") parser.add_argument( "--compress", action="store_true", default=False, help="Also compress the dataset", ) parser.add_argument("output", help="output directory", nargs="?", default=".") args = parser.parse_args() exporters = {"edges": GraphEdgesExporter, "orc": ORCExporter} config = {"test_unique_file_id": "all"} output_path = Path(args.output) for name, exporter in exporters.items(): with exporter(config, output_path / name) as e: for obj in TEST_DATASET: e.process_object(obj.object_type, obj.to_dict()) if args.compress: compress("example", output_path / "orc", output_path / "compressed") if __name__ == "__main__": main() diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py index 4dc4036..454c0c5 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_api_client.py @@ -1,381 +1,384 @@ import hashlib import pytest from pytest import raises from swh.core.api import RemoteException from swh.graph.client import GraphArgumentException TEST_ORIGIN_ID = "swh:1:ori:{}".format( hashlib.sha1(b"https://example.com/swh/graph").hexdigest() ) def test_stats(graph_client): stats = graph_client.stats() assert set(stats.keys()) == {"counts", "ratios", "indegree", "outdegree"} assert set(stats["counts"].keys()) == {"nodes", "edges"} assert set(stats["ratios"].keys()) == { "compression", "bits_per_node", "bits_per_edge", "avg_locality", } assert set(stats["indegree"].keys()) == {"min", "max", "avg"} assert set(stats["outdegree"].keys()) == {"min", "max", "avg"} assert stats["counts"]["nodes"] == 21 assert stats["counts"]["edges"] == 23 assert isinstance(stats["ratios"]["compression"], float) assert isinstance(stats["ratios"]["bits_per_node"], float) assert isinstance(stats["ratios"]["bits_per_edge"], float) assert isinstance(stats["ratios"]["avg_locality"], float) assert stats["indegree"]["min"] == 0 assert stats["indegree"]["max"] == 3 assert isinstance(stats["indegree"]["avg"], float) assert stats["outdegree"]["min"] == 0 assert stats["outdegree"]["max"] == 3 assert isinstance(stats["outdegree"]["avg"], float) def test_leaves(graph_client): actual = list(graph_client.leaves(TEST_ORIGIN_ID)) expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_neighbors(graph_client): actual = list( graph_client.neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) ) expected = [ "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_nodes_filtered(graph_client): actual = list( graph_client.visit_nodes( - "swh:1:rel:0000000000000000000000000000000000000010", return_types="dir", + "swh:1:rel:0000000000000000000000000000000000000010", + return_types="dir", ) ) expected = [ "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ] assert set(actual) == set(expected) def test_visit_nodes_filtered_star(graph_client): actual = list( graph_client.visit_nodes( - "swh:1:rel:0000000000000000000000000000000000000010", return_types="*", + "swh:1:rel:0000000000000000000000000000000000000010", + return_types="*", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", ] assert set(actual) == set(expected) def test_visit_edges(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_edges_limited(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", max_edges=4, edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] # As there are four valid answers (up to reordering), we cannot check for # equality. Instead, we check the client returned all edges but one. assert set(actual).issubset(set(expected)) assert len(actual) == 3 def test_visit_edges_diamond_pattern(graph_client): actual = list( graph_client.visit_edges( - "swh:1:rev:0000000000000000000000000000000000000009", edges="*", + "swh:1:rev:0000000000000000000000000000000000000009", + edges="*", ) ) expected = [ ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000005", ), ] assert set(actual) == set(expected) @pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { "edges": "dir:dir,dir:rev,rev:*", "direction": "backward", "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) def test_random_walk_dst_is_type(graph_client): """as the walk is random, we test a visit from a cnt node to a release reachable from every single path in the backward graph, and only check the final node of the path (i.e., the release) """ args = ("swh:1:cnt:0000000000000000000000000000000000000015", "rel") kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no release directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_random_walk_dst_is_node(graph_client): """Same as test_random_walk_dst_is_type, but we target the specific release node instead of a type """ args = ( "swh:1:cnt:0000000000000000000000000000000000000015", "swh:1:rel:0000000000000000000000000000000000000019", ) kwargs = {"direction": "backward"} expected_root = "swh:1:rel:0000000000000000000000000000000000000019" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_count(graph_client): actual = graph_client.count_leaves(TEST_ORIGIN_ID) assert actual == 4 actual = graph_client.count_visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 def test_param_validation(graph_client): with raises(GraphArgumentException) as exc_info: # SWHID not found list(graph_client.leaves("swh:1:rel:00ffffffff000000000000000000000000000010")) if exc_info.value.response: assert exc_info.value.response.status_code == 404 with raises(GraphArgumentException) as exc_info: # malformed SWHID list( graph_client.neighbors("swh:1:rel:00ffffffff00000000zzzzzzz000000000000010") ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed edge specificaiton list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:notanodetype,dir:rev,rev:*", direction="backward", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed direction list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:dir,dir:rev,rev:*", direction="notadirection", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 @pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order list( graph_client.walk( "swh:1:dir:0000000000000000000000000000000000000016", "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="notatraversalorder", ) ) assert exc_info.value.response.status_code == 400 diff --git a/tools/dir2graph b/tools/dir2graph index 054ac61..b843af0 100755 --- a/tools/dir2graph +++ b/tools/dir2graph @@ -1,103 +1,106 @@ #!/usr/bin/env python3 # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import sys from typing import Iterator, Tuple, Union import click from swh.model.from_disk import Content, Directory from swh.model.identifiers import CoreSWHID, ObjectType def swhid_of_node(obj: Union[Content, Directory]): return CoreSWHID( - object_type=ObjectType[obj.object_type.upper()], object_id=obj.hash, + object_type=ObjectType[obj.object_type.upper()], + object_id=obj.hash, ) -def walk_model(root: Directory,) -> Iterator[Tuple[CoreSWHID, Iterator[CoreSWHID]]]: +def walk_model( + root: Directory, +) -> Iterator[Tuple[CoreSWHID, Iterator[CoreSWHID]]]: """recursively visit a model.from_disk object Yield pairs (SWHID, neighbors) where SWHID is the identifier of a node and neighbors an iterator over SWHID of nodes directly reachable from it. So you can obtain all graph nodes by only looking at the first element of the pair, and edges by joining the first element with each of the neighbors. Note that no deduplication is applied, so both nodes and edges can be yielded multiple times if they do in fact appear multiple times in the graph. """ def walk_neighbors(node): for child in node.values(): yield swhid_of_node(child) to_visit = [root] while to_visit: node = to_visit.pop() swhid = swhid_of_node(node) yield (swhid, walk_neighbors(node)) for child in node.values(): to_visit.insert(0, child) @click.command() @click.argument( "directory", required=True, type=click.Path(exists=True, file_okay=False, dir_okay=True), ) @click.option( "-n", "--nodes-output", type=click.Path(file_okay=True, dir_okay=False, writable=True), help="output file where to store nodes as SWHIDs" " (if not given, node SWHIDs will not be output)." ' Use "-" for stdout.' " Default: output node SWHIDs to stdout.", ) @click.option( "-e", "--edges-output", type=click.Path(file_okay=True, dir_okay=False, writable=True), help="output file where to store edges as SWHID pairs" " (if not given, edge SWHIDs will not be output)" ' Use "-" for stdout.' " Default: do not output edge SWHIDs.", ) def main(directory, nodes_output, edges_output): """Recursively identifies the content of a directory. Outputs SWHID identifiers as both nodes (one SWHID per object) and edges (pairs of SWHIDs (parent, child) corresponding to the filesystem hierarchy). """ nodes_file = sys.stdout edges_file = None if nodes_output: if nodes_output == "-": nodes_file = sys.stdout else: nodes_file = open(nodes_output, "w") if edges_output: if edges_output == "-": edges_file = sys.stdout else: edges_file = open(edges_output, "w") root = Directory.from_disk(path=directory.encode()) for swhid, neighbors in walk_model(root): if nodes_file: nodes_file.write(f"{swhid}\n") if edges_file: for child_swhid in neighbors: edges_file.write(f"{swhid} {child_swhid}\n") if __name__ == "__main__": main()