diff --git a/swh/graph/backend.py b/swh/graph/backend.py index d838023..ecf3cb9 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,194 +1,194 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.config import check_config -from swh.graph.pid import NodeToPidMap, PidToNodeMap -from swh.model.identifiers import PID_TYPES +from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap +from swh.model.identifiers import SWHID_TYPES BUF_SIZE = 64 * 1024 BIN_FMT = ">q" # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 -NODE2PID_EXT = "node2pid.bin" -PID2NODE_EXT = "pid2node.bin" +NODE2SWHID_EXT = "node2swhid.bin" +SWHID2NODE_EXT = "swhid2node.bin" def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path, config=None): self.gateway = None self.entry = None self.graph_path = graph_path self.config = check_config(config or {}) def __enter__(self): self.gateway = JavaGateway.launch_gateway( java_path=None, javaopts=self.config["java_tool_options"].split(), classpath=self.config["classpath"], die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) - self.node2pid = NodeToPidMap(self.graph_path + "." + NODE2PID_EXT) - self.pid2node = PidToNodeMap(self.graph_path + "." + PID2NODE_EXT) + self.node2swhid = NodeToSwhidMap(self.graph_path + "." + NODE2SWHID_EXT) + self.swhid2node = SwhidToNodeMap(self.graph_path + "." + SWHID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() def count(self, ttype, direction, edges_fmt, src): method = getattr(self.entry, "count_" + ttype) return method(direction, edges_fmt, src) async def simple_traversal(self, ttype, direction, edges_fmt, src): assert ttype in ("leaves", "neighbors", "visit_nodes") method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): - if dst in PID_TYPES: + if dst in SWHID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id async def random_walk(self, direction, edges_fmt, retries, src, dst): - if dst in PID_TYPES: + if dst in SWHID_TYPES: it = self.stream_proxy.random_walk_type( direction, edges_fmt, retries, src, dst ) else: it = self.stream_proxy.random_walk(direction, edges_fmt, retries, src, dst) async for node_id in it: # TODO return 404 if path is empty yield node_id async def visit_edges(self, direction, edges_fmt, src): it = self.stream_proxy.visit_edges(direction, edges_fmt, src) # convert stream a, b, c, d -> (a, b), (c, d) prevNode = None async for node in it: if prevNode is not None: yield (prevNode, node) prevNode = None else: prevNode = node async def visit_paths(self, direction, edges_fmt, src): path = [] async for node in self.stream_proxy.visit_paths(direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() open_thread = loop.run_in_executor(None, open, fname, "rb") # Since the open() call on the FIFO is blocking until it is also opened # on the Java side, we await it with a timeout in case there is an # exception that prevents the write-side open(). with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix="swh-graph-") as tmpdirname: cli_fifo = os.path.join(tmpdirname, "swh-graph.fifo") os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: # If the read-side open() timeouts, an exception on the # Java side probably happened that prevented the # write-side open(). We propagate this exception here if # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py index 5265ac2..1b8d34c 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,445 +1,445 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from pathlib import Path import sys from typing import TYPE_CHECKING, Any, Dict, Set, Tuple # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup if TYPE_CHECKING: from swh.graph.webgraph import CompressionStep # noqa class StepOption(click.ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa steps: Set[CompressionStep] = set() specs = value.split(",") for spec in specs: if "-" in spec: # step range (raw_l, raw_r) = spec.split("-", maxsplit=1) if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail(f"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union( set(map(CompressionStep, range(l_idx.value, r_idx.value + 1))) ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail( f"invalid step specification: {value}, " f"see --help" ) return steps class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string""" def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx)) DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})} @click.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="YAML configuration file", ) @click.pass_context def cli(ctx, config_file): """Software Heritage graph tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) if "graph" not in conf: raise ValueError( 'no "graph" stanza found in configuration file %s' % config_file ) ctx.obj["config"] = conf @cli.command("api-client") @click.option("--host", default="localhost", help="Graph server host") @click.option("--port", default="5009", help="Graph server port") @click.pass_context def api_client(ctx, host, port): """client for the graph REST service""" from swh.graph import client url = "http://{}:{}".format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) @cli.group("map") @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass -def dump_pid2node(filename): - from swh.graph.pid import PidToNodeMap +def dump_swhid2node(filename): + from swh.graph.swhid import SwhidToNodeMap - for (pid, int) in PidToNodeMap(filename): - print("{}\t{}".format(pid, int)) + for (swhid, int) in SwhidToNodeMap(filename): + print("{}\t{}".format(swhid, int)) -def dump_node2pid(filename): - from swh.graph.pid import NodeToPidMap +def dump_node2swhid(filename): + from swh.graph.swhid import NodeToSwhidMap - for (int, pid) in NodeToPidMap(filename): - print("{}\t{}".format(int, pid)) + for (int, swhid) in NodeToSwhidMap(filename): + print("{}\t{}".format(int, swhid)) -def restore_pid2node(filename): - """read a textual PID->int map from stdin and write its binary version to +def restore_swhid2node(filename): + """read a textual SWHID->int map from stdin and write its binary version to filename """ - from swh.graph.pid import PidToNodeMap + from swh.graph.swhid import SwhidToNodeMap with open(filename, "wb") as dst: for line in sys.stdin: - (str_pid, str_int) = line.split() - PidToNodeMap.write_record(dst, str_pid, int(str_int)) + (str_swhid, str_int) = line.split() + SwhidToNodeMap.write_record(dst, str_swhid, int(str_int)) -def restore_node2pid(filename, length): - """read a textual int->PID map from stdin and write its binary version to +def restore_node2swhid(filename, length): + """read a textual int->SWHID map from stdin and write its binary version to filename """ - from swh.graph.pid import NodeToPidMap + from swh.graph.swhid import NodeToSwhidMap - node2pid = NodeToPidMap(filename, mode="wb", length=length) + node2swhid = NodeToSwhidMap(filename, mode="wb", length=length) for line in sys.stdin: - (str_int, str_pid) = line.split() - node2pid[int(str_int)] = str_pid - node2pid.close() + (str_int, str_swhid) = line.split() + node2swhid[int(str_int)] = str_swhid + node2swhid.close() @map.command("dump") @click.option( "--type", "-t", "map_type", required=True, - type=click.Choice(["pid2node", "node2pid"]), + type=click.Choice(["swhid2node", "node2swhid"]), help="type of map to dump", ) @click.argument("filename", required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): - """Dump a binary PID<->node map to textual format.""" - if map_type == "pid2node": - dump_pid2node(filename) - elif map_type == "node2pid": - dump_node2pid(filename) + """Dump a binary SWHID<->node map to textual format.""" + if map_type == "swhid2node": + dump_swhid2node(filename) + elif map_type == "node2swhid": + dump_node2swhid(filename) else: raise ValueError("invalid map type: " + map_type) pass @map.command("restore") @click.option( "--type", "-t", "map_type", required=True, - type=click.Choice(["pid2node", "node2pid"]), + type=click.Choice(["swhid2node", "node2swhid"]), help="type of map to dump", ) @click.option( "--length", "-l", type=int, help="""map size in number of logical records - (required for node2pid maps)""", + (required for node2swhid maps)""", ) @click.argument("filename", required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): - """Restore a binary PID<->node map from textual format.""" - if map_type == "pid2node": - restore_pid2node(filename) - elif map_type == "node2pid": + """Restore a binary SWHID<->node map from textual format.""" + if map_type == "swhid2node": + restore_swhid2node(filename) + elif map_type == "node2swhid": if length is None: raise click.UsageError( "map length is required when restoring {} maps".format(map_type), ctx ) - restore_node2pid(filename, length) + restore_node2swhid(filename, length) else: raise ValueError("invalid map type: " + map_type) @map.command("write") @click.option( "--type", "-t", "map_type", required=True, - type=click.Choice(["pid2node", "node2pid"]), + type=click.Choice(["swhid2node", "node2swhid"]), help="type of map to write", ) @click.argument("filename", required=True, type=click.Path()) @click.pass_context def write(ctx, map_type, filename): """Write a map to disk sequentially. - read from stdin a textual PID->node mapping (for pid2node, or a simple - sequence of PIDs for node2pid) and write it to disk in the requested binary + read from stdin a textual SWHID->node mapping (for swhid2node, or a simple + sequence of SWHIDs for node2swhid) and write it to disk in the requested binary map format note that no sorting is applied, so the input should already be sorted as - required by the chosen map type (by PID for pid2node, by int for node2pid) + required by the chosen map type (by SWHID for swhid2node, by int for node2swhid) """ - from swh.graph.pid import NodeToPidMap, PidToNodeMap + from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap with open(filename, "wb") as f: - if map_type == "pid2node": + if map_type == "swhid2node": for line in sys.stdin: - (pid, int_str) = line.rstrip().split(maxsplit=1) - PidToNodeMap.write_record(f, pid, int(int_str)) - elif map_type == "node2pid": + (swhid, int_str) = line.rstrip().split(maxsplit=1) + SwhidToNodeMap.write_record(f, swhid, int(int_str)) + elif map_type == "node2swhid": for line in sys.stdin: - pid = line.rstrip() - NodeToPidMap.write_record(f, pid) + swhid = line.rstrip() + NodeToSwhidMap.write_record(f, swhid) else: raise ValueError("invalid map type: " + map_type) @map.command("lookup") @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.argument("identifiers", nargs=-1) def map_lookup(graph, identifiers): """Lookup identifiers using on-disk maps. - Depending on the identifier type lookup either a PID into a PID->node (and + Depending on the identifier type lookup either a SWHID into a SWHID->node (and return the node integer identifier) or, vice-versa, lookup a node integer - identifier into a node->PID (and return the PID). The desired behavior is + identifier into a node->SWHID (and return the SWHID). The desired behavior is chosen depending on the syntax of each given identifier. Identifiers can be passed either directly on the command line or on standard input, separate by blanks. Logical lines (as returned by readline()) in stdin will be preserved in stdout. """ - from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT - from swh.graph.pid import NodeToPidMap, PidToNodeMap + from swh.graph.backend import NODE2SWHID_EXT, SWHID2NODE_EXT + from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap import swh.model.exceptions - from swh.model.identifiers import parse_persistent_identifier + from swh.model.identifiers import parse_swhid success = True # no identifiers failed to be looked up - pid2node = PidToNodeMap(f"{graph}.{PID2NODE_EXT}") - node2pid = NodeToPidMap(f"{graph}.{NODE2PID_EXT}") + swhid2node = SwhidToNodeMap(f"{graph}.{SWHID2NODE_EXT}") + node2swhid = NodeToSwhidMap(f"{graph}.{NODE2SWHID_EXT}") def lookup(identifier): - nonlocal success, pid2node, node2pid - is_pid = None + nonlocal success, swhid2node, node2swhid + is_swhid = None try: int(identifier) - is_pid = False + is_swhid = False except ValueError: try: - parse_persistent_identifier(identifier) - is_pid = True + parse_swhid(identifier) + is_swhid = True except swh.model.exceptions.ValidationError: success = False logging.error(f'invalid identifier: "{identifier}", skipping') try: - if is_pid: - return str(pid2node[identifier]) + if is_swhid: + return str(swhid2node[identifier]) else: - return node2pid[int(identifier)] + return node2swhid[int(identifier)] except KeyError: success = False logging.error(f'identifier not found: "{identifier}", skipping') if identifiers: # lookup identifiers passed via CLI for identifier in identifiers: print(lookup(identifier)) else: # lookup identifiers passed via stdin, preserving logical lines for line in sys.stdin: results = [lookup(id) for id in line.rstrip().split()] if results: # might be empty if all IDs on the same line failed print(" ".join(results)) sys.exit(0 if success else 1) @cli.command(name="rpc-serve") @click.option( "--host", "-h", default="0.0.0.0", metavar="IP", show_default=True, help="host IP address to bind the server on", ) @click.option( "--port", "-p", default=5009, type=click.INT, metavar="PORT", show_default=True, help="port to bind the server on", ) @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.pass_context def serve(ctx, host, port, graph): """run the graph REST service""" import aiohttp from swh.graph.backend import Backend from swh.graph.server.app import make_app backend = Backend(graph_path=graph, config=ctx.obj["config"]) app = make_app(backend=backend) with backend: aiohttp.web.run_app(app, host=host, port=port) @cli.command() @click.option( "--graph", "-g", required=True, metavar="GRAPH", type=PathlibPath(), help="input graph basename", ) @click.option( "--outdir", "-o", "out_dir", required=True, metavar="DIR", type=PathlibPath(), help="directory where to store compressed graph", ) @click.option( "--steps", "-s", metavar="STEPS", type=StepOption(), help="run only these compression steps (default: all steps)", ) @click.pass_context def compress(ctx, graph, out_dir, steps): """Compress a graph using WebGraph Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz Output: a directory containing a WebGraph compressed graph Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, (11) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ from swh.graph import webgraph graph_name = graph.name in_dir = graph.parent try: conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults webgraph.compress(graph_name, in_dir, out_dir, steps, conf) @cli.command(name="cachemount") @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.option( "--cache", "-c", default="/dev/shm/swh-graph/default", metavar="CACHE", type=PathlibPath(), help="Memory cache path (defaults to /dev/shm/swh-graph/default)", ) @click.pass_context def cachemount(ctx, graph, cache): """ Cache the mmapped files of the compressed graph in a tmpfs. This command creates a new directory at the path given by CACHE that has the same structure as the compressed graph basename, except it copies the files that require mmap access (*.graph) but uses symlinks from the source for all the other files (.map, .bin, ...). The command outputs the path to the memory cache directory (particularly useful when relying on the default value). """ import shutil cache.mkdir(parents=True) for src in Path(graph).parent.glob("*"): dst = cache / src.name if src.suffix == ".graph": shutil.copy2(src, dst) else: dst.symlink_to(src.resolve()) print(cache) def main(): return cli(auto_envvar_prefix="SWH_GRAPH") if __name__ == "__main__": main() diff --git a/swh/graph/client.py b/swh/graph/client.py index 7f32546..1541900 100644 --- a/swh/graph/client.py +++ b/swh/graph/client.py @@ -1,110 +1,110 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from swh.core.api import RPCClient class GraphAPIError(Exception): """Graph API Error""" def __str__(self): return "An unexpected error occurred in the Graph backend: {}".format(self.args) class RemoteGraphClient(RPCClient): """Client to the Software Heritage Graph.""" def __init__(self, url, timeout=None): super().__init__(api_exception=GraphAPIError, url=url, timeout=timeout) def raw_verb_lines(self, verb, endpoint, **kwargs): response = self.raw_verb(verb, endpoint, stream=True, **kwargs) self.raise_for_status(response) for line in response.iter_lines(): yield line.decode().lstrip("\n") def get_lines(self, endpoint, **kwargs): yield from self.raw_verb_lines("get", endpoint, **kwargs) # Web API endpoints def stats(self): return self.get("stats") def leaves(self, src, edges="*", direction="forward"): return self.get_lines( "leaves/{}".format(src), params={"edges": edges, "direction": direction} ) def neighbors(self, src, edges="*", direction="forward"): return self.get_lines( "neighbors/{}".format(src), params={"edges": edges, "direction": direction} ) def visit_nodes(self, src, edges="*", direction="forward"): return self.get_lines( "visit/nodes/{}".format(src), params={"edges": edges, "direction": direction}, ) def visit_edges(self, src, edges="*", direction="forward"): for edge in self.get_lines( "visit/edges/{}".format(src), params={"edges": edges, "direction": direction}, ): yield tuple(edge.split()) def visit_paths(self, src, edges="*", direction="forward"): def decode_path_wrapper(it): for e in it: yield json.loads(e) return decode_path_wrapper( self.get_lines( "visit/paths/{}".format(src), params={"edges": edges, "direction": direction}, ) ) def walk( self, src, dst, edges="*", traversal="dfs", direction="forward", limit=None ): endpoint = "walk/{}/{}" return self.get_lines( endpoint.format(src, dst), params={ "edges": edges, "traversal": traversal, "direction": direction, "limit": limit, }, ) def random_walk(self, src, dst, edges="*", direction="forward", limit=None): endpoint = "randomwalk/{}/{}" return self.get_lines( endpoint.format(src, dst), params={"edges": edges, "direction": direction, "limit": limit}, ) def count_leaves(self, src, edges="*", direction="forward"): return self.get( "leaves/count/{}".format(src), params={"edges": edges, "direction": direction}, ) def count_neighbors(self, src, edges="*", direction="forward"): return self.get( "neighbors/count/{}".format(src), params={"edges": edges, "direction": direction}, ) def count_visit_nodes(self, src, edges="*", direction="forward"): return self.get( "visit/nodes/count/{}".format(src), params={"edges": edges, "direction": direction}, ) diff --git a/swh/graph/config.py b/swh/graph/config.py index 4487753..61b24c2 100644 --- a/swh/graph/config.py +++ b/swh/graph/config.py @@ -1,111 +1,110 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from pathlib import Path import sys import psutil def find_graph_jar(): """find swh-graph.jar, containing the Java part of swh-graph look both in development directories and installed data (for in-production deployments who fecthed the JAR from pypi) """ swh_graph_root = Path(__file__).parents[2] try_paths = [ swh_graph_root / "java/target/", Path(sys.prefix) / "share/swh-graph/", Path(sys.prefix) / "local/share/swh-graph/", ] for path in try_paths: glob = list(path.glob("swh-graph-*.jar")) if glob: if len(glob) > 1: logging.warn( "found multiple swh-graph JARs, " "arbitrarily picking one" ) logging.info("using swh-graph JAR: {0}".format(glob[0])) return str(glob[0]) raise RuntimeError("swh-graph JAR not found. Have you run `make java`?") def check_config(conf): - """check configuration and propagate defaults - """ + """check configuration and propagate defaults""" conf = conf.copy() if "batch_size" not in conf: conf["batch_size"] = "1000000000" # 1 billion if "max_ram" not in conf: conf["max_ram"] = str(psutil.virtual_memory().total) if "java_tool_options" not in conf: conf["java_tool_options"] = " ".join( [ "-Xmx{max_ram}", "-XX:PretenureSizeThreshold=512M", "-XX:MaxNewSize=4G", "-XX:+UseLargePages", "-XX:+UseTransparentHugePages", "-XX:+UseNUMA", "-XX:+UseTLAB", "-XX:+ResizeTLAB", ] ) conf["java_tool_options"] = conf["java_tool_options"].format( max_ram=conf["max_ram"] ) if "java" not in conf: conf["java"] = "java" if "classpath" not in conf: conf["classpath"] = find_graph_jar() return conf def check_config_compress(config, graph_name, in_dir, out_dir): """check compression-specific configuration and initialize its execution environment. """ conf = check_config(config) conf["graph_name"] = graph_name conf["in_dir"] = str(in_dir) conf["out_dir"] = str(out_dir) out_dir.mkdir(parents=True, exist_ok=True) if "tmp_dir" not in conf: tmp_dir = out_dir / "tmp" conf["tmp_dir"] = str(tmp_dir) else: tmp_dir = Path(conf["tmp_dir"]) tmp_dir.mkdir(parents=True, exist_ok=True) if "logback" not in conf: logback_confpath = tmp_dir / "logback.xml" with open(logback_confpath, "w") as conffile: conffile.write( """ %d %r %p [%t] %logger{1} - %m%n """ ) conf["logback"] = str(logback_confpath) conf["java_tool_options"] += " -Dlogback.configurationFile={logback}" conf["java_tool_options"] = conf["java_tool_options"].format( logback=conf["logback"] ) return conf diff --git a/swh/graph/graph.py b/swh/graph/graph.py index 726ab6d..0a87445 100644 --- a/swh/graph/graph.py +++ b/swh/graph/graph.py @@ -1,185 +1,185 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import functools from swh.graph.backend import Backend from swh.graph.dot import KIND_TO_SHAPE, dot_to_svg, graph_dot BASE_URL = "https://archive.softwareheritage.org/browse" KIND_TO_URL_FRAGMENT = { "ori": "/origin/{}", "snp": "/snapshot/{}", "rel": "/release/{}", "rev": "/revision/{}", "dir": "/directory/{}", "cnt": "/content/sha1_git:{}/", } def call_async_gen(generator, *args, **kwargs): loop = asyncio.get_event_loop() it = generator(*args, **kwargs).__aiter__() while True: try: res = loop.run_until_complete(it.__anext__()) yield res except StopAsyncIteration: break class Neighbors: """Neighbor iterator with custom O(1) length method""" def __init__(self, graph, iterator, length_func): self.graph = graph self.iterator = iterator self.length_func = length_func def __iter__(self): return self def __next__(self): succ = self.iterator.nextLong() if succ == -1: raise StopIteration return GraphNode(self.graph, succ) def __len__(self): return self.length_func() class GraphNode: """Node in the SWH graph""" def __init__(self, graph, node_id): self.graph = graph self.id = node_id def children(self): return Neighbors( self.graph, self.graph.java_graph.successors(self.id), lambda: self.graph.java_graph.outdegree(self.id), ) def parents(self): return Neighbors( self.graph, self.graph.java_graph.predecessors(self.id), lambda: self.graph.java_graph.indegree(self.id), ) def simple_traversal(self, ttype, direction="forward", edges="*"): for node in call_async_gen( self.graph.backend.simple_traversal, ttype, direction, edges, self.id ): yield self.graph[node] def leaves(self, *args, **kwargs): yield from self.simple_traversal("leaves", *args, **kwargs) def visit_nodes(self, *args, **kwargs): yield from self.simple_traversal("visit_nodes", *args, **kwargs) def visit_edges(self, direction="forward", edges="*"): for src, dst in call_async_gen( self.graph.backend.visit_edges, direction, edges, self.id ): yield (self.graph[src], self.graph[dst]) def visit_paths(self, direction="forward", edges="*"): for path in call_async_gen( self.graph.backend.visit_paths, direction, edges, self.id ): yield [self.graph[node] for node in path] def walk(self, dst, direction="forward", edges="*", traversal="dfs"): for node in call_async_gen( self.graph.backend.walk, direction, edges, traversal, self.id, dst ): yield self.graph[node] def _count(self, ttype, direction="forward", edges="*"): return self.graph.backend.count(ttype, direction, edges, self.id) count_leaves = functools.partialmethod(_count, ttype="leaves") count_neighbors = functools.partialmethod(_count, ttype="neighbors") count_visit_nodes = functools.partialmethod(_count, ttype="visit_nodes") @property - def pid(self): - return self.graph.node2pid[self.id] + def swhid(self): + return self.graph.node2swhid[self.id] @property def kind(self): - return self.pid.split(":")[2] + return self.swhid.split(":")[2] def __str__(self): - return self.pid + return self.swhid def __repr__(self): - return "<{}>".format(self.pid) + return "<{}>".format(self.swhid) def dot_fragment(self): - swh, version, kind, hash = self.pid.split(":") + swh, version, kind, hash = self.swhid.split(":") label = "{}:{}..{}".format(kind, hash[0:2], hash[-2:]) url = BASE_URL + KIND_TO_URL_FRAGMENT[kind].format(hash) shape = KIND_TO_SHAPE[kind] return '{} [label="{}", href="{}", target="_blank", shape="{}"];'.format( self.id, label, url, shape ) def _repr_svg_(self): nodes = [self, *list(self.children()), *list(self.parents())] dot = graph_dot(nodes) svg = dot_to_svg(dot) return svg class Graph: - def __init__(self, backend, node2pid, pid2node): + def __init__(self, backend, node2swhid, swhid2node): self.backend = backend self.java_graph = backend.entry.get_graph() - self.node2pid = node2pid - self.pid2node = pid2node + self.node2swhid = node2swhid + self.swhid2node = swhid2node def stats(self): return self.backend.stats() @property def path(self): return self.java_graph.getPath() def __len__(self): return self.java_graph.numNodes() def __getitem__(self, node_id): if isinstance(node_id, int): - self.node2pid[node_id] # check existence + self.node2swhid[node_id] # check existence return GraphNode(self, node_id) elif isinstance(node_id, str): - node_id = self.pid2node[node_id] + node_id = self.swhid2node[node_id] return GraphNode(self, node_id) def __iter__(self): - for pid, pos in self.backend.pid2node: - yield self[pid] + for swhid, pos in self.backend.swhid2node: + yield self[swhid] def iter_prefix(self, prefix): - for pid, pos in self.backend.pid2node.iter_prefix(prefix): - yield self[pid] + for swhid, pos in self.backend.swhid2node.iter_prefix(prefix): + yield self[swhid] - def iter_type(self, pid_type): - for pid, pos in self.backend.pid2node.iter_type(pid_type): - yield self[pid] + def iter_type(self, swhid_type): + for swhid, pos in self.backend.swhid2node.iter_type(swhid_type): + yield self[swhid] @contextlib.contextmanager def load(graph_path): with Backend(graph_path) as backend: - yield Graph(backend, backend.node2pid, backend.pid2node) + yield Graph(backend, backend.node2swhid, backend.swhid2node) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py index a45fad9..1e3e54a 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -1,313 +1,315 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ import asyncio from collections import deque import json from typing import Optional import aiohttp.web from swh.core.api.asynchronous import RPCServerApp from swh.model.exceptions import ValidationError -from swh.model.identifiers import PID_TYPES +from swh.model.identifiers import SWHID_TYPES try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore # maximum number of retries for random walks RANDOM_RETRIES = 5 # TODO make this configurable via rpc-serve configuration async def index(request): return aiohttp.web.Response( content_type="text/html", body=""" Software Heritage storage server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.backend = self.request.app["backend"] - def node_of_pid(self, pid): - """Lookup a PID in a pid2node map, failing in an HTTP-nice way if needed.""" + def node_of_swhid(self, swhid): + """Lookup a SWHID in a swhid2node map, failing in an HTTP-nice way if + needed.""" try: - return self.backend.pid2node[pid] + return self.backend.swhid2node[swhid] except KeyError: - raise aiohttp.web.HTTPNotFound(body=f"PID not found: {pid}") + raise aiohttp.web.HTTPNotFound(body=f"SWHID not found: {swhid}") except ValidationError: - raise aiohttp.web.HTTPBadRequest(body=f"malformed PID: {pid}") + raise aiohttp.web.HTTPBadRequest(body=f"malformed SWHID: {swhid}") - def pid_of_node(self, node): - """Lookup a node in a node2pid map, failing in an HTTP-nice way if needed.""" + def swhid_of_node(self, node): + """Lookup a node in a node2swhid map, failing in an HTTP-nice way if + needed.""" try: - return self.backend.node2pid[node] + return self.backend.node2swhid[node] except KeyError: raise aiohttp.web.HTTPInternalServerError( body=f"reverse lookup failed for node id: {node}" ) def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(body=f"invalid direction: {s}") return s def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ - node_type != "*" and node_type not in PID_TYPES + node_type != "*" and node_type not in SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(body=f"invalid edge restriction: {s}") return s def get_traversal(self): """Validate HTTP query parameter `traversal`, i.e., visit order""" s = self.request.query.get("traversal", "dfs") if s not in ("bfs", "dfs"): raise aiohttp.web.HTTPBadRequest(body=f"invalid traversal order: {s}") return s def get_limit(self): """Validate HTTP query parameter `limit`, i.e., number of results""" s = self.request.query.get("limit", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(body=f"invalid limit value: {s}") class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: await self.stream_response() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" await self.response_stream.write((line + "\n").encode()) class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): stats = self.backend.stats() return aiohttp.web.Response(body=stats, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" simple_traversal_type: Optional[str] = None async def prepare_response(self): src = self.request.match_info["src"] - self.src_node = self.node_of_pid(src) + self.src_node = self.node_of_swhid(src) self.edges = self.get_edges() self.direction = self.get_direction() async def stream_response(self): async for res_node in self.backend.simple_traversal( self.simple_traversal_type, self.direction, self.edges, self.src_node ): - res_pid = self.pid_of_node(res_node) - await self.stream_line(res_pid) + res_swhid = self.swhid_of_node(res_node) + await self.stream_line(res_swhid) class LeavesView(SimpleTraversalView): simple_traversal_type = "leaves" class NeighborsView(SimpleTraversalView): simple_traversal_type = "neighbors" class VisitNodesView(SimpleTraversalView): simple_traversal_type = "visit_nodes" class WalkView(StreamingGraphView): async def prepare_response(self): src = self.request.match_info["src"] dst = self.request.match_info["dst"] - self.src_node = self.node_of_pid(src) - if dst not in PID_TYPES: - self.dst_thing = self.node_of_pid(dst) + self.src_node = self.node_of_swhid(src) + if dst not in SWHID_TYPES: + self.dst_thing = self.node_of_swhid(dst) else: self.dst_thing = dst self.edges = self.get_edges() self.direction = self.get_direction() self.algo = self.get_traversal() self.limit = self.get_limit() async def get_walk_iterator(self): return self.backend.walk( self.direction, self.edges, self.algo, self.src_node, self.dst_thing ) async def stream_response(self): it = self.get_walk_iterator() if self.limit < 0: queue = deque(maxlen=-self.limit) async for res_node in it: - res_pid = self.pid_of_node(res_node) - queue.append(res_pid) + res_swhid = self.swhid_of_node(res_node) + queue.append(res_swhid) while queue: await self.stream_line(queue.popleft()) else: count = 0 async for res_node in it: if self.limit == 0 or count < self.limit: - res_pid = self.pid_of_node(res_node) - await self.stream_line(res_pid) + res_swhid = self.swhid_of_node(res_node) + await self.stream_line(res_swhid) count += 1 else: break class RandomWalkView(WalkView): def get_walk_iterator(self): return self.backend.random_walk( self.direction, self.edges, RANDOM_RETRIES, self.src_node, self.dst_thing ) class VisitEdgesView(SimpleTraversalView): async def stream_response(self): it = self.backend.visit_edges(self.direction, self.edges, self.src_node) async for (res_src, res_dst) in it: - res_src_pid = self.pid_of_node(res_src) - res_dst_pid = self.pid_of_node(res_dst) - await self.stream_line("{} {}".format(res_src_pid, res_dst_pid)) + res_src_swhid = self.swhid_of_node(res_src) + res_dst_swhid = self.swhid_of_node(res_dst) + await self.stream_line("{} {}".format(res_src_swhid, res_dst_swhid)) class VisitPathsView(SimpleTraversalView): content_type = "application/x-ndjson" async def stream_response(self): it = self.backend.visit_paths(self.direction, self.edges, self.src_node) async for res_path in it: - res_path_pid = [self.pid_of_node(n) for n in res_path] - line = json.dumps(res_path_pid) + res_path_swhid = [self.swhid_of_node(n) for n in res_path] + line = json.dumps(res_path_swhid) await self.stream_line(line) class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): src = self.request.match_info["src"] - self.src_node = self.node_of_pid(src) + self.src_node = self.node_of_swhid(src) self.edges = self.get_edges() self.direction = self.get_direction() loop = asyncio.get_event_loop() cnt = await loop.run_in_executor( None, self.backend.count, self.count_type, self.direction, self.edges, self.src_node, ) return aiohttp.web.Response(body=str(cnt), content_type="application/json") class CountNeighborsView(CountView): count_type = "neighbors" class CountLeavesView(CountView): count_type = "leaves" class CountVisitNodesView(CountView): count_type = "visit_nodes" def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), aiohttp.web.view("/graph/visit/paths/{src}", VisitPathsView), # temporarily disabled in wait of a proper fix for T1969 # aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) app["backend"] = backend return app diff --git a/swh/graph/pid.py b/swh/graph/swhid.py similarity index 65% rename from swh/graph/pid.py rename to swh/graph/swhid.py index f0e4a67..8d99307 100644 --- a/swh/graph/pid.py +++ b/swh/graph/swhid.py @@ -1,404 +1,402 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections.abc import MutableMapping from enum import Enum import mmap from mmap import MAP_SHARED, PROT_READ, PROT_WRITE import os import struct from typing import BinaryIO, Iterator, Tuple from swh.model.identifiers import SWHID, parse_swhid -PID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes +SWHID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes INT_BIN_FMT = ">q" # big endian, 8-byte integer -PID_BIN_SIZE = 22 # in bytes +SWHID_BIN_SIZE = 22 # in bytes INT_BIN_SIZE = 8 # in bytes -class PidType(Enum): - """types of existing PIDs, used to serialize PID type as a (char) integer +class SwhidType(Enum): + """types of existing SWHIDs, used to serialize SWHID type as a (char) integer Note that the order does matter also for driving the binary search in - PID-indexed maps. Integer values also matter, for compatibility with the + SWHID-indexed maps. Integer values also matter, for compatibility with the Java layer. """ content = 0 directory = 1 origin = 2 release = 3 revision = 4 snapshot = 5 -def str_to_bytes(pid_str: str) -> bytes: - """Convert a PID to a byte sequence +def str_to_bytes(swhid_str: str) -> bytes: + """Convert a SWHID to a byte sequence - The binary format used to represent PIDs as 22-byte long byte sequences as + The binary format used to represent SWHIDs as 22-byte long byte sequences as follows: - 1 byte for the namespace version represented as a C `unsigned char` - - 1 byte for the object type, as the int value of :class:`PidType` enums, + - 1 byte for the object type, as the int value of :class:`SwhidType` enums, represented as a C `unsigned char` - 20 bytes for the SHA1 digest as a byte sequence Args: - pid: persistent identifier + swhid: persistent identifier Returns: - bytes: byte sequence representation of pid + bytes: byte sequence representation of swhid """ - pid = parse_swhid(pid_str) + swhid = parse_swhid(swhid_str) return struct.pack( - PID_BIN_FMT, - pid.scheme_version, - PidType[pid.object_type].value, - bytes.fromhex(pid.object_id), + SWHID_BIN_FMT, + swhid.scheme_version, + SwhidType[swhid.object_type].value, + bytes.fromhex(swhid.object_id), ) def bytes_to_str(bytes: bytes) -> str: """Inverse function of :func:`str_to_bytes` - See :func:`str_to_bytes` for a description of the binary PID format. + See :func:`str_to_bytes` for a description of the binary SWHID format. Args: - bytes: byte sequence representation of pid + bytes: byte sequence representation of swhid Returns: - pid: persistent identifier + swhid: persistent identifier """ - (version, type, bin_digest) = struct.unpack(PID_BIN_FMT, bytes) - pid = SWHID(object_type=PidType(type).name, object_id=bin_digest) - return str(pid) + (version, type, bin_digest) = struct.unpack(SWHID_BIN_FMT, bytes) + swhid = SWHID(object_type=SwhidType(type).name, object_id=bin_digest) + return str(swhid) class _OnDiskMap: - """mmap-ed on-disk sequence of fixed size records - - """ + """mmap-ed on-disk sequence of fixed size records""" def __init__( self, record_size: int, fname: str, mode: str = "rb", length: int = None ): """open an existing on-disk map Args: record_size: size of each record in bytes fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize writable maps at creation time. Must be given when mode is 'wb' and the map doesn't exist on disk; ignored otherwise """ os_modes = {"rb": os.O_RDONLY, "wb": os.O_RDWR | os.O_CREAT, "rb+": os.O_RDWR} if mode not in os_modes: raise ValueError("invalid file open mode: " + mode) new_map = mode == "wb" writable_map = mode in ["wb", "rb+"] self.record_size = record_size self.fd = os.open(fname, os_modes[mode]) if new_map: if length is None: raise ValueError("missing length when creating new map") os.truncate(self.fd, length * self.record_size) self.size = os.path.getsize(fname) (self.length, remainder) = divmod(self.size, record_size) if remainder: raise ValueError( "map size {} is not a multiple of the record size {}".format( self.size, record_size ) ) self.mm = mmap.mmap( self.fd, self.size, prot=(PROT_READ | PROT_WRITE if writable_map else PROT_READ), flags=MAP_SHARED, ) def close(self) -> None: """close the map shuts down both the mmap and the underlying file descriptor """ if not self.mm.closed: self.mm.close() os.close(self.fd) def __len__(self) -> int: return self.length def __delitem__(self, pos: int) -> None: raise NotImplementedError("cannot delete records from fixed-size map") -class PidToNodeMap(_OnDiskMap, MutableMapping): +class SwhidToNodeMap(_OnDiskMap, MutableMapping): """memory mapped map from :ref:`SWHIDs ` to a continuous range 0..N of (8-byte long) integers - This is the converse mapping of :class:`NodeToPidMap`. + This is the converse mapping of :class:`NodeToSwhidMap`. The on-disk serialization format is a sequence of fixed length (30 bytes) records with the following fields: - - PID (22 bytes): binary PID representation as per :func:`str_to_bytes` + - SWHID (22 bytes): binary SWHID representation as per :func:`str_to_bytes` - long (8 bytes): big endian long integer - The records are sorted lexicographically by PID type and checksum, where - type is the integer value of :class:`PidType`. PID lookup in the map is + The records are sorted lexicographically by SWHID type and checksum, where + type is the integer value of :class:`SwhidType`. SWHID lookup in the map is performed via binary search. Hence a huge map with, say, 11 B entries, will require ~30 disk seeks. Note that, due to fixed size + ordering, it is not possible to create these maps by random writing. Hence, __setitem__ can be used only to *update* the value associated to an existing key, rather than to add a missing item. To create an entire map from scratch, you should do so *sequentially*, using static method :meth:`write_record` (or, at your own risk, by hand via the mmap :attr:`mm`). """ - # record binary format: PID + a big endian 8-byte big endian integer - RECORD_BIN_FMT = ">" + PID_BIN_FMT + "q" - RECORD_SIZE = PID_BIN_SIZE + INT_BIN_SIZE + # record binary format: SWHID + a big endian 8-byte big endian integer + RECORD_BIN_FMT = ">" + SWHID_BIN_FMT + "q" + RECORD_SIZE = SWHID_BIN_SIZE + INT_BIN_SIZE def __init__(self, fname: str, mode: str = "rb", length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]: """seek and return the (binary) record at a given (logical) position see :func:`_get_record` for an equivalent function with additional deserialization Args: pos: 0-based record number Returns: - a pair `(pid, int)`, where pid and int are bytes + a pair `(swhid, int)`, where swhid and int are bytes """ rec_pos = pos * self.RECORD_SIZE - int_pos = rec_pos + PID_BIN_SIZE + int_pos = rec_pos + SWHID_BIN_SIZE return (self.mm[rec_pos:int_pos], self.mm[int_pos : int_pos + INT_BIN_SIZE]) def _get_record(self, pos: int) -> Tuple[str, int]: """seek and return the record at a given (logical) position moral equivalent of :func:`_get_bin_record`, with additional deserialization to non-bytes types Args: pos: 0-based record number Returns: - a pair `(pid, int)`, where pid is a string-based PID and int the + a pair `(swhid, int)`, where swhid is a string-based SWHID and int the corresponding integer identifier """ - (pid_bytes, int_bytes) = self._get_bin_record(pos) - return (bytes_to_str(pid_bytes), struct.unpack(INT_BIN_FMT, int_bytes)[0]) + (swhid_bytes, int_bytes) = self._get_bin_record(pos) + return (bytes_to_str(swhid_bytes), struct.unpack(INT_BIN_FMT, int_bytes)[0]) @classmethod - def write_record(cls, f: BinaryIO, pid: str, int: int) -> None: + def write_record(cls, f: BinaryIO, swhid: str, int: int) -> None: """write a logical record to a file-like object Args: f: file-like object to write the record to - pid: textual PID - int: PID integer identifier + swhid: textual SWHID + int: SWHID integer identifier """ - f.write(str_to_bytes(pid)) + f.write(str_to_bytes(swhid)) f.write(struct.pack(INT_BIN_FMT, int)) - def _bisect_pos(self, pid_str: str) -> int: + def _bisect_pos(self, swhid_str: str) -> int: """bisect the position of the given identifier. If the identifier is - not found, the position of the pid immediately after is returned. + not found, the position of the swhid immediately after is returned. Args: - pid_str: the pid as a string + swhid_str: the swhid as a string Returns: the logical record of the bisected position in the map """ - if not isinstance(pid_str, str): - raise TypeError("PID must be a str, not {}".format(type(pid_str))) + if not isinstance(swhid_str, str): + raise TypeError("SWHID must be a str, not {}".format(type(swhid_str))) try: - target = str_to_bytes(pid_str) # desired PID as bytes + target = str_to_bytes(swhid_str) # desired SWHID as bytes except ValueError: - raise ValueError('invalid PID: "{}"'.format(pid_str)) + raise ValueError('invalid SWHID: "{}"'.format(swhid_str)) lo = 0 hi = self.length - 1 while lo < hi: mid = (lo + hi) // 2 - (pid, _value) = self._get_bin_record(mid) - if pid < target: + (swhid, _value) = self._get_bin_record(mid) + if swhid < target: lo = mid + 1 else: hi = mid return lo - def _find(self, pid_str: str) -> Tuple[int, int]: - """lookup the integer identifier of a pid and its position + def _find(self, swhid_str: str) -> Tuple[int, int]: + """lookup the integer identifier of a swhid and its position Args: - pid_str: the pid as a string + swhid_str: the swhid as a string Returns: - a pair `(pid, pos)` with pid integer identifier and its logical + a pair `(swhid, pos)` with swhid integer identifier and its logical record position in the map """ - pos = self._bisect_pos(pid_str) - pid_found, value = self._get_record(pos) - if pid_found == pid_str: + pos = self._bisect_pos(swhid_str) + swhid_found, value = self._get_record(pos) + if swhid_found == swhid_str: return (value, pos) - raise KeyError(pid_str) + raise KeyError(swhid_str) - def __getitem__(self, pid_str: str) -> int: - """lookup the integer identifier of a PID + def __getitem__(self, swhid_str: str) -> int: + """lookup the integer identifier of a SWHID Args: - pid: the PID as a string + swhid: the SWHID as a string Returns: - the integer identifier of pid + the integer identifier of swhid """ - return self._find(pid_str)[0] # return element, ignore position + return self._find(swhid_str)[0] # return element, ignore position - def __setitem__(self, pid_str: str, int: str) -> None: - (_pid, pos) = self._find(pid_str) # might raise KeyError and that's OK + def __setitem__(self, swhid_str: str, int: str) -> None: + (_swhid, pos) = self._find(swhid_str) # might raise KeyError and that's OK rec_pos = pos * self.RECORD_SIZE - int_pos = rec_pos + PID_BIN_SIZE - self.mm[rec_pos:int_pos] = str_to_bytes(pid_str) + int_pos = rec_pos + SWHID_BIN_SIZE + self.mm[rec_pos:int_pos] = str_to_bytes(swhid_str) self.mm[int_pos : int_pos + INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int) def __iter__(self) -> Iterator[Tuple[str, int]]: for pos in range(self.length): yield self._get_record(pos) def iter_prefix(self, prefix: str): swh, n, t, sha = prefix.split(":") sha = sha.ljust(40, "0") - start_pid = ":".join([swh, n, t, sha]) - start = self._bisect_pos(start_pid) + start_swhid = ":".join([swh, n, t, sha]) + start = self._bisect_pos(start_swhid) for pos in range(start, self.length): - pid, value = self._get_record(pos) - if not pid.startswith(prefix): + swhid, value = self._get_record(pos) + if not swhid.startswith(prefix): break - yield pid, value + yield swhid, value - def iter_type(self, pid_type: str) -> Iterator[Tuple[str, int]]: - prefix = "swh:1:{}:".format(pid_type) + def iter_type(self, swhid_type: str) -> Iterator[Tuple[str, int]]: + prefix = "swh:1:{}:".format(swhid_type) yield from self.iter_prefix(prefix) -class NodeToPidMap(_OnDiskMap, MutableMapping): +class NodeToSwhidMap(_OnDiskMap, MutableMapping): """memory mapped map from a continuous range of 0..N (8-byte long) integers to :ref:`SWHIDs ` - This is the converse mapping of :class:`PidToNodeMap`. + This is the converse mapping of :class:`SwhidToNodeMap`. The on-disk serialization format is a sequence of fixed length records (22 - bytes), each being the binary representation of a PID as per + bytes), each being the binary representation of a SWHID as per :func:`str_to_bytes`. The records are sorted by long integer, so that integer lookup is possible via fixed-offset seek. """ - RECORD_BIN_FMT = PID_BIN_FMT - RECORD_SIZE = PID_BIN_SIZE + RECORD_BIN_FMT = SWHID_BIN_FMT + RECORD_SIZE = SWHID_BIN_SIZE def __init__(self, fname: str, mode: str = "rb", length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') size: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise length: passed to :class:`_OnDiskMap` """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> bytes: - """seek and return the (binary) PID at a given (logical) position + """seek and return the (binary) SWHID at a given (logical) position Args: pos: 0-based record number Returns: - PID as a byte sequence + SWHID as a byte sequence """ rec_pos = pos * self.RECORD_SIZE return self.mm[rec_pos : rec_pos + self.RECORD_SIZE] @classmethod - def write_record(cls, f: BinaryIO, pid: str) -> None: - """write a PID to a file-like object + def write_record(cls, f: BinaryIO, swhid: str) -> None: + """write a SWHID to a file-like object Args: f: file-like object to write the record to - pid: textual PID + swhid: textual SWHID """ - f.write(str_to_bytes(pid)) + f.write(str_to_bytes(swhid)) def __getitem__(self, pos: int) -> str: orig_pos = pos if pos < 0: pos = len(self) + pos if not (0 <= pos < len(self)): raise IndexError(orig_pos) return bytes_to_str(self._get_bin_record(pos)) - def __setitem__(self, pos: int, pid: str) -> None: + def __setitem__(self, pos: int, swhid: str) -> None: rec_pos = pos * self.RECORD_SIZE - self.mm[rec_pos : rec_pos + self.RECORD_SIZE] = str_to_bytes(pid) + self.mm[rec_pos : rec_pos + self.RECORD_SIZE] = str_to_bytes(swhid) def __iter__(self) -> Iterator[Tuple[int, str]]: for pos in range(self.length): yield (pos, self[pos]) diff --git a/swh/graph/tests/dataset/output/example.node2pid.bin b/swh/graph/tests/dataset/output/example.node2swhid.bin similarity index 100% rename from swh/graph/tests/dataset/output/example.node2pid.bin rename to swh/graph/tests/dataset/output/example.node2swhid.bin diff --git a/swh/graph/tests/dataset/output/example.pid2node.bin b/swh/graph/tests/dataset/output/example.swhid2node.bin similarity index 100% rename from swh/graph/tests/dataset/output/example.pid2node.bin rename to swh/graph/tests/dataset/output/example.swhid2node.bin diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py index 4bed140..c4b651f 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_api_client.py @@ -1,303 +1,303 @@ import pytest from pytest import raises from swh.core.api import RemoteException def test_stats(graph_client): stats = graph_client.stats() assert set(stats.keys()) == {"counts", "ratios", "indegree", "outdegree"} assert set(stats["counts"].keys()) == {"nodes", "edges"} assert set(stats["ratios"].keys()) == { "compression", "bits_per_node", "bits_per_edge", "avg_locality", } assert set(stats["indegree"].keys()) == {"min", "max", "avg"} assert set(stats["outdegree"].keys()) == {"min", "max", "avg"} assert stats["counts"]["nodes"] == 21 assert stats["counts"]["edges"] == 23 assert isinstance(stats["ratios"]["compression"], float) assert isinstance(stats["ratios"]["bits_per_node"], float) assert isinstance(stats["ratios"]["bits_per_edge"], float) assert isinstance(stats["ratios"]["avg_locality"], float) assert stats["indegree"]["min"] == 0 assert stats["indegree"]["max"] == 3 assert isinstance(stats["indegree"]["avg"], float) assert stats["outdegree"]["min"] == 0 assert stats["outdegree"]["max"] == 3 assert isinstance(stats["outdegree"]["avg"], float) def test_leaves(graph_client): actual = list( graph_client.leaves("swh:1:ori:0000000000000000000000000000000000000021") ) expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_neighbors(graph_client): actual = list( graph_client.neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) ) expected = [ "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_edges(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_edges_diamond_pattern(graph_client): actual = list( graph_client.visit_edges( "swh:1:rev:0000000000000000000000000000000000000009", edges="*", ) ) expected = [ ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000005", ), ] assert set(actual) == set(expected) def test_visit_paths(graph_client): actual = list( graph_client.visit_paths( "swh:1:snp:0000000000000000000000000000000000000020", edges="snp:*,rev:*" ) ) actual = [tuple(path) for path in actual] expected = [ ( "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", ), ] assert set(actual) == set(expected) @pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { "edges": "dir:dir,dir:rev,rev:*", "direction": "backward", "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) def test_random_walk(graph_client): """as the walk is random, we test a visit from a cnt node to the only origin in the dataset, and only check the final node of the path (i.e., the origin) """ args = ("swh:1:cnt:0000000000000000000000000000000000000001", "ori") kwargs = {"direction": "backward"} expected_root = "swh:1:ori:0000000000000000000000000000000000000021" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_count(graph_client): actual = graph_client.count_leaves( "swh:1:ori:0000000000000000000000000000000000000021" ) assert actual == 4 actual = graph_client.count_visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 def test_param_validation(graph_client): - with raises(RemoteException) as exc_info: # PID not found + with raises(RemoteException) as exc_info: # SWHID not found list(graph_client.leaves("swh:1:ori:fff0000000000000000000000000000000000021")) assert exc_info.value.response.status_code == 404 - with raises(RemoteException) as exc_info: # malformed PID + with raises(RemoteException) as exc_info: # malformed SWHID list( graph_client.neighbors("swh:1:ori:fff000000zzzzzz0000000000000000000000021") ) assert exc_info.value.response.status_code == 400 with raises(RemoteException) as exc_info: # malformed edge specificaiton list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:notanodetype,dir:rev,rev:*", direction="backward", ) ) assert exc_info.value.response.status_code == 400 with raises(RemoteException) as exc_info: # malformed direction list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:dir,dir:rev,rev:*", direction="notadirection", ) ) assert exc_info.value.response.status_code == 400 @pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order list( graph_client.walk( "swh:1:dir:0000000000000000000000000000000000000016", "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="notatraversalorder", ) ) assert exc_info.value.response.status_code == 400 diff --git a/swh/graph/tests/test_graph.py b/swh/graph/tests/test_graph.py index e54c576..c752580 100644 --- a/swh/graph/tests/test_graph.py +++ b/swh/graph/tests/test_graph.py @@ -1,166 +1,166 @@ import pytest def test_graph(graph): assert len(graph) == 21 obj = "swh:1:dir:0000000000000000000000000000000000000008" node = graph[obj] assert str(node) == obj assert len(node.children()) == 3 assert len(node.parents()) == 2 - actual = {p.pid for p in node.children()} + actual = {p.swhid for p in node.children()} expected = { "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000007", } assert expected == actual - actual = {p.pid for p in node.parents()} + actual = {p.swhid for p in node.parents()} expected = { "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000012", } assert expected == actual -def test_invalid_pid(graph): +def test_invalid_swhid(graph): with pytest.raises(IndexError): graph[1337] with pytest.raises(IndexError): graph[len(graph) + 1] with pytest.raises(KeyError): graph["swh:1:dir:0000000000000000000000000000000420000012"] def test_leaves(graph): actual = list(graph["swh:1:ori:0000000000000000000000000000000000000021"].leaves()) - actual = [p.pid for p in actual] + actual = [p.swhid for p in actual] expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_visit_nodes(graph): actual = list( graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_nodes( edges="rel:rev,rev:rev" ) ) - actual = [p.pid for p in actual] + actual = [p.swhid for p in actual] expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_edges(graph): actual = list( graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_edges( edges="rel:rev,rev:rev,rev:dir" ) ) - actual = [(src.pid, dst.pid) for src, dst in actual] + actual = [(src.swhid, dst.swhid) for src, dst in actual] expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_paths(graph): actual = list( graph["swh:1:snp:0000000000000000000000000000000000000020"].visit_paths( edges="snp:*,rev:*" ) ) - actual = [tuple(n.pid for n in path) for path in actual] + actual = [tuple(n.swhid for n in path) for path in actual] expected = [ ( "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", ), ] assert set(actual) == set(expected) def test_walk(graph): actual = list( graph["swh:1:dir:0000000000000000000000000000000000000016"].walk( "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="bfs" ) ) - actual = [p.pid for p in actual] + actual = [p.swhid for p in actual] expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) def test_count(graph): assert ( graph["swh:1:ori:0000000000000000000000000000000000000021"].count_leaves() == 4 ) assert ( graph["swh:1:rel:0000000000000000000000000000000000000010"].count_visit_nodes( edges="rel:rev,rev:rev" ) == 3 ) assert ( graph["swh:1:rev:0000000000000000000000000000000000000009"].count_neighbors( direction="backward" ) == 3 ) def test_iter_type(graph): rev_list = list(graph.iter_type("rev")) - actual = [n.pid for n in rev_list] + actual = [n.swhid for n in rev_list] expected = [ "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000013", "swh:1:rev:0000000000000000000000000000000000000018", ] assert expected == actual diff --git a/swh/graph/tests/test_pid.py b/swh/graph/tests/test_swhid.py similarity index 63% rename from swh/graph/tests/test_pid.py rename to swh/graph/tests/test_swhid.py index f937729..e6fe5e1 100644 --- a/swh/graph/tests/test_pid.py +++ b/swh/graph/tests/test_swhid.py @@ -1,200 +1,196 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from itertools import islice import os import shutil import tempfile import unittest -from swh.graph.pid import NodeToPidMap, PidToNodeMap, bytes_to_str, str_to_bytes -from swh.model.identifiers import PID_TYPES +from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap, bytes_to_str, str_to_bytes +from swh.model.identifiers import SWHID_TYPES -class TestPidSerialization(unittest.TestCase): +class TestSwhidSerialization(unittest.TestCase): pairs = [ ( "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", bytes.fromhex("01" + "00" + "94a9ed024d3859793618152ea559a168bbcbb5e2"), ), ( "swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505", bytes.fromhex("01" + "01" + "d198bc9d7a6bcf6db04f476d29314f157507d505"), ), ( "swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f", bytes.fromhex("01" + "02" + "b63a575fe3faab7692c9f38fb09d4bb45651bb0f"), ), ( "swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f", bytes.fromhex("01" + "03" + "22ece559cc7cc2364edc5e5593d63ae8bd229f9f"), ), ( "swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d", bytes.fromhex("01" + "04" + "309cf2674ee7a0749978cf8265ab91a60aea0f7d"), ), ( "swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453", bytes.fromhex("01" + "05" + "c7c108084bc0bf3d81436bf980b46e98bd338453"), ), ] def test_str_to_bytes(self): - for (pid_str, pid_bytes) in self.pairs: - self.assertEqual(str_to_bytes(pid_str), pid_bytes) + for (swhid_str, swhid_bytes) in self.pairs: + self.assertEqual(str_to_bytes(swhid_str), swhid_bytes) def test_bytes_to_str(self): - for (pid_str, pid_bytes) in self.pairs: - self.assertEqual(bytes_to_str(pid_bytes), pid_str) + for (swhid_str, swhid_bytes) in self.pairs: + self.assertEqual(bytes_to_str(swhid_bytes), swhid_str) def test_round_trip(self): - for (pid_str, pid_bytes) in self.pairs: - self.assertEqual(pid_str, bytes_to_str(str_to_bytes(pid_str))) - self.assertEqual(pid_bytes, str_to_bytes(bytes_to_str(pid_bytes))) + for (swhid_str, swhid_bytes) in self.pairs: + self.assertEqual(swhid_str, bytes_to_str(str_to_bytes(swhid_str))) + self.assertEqual(swhid_bytes, str_to_bytes(bytes_to_str(swhid_bytes))) def gen_records(types=["cnt", "dir", "ori", "rel", "rev", "snp"], length=10000): - """generate sequential PID/int records, suitable for filling int<->pid maps for + """generate sequential SWHID/int records, suitable for filling int<->swhid maps for testing swh-graph on-disk binary databases Args: - types (list): list of PID types to be generated, specified as the - corresponding 3-letter component in PIDs - length (int): number of PIDs to generate *per type* + types (list): list of SWHID types to be generated, specified as the + corresponding 3-letter component in SWHIDs + length (int): number of SWHIDs to generate *per type* Yields: - pairs (pid, int) where pid is a textual PID and int its sequential + pairs (swhid, int) where swhid is a textual SWHID and int its sequential integer identifier """ pos = 0 for t in sorted(types): for i in range(0, length): seq = format(pos, "x") # current position as hex string - pid = "swh:1:{}:{}{}".format(t, "0" * (40 - len(seq)), seq) - yield (pid, pos) + swhid = "swh:1:{}:{}{}".format(t, "0" * (40 - len(seq)), seq) + yield (swhid, pos) pos += 1 -# pairs PID/position in the sequence generated by :func:`gen_records` above +# pairs SWHID/position in the sequence generated by :func:`gen_records` above MAP_PAIRS = [ ("swh:1:cnt:0000000000000000000000000000000000000000", 0), ("swh:1:cnt:000000000000000000000000000000000000002a", 42), ("swh:1:dir:0000000000000000000000000000000000002afc", 11004), ("swh:1:ori:00000000000000000000000000000000000056ce", 22222), ("swh:1:rel:0000000000000000000000000000000000008235", 33333), ("swh:1:rev:000000000000000000000000000000000000ad9c", 44444), ("swh:1:snp:000000000000000000000000000000000000ea5f", 59999), ] -class TestPidToNodeMap(unittest.TestCase): +class TestSwhidToNodeMap(unittest.TestCase): @classmethod def setUpClass(cls): - """create reasonably sized (~2 MB) PID->int map to test on-disk DB - - """ + """create reasonably sized (~2 MB) SWHID->int map to test on-disk DB""" cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.") - cls.fname = os.path.join(cls.tmpdir, "pid2int.bin") + cls.fname = os.path.join(cls.tmpdir, "swhid2int.bin") with open(cls.fname, "wb") as f: - for (pid, i) in gen_records(length=10000): - PidToNodeMap.write_record(f, pid, i) + for (swhid, i) in gen_records(length=10000): + SwhidToNodeMap.write_record(f, swhid, i) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): - self.map = PidToNodeMap(self.fname) + self.map = SwhidToNodeMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): - for (pid, pos) in MAP_PAIRS: - self.assertEqual(self.map[pid], pos) + for (swhid, pos) in MAP_PAIRS: + self.assertEqual(self.map[swhid], pos) def test_missing(self): with self.assertRaises(KeyError): self.map["swh:1:ori:0101010100000000000000000000000000000000"], with self.assertRaises(KeyError): self.map["swh:1:cnt:0101010100000000000000000000000000000000"], def test_type_error(self): with self.assertRaises(TypeError): self.map[42] with self.assertRaises(TypeError): self.map[1.2] def test_update(self): fname2 = self.fname + ".update" shutil.copy(self.fname, fname2) # fresh map copy - map2 = PidToNodeMap(fname2, mode="rb+") - for (pid, int) in islice(map2, 11): # update the first N items + map2 = SwhidToNodeMap(fname2, mode="rb+") + for (swhid, int) in islice(map2, 11): # update the first N items new_int = int + 42 - map2[pid] = new_int - self.assertEqual(map2[pid], new_int) # check updated value + map2[swhid] = new_int + self.assertEqual(map2[swhid], new_int) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this def test_iter_type(self): - for t in PID_TYPES: + for t in SWHID_TYPES: first_20 = list(islice(self.map.iter_type(t), 20)) k = first_20[0][1] expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected def test_iter_prefix(self): - for t in PID_TYPES: + for t in SWHID_TYPES: prefix = self.map.iter_prefix("swh:1:{}:00".format(t)) first_20 = list(islice(prefix, 20)) k = first_20[0][1] expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected -class TestNodeToPidMap(unittest.TestCase): +class TestNodeToSwhidMap(unittest.TestCase): @classmethod def setUpClass(cls): - """create reasonably sized (~1 MB) int->PID map to test on-disk DB - - """ + """create reasonably sized (~1 MB) int->SWHID map to test on-disk DB""" cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.") - cls.fname = os.path.join(cls.tmpdir, "int2pid.bin") + cls.fname = os.path.join(cls.tmpdir, "int2swhid.bin") with open(cls.fname, "wb") as f: - for (pid, _i) in gen_records(length=10000): - NodeToPidMap.write_record(f, pid) + for (swhid, _i) in gen_records(length=10000): + NodeToSwhidMap.write_record(f, swhid) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): - self.map = NodeToPidMap(self.fname) + self.map = NodeToSwhidMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): - for (pid, pos) in MAP_PAIRS: - self.assertEqual(self.map[pos], pid) + for (swhid, pos) in MAP_PAIRS: + self.assertEqual(self.map[pos], swhid) def test_out_of_bounds(self): with self.assertRaises(IndexError): self.map[1000000] with self.assertRaises(IndexError): self.map[-1000000] def test_update(self): fname2 = self.fname + ".update" shutil.copy(self.fname, fname2) # fresh map copy - map2 = NodeToPidMap(fname2, mode="rb+") - for (int, pid) in islice(map2, 11): # update the first N items - new_pid = pid.replace(":0", ":f") # mangle first hex digit - map2[int] = new_pid - self.assertEqual(map2[int], new_pid) # check updated value + map2 = NodeToSwhidMap(fname2, mode="rb+") + for (int, swhid) in islice(map2, 11): # update the first N items + new_swhid = swhid.replace(":0", ":f") # mangle first hex digit + map2[int] = new_swhid + self.assertEqual(map2[int], new_swhid) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this