diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 380c658..4b0d8a9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,46 +1,41 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: flake8 - id: check-json - id: check-yaml - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell + args: ["-L te,wth,alledges"] - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] +- repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black + # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] -# black requires py3.6+ -#- repo: https://github.com/python/black -# rev: 19.3b0 -# hooks: -# - id: black -# language_version: python3 -#- repo: https://github.com/asottile/blacken-docs -# rev: v1.0.0-1 -# hooks: -# - id: blacken-docs -# additional_dependencies: [black==19.3b0] diff --git a/reports/node_mapping/node_mapping.tex b/reports/node_mapping/node_mapping.tex index cd7e81c..fcfd1ea 100644 --- a/reports/node_mapping/node_mapping.tex +++ b/reports/node_mapping/node_mapping.tex @@ -1,203 +1,203 @@ \documentclass[11pt,a4paper]{article} \usepackage[a4paper,left=2cm,right=2cm,top=2.5cm,bottom=2.5cm]{geometry} \usepackage[english]{babel} \usepackage{booktabs} \usepackage{graphicx} \usepackage{hyperref} \usepackage{minted} \usepackage{parskip} \usepackage{siunitx} \usepackage{tikz} \title{Google Summer of Code 2019} \author{Thibault Allançon} \date{June 6, 2019} \newcommand{\mmap}{\mintinline{text}{mmap}} \begin{document} \maketitle Software Heritage refers to its graph nodes using string persistent identifiers\footnote{\url{https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html}}. During the compression, WebGraph maps those labels to long identifiers. We need an efficient way to have a bidirectional map between the labels and internal node ids. -\section{Additional informations} +\section{Additional information} Example of a SWH string identifier: \begin{minted}{text} swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2 \end{minted} The first step of the compression process is to map the input strings to long identifiers using the \mintinline{text}{GOVMinimalPerfectHashFunction} from the Sux4J\footnote{\url{http://sux4j.di.unimi.it/}} framework. The \mintinline{text}{.mph} function generated is a \textbf{minimal perfect hash function} (mapping with no collisions $n$ keys to $n$ consecutive integers). Later during the compression process, the nodes are re-ordered in a certain way (to be precise: in the order of a BFS traversal), so the MPH mapping is shuffled. \begin{figure}[H] \centering \begin{tikzpicture}[auto, node distance=6cm] \node[label=below:{(string)}] (input) {\mintinline{text}{.nodes.csv}}; \node[right of=input, label=below:{(long)}] (mph) {\mintinline{text}{.mph}}; \node[right of=mph, label=below:{(long)}] (bfs) {\mintinline{text}{.order}}; \draw[->] (input) edge node{MPH function} (mph); \draw[->] (mph) edge node{BFS ordering} (bfs); \end{tikzpicture} \caption{End-to-end node mapping} \end{figure} We want to create a bidirectional map to retrieve efficiently the corresponding strings or longs. Using naive hash maps is not possible due to memory usage (we have around 10B nodes), hence we must use a disk-based solution. \newpage \section{Solution 1: Simple file} Idea: dump the two mappings into huge files and load them using \mmap{} system call to avoid storing them into memory. Map from long to string only needs to store the SWH id because the MPH is a perfect mapping, so we can store on line $i$ the SWH persistent identifier of node $i$, and simply read the $i$-th line to retrieve it. Map from string to long needs to store both the SWH id and the node id on the same line. To efficiently get the corresponding node id we can store the lines ordered by SWH id, and binary search to get the mapping. \subsection{Java I/O API} Java offers many ways to write to a file, three were considered and timed: \begin{small} \begin{minted}{java} // Version 1 try (Writer writer = new BufferedWriter(new OutputStreamWriter( new FileOutputStream("output.txt"), "utf-8"))) { writer.write("test"); } // Version 2 try (Writer writer = new BufferedWriter(new FileWriter("output.txt"))) { writer.write("test"); } // Version 3 try (FileWriter writer = new FileWriter("output.txt")) { writer.write("test"); } \end{minted} \end{small} String ids are sorted in alphabetical order. When reading them, the long to string map needs to write data in random places in the output file. One way to transform it into a sequential write is to put the data into an array, and dump the array sequentially into the file once the reading process is done. \subsection{\mmap{}} Java has a maximum size on \mmap{}-ed files of 2GB. To overcome the limitation and \mmap{} very large files, there is the \mintinline{text}{ByteBufferInputStream} class from the dsiutils\footnote{\url{http://dsiutils.di.unimi.it/}} package. But this only applies to read from files, not to write to them. I had to create the \mintinline{text}{ByteBufferOutputStream} class to allow writing on huge \mmap{}-ed files. \subsection{Timings} Experiments on single file were done on a VM with 1TB of RAM and 40 vCPUs using the \mintinline{text}{rev_to_rev} dataset (1B nodes and 1B edges). \begin{center} \begin{tabular}{@{} l *5r @{}} \toprule \multicolumn{1}{c}{} & \textbf{I/O v1} & \textbf{I/O v2} & \textbf{I/O v3} & \textbf{random \mmap{}} & \textbf{sequential \mmap{}} \\ \midrule \texttt{Dump time} & 4990s & \textbf{4890s} & 5350s & 9290s & 5240s \\ \bottomrule \end{tabular} \end{center} \section{Solution 2: Databases} Another approach would be to simply use a disk-based database, two were considered in my tests: RocksDB, and HaloDB. In both cases, I tried optimizing the database options to fit our needs and get the best timings. On the technical side, the code needed is simpler and more generic than the custom file solution. However, this can only be a feasible solution if map retrieval/building time overhead are small enough to scale to the entire graph. \subsection{RocksDB} RocksDB is a popular and mature persistent key-value database developed by Facebook. It is coded in C++ but provides a Java API. The code used to dump and load the mappings can be found in \mintinline{text}{NodeIdMapRocksDB.java} file. RocksDB has \textbf{many} parameters to tune (see their tuning guide\footnote{\url{https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide}}), I tried the most obvious ones but being completely new to this software, the benchmarks are not be the best possible. \subsection{HaloDB} HaloDB is a relatively new database scheme developed by Yahoo. It got my attention because it is written in Java and leaves out features of RocksDB we don't need (like range scans) in order to have better performance. The code used to dump and load the mappings can be found in \mintinline{text}{NodeIdMapHaloDB.java} file. \section{Results} Experiments were done on a VM with 700GB of RAM and 72 vCPUs using the \mintinline{text}{rev_to_rev} dataset (1B nodes and 1B edges). \begin{center} \begin{tabular}{@{} l *3r @{}} \toprule \multicolumn{1}{c}{} & \textbf{file solution} & \textbf{RocksDB} & \textbf{HaloDB} \\ \midrule \texttt{dump} & < 1h & estimated 4h & estimated 8h \\ \texttt{load} & 0.003s & & \\ \texttt{get node id} & ~900µs & & \\ \texttt{get swh id} & ~50µs & & \\ \texttt{RAM usage (dump)} & 130GB & & \\ \texttt{RAM usage (load)} & 5GB & & \\ \texttt{Disk space} & 108GB & & \\ \bottomrule \end{tabular} \end{center} Note: \texttt{RAM usage} includes in-memory compressed graph. \end{document} diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8d79b7e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[flake8] +# E203: whitespaces before ':' +# E231: missing whitespace after ',' +# W503: line break before binary operator +ignore = E203,E231,W503 +max-line-length = 88 diff --git a/setup.py b/setup.py index f752222..3cf8261 100755 --- a/setup.py +++ b/setup.py @@ -1,75 +1,75 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open from glob import glob here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.rst'), encoding='utf-8') as f: +with open(path.join(here, "README.rst"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: - reqf = 'requirements-%s.txt' % name + reqf = "requirements-%s.txt" % name else: - reqf = 'requirements.txt' + reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue requirements.append(line) return requirements -JAR_PATHS = list(glob('java/target/swh-graph-*.jar')) +JAR_PATHS = list(glob("java/target/swh-graph-*.jar")) setup( - name='swh.graph', - description='Software Heritage graph service', + name="swh.graph", + description="Software Heritage graph service", long_description=long_description, - long_description_content_type='text/x-rst', - author='Software Heritage developers', - author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DGRPH', + long_description_content_type="text/x-rst", + author="Software Heritage developers", + author_email="swh-devel@inria.fr", + url="https://forge.softwareheritage.org/diffusion/DGRPH", packages=find_packages(), - install_requires=parse_requirements() + parse_requirements('swh'), - tests_require=parse_requirements('test'), - setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + install_requires=parse_requirements() + parse_requirements("swh"), + tests_require=parse_requirements("test"), + setup_requires=["vcversioner"], + extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, - data_files=[('share/swh-graph', JAR_PATHS)], - entry_points=''' + data_files=[("share/swh-graph", JAR_PATHS)], + entry_points=""" [console_scripts] swh-graph=swh.graph.cli:main [swh.cli.subcommands] graph=swh.graph.cli:cli - ''', + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ - 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', - 'Funding': 'https://www.softwareheritage.org/donate', - 'Source': 'https://forge.softwareheritage.org/source/swh-graph', + "Bug Reports": "https://forge.softwareheritage.org/maniphest", + "Funding": "https://www.softwareheritage.org/donate", + "Source": "https://forge.softwareheritage.org/source/swh-graph", }, ) diff --git a/swh/graph/backend.py b/swh/graph/backend.py index 954eb37..370dd33 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,185 +1,183 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway from swh.graph.config import check_config from swh.graph.pid import NodeToPidMap, PidToNodeMap from swh.model.identifiers import PID_TYPES -BUF_SIZE = 64*1024 -BIN_FMT = '>q' # 64 bit integer, big endian +BUF_SIZE = 64 * 1024 +BIN_FMT = ">q" # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 -NODE2PID_EXT = 'node2pid.bin' -PID2NODE_EXT = 'pid2node.bin' +NODE2PID_EXT = "node2pid.bin" +PID2NODE_EXT = "pid2node.bin" def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path, config=None): self.gateway = None self.entry = None self.graph_path = graph_path self.config = check_config(config or {}) def __enter__(self): self.gateway = JavaGateway.launch_gateway( java_path=None, - javaopts=self.config['java_tool_options'].split(), - classpath=self.config['classpath'], + javaopts=self.config["java_tool_options"].split(), + classpath=self.config["classpath"], die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) - self.node2pid = NodeToPidMap(self.graph_path + '.' + NODE2PID_EXT) - self.pid2node = PidToNodeMap(self.graph_path + '.' + PID2NODE_EXT) + self.node2pid = NodeToPidMap(self.graph_path + "." + NODE2PID_EXT) + self.pid2node = PidToNodeMap(self.graph_path + "." + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() def count(self, ttype, direction, edges_fmt, src): - method = getattr(self.entry, 'count_' + ttype) + method = getattr(self.entry, "count_" + ttype) return method(direction, edges_fmt, src) async def simple_traversal(self, ttype, direction, edges_fmt, src): - assert ttype in ('leaves', 'neighbors', 'visit_nodes') + assert ttype in ("leaves", "neighbors", "visit_nodes") method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): if dst in PID_TYPES: - it = self.stream_proxy.walk_type(direction, edges_fmt, algo, - src, dst) + it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: - it = self.stream_proxy.walk(direction, edges_fmt, algo, - src, dst) + it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id async def random_walk(self, direction, edges_fmt, retries, src, dst): if dst in PID_TYPES: - it = self.stream_proxy.random_walk_type(direction, edges_fmt, - retries, src, dst) + it = self.stream_proxy.random_walk_type( + direction, edges_fmt, retries, src, dst + ) else: - it = self.stream_proxy.random_walk(direction, edges_fmt, retries, - src, dst) + it = self.stream_proxy.random_walk(direction, edges_fmt, retries, src, dst) async for node_id in it: # TODO return 404 if path is empty yield node_id async def visit_paths(self, direction, edges_fmt, src): path = [] - async for node in self.stream_proxy.visit_paths( - direction, edges_fmt, src): + async for node in self.stream_proxy.visit_paths(direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() - open_thread = loop.run_in_executor(None, open, fname, 'rb') + open_thread = loop.run_in_executor(None, open, fname, "rb") # Since the open() call on the FIFO is blocking until it is also opened # on the Java side, we await it with a timeout in case there is an # exception that prevents the write-side open(). with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): - with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: - cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') + with tempfile.TemporaryDirectory(prefix="swh-graph-") as tmpdirname: + cli_fifo = os.path.join(tmpdirname, "swh-graph.fifo") os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: # If the read-side open() timeouts, an exception on the # Java side probably happened that prevented the # write-side open(). We propagate this exception here if # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task + return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py index 6375106..b568dab 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,291 +1,342 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import aiohttp import click import logging import sys from pathlib import Path from typing import Any, Dict, Tuple import swh.model.exceptions from swh.core import config from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.graph import client, webgraph from swh.graph.backend import NODE2PID_EXT, PID2NODE_EXT from swh.graph.pid import PidToNodeMap, NodeToPidMap from swh.graph.server.app import make_app from swh.graph.backend import Backend from swh.model.identifiers import parse_persistent_identifier class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string""" + def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx)) -DEFAULT_CONFIG = { - 'graph': ('dict', {}) -} # type: Dict[str, Tuple[str, Any]] +DEFAULT_CONFIG = {"graph": ("dict", {})} # type: Dict[str, Tuple[str, Any]] -@click.group(name='graph', context_settings=CONTEXT_SETTINGS, - cls=AliasedGroup) -@click.option('--config-file', '-C', default=None, - type=click.Path(exists=True, dir_okay=False,), - help='YAML configuration file') +@click.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="YAML configuration file", +) @click.pass_context def cli(ctx, config_file): """Software Heritage graph tools.""" ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) - if 'graph' not in conf: - raise ValueError('no "graph" stanza found in configuration file %s' - % config_file) - ctx.obj['config'] = conf + if "graph" not in conf: + raise ValueError( + 'no "graph" stanza found in configuration file %s' % config_file + ) + ctx.obj["config"] = conf -@cli.command('api-client') -@click.option('--host', default='localhost', help='Graph server host') -@click.option('--port', default='5009', help='Graph server port') +@cli.command("api-client") +@click.option("--host", default="localhost", help="Graph server host") +@click.option("--port", default="5009", help="Graph server port") @click.pass_context def api_client(ctx, host, port): """client for the graph REST service""" - url = 'http://{}:{}'.format(host, port) + url = "http://{}:{}".format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) -@cli.group('map') +@cli.group("map") @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass def dump_pid2node(filename): for (pid, int) in PidToNodeMap(filename): - print('{}\t{}'.format(pid, int)) + print("{}\t{}".format(pid, int)) def dump_node2pid(filename): for (int, pid) in NodeToPidMap(filename): - print('{}\t{}'.format(int, pid)) + print("{}\t{}".format(int, pid)) def restore_pid2node(filename): """read a textual PID->int map from stdin and write its binary version to filename """ - with open(filename, 'wb') as dst: + with open(filename, "wb") as dst: for line in sys.stdin: (str_pid, str_int) = line.split() PidToNodeMap.write_record(dst, str_pid, int(str_int)) def restore_node2pid(filename, length): """read a textual int->PID map from stdin and write its binary version to filename """ - node2pid = NodeToPidMap(filename, mode='wb', length=length) + node2pid = NodeToPidMap(filename, mode="wb", length=length) for line in sys.stdin: (str_int, str_pid) = line.split() node2pid[int(str_int)] = str_pid node2pid.close() -@map.command('dump') -@click.option('--type', '-t', 'map_type', required=True, - type=click.Choice(['pid2node', 'node2pid']), - help='type of map to dump') -@click.argument('filename', required=True, type=click.Path(exists=True)) +@map.command("dump") +@click.option( + "--type", + "-t", + "map_type", + required=True, + type=click.Choice(["pid2node", "node2pid"]), + help="type of map to dump", +) +@click.argument("filename", required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): """Dump a binary PID<->node map to textual format.""" - if map_type == 'pid2node': + if map_type == "pid2node": dump_pid2node(filename) - elif map_type == 'node2pid': + elif map_type == "node2pid": dump_node2pid(filename) else: - raise ValueError('invalid map type: ' + map_type) + raise ValueError("invalid map type: " + map_type) pass -@map.command('restore') -@click.option('--type', '-t', 'map_type', required=True, - type=click.Choice(['pid2node', 'node2pid']), - help='type of map to dump') -@click.option('--length', '-l', type=int, - help='''map size in number of logical records - (required for node2pid maps)''') -@click.argument('filename', required=True, type=click.Path()) +@map.command("restore") +@click.option( + "--type", + "-t", + "map_type", + required=True, + type=click.Choice(["pid2node", "node2pid"]), + help="type of map to dump", +) +@click.option( + "--length", + "-l", + type=int, + help="""map size in number of logical records + (required for node2pid maps)""", +) +@click.argument("filename", required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): """Restore a binary PID<->node map from textual format.""" - if map_type == 'pid2node': + if map_type == "pid2node": restore_pid2node(filename) - elif map_type == 'node2pid': + elif map_type == "node2pid": if length is None: raise click.UsageError( - 'map length is required when restoring {} maps'.format( - map_type), ctx) + "map length is required when restoring {} maps".format(map_type), ctx + ) restore_node2pid(filename, length) else: - raise ValueError('invalid map type: ' + map_type) - - -@map.command('write') -@click.option('--type', '-t', 'map_type', required=True, - type=click.Choice(['pid2node', 'node2pid']), - help='type of map to write') -@click.argument('filename', required=True, type=click.Path()) + raise ValueError("invalid map type: " + map_type) + + +@map.command("write") +@click.option( + "--type", + "-t", + "map_type", + required=True, + type=click.Choice(["pid2node", "node2pid"]), + help="type of map to write", +) +@click.argument("filename", required=True, type=click.Path()) @click.pass_context def write(ctx, map_type, filename): """Write a map to disk sequentially. read from stdin a textual PID->node mapping (for pid2node, or a simple sequence of PIDs for node2pid) and write it to disk in the requested binary map format note that no sorting is applied, so the input should already be sorted as required by the chosen map type (by PID for pid2node, by int for node2pid) """ - with open(filename, 'wb') as f: - if map_type == 'pid2node': + with open(filename, "wb") as f: + if map_type == "pid2node": for line in sys.stdin: (pid, int_str) = line.rstrip().split(maxsplit=1) PidToNodeMap.write_record(f, pid, int(int_str)) - elif map_type == 'node2pid': + elif map_type == "node2pid": for line in sys.stdin: pid = line.rstrip() NodeToPidMap.write_record(f, pid) else: - raise ValueError('invalid map type: ' + map_type) + raise ValueError("invalid map type: " + map_type) -@map.command('lookup') -@click.option('--graph', '-g', required=True, metavar='GRAPH', - help='compressed graph basename') -@click.argument('identifiers', nargs=-1) +@map.command("lookup") +@click.option( + "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" +) +@click.argument("identifiers", nargs=-1) def map_lookup(graph, identifiers): """Lookup identifiers using on-disk maps. Depending on the identifier type lookup either a PID into a PID->node (and return the node integer identifier) or, vice-versa, lookup a node integer identifier into a node->PID (and return the PID). The desired behavior is chosen depending on the syntax of each given identifier. Identifiers can be passed either directly on the command line or on standard input, separate by blanks. Logical lines (as returned by readline()) in stdin will be preserved in stdout. """ success = True # no identifiers failed to be looked up - pid2node = PidToNodeMap(f'{graph}.{PID2NODE_EXT}') - node2pid = NodeToPidMap(f'{graph}.{NODE2PID_EXT}') + pid2node = PidToNodeMap(f"{graph}.{PID2NODE_EXT}") + node2pid = NodeToPidMap(f"{graph}.{NODE2PID_EXT}") def lookup(identifier): nonlocal success, pid2node, node2pid is_pid = None try: int(identifier) is_pid = False except ValueError: try: parse_persistent_identifier(identifier) is_pid = True except swh.model.exceptions.ValidationError: success = False logging.error(f'invalid identifier: "{identifier}", skipping') try: if is_pid: return str(pid2node[identifier]) else: return node2pid[int(identifier)] except KeyError: success = False logging.error(f'identifier not found: "{identifier}", skipping') if identifiers: # lookup identifiers passed via CLI for identifier in identifiers: print(lookup(identifier)) else: # lookup identifiers passed via stdin, preserving logical lines for line in sys.stdin: results = [lookup(id) for id in line.rstrip().split()] if results: # might be empty if all IDs on the same line failed - print(' '.join(results)) + print(" ".join(results)) sys.exit(0 if success else 1) -@cli.command(name='rpc-serve') -@click.option('--host', '-h', default='0.0.0.0', - metavar='IP', show_default=True, - help='host IP address to bind the server on') -@click.option('--port', '-p', default=5009, type=click.INT, - metavar='PORT', show_default=True, - help='port to bind the server on') -@click.option('--graph', '-g', required=True, metavar='GRAPH', - help='compressed graph basename') +@cli.command(name="rpc-serve") +@click.option( + "--host", + "-h", + default="0.0.0.0", + metavar="IP", + show_default=True, + help="host IP address to bind the server on", +) +@click.option( + "--port", + "-p", + default=5009, + type=click.INT, + metavar="PORT", + show_default=True, + help="port to bind the server on", +) +@click.option( + "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" +) @click.pass_context def serve(ctx, host, port, graph): """run the graph REST service""" - backend = Backend(graph_path=graph, config=ctx.obj['config']) + backend = Backend(graph_path=graph, config=ctx.obj["config"]) app = make_app(backend=backend) with backend: aiohttp.web.run_app(app, host=host, port=port) @cli.command() -@click.option('--graph', '-g', required=True, metavar='GRAPH', - type=PathlibPath(), - help='input graph basename') -@click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR', - type=PathlibPath(), - help='directory where to store compressed graph') -@click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(), - help='run only these compression steps (default: all steps)') +@click.option( + "--graph", + "-g", + required=True, + metavar="GRAPH", + type=PathlibPath(), + help="input graph basename", +) +@click.option( + "--outdir", + "-o", + "out_dir", + required=True, + metavar="DIR", + type=PathlibPath(), + help="directory where to store compressed graph", +) +@click.option( + "--steps", + "-s", + metavar="STEPS", + type=webgraph.StepOption(), + help="run only these compression steps (default: all steps)", +) @click.pass_context def compress(ctx, graph, out_dir, steps): """Compress a graph using WebGraph Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz Output: a directory containing a WebGraph compressed graph Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, (11) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ graph_name = graph.name in_dir = graph.parent try: - conf = ctx.obj['config']['graph']['compress'] + conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults webgraph.compress(graph_name, in_dir, out_dir, steps, conf) def main(): - return cli(auto_envvar_prefix='SWH_GRAPH') + return cli(auto_envvar_prefix="SWH_GRAPH") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/swh/graph/client.py b/swh/graph/client.py index 95379b1..5821dfe 100644 --- a/swh/graph/client.py +++ b/swh/graph/client.py @@ -1,121 +1,103 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from swh.core.api import RPCClient class GraphAPIError(Exception): """Graph API Error""" + def __str__(self): - return ('An unexpected error occurred in the Graph backend: {}' - .format(self.args)) + return "An unexpected error occurred in the Graph backend: {}".format(self.args) class RemoteGraphClient(RPCClient): """Client to the Software Heritage Graph.""" def __init__(self, url, timeout=None): - super().__init__( - api_exception=GraphAPIError, url=url, timeout=timeout) + super().__init__(api_exception=GraphAPIError, url=url, timeout=timeout) def raw_verb_lines(self, verb, endpoint, **kwargs): response = self.raw_verb(verb, endpoint, stream=True, **kwargs) self.raise_for_status(response) for line in response.iter_lines(): - yield line.decode().lstrip('\n') + yield line.decode().lstrip("\n") def get_lines(self, endpoint, **kwargs): - yield from self.raw_verb_lines('get', endpoint, **kwargs) + yield from self.raw_verb_lines("get", endpoint, **kwargs) # Web API endpoints def stats(self): - return self.get('stats') + return self.get("stats") def leaves(self, src, edges="*", direction="forward"): return self.get_lines( - 'leaves/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - }) + "leaves/{}".format(src), params={"edges": edges, "direction": direction} + ) def neighbors(self, src, edges="*", direction="forward"): return self.get_lines( - 'neighbors/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - }) + "neighbors/{}".format(src), params={"edges": edges, "direction": direction} + ) def visit_nodes(self, src, edges="*", direction="forward"): return self.get_lines( - 'visit/nodes/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - }) + "visit/nodes/{}".format(src), + params={"edges": edges, "direction": direction}, + ) def visit_paths(self, src, edges="*", direction="forward"): def decode_path_wrapper(it): for e in it: yield json.loads(e) return decode_path_wrapper( self.get_lines( - 'visit/paths/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - })) - - def walk(self, src, dst, edges="*", traversal="dfs", - direction="forward", limit=None): - endpoint = 'walk/{}/{}' + "visit/paths/{}".format(src), + params={"edges": edges, "direction": direction}, + ) + ) + + def walk( + self, src, dst, edges="*", traversal="dfs", direction="forward", limit=None + ): + endpoint = "walk/{}/{}" return self.get_lines( endpoint.format(src, dst), params={ - 'edges': edges, - 'traversal': traversal, - 'direction': direction, - 'limit': limit - }) - - def random_walk(self, src, dst, - edges="*", direction="forward", limit=None): - endpoint = 'randomwalk/{}/{}' + "edges": edges, + "traversal": traversal, + "direction": direction, + "limit": limit, + }, + ) + + def random_walk(self, src, dst, edges="*", direction="forward", limit=None): + endpoint = "randomwalk/{}/{}" return self.get_lines( endpoint.format(src, dst), - params={ - 'edges': edges, - 'direction': direction, - 'limit': limit - }) + params={"edges": edges, "direction": direction, "limit": limit}, + ) def count_leaves(self, src, edges="*", direction="forward"): return self.get( - 'leaves/count/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - }) + "leaves/count/{}".format(src), + params={"edges": edges, "direction": direction}, + ) def count_neighbors(self, src, edges="*", direction="forward"): return self.get( - 'neighbors/count/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - }) + "neighbors/count/{}".format(src), + params={"edges": edges, "direction": direction}, + ) def count_visit_nodes(self, src, edges="*", direction="forward"): return self.get( - 'visit/nodes/count/{}'.format(src), - params={ - 'edges': edges, - 'direction': direction - }) + "visit/nodes/count/{}".format(src), + params={"edges": edges, "direction": direction}, + ) diff --git a/swh/graph/config.py b/swh/graph/config.py index acb83e5..a9146d1 100644 --- a/swh/graph/config.py +++ b/swh/graph/config.py @@ -1,104 +1,111 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psutil import sys from pathlib import Path def find_graph_jar(): """find swh-graph.jar, containing the Java part of swh-graph look both in development directories and installed data (for in-production deployments who fecthed the JAR from pypi) """ swh_graph_root = Path(__file__).parents[2] try_paths = [ - swh_graph_root / 'java/target/', - Path(sys.prefix) / 'share/swh-graph/', - Path(sys.prefix) / 'local/share/swh-graph/', + swh_graph_root / "java/target/", + Path(sys.prefix) / "share/swh-graph/", + Path(sys.prefix) / "local/share/swh-graph/", ] for path in try_paths: - glob = list(path.glob('swh-graph-*.jar')) + glob = list(path.glob("swh-graph-*.jar")) if glob: if len(glob) > 1: - logging.warn('found multiple swh-graph JARs, ' - 'arbitrarily picking one') - logging.info('using swh-graph JAR: {0}'.format(glob[0])) + logging.warn( + "found multiple swh-graph JARs, " "arbitrarily picking one" + ) + logging.info("using swh-graph JAR: {0}".format(glob[0])) return str(glob[0]) - raise RuntimeError('swh-graph JAR not found. Have you run `make java`?') + raise RuntimeError("swh-graph JAR not found. Have you run `make java`?") def check_config(conf): """check configuration and propagate defaults """ conf = conf.copy() - if 'batch_size' not in conf: - conf['batch_size'] = '1000000000' # 1 billion - if 'max_ram' not in conf: - conf['max_ram'] = str(psutil.virtual_memory().total) - if 'java_tool_options' not in conf: - conf['java_tool_options'] = ' '.join([ - '-Xmx{max_ram}', - '-XX:PretenureSizeThreshold=512M', - '-XX:MaxNewSize=4G', - '-XX:+UseLargePages', - '-XX:+UseTransparentHugePages', - '-XX:+UseNUMA', - '-XX:+UseTLAB', - '-XX:+ResizeTLAB', - ]) - conf['java_tool_options'] = conf['java_tool_options'].format( - max_ram=conf['max_ram']) - if 'java' not in conf: - conf['java'] = 'java' - if 'classpath' not in conf: - conf['classpath'] = find_graph_jar() + if "batch_size" not in conf: + conf["batch_size"] = "1000000000" # 1 billion + if "max_ram" not in conf: + conf["max_ram"] = str(psutil.virtual_memory().total) + if "java_tool_options" not in conf: + conf["java_tool_options"] = " ".join( + [ + "-Xmx{max_ram}", + "-XX:PretenureSizeThreshold=512M", + "-XX:MaxNewSize=4G", + "-XX:+UseLargePages", + "-XX:+UseTransparentHugePages", + "-XX:+UseNUMA", + "-XX:+UseTLAB", + "-XX:+ResizeTLAB", + ] + ) + conf["java_tool_options"] = conf["java_tool_options"].format( + max_ram=conf["max_ram"] + ) + if "java" not in conf: + conf["java"] = "java" + if "classpath" not in conf: + conf["classpath"] = find_graph_jar() return conf def check_config_compress(config, graph_name, in_dir, out_dir): """check compression-specific configuration and initialize its execution environment. """ conf = check_config(config) - conf['graph_name'] = graph_name - conf['in_dir'] = str(in_dir) - conf['out_dir'] = str(out_dir) + conf["graph_name"] = graph_name + conf["in_dir"] = str(in_dir) + conf["out_dir"] = str(out_dir) out_dir.mkdir(parents=True, exist_ok=True) - if 'tmp_dir' not in conf: - tmp_dir = out_dir / 'tmp' - conf['tmp_dir'] = str(tmp_dir) + if "tmp_dir" not in conf: + tmp_dir = out_dir / "tmp" + conf["tmp_dir"] = str(tmp_dir) else: - tmp_dir = Path(conf['tmp_dir']) + tmp_dir = Path(conf["tmp_dir"]) tmp_dir.mkdir(parents=True, exist_ok=True) - if 'logback' not in conf: - logback_confpath = tmp_dir / 'logback.xml' - with open(logback_confpath, 'w') as conffile: - conffile.write(""" + if "logback" not in conf: + logback_confpath = tmp_dir / "logback.xml" + with open(logback_confpath, "w") as conffile: + conffile.write( + """ %d %r %p [%t] %logger{1} - %m%n -""") - conf['logback'] = str(logback_confpath) +""" + ) + conf["logback"] = str(logback_confpath) - conf['java_tool_options'] += ' -Dlogback.configurationFile={logback}' - conf['java_tool_options'] = conf['java_tool_options'].format( - logback=conf['logback']) + conf["java_tool_options"] += " -Dlogback.configurationFile={logback}" + conf["java_tool_options"] = conf["java_tool_options"].format( + logback=conf["logback"] + ) print(conf) return conf diff --git a/swh/graph/dot.py b/swh/graph/dot.py index b2d455a..505ec65 100644 --- a/swh/graph/dot.py +++ b/swh/graph/dot.py @@ -1,65 +1,69 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import lru_cache import subprocess import collections KIND_TO_SHAPE = { - 'ori': 'egg', - 'snp': 'doubleoctagon', - 'rel': 'octagon', - 'rev': 'diamond', - 'dir': 'folder', - 'cnt': 'oval', + "ori": "egg", + "snp": "doubleoctagon", + "rel": "octagon", + "rev": "diamond", + "dir": "folder", + "cnt": "oval", } @lru_cache() def dot_to_svg(dot): try: p = subprocess.run( - ['dot', '-Tsvg'], input=dot, - universal_newlines=True, capture_output=True, - check=True + ["dot", "-Tsvg"], + input=dot, + universal_newlines=True, + capture_output=True, + check=True, ) except subprocess.CalledProcessError as e: raise RuntimeError(e.stderr) from e return p.stdout def graph_dot(nodes): ids = {n.id for n in nodes} by_kind = collections.defaultdict(list) for n in nodes: by_kind[n.kind].append(n) forward_edges = [ (node.id, child.id) for node in nodes for child in node.children() if child.id in ids ] backward_edges = [ (parent.id, node.id) for node in nodes for parent in node.parents() if parent.id in ids ] edges = set(forward_edges + backward_edges) - edges_fmt = '\n'.join('{} -> {};'.format(a, b) for a, b in edges) - nodes_fmt = '\n'.join(node.dot_fragment() for node in nodes) + edges_fmt = "\n".join("{} -> {};".format(a, b) for a, b in edges) + nodes_fmt = "\n".join(node.dot_fragment() for node in nodes) s = """digraph G {{ ranksep=1; nodesep=0.5; {nodes} {edges} - }}""".format(nodes=nodes_fmt, edges=edges_fmt) + }}""".format( + nodes=nodes_fmt, edges=edges_fmt + ) return s diff --git a/swh/graph/graph.py b/swh/graph/graph.py index 37214e4..f26bb31 100644 --- a/swh/graph/graph.py +++ b/swh/graph/graph.py @@ -1,178 +1,179 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import functools from swh.graph.backend import Backend from swh.graph.dot import dot_to_svg, graph_dot, KIND_TO_SHAPE -BASE_URL = 'https://archive.softwareheritage.org/browse' +BASE_URL = "https://archive.softwareheritage.org/browse" KIND_TO_URL_FRAGMENT = { - 'ori': '/origin/{}', - 'snp': '/snapshot/{}', - 'rel': '/release/{}', - 'rev': '/revision/{}', - 'dir': '/directory/{}', - 'cnt': '/content/sha1_git:{}/', + "ori": "/origin/{}", + "snp": "/snapshot/{}", + "rel": "/release/{}", + "rev": "/revision/{}", + "dir": "/directory/{}", + "cnt": "/content/sha1_git:{}/", } def call_async_gen(generator, *args, **kwargs): loop = asyncio.get_event_loop() it = generator(*args, **kwargs).__aiter__() while True: try: res = loop.run_until_complete(it.__anext__()) yield res except StopAsyncIteration: break class Neighbors: """Neighbor iterator with custom O(1) length method""" + def __init__(self, graph, iterator, length_func): self.graph = graph self.iterator = iterator self.length_func = length_func def __iter__(self): return self def __next__(self): succ = self.iterator.nextLong() if succ == -1: raise StopIteration return GraphNode(self.graph, succ) def __len__(self): return self.length_func() class GraphNode: """Node in the SWH graph""" def __init__(self, graph, node_id): self.graph = graph self.id = node_id def children(self): return Neighbors( self.graph, self.graph.java_graph.successors(self.id), - lambda: self.graph.java_graph.outdegree(self.id)) + lambda: self.graph.java_graph.outdegree(self.id), + ) def parents(self): return Neighbors( self.graph, self.graph.java_graph.predecessors(self.id), - lambda: self.graph.java_graph.indegree(self.id)) + lambda: self.graph.java_graph.indegree(self.id), + ) - def simple_traversal(self, ttype, direction='forward', edges='*'): + def simple_traversal(self, ttype, direction="forward", edges="*"): for node in call_async_gen( - self.graph.backend.simple_traversal, - ttype, direction, edges, self.id + self.graph.backend.simple_traversal, ttype, direction, edges, self.id ): yield self.graph[node] def leaves(self, *args, **kwargs): - yield from self.simple_traversal('leaves', *args, **kwargs) + yield from self.simple_traversal("leaves", *args, **kwargs) def visit_nodes(self, *args, **kwargs): - yield from self.simple_traversal('visit_nodes', *args, **kwargs) + yield from self.simple_traversal("visit_nodes", *args, **kwargs) - def visit_paths(self, direction='forward', edges='*'): + def visit_paths(self, direction="forward", edges="*"): for path in call_async_gen( - self.graph.backend.visit_paths, - direction, edges, self.id + self.graph.backend.visit_paths, direction, edges, self.id ): yield [self.graph[node] for node in path] - def walk(self, dst, direction='forward', edges='*', traversal='dfs'): + def walk(self, dst, direction="forward", edges="*", traversal="dfs"): for node in call_async_gen( - self.graph.backend.walk, - direction, edges, traversal, self.id, dst + self.graph.backend.walk, direction, edges, traversal, self.id, dst ): yield self.graph[node] - def _count(self, ttype, direction='forward', edges='*'): + def _count(self, ttype, direction="forward", edges="*"): return self.graph.backend.count(ttype, direction, edges, self.id) - count_leaves = functools.partialmethod(_count, ttype='leaves') - count_neighbors = functools.partialmethod(_count, ttype='neighbors') - count_visit_nodes = functools.partialmethod(_count, ttype='visit_nodes') + count_leaves = functools.partialmethod(_count, ttype="leaves") + count_neighbors = functools.partialmethod(_count, ttype="neighbors") + count_visit_nodes = functools.partialmethod(_count, ttype="visit_nodes") @property def pid(self): return self.graph.node2pid[self.id] @property def kind(self): - return self.pid.split(':')[2] + return self.pid.split(":")[2] def __str__(self): return self.pid def __repr__(self): - return '<{}>'.format(self.pid) + return "<{}>".format(self.pid) def dot_fragment(self): - swh, version, kind, hash = self.pid.split(':') - label = '{}:{}..{}'.format(kind, hash[0:2], hash[-2:]) + swh, version, kind, hash = self.pid.split(":") + label = "{}:{}..{}".format(kind, hash[0:2], hash[-2:]) url = BASE_URL + KIND_TO_URL_FRAGMENT[kind].format(hash) shape = KIND_TO_SHAPE[kind] - return ('{} [label="{}", href="{}", target="_blank", shape="{}"];' - .format(self.id, label, url, shape)) + return '{} [label="{}", href="{}", target="_blank", shape="{}"];'.format( + self.id, label, url, shape + ) def _repr_svg_(self): nodes = [self, *list(self.children()), *list(self.parents())] dot = graph_dot(nodes) svg = dot_to_svg(dot) return svg class Graph: def __init__(self, backend, node2pid, pid2node): self.backend = backend self.java_graph = backend.entry.get_graph() self.node2pid = node2pid self.pid2node = pid2node def stats(self): return self.backend.stats() @property def path(self): return self.java_graph.getPath() def __len__(self): return self.java_graph.getNbNodes() def __getitem__(self, node_id): if isinstance(node_id, int): self.node2pid[node_id] # check existence return GraphNode(self, node_id) elif isinstance(node_id, str): node_id = self.pid2node[node_id] return GraphNode(self, node_id) def __iter__(self): for pid, pos in self.backend.pid2node: yield self[pid] def iter_prefix(self, prefix): for pid, pos in self.backend.pid2node.iter_prefix(prefix): yield self[pid] def iter_type(self, pid_type): for pid, pos in self.backend.pid2node.iter_type(pid_type): yield self[pid] @contextlib.contextmanager def load(graph_path): with Backend(graph_path) as backend: yield Graph(backend, backend.node2pid, backend.pid2node) diff --git a/swh/graph/pid.py b/swh/graph/pid.py index 8a780fa..330ba25 100644 --- a/swh/graph/pid.py +++ b/swh/graph/pid.py @@ -1,402 +1,403 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import mmap import os import struct from collections.abc import MutableMapping from enum import Enum from mmap import MAP_SHARED, MAP_PRIVATE from typing import BinaryIO, Iterator, Tuple from swh.model.identifiers import PersistentId, parse_persistent_identifier -PID_BIN_FMT = 'BB20s' # 2 unsigned chars + 20 bytes -INT_BIN_FMT = '>q' # big endian, 8-byte integer -PID_BIN_SIZE = 22 # in bytes -INT_BIN_SIZE = 8 # in bytes +PID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes +INT_BIN_FMT = ">q" # big endian, 8-byte integer +PID_BIN_SIZE = 22 # in bytes +INT_BIN_SIZE = 8 # in bytes class PidType(Enum): """types of existing PIDs, used to serialize PID type as a (char) integer Note that the order does matter also for driving the binary search in PID-indexed maps. Integer values also matter, for compatibility with the Java layer. """ + content = 0 directory = 1 origin = 2 release = 3 revision = 4 snapshot = 5 def str_to_bytes(pid_str: str) -> bytes: """Convert a PID to a byte sequence The binary format used to represent PIDs as 22-byte long byte sequences as follows: - 1 byte for the namespace version represented as a C `unsigned char` - 1 byte for the object type, as the int value of :class:`PidType` enums, represented as a C `unsigned char` - 20 bytes for the SHA1 digest as a byte sequence Args: pid: persistent identifier Returns: bytes: byte sequence representation of pid """ pid = parse_persistent_identifier(pid_str) - return struct.pack(PID_BIN_FMT, pid.scheme_version, - PidType[pid.object_type].value, - bytes.fromhex(pid.object_id)) + return struct.pack( + PID_BIN_FMT, + pid.scheme_version, + PidType[pid.object_type].value, + bytes.fromhex(pid.object_id), + ) def bytes_to_str(bytes: bytes) -> str: """Inverse function of :func:`str_to_bytes` See :func:`str_to_bytes` for a description of the binary PID format. Args: bytes: byte sequence representation of pid Returns: pid: persistent identifier """ (version, type, bin_digest) = struct.unpack(PID_BIN_FMT, bytes) pid = PersistentId(object_type=PidType(type).name, object_id=bin_digest) return str(pid) -class _OnDiskMap(): +class _OnDiskMap: """mmap-ed on-disk sequence of fixed size records """ - def __init__(self, record_size: int, fname: str, mode: str = 'rb', - length: int = None): + def __init__( + self, record_size: int, fname: str, mode: str = "rb", length: int = None + ): """open an existing on-disk map Args: record_size: size of each record in bytes fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize writable maps at creation time. Must be given when mode is 'wb' and the map doesn't exist on disk; ignored otherwise """ - os_modes = { - 'rb': os.O_RDONLY, - 'wb': os.O_RDWR | os.O_CREAT, - 'rb+': os.O_RDWR - } + os_modes = {"rb": os.O_RDONLY, "wb": os.O_RDWR | os.O_CREAT, "rb+": os.O_RDWR} if mode not in os_modes: - raise ValueError('invalid file open mode: ' + mode) - new_map = (mode == 'wb') - writable_map = mode in ['wb', 'rb+'] + raise ValueError("invalid file open mode: " + mode) + new_map = mode == "wb" + writable_map = mode in ["wb", "rb+"] self.record_size = record_size self.fd = os.open(fname, os_modes[mode]) if new_map: if length is None: - raise ValueError('missing length when creating new map') + raise ValueError("missing length when creating new map") os.truncate(self.fd, length * self.record_size) self.size = os.path.getsize(fname) (self.length, remainder) = divmod(self.size, record_size) if remainder: raise ValueError( - 'map size {} is not a multiple of the record size {}'.format( - self.size, record_size)) + "map size {} is not a multiple of the record size {}".format( + self.size, record_size + ) + ) self.mm = mmap.mmap( - self.fd, self.size, - flags=MAP_SHARED if writable_map else MAP_PRIVATE) + self.fd, self.size, flags=MAP_SHARED if writable_map else MAP_PRIVATE + ) def close(self) -> None: """close the map shuts down both the mmap and the underlying file descriptor """ if not self.mm.closed: self.mm.close() os.close(self.fd) def __len__(self) -> int: return self.length def __delitem__(self, pos: int) -> None: - raise NotImplementedError('cannot delete records from fixed-size map') + raise NotImplementedError("cannot delete records from fixed-size map") class PidToNodeMap(_OnDiskMap, MutableMapping): """memory mapped map from PID (:ref:`persistent-identifiers`) to a continuous range 0..N of (8-byte long) integers This is the converse mapping of :class:`NodeToPidMap`. The on-disk serialization format is a sequence of fixed length (30 bytes) records with the following fields: - PID (22 bytes): binary PID representation as per :func:`str_to_bytes` - long (8 bytes): big endian long integer The records are sorted lexicographically by PID type and checksum, where type is the integer value of :class:`PidType`. PID lookup in the map is performed via binary search. Hence a huge map with, say, 11 B entries, will require ~30 disk seeks. Note that, due to fixed size + ordering, it is not possible to create these maps by random writing. Hence, __setitem__ can be used only to *update* the value associated to an existing key, rather than to add a missing item. To create an entire map from scratch, you should do so *sequentially*, using static method :meth:`write_record` (or, at your own risk, by hand via the mmap :attr:`mm`). """ # record binary format: PID + a big endian 8-byte big endian integer - RECORD_BIN_FMT = '>' + PID_BIN_FMT + 'q' + RECORD_BIN_FMT = ">" + PID_BIN_FMT + "q" RECORD_SIZE = PID_BIN_SIZE + INT_BIN_SIZE - def __init__(self, fname: str, mode: str = 'rb', length: int = None): + def __init__(self, fname: str, mode: str = "rb", length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]: """seek and return the (binary) record at a given (logical) position see :func:`_get_record` for an equivalent function with additional deserialization Args: pos: 0-based record number Returns: a pair `(pid, int)`, where pid and int are bytes """ rec_pos = pos * self.RECORD_SIZE int_pos = rec_pos + PID_BIN_SIZE - return (self.mm[rec_pos:int_pos], - self.mm[int_pos:int_pos+INT_BIN_SIZE]) + return (self.mm[rec_pos:int_pos], self.mm[int_pos : int_pos + INT_BIN_SIZE]) def _get_record(self, pos: int) -> Tuple[str, int]: """seek and return the record at a given (logical) position moral equivalent of :func:`_get_bin_record`, with additional deserialization to non-bytes types Args: pos: 0-based record number Returns: a pair `(pid, int)`, where pid is a string-based PID and int the corresponding integer identifier """ (pid_bytes, int_bytes) = self._get_bin_record(pos) - return (bytes_to_str(pid_bytes), - struct.unpack(INT_BIN_FMT, int_bytes)[0]) + return (bytes_to_str(pid_bytes), struct.unpack(INT_BIN_FMT, int_bytes)[0]) @classmethod def write_record(cls, f: BinaryIO, pid: str, int: int) -> None: """write a logical record to a file-like object Args: f: file-like object to write the record to pid: textual PID int: PID integer identifier """ f.write(str_to_bytes(pid)) f.write(struct.pack(INT_BIN_FMT, int)) def _bisect_pos(self, pid_str: str) -> int: """bisect the position of the given identifier. If the identifier is not found, the position of the pid immediately after is returned. Args: pid_str: the pid as a string Returns: the logical record of the bisected position in the map """ if not isinstance(pid_str, str): - raise TypeError('PID must be a str, not {}'.format(type(pid_str))) + raise TypeError("PID must be a str, not {}".format(type(pid_str))) try: target = str_to_bytes(pid_str) # desired PID as bytes except ValueError: raise ValueError('invalid PID: "{}"'.format(pid_str)) lo = 0 hi = self.length - 1 while lo < hi: mid = (lo + hi) // 2 (pid, _value) = self._get_bin_record(mid) if pid < target: lo = mid + 1 else: hi = mid return lo def _find(self, pid_str: str) -> Tuple[int, int]: """lookup the integer identifier of a pid and its position Args: pid_str: the pid as a string Returns: a pair `(pid, pos)` with pid integer identifier and its logical record position in the map """ pos = self._bisect_pos(pid_str) pid_found, value = self._get_record(pos) if pid_found == pid_str: return (value, pos) raise KeyError(pid_str) def __getitem__(self, pid_str: str) -> int: """lookup the integer identifier of a PID Args: pid: the PID as a string Returns: the integer identifier of pid """ return self._find(pid_str)[0] # return element, ignore position def __setitem__(self, pid_str: str, int: str) -> None: (_pid, pos) = self._find(pid_str) # might raise KeyError and that's OK rec_pos = pos * self.RECORD_SIZE int_pos = rec_pos + PID_BIN_SIZE self.mm[rec_pos:int_pos] = str_to_bytes(pid_str) - self.mm[int_pos:int_pos+INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int) + self.mm[int_pos : int_pos + INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int) def __iter__(self) -> Iterator[Tuple[str, int]]: for pos in range(self.length): yield self._get_record(pos) def iter_prefix(self, prefix: str): - swh, n, t, sha = prefix.split(':') - sha = sha.ljust(40, '0') - start_pid = ':'.join([swh, n, t, sha]) + swh, n, t, sha = prefix.split(":") + sha = sha.ljust(40, "0") + start_pid = ":".join([swh, n, t, sha]) start = self._bisect_pos(start_pid) for pos in range(start, self.length): pid, value = self._get_record(pos) if not pid.startswith(prefix): break yield pid, value def iter_type(self, pid_type: str) -> Iterator[Tuple[str, int]]: - prefix = 'swh:1:{}:'.format(pid_type) + prefix = "swh:1:{}:".format(pid_type) yield from self.iter_prefix(prefix) class NodeToPidMap(_OnDiskMap, MutableMapping): """memory mapped map from a continuous range of 0..N (8-byte long) integers to PIDs (:ref:`persistent-identifiers`) This is the converse mapping of :class:`PidToNodeMap`. The on-disk serialization format is a sequence of fixed length records (22 bytes), each being the binary representation of a PID as per :func:`str_to_bytes`. The records are sorted by long integer, so that integer lookup is possible via fixed-offset seek. """ RECORD_BIN_FMT = PID_BIN_FMT RECORD_SIZE = PID_BIN_SIZE - def __init__(self, fname: str, mode: str = 'rb', length: int = None): + def __init__(self, fname: str, mode: str = "rb", length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') size: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise length: passed to :class:`_OnDiskMap` """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> bytes: """seek and return the (binary) PID at a given (logical) position Args: pos: 0-based record number Returns: PID as a byte sequence """ rec_pos = pos * self.RECORD_SIZE - return self.mm[rec_pos:rec_pos+self.RECORD_SIZE] + return self.mm[rec_pos : rec_pos + self.RECORD_SIZE] @classmethod def write_record(cls, f: BinaryIO, pid: str) -> None: """write a PID to a file-like object Args: f: file-like object to write the record to pid: textual PID """ f.write(str_to_bytes(pid)) def __getitem__(self, pos: int) -> str: orig_pos = pos if pos < 0: pos = len(self) + pos if not (0 <= pos < len(self)): raise IndexError(orig_pos) return bytes_to_str(self._get_bin_record(pos)) def __setitem__(self, pos: int, pid: str) -> None: rec_pos = pos * self.RECORD_SIZE - self.mm[rec_pos:rec_pos+self.RECORD_SIZE] = str_to_bytes(pid) + self.mm[rec_pos : rec_pos + self.RECORD_SIZE] = str_to_bytes(pid) def __iter__(self) -> Iterator[Tuple[int, str]]: for pos in range(self.length): yield (pos, self[pos]) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py index f659043..5e99547 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -1,247 +1,253 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ import asyncio import json import aiohttp.web from collections import deque from swh.core.api.asynchronous import RPCServerApp from swh.model.identifiers import PID_TYPES from swh.model.exceptions import ValidationError try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore # maximum number of retries for random walks RANDOM_RETRIES = 5 # TODO make this configurable via rpc-serve configuration @asynccontextmanager -async def stream_response(request, content_type='text/plain', - *args, **kwargs): +async def stream_response(request, content_type="text/plain", *args, **kwargs): response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = content_type await response.prepare(request) yield response await response.write_eof() async def index(request): return aiohttp.web.Response( - content_type='text/html', + content_type="text/html", body=""" Software Heritage storage server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

-""") +""", + ) async def stats(request): - stats = request.app['backend'].stats() - return aiohttp.web.Response(body=stats, content_type='application/json') + stats = request.app["backend"].stats() + return aiohttp.web.Response(body=stats, content_type="application/json") def get_direction(request): """validate HTTP query parameter `direction`""" - s = request.query.get('direction', 'forward') - if s not in ('forward', 'backward'): - raise aiohttp.web.HTTPBadRequest(body=f'invalid direction: {s}') + s = request.query.get("direction", "forward") + if s not in ("forward", "backward"): + raise aiohttp.web.HTTPBadRequest(body=f"invalid direction: {s}") return s def get_edges(request): """validate HTTP query parameter `edges`, i.e., edge restrictions""" - s = request.query.get('edges', '*') - if any([node_type != '*' and node_type not in PID_TYPES - for edge in s.split(':') - for node_type in edge.split(',', maxsplit=1)]): - raise aiohttp.web.HTTPBadRequest(body=f'invalid edge restriction: {s}') + s = request.query.get("edges", "*") + if any( + [ + node_type != "*" and node_type not in PID_TYPES + for edge in s.split(":") + for node_type in edge.split(",", maxsplit=1) + ] + ): + raise aiohttp.web.HTTPBadRequest(body=f"invalid edge restriction: {s}") return s def get_traversal(request): """validate HTTP query parameter `traversal`, i.e., visit order""" - s = request.query.get('traversal', 'dfs') - if s not in ('bfs', 'dfs'): - raise aiohttp.web.HTTPBadRequest(body=f'invalid traversal order: {s}') + s = request.query.get("traversal", "dfs") + if s not in ("bfs", "dfs"): + raise aiohttp.web.HTTPBadRequest(body=f"invalid traversal order: {s}") return s def get_limit(request): """validate HTTP query parameter `limit`, i.e., number of results""" - s = request.query.get('limit', '0') + s = request.query.get("limit", "0") try: return int(s) except ValueError: - raise aiohttp.web.HTTPBadRequest(body=f'invalid limit value: {s}') + raise aiohttp.web.HTTPBadRequest(body=f"invalid limit value: {s}") def node_of_pid(pid, backend): """lookup a PID in a pid2node map, failing in an HTTP-nice way if needed""" try: return backend.pid2node[pid] except KeyError: - raise aiohttp.web.HTTPNotFound(body=f'PID not found: {pid}') + raise aiohttp.web.HTTPNotFound(body=f"PID not found: {pid}") except ValidationError: - raise aiohttp.web.HTTPBadRequest(body=f'malformed PID: {pid}') + raise aiohttp.web.HTTPBadRequest(body=f"malformed PID: {pid}") def pid_of_node(node, backend): """lookup a node in a node2pid map, failing in an HTTP-nice way if needed """ try: return backend.node2pid[node] except KeyError: raise aiohttp.web.HTTPInternalServerError( - body=f'reverse lookup failed for node id: {node}') + body=f"reverse lookup failed for node id: {node}" + ) def get_simple_traversal_handler(ttype): async def simple_traversal(request): - backend = request.app['backend'] + backend = request.app["backend"] - src = request.match_info['src'] + src = request.match_info["src"] edges = get_edges(request) direction = get_direction(request) src_node = node_of_pid(src, backend) async with stream_response(request) as response: async for res_node in backend.simple_traversal( ttype, direction, edges, src_node ): res_pid = pid_of_node(res_node, backend) - await response.write('{}\n'.format(res_pid).encode()) + await response.write("{}\n".format(res_pid).encode()) return response return simple_traversal def get_walk_handler(random=False): async def walk(request): - backend = request.app['backend'] + backend = request.app["backend"] - src = request.match_info['src'] - dst = request.match_info['dst'] + src = request.match_info["src"] + dst = request.match_info["dst"] edges = get_edges(request) direction = get_direction(request) algo = get_traversal(request) limit = get_limit(request) src_node = node_of_pid(src, backend) if dst not in PID_TYPES: dst = node_of_pid(dst, backend) async with stream_response(request) as response: if random: - it = backend.random_walk(direction, edges, RANDOM_RETRIES, - src_node, dst) + it = backend.random_walk( + direction, edges, RANDOM_RETRIES, src_node, dst + ) else: it = backend.walk(direction, edges, algo, src_node, dst) if limit < 0: queue = deque(maxlen=-limit) async for res_node in it: res_pid = pid_of_node(res_node, backend) - queue.append('{}\n'.format(res_pid).encode()) + queue.append("{}\n".format(res_pid).encode()) while queue: await response.write(queue.popleft()) else: count = 0 async for res_node in it: if limit == 0 or count < limit: res_pid = pid_of_node(res_node, backend) - await response.write('{}\n'.format(res_pid).encode()) + await response.write("{}\n".format(res_pid).encode()) count += 1 else: break return response return walk async def visit_paths(request): - backend = request.app['backend'] + backend = request.app["backend"] - src = request.match_info['src'] + src = request.match_info["src"] edges = get_edges(request) direction = get_direction(request) src_node = node_of_pid(src, backend) it = backend.visit_paths(direction, edges, src_node) - async with stream_response(request, content_type='application/x-ndjson') \ - as response: + async with stream_response( + request, content_type="application/x-ndjson" + ) as response: async for res_path in it: res_path_pid = [pid_of_node(n, backend) for n in res_path] line = json.dumps(res_path_pid) - await response.write('{}\n'.format(line).encode()) + await response.write("{}\n".format(line).encode()) return response def get_count_handler(ttype): async def count(request): loop = asyncio.get_event_loop() - backend = request.app['backend'] + backend = request.app["backend"] - src = request.match_info['src'] + src = request.match_info["src"] edges = get_edges(request) direction = get_direction(request) src_node = node_of_pid(src, backend) cnt = await loop.run_in_executor( - None, backend.count, ttype, direction, edges, src_node) - return aiohttp.web.Response(body=str(cnt), - content_type='application/json') + None, backend.count, ttype, direction, edges, src_node + ) + return aiohttp.web.Response(body=str(cnt), content_type="application/json") return count def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) - app.router.add_get('/', index) - app.router.add_get('/graph', index) - app.router.add_get('/graph/stats', stats) - - app.router.add_get('/graph/leaves/{src}', - get_simple_traversal_handler('leaves')) - app.router.add_get('/graph/neighbors/{src}', - get_simple_traversal_handler('neighbors')) - app.router.add_get('/graph/visit/nodes/{src}', - get_simple_traversal_handler('visit_nodes')) - app.router.add_get('/graph/visit/paths/{src}', visit_paths) + app.router.add_get("/", index) + app.router.add_get("/graph", index) + app.router.add_get("/graph/stats", stats) + + app.router.add_get("/graph/leaves/{src}", get_simple_traversal_handler("leaves")) + app.router.add_get( + "/graph/neighbors/{src}", get_simple_traversal_handler("neighbors") + ) + app.router.add_get( + "/graph/visit/nodes/{src}", get_simple_traversal_handler("visit_nodes") + ) + app.router.add_get("/graph/visit/paths/{src}", visit_paths) # temporarily disabled in wait of a proper fix for T1969 # app.router.add_get('/graph/walk/{src}/{dst}', # get_walk_handler(random=False)) # app.router.add_get('/graph/walk/last/{src}/{dst}', # get_walk_handler(random=False, last=True)) - app.router.add_get('/graph/randomwalk/{src}/{dst}', - get_walk_handler(random=True)) + app.router.add_get("/graph/randomwalk/{src}/{dst}", get_walk_handler(random=True)) - app.router.add_get('/graph/neighbors/count/{src}', - get_count_handler('neighbors')) - app.router.add_get('/graph/leaves/count/{src}', - get_count_handler('leaves')) - app.router.add_get('/graph/visit/nodes/count/{src}', - get_count_handler('visit_nodes')) + app.router.add_get("/graph/neighbors/count/{src}", get_count_handler("neighbors")) + app.router.add_get("/graph/leaves/count/{src}", get_count_handler("leaves")) + app.router.add_get( + "/graph/visit/nodes/count/{src}", get_count_handler("visit_nodes") + ) - app['backend'] = backend + app["backend"] = backend return app diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py index a3af742..442e32c 100644 --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -1,51 +1,51 @@ import multiprocessing import pytest from aiohttp.test_utils import TestServer, TestClient, loop_context from pathlib import Path from swh.graph.graph import load as graph_load from swh.graph.client import RemoteGraphClient from swh.graph.backend import Backend from swh.graph.server.app import make_app SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0] -TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / 'dataset/output/example' +TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/output/example" class GraphServerProcess(multiprocessing.Process): def __init__(self, q, *args, **kwargs): self.q = q super().__init__(*args, **kwargs) def run(self): try: backend = Backend(graph_path=str(TEST_GRAPH_PATH)) with backend: with loop_context() as loop: app = make_app(backend=backend, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) - url = client.make_url('/graph/') + url = client.make_url("/graph/") self.q.put(url) loop.run_forever() except Exception as e: self.q.put(e) @pytest.fixture(scope="module") def graph_client(): queue = multiprocessing.Queue() server = GraphServerProcess(queue) server.start() res = queue.get() if isinstance(res, Exception): raise res yield RemoteGraphClient(str(res)) server.terminate() @pytest.fixture(scope="module") def graph(): with graph_load(str(TEST_GRAPH_PATH)) as g: yield g diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py index 610aaeb..7651ba1 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_api_client.py @@ -1,219 +1,230 @@ import pytest from pytest import raises from swh.core.api import RemoteException def test_stats(graph_client): stats = graph_client.stats() - assert set(stats.keys()) == {'counts', 'ratios', 'indegree', - 'outdegree'} - - assert set(stats['counts'].keys()) == {'nodes', 'edges'} - assert set(stats['ratios'].keys()) == {'compression', 'bits_per_node', - 'bits_per_edge', 'avg_locality'} - assert set(stats['indegree'].keys()) == {'min', 'max', 'avg'} - assert set(stats['outdegree'].keys()) == {'min', 'max', 'avg'} - - assert stats['counts']['nodes'] == 21 - assert stats['counts']['edges'] == 23 - assert isinstance(stats['ratios']['compression'], float) - assert isinstance(stats['ratios']['bits_per_node'], float) - assert isinstance(stats['ratios']['bits_per_edge'], float) - assert isinstance(stats['ratios']['avg_locality'], float) - assert stats['indegree']['min'] == 0 - assert stats['indegree']['max'] == 3 - assert isinstance(stats['indegree']['avg'], float) - assert stats['outdegree']['min'] == 0 - assert stats['outdegree']['max'] == 3 - assert isinstance(stats['outdegree']['avg'], float) + assert set(stats.keys()) == {"counts", "ratios", "indegree", "outdegree"} + + assert set(stats["counts"].keys()) == {"nodes", "edges"} + assert set(stats["ratios"].keys()) == { + "compression", + "bits_per_node", + "bits_per_edge", + "avg_locality", + } + assert set(stats["indegree"].keys()) == {"min", "max", "avg"} + assert set(stats["outdegree"].keys()) == {"min", "max", "avg"} + + assert stats["counts"]["nodes"] == 21 + assert stats["counts"]["edges"] == 23 + assert isinstance(stats["ratios"]["compression"], float) + assert isinstance(stats["ratios"]["bits_per_node"], float) + assert isinstance(stats["ratios"]["bits_per_edge"], float) + assert isinstance(stats["ratios"]["avg_locality"], float) + assert stats["indegree"]["min"] == 0 + assert stats["indegree"]["max"] == 3 + assert isinstance(stats["indegree"]["avg"], float) + assert stats["outdegree"]["min"] == 0 + assert stats["outdegree"]["max"] == 3 + assert isinstance(stats["outdegree"]["avg"], float) def test_leaves(graph_client): - actual = list(graph_client.leaves( - 'swh:1:ori:0000000000000000000000000000000000000021' - )) + actual = list( + graph_client.leaves("swh:1:ori:0000000000000000000000000000000000000021") + ) expected = [ - 'swh:1:cnt:0000000000000000000000000000000000000001', - 'swh:1:cnt:0000000000000000000000000000000000000004', - 'swh:1:cnt:0000000000000000000000000000000000000005', - 'swh:1:cnt:0000000000000000000000000000000000000007' + "swh:1:cnt:0000000000000000000000000000000000000001", + "swh:1:cnt:0000000000000000000000000000000000000004", + "swh:1:cnt:0000000000000000000000000000000000000005", + "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_neighbors(graph_client): - actual = list(graph_client.neighbors( - 'swh:1:rev:0000000000000000000000000000000000000009', - direction='backward' - )) + actual = list( + graph_client.neighbors( + "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" + ) + ) expected = [ - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rel:0000000000000000000000000000000000000010', - 'swh:1:rev:0000000000000000000000000000000000000013' + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rel:0000000000000000000000000000000000000010", + "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): - actual = list(graph_client.visit_nodes( - 'swh:1:rel:0000000000000000000000000000000000000010', - edges='rel:rev,rev:rev' - )) + actual = list( + graph_client.visit_nodes( + "swh:1:rel:0000000000000000000000000000000000000010", + edges="rel:rev,rev:rev", + ) + ) expected = [ - 'swh:1:rel:0000000000000000000000000000000000000010', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:rev:0000000000000000000000000000000000000003' + "swh:1:rel:0000000000000000000000000000000000000010", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_paths(graph_client): - actual = list(graph_client.visit_paths( - 'swh:1:snp:0000000000000000000000000000000000000020', - edges='snp:*,rev:*')) + actual = list( + graph_client.visit_paths( + "swh:1:snp:0000000000000000000000000000000000000020", edges="snp:*,rev:*" + ) + ) actual = [tuple(path) for path in actual] expected = [ ( - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:rev:0000000000000000000000000000000000000003', - 'swh:1:dir:0000000000000000000000000000000000000002' + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:rev:0000000000000000000000000000000000000003", + "swh:1:dir:0000000000000000000000000000000000000002", ), ( - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:dir:0000000000000000000000000000000000000008' + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:dir:0000000000000000000000000000000000000008", ), ( - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rel:0000000000000000000000000000000000000010' - ) + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rel:0000000000000000000000000000000000000010", + ), ] assert set(actual) == set(expected) -@pytest.mark.skip(reason='currently disabled due to T1969') +@pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): - args = ('swh:1:dir:0000000000000000000000000000000000000016', 'rel') + args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { - 'edges': 'dir:dir,dir:rev,rev:*', - 'direction': 'backward', - 'traversal': 'bfs', + "edges": "dir:dir,dir:rev,rev:*", + "direction": "backward", + "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ - 'swh:1:dir:0000000000000000000000000000000000000016', - 'swh:1:dir:0000000000000000000000000000000000000017', - 'swh:1:rev:0000000000000000000000000000000000000018', - 'swh:1:rel:0000000000000000000000000000000000000019' + "swh:1:dir:0000000000000000000000000000000000000016", + "swh:1:dir:0000000000000000000000000000000000000017", + "swh:1:rev:0000000000000000000000000000000000000018", + "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() - kwargs2['limit'] = -1 + kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) - expected = [ - 'swh:1:rel:0000000000000000000000000000000000000019' - ] + expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() - kwargs2['limit'] = 2 + kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ - 'swh:1:dir:0000000000000000000000000000000000000016', - 'swh:1:dir:0000000000000000000000000000000000000017' + "swh:1:dir:0000000000000000000000000000000000000016", + "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) def test_random_walk(graph_client): """as the walk is random, we test a visit from a cnt node to the only origin in the dataset, and only check the final node of the path (i.e., the origin) """ - args = ('swh:1:cnt:0000000000000000000000000000000000000001', 'ori') - kwargs = {'direction': 'backward'} - expected_root = 'swh:1:ori:0000000000000000000000000000000000000021' + args = ("swh:1:cnt:0000000000000000000000000000000000000001", "ori") + kwargs = {"direction": "backward"} + expected_root = "swh:1:ori:0000000000000000000000000000000000000021" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() - kwargs2['limit'] = -1 + kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] - kwargs2['limit'] = -2 + kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root - kwargs2['limit'] = 3 + kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 def test_count(graph_client): actual = graph_client.count_leaves( - 'swh:1:ori:0000000000000000000000000000000000000021' + "swh:1:ori:0000000000000000000000000000000000000021" ) assert actual == 4 actual = graph_client.count_visit_nodes( - 'swh:1:rel:0000000000000000000000000000000000000010', - edges='rel:rev,rev:rev' + "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( - 'swh:1:rev:0000000000000000000000000000000000000009', - direction='backward' + "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 def test_param_validation(graph_client): with raises(RemoteException) as exc_info: # PID not found - list(graph_client.leaves( - 'swh:1:ori:fff0000000000000000000000000000000000021')) + list(graph_client.leaves("swh:1:ori:fff0000000000000000000000000000000000021")) assert exc_info.value.response.status_code == 404 with raises(RemoteException) as exc_info: # malformed PID - list(graph_client.neighbors( - 'swh:1:ori:fff000000zzzzzz0000000000000000000000021')) + list( + graph_client.neighbors("swh:1:ori:fff000000zzzzzz0000000000000000000000021") + ) assert exc_info.value.response.status_code == 400 with raises(RemoteException) as exc_info: # malformed edge specificaiton - list(graph_client.visit_nodes( - 'swh:1:dir:0000000000000000000000000000000000000016', - edges='dir:notanodetype,dir:rev,rev:*', - direction='backward', - )) + list( + graph_client.visit_nodes( + "swh:1:dir:0000000000000000000000000000000000000016", + edges="dir:notanodetype,dir:rev,rev:*", + direction="backward", + ) + ) assert exc_info.value.response.status_code == 400 with raises(RemoteException) as exc_info: # malformed direction - list(graph_client.visit_nodes( - 'swh:1:dir:0000000000000000000000000000000000000016', - edges='dir:dir,dir:rev,rev:*', - direction='notadirection', - )) + list( + graph_client.visit_nodes( + "swh:1:dir:0000000000000000000000000000000000000016", + edges="dir:dir,dir:rev,rev:*", + direction="notadirection", + ) + ) assert exc_info.value.response.status_code == 400 -@pytest.mark.skip(reason='currently disabled due to T1969') +@pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order - list(graph_client.walk( - 'swh:1:dir:0000000000000000000000000000000000000016', 'rel', - edges='dir:dir,dir:rev,rev:*', - direction='backward', - traversal='notatraversalorder', - )) + list( + graph_client.walk( + "swh:1:dir:0000000000000000000000000000000000000016", + "rel", + edges="dir:dir,dir:rev,rev:*", + direction="backward", + traversal="notatraversalorder", + ) + ) assert exc_info.value.response.status_code == 400 diff --git a/swh/graph/tests/test_cli.py b/swh/graph/tests/test_cli.py index 19ba7ec..be2acd7 100644 --- a/swh/graph/tests/test_cli.py +++ b/swh/graph/tests/test_cli.py @@ -1,66 +1,70 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from pathlib import Path from tempfile import TemporaryDirectory, NamedTemporaryFile from typing import Dict from click.testing import CliRunner from swh.core import config from swh.graph import cli def read_properties(properties_fname) -> Dict[str, str]: """read a Java .properties file""" properties = {} with open(properties_fname) as f: for line in f: - if line.startswith('#'): + if line.startswith("#"): continue - (key, value) = line.rstrip().split('=', maxsplit=1) + (key, value) = line.rstrip().split("=", maxsplit=1) properties[key] = value return properties class TestCompress(unittest.TestCase): - DATA_DIR = Path(__file__).parents[0] / 'dataset' + DATA_DIR = Path(__file__).parents[0] / "dataset" def setUp(self): self.runner = CliRunner() - tmpconf = NamedTemporaryFile(mode='w', delete=False, - prefix='swh-graph-test', suffix='.yml') + tmpconf = NamedTemporaryFile( + mode="w", delete=False, prefix="swh-graph-test", suffix=".yml" + ) # bare bone configuration, to allow testing the compression pipeline # with minimum RAM requirements on trivial graphs - tmpconf.write(""" + tmpconf.write( + """ graph: compress: batch_size: 1000 -""") +""" + ) tmpconf.close() self.conffile = Path(tmpconf.name) self.config = config.read(self.conffile, cli.DEFAULT_CONFIG) def tearDown(self): if self.conffile.exists(): self.conffile.unlink() def test_pipeline(self): """run full compression pipeline""" - with TemporaryDirectory(suffix='.swh-graph-test') as tmpdir: + with TemporaryDirectory(suffix=".swh-graph-test") as tmpdir: result = self.runner.invoke( cli.compress, - ['--graph', self.DATA_DIR / 'example', '--outdir', tmpdir], - obj={'config': self.config}) + ["--graph", self.DATA_DIR / "example", "--outdir", tmpdir], + obj={"config": self.config}, + ) self.assertEqual(result.exit_code, 0) - properties = read_properties(Path(tmpdir) / 'example.properties') - self.assertEqual(int(properties['nodes']), 21) - self.assertEqual(int(properties['arcs']), 23) + properties = read_properties(Path(tmpdir) / "example.properties") + self.assertEqual(int(properties["nodes"]), 21) + self.assertEqual(int(properties["arcs"]), 23) diff --git a/swh/graph/tests/test_graph.py b/swh/graph/tests/test_graph.py index 4de3734..264a0ea 100644 --- a/swh/graph/tests/test_graph.py +++ b/swh/graph/tests/test_graph.py @@ -1,122 +1,138 @@ import pytest def test_graph(graph): assert len(graph) == 21 - obj = 'swh:1:dir:0000000000000000000000000000000000000008' + obj = "swh:1:dir:0000000000000000000000000000000000000008" node = graph[obj] assert str(node) == obj assert len(node.children()) == 3 assert len(node.parents()) == 2 actual = {p.pid for p in node.children()} expected = { - 'swh:1:cnt:0000000000000000000000000000000000000001', - 'swh:1:dir:0000000000000000000000000000000000000006', - 'swh:1:cnt:0000000000000000000000000000000000000007' + "swh:1:cnt:0000000000000000000000000000000000000001", + "swh:1:dir:0000000000000000000000000000000000000006", + "swh:1:cnt:0000000000000000000000000000000000000007", } assert expected == actual actual = {p.pid for p in node.parents()} expected = { - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:dir:0000000000000000000000000000000000000012', + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:dir:0000000000000000000000000000000000000012", } assert expected == actual def test_invalid_pid(graph): with pytest.raises(IndexError): graph[1337] with pytest.raises(IndexError): graph[len(graph) + 1] with pytest.raises(KeyError): - graph['swh:1:dir:0000000000000000000000000000000420000012'] + graph["swh:1:dir:0000000000000000000000000000000420000012"] def test_leaves(graph): - actual = list(graph['swh:1:ori:0000000000000000000000000000000000000021'] - .leaves()) + actual = list(graph["swh:1:ori:0000000000000000000000000000000000000021"].leaves()) actual = [p.pid for p in actual] expected = [ - 'swh:1:cnt:0000000000000000000000000000000000000001', - 'swh:1:cnt:0000000000000000000000000000000000000004', - 'swh:1:cnt:0000000000000000000000000000000000000005', - 'swh:1:cnt:0000000000000000000000000000000000000007' + "swh:1:cnt:0000000000000000000000000000000000000001", + "swh:1:cnt:0000000000000000000000000000000000000004", + "swh:1:cnt:0000000000000000000000000000000000000005", + "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_visit_nodes(graph): - actual = list(graph['swh:1:rel:0000000000000000000000000000000000000010'] - .visit_nodes(edges='rel:rev,rev:rev')) + actual = list( + graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_nodes( + edges="rel:rev,rev:rev" + ) + ) actual = [p.pid for p in actual] expected = [ - 'swh:1:rel:0000000000000000000000000000000000000010', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:rev:0000000000000000000000000000000000000003' + "swh:1:rel:0000000000000000000000000000000000000010", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_paths(graph): - actual = list(graph['swh:1:snp:0000000000000000000000000000000000000020'] - .visit_paths(edges='snp:*,rev:*')) + actual = list( + graph["swh:1:snp:0000000000000000000000000000000000000020"].visit_paths( + edges="snp:*,rev:*" + ) + ) actual = [tuple(n.pid for n in path) for path in actual] expected = [ ( - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:rev:0000000000000000000000000000000000000003', - 'swh:1:dir:0000000000000000000000000000000000000002' + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:rev:0000000000000000000000000000000000000003", + "swh:1:dir:0000000000000000000000000000000000000002", ), ( - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:dir:0000000000000000000000000000000000000008' + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:dir:0000000000000000000000000000000000000008", ), ( - 'swh:1:snp:0000000000000000000000000000000000000020', - 'swh:1:rel:0000000000000000000000000000000000000010' - ) + "swh:1:snp:0000000000000000000000000000000000000020", + "swh:1:rel:0000000000000000000000000000000000000010", + ), ] assert set(actual) == set(expected) def test_walk(graph): - actual = list(graph['swh:1:dir:0000000000000000000000000000000000000016'] - .walk('rel', - edges='dir:dir,dir:rev,rev:*', - direction='backward', - traversal='bfs')) + actual = list( + graph["swh:1:dir:0000000000000000000000000000000000000016"].walk( + "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="bfs" + ) + ) actual = [p.pid for p in actual] expected = [ - 'swh:1:dir:0000000000000000000000000000000000000016', - 'swh:1:dir:0000000000000000000000000000000000000017', - 'swh:1:rev:0000000000000000000000000000000000000018', - 'swh:1:rel:0000000000000000000000000000000000000019' + "swh:1:dir:0000000000000000000000000000000000000016", + "swh:1:dir:0000000000000000000000000000000000000017", + "swh:1:rev:0000000000000000000000000000000000000018", + "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) def test_count(graph): - assert (graph['swh:1:ori:0000000000000000000000000000000000000021'] - .count_leaves() == 4) - assert (graph['swh:1:rel:0000000000000000000000000000000000000010'] - .count_visit_nodes(edges='rel:rev,rev:rev') == 3) - assert (graph['swh:1:rev:0000000000000000000000000000000000000009'] - .count_neighbors(direction='backward') == 3) + assert ( + graph["swh:1:ori:0000000000000000000000000000000000000021"].count_leaves() == 4 + ) + assert ( + graph["swh:1:rel:0000000000000000000000000000000000000010"].count_visit_nodes( + edges="rel:rev,rev:rev" + ) + == 3 + ) + assert ( + graph["swh:1:rev:0000000000000000000000000000000000000009"].count_neighbors( + direction="backward" + ) + == 3 + ) def test_iter_type(graph): - rev_list = list(graph.iter_type('rev')) + rev_list = list(graph.iter_type("rev")) actual = [n.pid for n in rev_list] - expected = ['swh:1:rev:0000000000000000000000000000000000000003', - 'swh:1:rev:0000000000000000000000000000000000000009', - 'swh:1:rev:0000000000000000000000000000000000000013', - 'swh:1:rev:0000000000000000000000000000000000000018'] + expected = [ + "swh:1:rev:0000000000000000000000000000000000000003", + "swh:1:rev:0000000000000000000000000000000000000009", + "swh:1:rev:0000000000000000000000000000000000000013", + "swh:1:rev:0000000000000000000000000000000000000018", + ] assert expected == actual diff --git a/swh/graph/tests/test_pid.py b/swh/graph/tests/test_pid.py index 69cb927..110c61e 100644 --- a/swh/graph/tests/test_pid.py +++ b/swh/graph/tests/test_pid.py @@ -1,201 +1,202 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest from itertools import islice from swh.graph.pid import str_to_bytes, bytes_to_str from swh.graph.pid import PidToNodeMap, NodeToPidMap from swh.model.identifiers import PID_TYPES class TestPidSerialization(unittest.TestCase): pairs = [ - ('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', - bytes.fromhex('01' + '00' + - '94a9ed024d3859793618152ea559a168bbcbb5e2')), - ('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', - bytes.fromhex('01' + '01' + - 'd198bc9d7a6bcf6db04f476d29314f157507d505')), - ('swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f', - bytes.fromhex('01' + '02' + - 'b63a575fe3faab7692c9f38fb09d4bb45651bb0f')), - ('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', - bytes.fromhex('01' + '03' + - '22ece559cc7cc2364edc5e5593d63ae8bd229f9f')), - ('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', - bytes.fromhex('01' + '04' + - '309cf2674ee7a0749978cf8265ab91a60aea0f7d')), - ('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', - bytes.fromhex('01' + '05' + - 'c7c108084bc0bf3d81436bf980b46e98bd338453')), + ( + "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", + bytes.fromhex("01" + "00" + "94a9ed024d3859793618152ea559a168bbcbb5e2"), + ), + ( + "swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505", + bytes.fromhex("01" + "01" + "d198bc9d7a6bcf6db04f476d29314f157507d505"), + ), + ( + "swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f", + bytes.fromhex("01" + "02" + "b63a575fe3faab7692c9f38fb09d4bb45651bb0f"), + ), + ( + "swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f", + bytes.fromhex("01" + "03" + "22ece559cc7cc2364edc5e5593d63ae8bd229f9f"), + ), + ( + "swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d", + bytes.fromhex("01" + "04" + "309cf2674ee7a0749978cf8265ab91a60aea0f7d"), + ), + ( + "swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453", + bytes.fromhex("01" + "05" + "c7c108084bc0bf3d81436bf980b46e98bd338453"), + ), ] def test_str_to_bytes(self): for (pid_str, pid_bytes) in self.pairs: self.assertEqual(str_to_bytes(pid_str), pid_bytes) def test_bytes_to_str(self): for (pid_str, pid_bytes) in self.pairs: self.assertEqual(bytes_to_str(pid_bytes), pid_str) def test_round_trip(self): for (pid_str, pid_bytes) in self.pairs: self.assertEqual(pid_str, bytes_to_str(str_to_bytes(pid_str))) self.assertEqual(pid_bytes, str_to_bytes(bytes_to_str(pid_bytes))) -def gen_records(types=['cnt', 'dir', 'ori', 'rel', 'rev', 'snp'], - length=10000): +def gen_records(types=["cnt", "dir", "ori", "rel", "rev", "snp"], length=10000): """generate sequential PID/int records, suitable for filling int<->pid maps for testing swh-graph on-disk binary databases Args: types (list): list of PID types to be generated, specified as the corresponding 3-letter component in PIDs length (int): number of PIDs to generate *per type* Yields: pairs (pid, int) where pid is a textual PID and int its sequential integer identifier """ pos = 0 for t in sorted(types): for i in range(0, length): - seq = format(pos, 'x') # current position as hex string - pid = 'swh:1:{}:{}{}'.format(t, '0' * (40 - len(seq)), seq) + seq = format(pos, "x") # current position as hex string + pid = "swh:1:{}:{}{}".format(t, "0" * (40 - len(seq)), seq) yield (pid, pos) pos += 1 # pairs PID/position in the sequence generated by :func:`gen_records` above MAP_PAIRS = [ - ('swh:1:cnt:0000000000000000000000000000000000000000', 0), - ('swh:1:cnt:000000000000000000000000000000000000002a', 42), - ('swh:1:dir:0000000000000000000000000000000000002afc', 11004), - ('swh:1:ori:00000000000000000000000000000000000056ce', 22222), - ('swh:1:rel:0000000000000000000000000000000000008235', 33333), - ('swh:1:rev:000000000000000000000000000000000000ad9c', 44444), - ('swh:1:snp:000000000000000000000000000000000000ea5f', 59999), + ("swh:1:cnt:0000000000000000000000000000000000000000", 0), + ("swh:1:cnt:000000000000000000000000000000000000002a", 42), + ("swh:1:dir:0000000000000000000000000000000000002afc", 11004), + ("swh:1:ori:00000000000000000000000000000000000056ce", 22222), + ("swh:1:rel:0000000000000000000000000000000000008235", 33333), + ("swh:1:rev:000000000000000000000000000000000000ad9c", 44444), + ("swh:1:snp:000000000000000000000000000000000000ea5f", 59999), ] class TestPidToNodeMap(unittest.TestCase): - @classmethod def setUpClass(cls): """create reasonably sized (~2 MB) PID->int map to test on-disk DB """ - cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.') - cls.fname = os.path.join(cls.tmpdir, 'pid2int.bin') - with open(cls.fname, 'wb') as f: + cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.") + cls.fname = os.path.join(cls.tmpdir, "pid2int.bin") + with open(cls.fname, "wb") as f: for (pid, i) in gen_records(length=10000): PidToNodeMap.write_record(f, pid, i) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): self.map = PidToNodeMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): for (pid, pos) in MAP_PAIRS: self.assertEqual(self.map[pid], pos) def test_missing(self): with self.assertRaises(KeyError): - self.map['swh:1:ori:0101010100000000000000000000000000000000'], + self.map["swh:1:ori:0101010100000000000000000000000000000000"], with self.assertRaises(KeyError): - self.map['swh:1:cnt:0101010100000000000000000000000000000000'], + self.map["swh:1:cnt:0101010100000000000000000000000000000000"], def test_type_error(self): with self.assertRaises(TypeError): self.map[42] with self.assertRaises(TypeError): self.map[1.2] def test_update(self): - fname2 = self.fname + '.update' + fname2 = self.fname + ".update" shutil.copy(self.fname, fname2) # fresh map copy - map2 = PidToNodeMap(fname2, mode='rb+') + map2 = PidToNodeMap(fname2, mode="rb+") for (pid, int) in islice(map2, 11): # update the first N items new_int = int + 42 map2[pid] = new_int self.assertEqual(map2[pid], new_int) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this def test_iter_type(self): for t in PID_TYPES: first_20 = list(islice(self.map.iter_type(t), 20)) k = first_20[0][1] - expected = [('swh:1:{}:{:040x}'.format(t, i), i) - for i in range(k, k + 20)] + expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected def test_iter_prefix(self): for t in PID_TYPES: - prefix = self.map.iter_prefix('swh:1:{}:00'.format(t)) + prefix = self.map.iter_prefix("swh:1:{}:00".format(t)) first_20 = list(islice(prefix, 20)) k = first_20[0][1] - expected = [('swh:1:{}:{:040x}'.format(t, i), i) - for i in range(k, k + 20)] + expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected class TestNodeToPidMap(unittest.TestCase): - @classmethod def setUpClass(cls): """create reasonably sized (~1 MB) int->PID map to test on-disk DB """ - cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.') - cls.fname = os.path.join(cls.tmpdir, 'int2pid.bin') - with open(cls.fname, 'wb') as f: + cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.") + cls.fname = os.path.join(cls.tmpdir, "int2pid.bin") + with open(cls.fname, "wb") as f: for (pid, _i) in gen_records(length=10000): NodeToPidMap.write_record(f, pid) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): self.map = NodeToPidMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): for (pid, pos) in MAP_PAIRS: self.assertEqual(self.map[pos], pid) def test_out_of_bounds(self): with self.assertRaises(IndexError): self.map[1000000] with self.assertRaises(IndexError): self.map[-1000000] def test_update(self): - fname2 = self.fname + '.update' + fname2 = self.fname + ".update" shutil.copy(self.fname, fname2) # fresh map copy - map2 = NodeToPidMap(fname2, mode='rb+') + map2 = NodeToPidMap(fname2, mode="rb+") for (int, pid) in islice(map2, 11): # update the first N items - new_pid = pid.replace(':0', ':f') # mangle first hex digit + new_pid = pid.replace(":0", ":f") # mangle first hex digit map2[int] = new_pid self.assertEqual(map2[int], new_pid) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index b50dea8..291c04d 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,215 +1,271 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ import logging import os import subprocess from enum import Enum from datetime import datetime from pathlib import Path from typing import Dict, List, Set from click import ParamType from swh.graph.config import check_config_compress class CompressionStep(Enum): MPH = 1 BV = 2 BV_OBL = 3 BFS = 4 PERMUTE = 5 PERMUTE_OBL = 6 STATS = 7 TRANSPOSE = 8 TRANSPOSE_OBL = 9 MAPS = 10 CLEAN_TMP = 11 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV = { - CompressionStep.MPH: - ['{java}', 'it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction', - '--temp-dir', '{tmp_dir}', '{out_dir}/{graph_name}.mph', - '<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )'], + CompressionStep.MPH: [ + "{java}", + "it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction", + "--temp-dir", + "{tmp_dir}", + "{out_dir}/{graph_name}.mph", + "<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )", + ], # use process substitution (and hence FIFO) above as MPH class load the # entire file in memory when reading from stdin - CompressionStep.BV: - ['zstdcat', '{in_dir}/{graph_name}.edges.csv.zst', '|', - '{java}', 'it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph', - '--temp-dir', '{tmp_dir}', - '--function', '{out_dir}/{graph_name}.mph', '{out_dir}/{graph_name}-bv'], - CompressionStep.BV_OBL: - ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', - '--list', '{out_dir}/{graph_name}-bv'], - CompressionStep.BFS: - ['{java}', 'it.unimi.dsi.law.big.graph.BFS', - '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}.order'], - CompressionStep.PERMUTE: - ['{java}', 'it.unimi.dsi.big.webgraph.Transform', - 'mapOffline', '{out_dir}/{graph_name}-bv', '{out_dir}/{graph_name}', - '{out_dir}/{graph_name}.order', '{batch_size}', '{tmp_dir}'], - CompressionStep.PERMUTE_OBL: - ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', - '--list', '{out_dir}/{graph_name}'], - CompressionStep.STATS: - ['{java}', 'it.unimi.dsi.big.webgraph.Stats', - '{out_dir}/{graph_name}'], - CompressionStep.TRANSPOSE: - ['{java}', 'it.unimi.dsi.big.webgraph.Transform', - 'transposeOffline', '{out_dir}/{graph_name}', - '{out_dir}/{graph_name}-transposed', '{batch_size}', '{tmp_dir}'], - CompressionStep.TRANSPOSE_OBL: - ['{java}', 'it.unimi.dsi.big.webgraph.BVGraph', - '--list', '{out_dir}/{graph_name}-transposed'], - CompressionStep.MAPS: - ['zstdcat', '{in_dir}/{graph_name}.nodes.csv.zst', '|', - '{java}', 'org.softwareheritage.graph.backend.MapBuilder', - '{out_dir}/{graph_name}', '{tmp_dir}'], - CompressionStep.CLEAN_TMP: - ['rm', '-rf', - '{out_dir}/{graph_name}-bv.graph', - '{out_dir}/{graph_name}-bv.obl', - '{out_dir}/{graph_name}-bv.offsets', - '{tmp_dir}'], + CompressionStep.BV: [ + "zstdcat", + "{in_dir}/{graph_name}.edges.csv.zst", + "|", + "{java}", + "it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph", + "--temp-dir", + "{tmp_dir}", + "--function", + "{out_dir}/{graph_name}.mph", + "{out_dir}/{graph_name}-bv", + ], + CompressionStep.BV_OBL: [ + "{java}", + "it.unimi.dsi.big.webgraph.BVGraph", + "--list", + "{out_dir}/{graph_name}-bv", + ], + CompressionStep.BFS: [ + "{java}", + "it.unimi.dsi.law.big.graph.BFS", + "{out_dir}/{graph_name}-bv", + "{out_dir}/{graph_name}.order", + ], + CompressionStep.PERMUTE: [ + "{java}", + "it.unimi.dsi.big.webgraph.Transform", + "mapOffline", + "{out_dir}/{graph_name}-bv", + "{out_dir}/{graph_name}", + "{out_dir}/{graph_name}.order", + "{batch_size}", + "{tmp_dir}", + ], + CompressionStep.PERMUTE_OBL: [ + "{java}", + "it.unimi.dsi.big.webgraph.BVGraph", + "--list", + "{out_dir}/{graph_name}", + ], + CompressionStep.STATS: [ + "{java}", + "it.unimi.dsi.big.webgraph.Stats", + "{out_dir}/{graph_name}", + ], + CompressionStep.TRANSPOSE: [ + "{java}", + "it.unimi.dsi.big.webgraph.Transform", + "transposeOffline", + "{out_dir}/{graph_name}", + "{out_dir}/{graph_name}-transposed", + "{batch_size}", + "{tmp_dir}", + ], + CompressionStep.TRANSPOSE_OBL: [ + "{java}", + "it.unimi.dsi.big.webgraph.BVGraph", + "--list", + "{out_dir}/{graph_name}-transposed", + ], + CompressionStep.MAPS: [ + "zstdcat", + "{in_dir}/{graph_name}.nodes.csv.zst", + "|", + "{java}", + "org.softwareheritage.graph.backend.MapBuilder", + "{out_dir}/{graph_name}", + "{tmp_dir}", + ], + CompressionStep.CLEAN_TMP: [ + "rm", + "-rf", + "{out_dir}/{graph_name}-bv.graph", + "{out_dir}/{graph_name}-bv.obl", + "{out_dir}/{graph_name}-bv.offsets", + "{tmp_dir}", + ], } # type: Dict[CompressionStep, List[str]] class StepOption(ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ + name = "compression step" def convert(self, value, param, ctx) -> Set[CompressionStep]: steps = set() # type: Set[CompressionStep] - specs = value.split(',') + specs = value.split(",") for spec in specs: - if '-' in spec: # step range - (raw_l, raw_r) = spec.split('-', maxsplit=1) - if raw_l == '': # no left endpoint + if "-" in spec: # step range + (raw_l, raw_r) = spec.split("-", maxsplit=1) + if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name - if raw_r == '': # no right endpoint + if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: - self.fail(f'invalid step specification: {value}, ' - f'see --help') + self.fail(f"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() - steps = steps.union(set(map(CompressionStep, - range(l_idx.value, - r_idx.value + 1)))) + steps = steps.union( + set(map(CompressionStep, range(l_idx.value, r_idx.value + 1))) + ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: - self.fail(f'invalid step specification: {value}, ' - f'see --help') + self.fail( + f"invalid step specification: {value}, " f"see --help" + ) return steps def do_step(step, conf): - cmd = ' '.join(STEP_ARGV[step]).format(**conf) + cmd = " ".join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() - cmd_env['JAVA_TOOL_OPTIONS'] = conf['java_tool_options'] - cmd_env['CLASSPATH'] = conf['classpath'] - - logging.info(f'running: {cmd}') - process = subprocess.Popen(['/bin/bash', '-c', cmd], - env=cmd_env, encoding='utf8', - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"] + cmd_env["CLASSPATH"] = conf["classpath"] + + logging.info(f"running: {cmd}") + process = subprocess.Popen( + ["/bin/bash", "-c", cmd], + env=cmd_env, + encoding="utf8", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: - raise RuntimeError(f'compression step {step} returned non-zero ' - f'exit code {rc}') + raise RuntimeError( + f"compression step {step} returned non-zero " f"exit code {rc}" + ) else: return rc -def compress(graph_name: str, in_dir: Path, out_dir: Path, - steps: Set[CompressionStep] = set(COMP_SEQ), - conf: Dict[str, str] = {}): +def compress( + graph_name: str, + in_dir: Path, + out_dir: Path, + steps: Set[CompressionStep] = set(COMP_SEQ), + conf: Dict[str, str] = {}, +): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config_compress(conf, graph_name, in_dir, out_dir) compression_start_time = datetime.now() - logging.info(f'starting compression at {compression_start_time}') + logging.info(f"starting compression at {compression_start_time}") seq_no = 0 for step in COMP_SEQ: if step not in steps: - logging.debug(f'skipping compression step {step}') + logging.debug(f"skipping compression step {step}") continue seq_no += 1 step_start_time = datetime.now() - logging.info(f'starting compression step {step} ' - f'({seq_no}/{len(steps)}) at {step_start_time}') + logging.info( + f"starting compression step {step} " + f"({seq_no}/{len(steps)}) at {step_start_time}" + ) do_step(step, conf) step_end_time = datetime.now() step_duration = step_end_time - step_start_time - logging.info(f'completed compression step {step} ' - f'({seq_no}/{len(steps)}) ' - f'at {step_end_time} in {step_duration}') + logging.info( + f"completed compression step {step} " + f"({seq_no}/{len(steps)}) " + f"at {step_end_time} in {step_duration}" + ) compression_end_time = datetime.now() compression_duration = compression_end_time - compression_start_time - logging.info(f'completed compression in {compression_duration}') + logging.info(f"completed compression in {compression_duration}") diff --git a/tools/git2graph/git2graph.c b/tools/git2graph/git2graph.c index 9346c5f..c16fbc0 100644 --- a/tools/git2graph/git2graph.c +++ b/tools/git2graph/git2graph.c @@ -1,735 +1,735 @@ /* * Copyright (C) 2019 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU General Public License version 3, or any later version * See top-level LICENSE file for more information */ /* Crawls a Git repository and outputs it as a graph, i.e., as a pair of * textual files . The nodes file will contain a list of graph * nodes as Software Heritage (SWH) Persistent Identifiers (PIDs); the edges * file a list of graph edges as PID pairs. */ #include #include #include #include #include #include #include #include #include #include #define SWH_PREFIX "swh:1" #define SWH_DIR_PRE "swh:1:dir" #define SWH_ORI_PRE "swh:1:ori" #define SWH_REV_PRE "swh:1:rev" #define SWH_SNP_PRE "swh:1:snp" #define SWH_PIDSZ (GIT_OID_HEXSZ + 10) // size of a SWH PID // line-lengths in nodes and edges file #define NODES_LINELEN (SWH_PIDSZ + 1) #define EDGES_LINELEN (SWH_PIDSZ * 2 + 2) // Output buffer sizes for nodes and edges files. To guarantee atomic and // non-interleaved writes (which matter when used concurrently writing to a // shared FIFO), these sizes must be <= PIPE_BUF and multiples of // {NODES,EDGES}_LINELEN. #define NODES_OUTSZ ((PIPE_BUF / NODES_LINELEN) * NODES_LINELEN) #define EDGES_OUTSZ ((PIPE_BUF / EDGES_LINELEN) * EDGES_LINELEN) // GIT_OBJ_* constants extension for non-git objects #define SWH_OBJ_SNP 5 // snapshots (swh:1:snp:...) #define SWH_OBJ_ORI 6 // origins (swh:1:ori:...) #define SWH_OBJ_LOC 7 // lines of code (swh:1:loc:...) #define OBJ_TYPES 8 #define ELT_SEP "," // element separator in lists #define PAIR_SEP ":" // key/value separator in paris /* map from libgit2's git_otype (+ SWH-specific types above) to SWH PID type * qualifiers */ static char *_git_otype2swh[OBJ_TYPES] = { "*", // 0 == GIT_OBJ__EXT1 (unused in libgit2, used as wildcard here) "rev", // 1 == GIT_OBJ_COMMIT "dir", // 2 == GIT_OBJ_TREE "cnt", // 3 == GIT_OBJ_BLOB "rel", // 4 == GIT_OBJ_TAG "snp", // 5 == SWH_OBJ_SNP "ori", // 6 == SWH_OBJ_ORI "loc", // 7 == SWH_OBJ_LOC }; /* map from libgit2's git_otype (+ SWH-specific types above) to long names of * SWH PID types. Used most notably to assemble snapshot manifests; see * https://docs.softwareheritage.org/devel/apidoc/swh.model.html#swh.model.identifiers.snapshot_identifier */ static char *_git_otype2swhlong[OBJ_TYPES] = { "ERROR", // 0 == GIT_OBJ__EXT1 "revision", // 1 == GIT_OBJ_COMMIT "directory", // 2 == GIT_OBJ_TREE "content", // 3 == GIT_OBJ_BLOB "release", // 4 == GIT_OBJ_TAG "snapshot", // 5 == SWH_OBJ_SNP "origin", // 6 == SWH_OBJ_ORI "line", // 7 == SWH_OBJ_LOC }; #define GIT_OBJ_ANY GIT_OBJ__EXT1 /* Convert a git object type (+ SWH-specific types above) to the corresponding * SWH PID type. */ #define git_otype2swh(type) _git_otype2swh[(type)] #define git_otype2swhlong(type) _git_otype2swhlong[(type)] /* Parse object type (libgit's + SWH-specific types) from 3-letter type * qualifiers. Return either object type, or 0 in case of "*" wildcard, or -1 * in case of parse error. */ int parse_otype(char *str) { for (int i = 0; i < OBJ_TYPES; i++) { if (strcmp(str, _git_otype2swh[i]) == 0) return i; } return -1; } /* Allowed edge types matrix. Each cell denotes whether edges from a given * SRC_TYPE to a given DST_TYPE should be produced or not. */ static int _allowed_edges[OBJ_TYPES][OBJ_TYPES] = { // TO rev dir cnt rel snp ori loc | // ---------------------------------------------------------------- {true, true, true, true, true, true, true, true}, // | FROM {true, true, true, true, true, true, true, true}, // | rev {true, true, true, true, true, true, true, true}, // | dir {true, true, true, true, true, true, true, true}, // | cnt {true, true, true, true, true, true, true, true}, // | rel {true, true, true, true, true, true, true, true}, // | snp {true, true, true, true, true, true, true, true}, // | ori {true, true, true, true, true, true, true, true}, // | loc }; /* Allowed node types vector. */ static int _allowed_nodes[OBJ_TYPES] = { true, // true, // rev true, // dir true, // cnt true, // rel true, // snp true, // ori true, // loc }; #define is_edge_allowed(src_type, dst_type) _allowed_edges[(src_type)][(dst_type)] #define is_node_allowed(type) _allowed_nodes[(type)] /* runtime configuration, for node/edges emitter functions */ typedef struct { git_odb *odb; // Git object DB git_repository *repo; // Git repository FILE *nodes_out; // stream to write nodes to, or NULL FILE *edges_out; // stream to write edges to, or NULL } config_t; /* context for iterating over refs */ typedef struct { GByteArray *manifest; // snapshot manifest, incrementally build GSList *pids; // target PIDs, incrementally collected char *snapshot_pid; // snapshot PID, initially missing config_t *conf; // runtime configuration } ref_ctxt_t; /* Invoke a libgit2 method and exits with an error message in case of * failure. - * + * * Reused from libgit2 examples, specifically common.c, available under CC0. */ void check_lg2(int error, const char *message, const char *extra) { const git_error *lg2err; const char *lg2msg = "", *lg2spacer = ""; if (!error) return; if ((lg2err = giterr_last()) != NULL && lg2err->message != NULL) { lg2msg = lg2err->message; lg2spacer = " - "; } if (extra) fprintf(stderr, "%s '%s' [%d]%s%s\n", message, extra, error, lg2spacer, lg2msg); else fprintf(stderr, "%s [%d]%s%s\n", message, error, lg2spacer, lg2msg); exit(1); } /* Emit commit edges. */ void emit_commit_edges(const git_commit *commit, const char *swhpid, FILE *out) { unsigned int i, max_i; char oidstr[GIT_OID_HEXSZ + 1]; // to PID // rev -> dir if (is_edge_allowed(GIT_OBJ_COMMIT, GIT_OBJ_TREE)) { git_oid_tostr(oidstr, sizeof(oidstr), git_commit_tree_id(commit)); fprintf(out, "%s %s:%s\n", swhpid, SWH_DIR_PRE, oidstr); } // rev -> rev if (is_edge_allowed(GIT_OBJ_COMMIT, GIT_OBJ_COMMIT)) { max_i = (unsigned int)git_commit_parentcount(commit); for (i = 0; i < max_i; ++i) { git_oid_tostr(oidstr, sizeof(oidstr), git_commit_parent_id(commit, i)); fprintf(out, "%s %s:%s\n", swhpid, SWH_REV_PRE, oidstr); } } } /* Emit tag edges. */ void emit_tag_edges(const git_tag *tag, const char *swhpid, FILE *out) { char oidstr[GIT_OID_HEXSZ + 1]; int target_type; // rel -> * target_type = git_tag_target_type(tag); if (is_edge_allowed(GIT_OBJ_TAG, target_type)) { git_oid_tostr(oidstr, sizeof(oidstr), git_tag_target_id(tag)); fprintf(out, "%s %s:%s:%s\n", swhpid, SWH_PREFIX, git_otype2swh(target_type), oidstr); } } /* Emit tree edges. */ void emit_tree_edges(const git_tree *tree, const char *swhpid, FILE *out) { size_t i, max_i = (int)git_tree_entrycount(tree); char oidstr[GIT_OID_HEXSZ + 1]; const git_tree_entry *te; int entry_type; // dir -> * for (i = 0; i < max_i; ++i) { te = git_tree_entry_byindex(tree, i); entry_type = git_tree_entry_type(te); if (is_edge_allowed(GIT_OBJ_TREE, entry_type)) { git_oid_tostr(oidstr, sizeof(oidstr), git_tree_entry_id(te)); fprintf(out, "%s %s:%s:%s\n", swhpid, SWH_PREFIX, git_otype2swh(entry_type), oidstr); } } } /* Emit node and edges for current object. */ int emit_obj(const git_oid *oid, config_t *conf) { char oidstr[GIT_OID_HEXSZ + 1]; char swhpid[SWH_PIDSZ + 1]; size_t len; int obj_type; git_commit *commit; git_tag *tag; git_tree *tree; git_odb *odb = conf->odb; git_repository *repo = conf->repo; FILE *nodes_out = conf->nodes_out; FILE *edges_out = conf->edges_out; check_lg2(git_odb_read_header(&len, &obj_type, odb, oid), "cannot read object header", NULL); // emit node sprintf(swhpid, "swh:1:%s:", git_otype2swh(obj_type)); git_oid_tostr(swhpid + 10, sizeof(oidstr), oid); if (nodes_out != NULL && is_node_allowed(obj_type)) fprintf(nodes_out, "%s\n", swhpid); if (edges_out != NULL) { // emit edges switch (obj_type) { case GIT_OBJ_BLOB: // graph leaf: no edges to emit break; case GIT_OBJ_COMMIT: check_lg2(git_commit_lookup(&commit, repo, oid), "cannot find commit", NULL); emit_commit_edges(commit, swhpid, edges_out); git_commit_free(commit); break; case GIT_OBJ_TAG: check_lg2(git_tag_lookup(&tag, repo, oid), "cannot find tag", NULL); emit_tag_edges(tag, swhpid, edges_out); git_tag_free(tag); break; case GIT_OBJ_TREE: check_lg2(git_tree_lookup(&tree, repo, oid), "cannot find tree", NULL); emit_tree_edges(tree, swhpid, edges_out); git_tree_free(tree); break; default: git_oid_tostr(oidstr, sizeof(oidstr), oid); fprintf(stderr, "E: ignoring unknown object: %s\n", oidstr); break; } } return 0; } /* Callback for emit_snapshots. Add a git reference to a snapshot manifest * (payload->manifest), according to the snapshot PID spec, see: * https://docs.softwareheritage.org/devel/apidoc/swh.model.html#swh.model.identifiers.snapshot_identifier . * As a side effect collect PIDs of references objects in payload->pids for * later reuse. * * Sample manifest entries for the tests/data/sample-repo.tgz repository: * * revision HEAD 9bf3ce249cf3d74ef57d5a1fb4227e26818553f0 * revision refs/heads/feature/baz 261586c455130b4bf10a5be7ffb0bf4077581b56 * revision refs/heads/feature/qux 20cca959bae94594f60450f339b408581f1b401f * revision refs/heads/master 9bf3ce249cf3d74ef57d5a1fb4227e26818553f0 * release refs/tags/1.0 1720af781051a8cafdf3cf134c263ec5c5e72412 * release refs/tags/1.1 d48ad9915be780fcfa296985f69df35e144864a5 */ int _snapshot_add_ref(char *ref_name, ref_ctxt_t *ctxt) { const git_oid *oid; size_t len; int dangling, obj_type, ref_type; const char *target_type, *target_name; char ascii_len[GIT_OID_HEXSZ]; char oidstr[GIT_OID_HEXSZ + 1]; char *swhpid = malloc(SWH_PIDSZ + 1); git_reference *ref, *ref2; check_lg2(git_reference_lookup(&ref, ctxt->conf->repo, ref_name), "cannot find reference", NULL); ref_type = git_reference_type(ref); assert(ref_type == GIT_REF_OID || ref_type == GIT_REF_SYMBOLIC); if (ref_type == GIT_REF_OID) { oid = git_reference_target(ref); dangling = (oid == NULL); } else { // ref_type == GIT_REF_SYMBOLIC target_name = git_reference_symbolic_target(ref); dangling = (target_name == NULL); } if (dangling) { // target type target_type = "dangling"; } else if (ref_type == GIT_REF_OID) { // non dangling, direct ref check_lg2(git_odb_read_header(&len, &obj_type, ctxt->conf->odb, oid), "cannot read object header", NULL); target_type = git_otype2swhlong(obj_type); } else { // non dangling, symbolic ref target_type = "alias"; // recurse to lookup OID and type for PID generation check_lg2(git_reference_resolve(&ref2, ref), "cannot resolve symbolic reference", NULL); assert(git_reference_type(ref2) == GIT_REF_OID); oid = git_reference_target(ref2); check_lg2(git_odb_read_header(&len, &obj_type, ctxt->conf->odb, oid), "cannot read object header", NULL); } g_byte_array_append(ctxt->manifest, (unsigned char *) target_type, strlen(target_type)); g_byte_array_append(ctxt->manifest, (unsigned char *) " ", 1); // reference name g_byte_array_append(ctxt->manifest, (unsigned char *) ref_name, strlen(ref_name)); g_byte_array_append(ctxt->manifest, (unsigned char *) "\0", 1); // (length-encoded) target ID if (dangling) { g_byte_array_append(ctxt->manifest, (unsigned char *) "0:", 2); } else { if (ref_type == GIT_REF_OID) { // direct ref g_byte_array_append(ctxt->manifest, (unsigned char *) "20:", 3); g_byte_array_append(ctxt->manifest, oid->id, 20); } else { // symbolic ref len = snprintf(ascii_len, sizeof(ascii_len), "%zd:", strlen(target_name)); assert(len <= sizeof(ascii_len)); g_byte_array_append(ctxt->manifest, (unsigned char *) ascii_len, len); g_byte_array_append(ctxt->manifest, (unsigned char *) target_name, strlen(target_name)); } } sprintf(swhpid, "swh:1:%s:", git_otype2swh(obj_type)); git_oid_tostr(swhpid + 10, sizeof(oidstr), oid); if (g_slist_find_custom(ctxt->pids, swhpid, (GCompareFunc) strcmp) == NULL) // avoid duplicate outbound snp->* edges ctxt->pids = g_slist_prepend(ctxt->pids, swhpid); git_reference_free(ref); // git_reference_free(ref2); // XXX triggers double-free, WTH return 0; } /* emit an edge snp->* */ void emit_snapshot_edge(char *target_pid, ref_ctxt_t *ctxt) { FILE *edges_out = ctxt->conf->edges_out; char **pid_parts, **ptr; pid_parts = g_strsplit(target_pid, ":", 4); ptr = pid_parts; ptr++; ptr++; // move ptr to PID type component int target_type = parse_otype(*ptr); g_strfreev(pid_parts); if (edges_out != NULL && is_edge_allowed(SWH_OBJ_SNP, target_type)) fprintf(edges_out, "%s %s\n", ctxt->snapshot_pid, target_pid); } int _collect_ref_name(const char *name, GSList **ref_names) { *ref_names = g_slist_insert_sorted(*ref_names, (gpointer) strdup(name), (GCompareFunc) strcmp); return 0; } /* Emit origin nodes and their outbound edges. Return the snapshot PID as a * freshly allocated string that should be freed by the caller. */ char *emit_snapshot(config_t *conf) { gchar *hex_sha1; char *snapshot_pid; GBytes *manifest; char manifest_header[GIT_OID_HEXSZ]; GSList *ref_names = NULL; FILE *nodes_out = conf->nodes_out; int len; snapshot_pid = malloc(SWH_PIDSZ + 1); ref_ctxt_t *ctxt = malloc(sizeof(ref_ctxt_t)); ctxt->manifest = g_byte_array_new(); ctxt->pids = NULL; ctxt->snapshot_pid = NULL; ctxt->conf = conf; // XXX TODO this does not return symbolic refs, making snapshot PIDs // potentially incompatible with `swh identify` :-( As a partial // workaround we explicitly add HEAD here. git_reference_foreach_name(conf->repo, (git_reference_foreach_name_cb) _collect_ref_name, &ref_names); // collect refs, sorted by name ref_names = g_slist_insert_sorted(ref_names, (gpointer) "HEAD", (GCompareFunc) strcmp); /* iterate over refs to assemble manifest; side-effect: fill ctxt->pids */ g_slist_foreach(ref_names, (GFunc) _snapshot_add_ref, (gpointer) ctxt); ctxt->pids = g_slist_reverse(ctxt->pids); /* prepend header for salted git hashes */ len = snprintf(manifest_header, sizeof(manifest_header), "snapshot %d", ctxt->manifest->len); assert(len <= sizeof(manifest_header)); g_byte_array_prepend(ctxt->manifest, (unsigned char *) "\0", 1); g_byte_array_prepend(ctxt->manifest, (unsigned char *) manifest_header, len); /* compute snapshot PID and emit snapshot node */ manifest = g_byte_array_free_to_bytes(ctxt->manifest); ctxt->manifest = NULL; // memory has been freed by *_free_to_bytes hex_sha1 = g_compute_checksum_for_bytes(G_CHECKSUM_SHA1, manifest); sprintf(snapshot_pid, "%s:%s", SWH_SNP_PRE, hex_sha1); ctxt->snapshot_pid = snapshot_pid; if (nodes_out != NULL && is_node_allowed(SWH_OBJ_SNP)) fprintf(nodes_out, "%s\n", snapshot_pid); /* emit snp->* edges */ g_slist_foreach(ctxt->pids, (GFunc) emit_snapshot_edge, (void *) ctxt); g_slist_free_full(ctxt->pids, (GDestroyNotify) free); free(ctxt); g_free(hex_sha1); return snapshot_pid; } /* emit origin node and its outbound edges (to snapshots) */ void emit_origin(char *origin_url, config_t *conf, char *snapshot_pid) { gchar *hex_sha1; char origin_pid[SWH_PIDSZ + 1]; FILE *nodes_out = conf->nodes_out; FILE *edges_out = conf->edges_out; if (nodes_out != NULL && is_node_allowed(SWH_OBJ_ORI)) { hex_sha1 = g_compute_checksum_for_string( G_CHECKSUM_SHA1, origin_url, strlen(origin_url)); sprintf(origin_pid, "%s:%s", SWH_ORI_PRE, hex_sha1); fprintf(nodes_out, "%s\n", origin_pid); g_free(hex_sha1); } if (edges_out != NULL && is_edge_allowed(SWH_OBJ_ORI, SWH_OBJ_SNP)) fprintf(edges_out, "%s %s\n", origin_pid, snapshot_pid); } void exit_usage(char *msg) { if (msg != NULL) fprintf(stderr, "Error: %s\n\n", msg); fprintf(stderr, "Usage: git2graph [OPTION..] GIT_REPO_DIR\n"); fprintf(stderr, "\n"); fprintf(stderr, "Options:\n"); fprintf(stderr, " -e, --edges-output=PATH edges output file (default: stdout)\n"); fprintf(stderr, " -n, --nodes-output=PATH nodes output file (default: stdout)\n"); fprintf(stderr, " -E, --edges-filter=EDGES_EXPR only emit selected edges\n"); fprintf(stderr, " -N, --nodes-filter=NODES_EXPR only emit selected nodes\n"); fprintf(stderr, " -o, --origin=URL repository origin URL\n"); fprintf(stderr, "\n"); fprintf(stderr, "EDGES_EXPR is a comma separate list of src_TYPE:dst_TYPE pairs\n"); fprintf(stderr, "NODES_EXPR is a comme separate list of node TYPEs\n"); fprintf(stderr, "{NODES,EDGES}_EXPR can be empty strings to filter *out* all elements.\n"); fprintf(stderr, "TYPE is one of: cnt, dir, loc, ori, rel, rev, snp, *\n"); fprintf(stderr, "\nNote: you can use \"-\" for stdout in file names.\n"); exit(EXIT_FAILURE); } /* command line arguments */ typedef struct { char *nodes_out; // path of nodes outputs file char *edges_out; // path of edges outputs file char *nodes_filter; // nodes filter expression char *edges_filter; // edges filter expression char *origin_url; // origin URL char *repo_dir; // repository directory } cli_args_t; cli_args_t *parse_cli(int argc, char **argv) { int opt; cli_args_t *args = malloc(sizeof(cli_args_t)); if (args == NULL) { perror("Cannot allocate memory."); exit(EXIT_FAILURE); } else { args->nodes_out = NULL; args->edges_out = NULL; args->nodes_filter = NULL; args->edges_filter = NULL; args->origin_url = NULL; args->repo_dir = NULL; } static struct option long_opts[] = { {"edges-output", required_argument, 0, 'e' }, {"nodes-output", required_argument, 0, 'n' }, {"edges-filter", required_argument, 0, 'E' }, {"nodes-filter", required_argument, 0, 'N' }, {"origin", required_argument, 0, 'o' }, {"help", no_argument, 0, 'h' }, {0, 0, 0, 0 } }; while ((opt = getopt_long(argc, argv, "e:n:E:N:o:h", long_opts, NULL)) != -1) { switch (opt) { case 'e': args->edges_out = optarg; break; case 'n': args->nodes_out = optarg; break; case 'E': args->edges_filter = optarg; break; case 'N': args->nodes_filter = optarg; break; case 'o': args->origin_url = optarg; break; case 'h': default: exit_usage(NULL); } } if (argv[optind] == NULL) exit_usage(NULL); args->repo_dir = argv[optind]; if (args->edges_out == NULL) args->edges_out = "-"; if (args->nodes_out == NULL) args->nodes_out = "-"; return args; } /* open output stream specified on the command line (if at all) */ FILE *open_out_stream(char *cli_path, char *buf, int bufsiz) { FILE *stream; if (cli_path == NULL) stream = NULL; else if (strcmp(cli_path, "-") == 0) stream = stdout; else if((stream = fopen(cli_path, "w")) == NULL) { fprintf(stderr, "can't open file: %s\n", cli_path); exit(EXIT_FAILURE); } // ensure atomic and non-interleaved writes if (stream != NULL) setvbuf(stream, buf, _IOFBF, bufsiz); return stream; } void fill_matrix(int matrix[OBJ_TYPES][OBJ_TYPES], int val) { for (int i = 0; i < OBJ_TYPES; i++) for (int j = 0; j < OBJ_TYPES; j++) matrix[i][j] = val; } void fill_row(int matrix[OBJ_TYPES][OBJ_TYPES], int row, int val) { for (int j = 0; j < OBJ_TYPES; j++) matrix[row][j] = val; } void fill_column(int matrix[OBJ_TYPES][OBJ_TYPES], int col, int val) { for (int i = 0; i < OBJ_TYPES; i++) matrix[i][col] = val; } void fill_vector(int vector[OBJ_TYPES], int val) { for (int i = 0; i < OBJ_TYPES; i++) vector[i] = val; } /* Dump node/edge filters to a given stream. For debugging purposes. */ void _dump_filters(FILE *out, int matrix[OBJ_TYPES][OBJ_TYPES], int vector[OBJ_TYPES]) { fprintf(out, "TO rev dir cnt rel snp ori loc FROM\n"); for(int i = 0; i < OBJ_TYPES; i++) { for(int j = 0; j < OBJ_TYPES; j++) fprintf(out, "%d ", matrix[i][j]); fprintf(out, "%s\n", _git_otype2swh[i]); } fprintf(out, " rev dir cnt rel snp ori loc\n"); for (int i = 0; i < OBJ_TYPES; i++) fprintf(out, "%d ", vector[i]); } /* set up nodes and edges restrictions, interpreting command line filters */ void init_graph_filters(char *nodes_filter, char *edges_filter) { char **filters; char **types; char **ptr; int src_type, dst_type; // Note: when either filter is NULL, the parsing loops below will be // skipped (due to g_strsplit's semantics on empty strings), which is // what we want: all elements will be forbidden. if (edges_filter != NULL) { fill_matrix(_allowed_edges, false); // nothing allowed by default filters = g_strsplit(edges_filter, ELT_SEP, -1); // "typ:typ" pairs for (ptr = filters; *ptr; ptr++) { types = g_strsplit(*ptr, PAIR_SEP, 2); // 2 "typ" fragments src_type = parse_otype(types[0]); dst_type = parse_otype(types[1]); if (src_type == GIT_OBJ_ANY && dst_type == GIT_OBJ_ANY) { // "*:*" wildcard fill_matrix(_allowed_edges, true); break; // all edges allowed already } else if (src_type == GIT_OBJ_ANY) { // "*:typ" wildcard fill_column(_allowed_edges, dst_type, true); } else if (dst_type == GIT_OBJ_ANY) { // "typ:*" wildcard fill_row(_allowed_edges, src_type, true); } else // "src_type:dst_type" _allowed_edges[src_type][dst_type] = true; g_strfreev(types); } g_strfreev(filters); } if (nodes_filter != NULL) { fill_vector(_allowed_nodes, false); // nothing allowed by default filters = g_strsplit(nodes_filter, ELT_SEP, -1); // "typ" fragments for (ptr = filters; *ptr; ptr++) { src_type = parse_otype(*ptr); if (src_type == GIT_OBJ_ANY) { // "*" wildcard fill_vector(_allowed_nodes, true); break; // all nodes allowed already } else _allowed_nodes[src_type] = true; } g_strfreev(filters); } } int main(int argc, char **argv) { git_repository *repo; git_odb *odb; int rc; cli_args_t *args; config_t *conf; FILE *nodes_out, *edges_out; char nodes_buf[EDGES_OUTSZ], edges_buf[EDGES_OUTSZ]; char *snapshot_pid; - + args = parse_cli(argc, argv); init_graph_filters(args->nodes_filter, args->edges_filter); // _dump_filters(stdout, _allowed_edges, _allowed_nodes); git_libgit2_init(); check_lg2(git_repository_open(&repo, args->repo_dir), "cannot open repository", NULL); check_lg2(git_repository_odb(&odb, repo), "cannot get object DB", NULL); nodes_out = open_out_stream(args->nodes_out, nodes_buf, NODES_OUTSZ); edges_out = open_out_stream(args->edges_out, edges_buf, EDGES_OUTSZ); assert(NODES_OUTSZ <= PIPE_BUF && (NODES_OUTSZ % NODES_LINELEN == 0)); assert(EDGES_OUTSZ <= PIPE_BUF && (EDGES_OUTSZ % EDGES_LINELEN == 0)); conf = malloc(sizeof(config_t)); conf->odb = odb; conf->repo = repo; conf->nodes_out = nodes_out; conf->edges_out = edges_out; snapshot_pid = emit_snapshot(conf); if (args->origin_url != NULL) emit_origin(args->origin_url, conf, snapshot_pid); rc = git_odb_foreach(odb, (git_odb_foreach_cb) emit_obj, (void *) conf); check_lg2(rc, "failure during object iteration", NULL); git_odb_free(odb); git_repository_free(repo); free(conf); exit(rc); } diff --git a/tox.ini b/tox.ini index 76b70ee..a35eedd 100644 --- a/tox.ini +++ b/tox.ini @@ -1,31 +1,38 @@ [tox] -envlist=flake8,mypy,py3 +envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov whitelist_externals = mvn sh commands = sh -c 'if ! [ -d {envdir}/share/swh-graph ]; then mvn -f java/pom.xml compile assembly:single; mkdir {envdir}/share/swh-graph; cp java/target/*.jar {envdir}/share/swh-graph; fi' pytest --cov={envsitepackagesdir}/swh/graph \ {envsitepackagesdir}/swh/graph \ --cov-branch {posargs} +[testenv:black] +skip_install = true +deps = + black +commands = + {envpython} -m black --check swh + [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh