diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java index 35ca81f..4d47a12 100644 --- a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java +++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java @@ -1,124 +1,124 @@ package org.softwareheritage.graph.backend; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.zip.GZIPInputStream; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.fastutil.objects.ObjectBigArrays; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SwhPID; import org.softwareheritage.graph.backend.NodeTypesMap; /** * Pre-processing steps (such as dumping mapping files on disk) before running the graph service. * * @author The Software Heritage developers */ public class Setup { /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME"); System.exit(1); } String nodesPath = args[0]; String graphPath = args[1]; System.out.println("Pre-computing node id maps..."); long startTime = System.nanoTime(); precomputeNodeIdMap(nodesPath, graphPath); long endTime = System.nanoTime(); double duration = (endTime - startTime) / 1_000_000_000; System.out.println("Done in: " + duration + " seconds"); } /** * Computes and dumps on disk mapping files. * * @param nodesPath path of the compressed csv nodes file * @param graphPath path of the compressed graph */ // Suppress warning for Object2LongFunction cast @SuppressWarnings("unchecked") static void precomputeNodeIdMap(String nodesPath, String graphPath) throws IOException { // First internal mapping: SWH PID (string) -> WebGraph MPH (long) Object2LongFunction mphMap = null; try { mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("The .mph file contains unknown class object: " + e); } long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); // Second internal mapping: WebGraph MPH (long) -> BFS ordering (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); if (loaded != nbIds) { throw new IllegalArgumentException("Graph contains " + nbIds + " nodes, but read " + loaded); } // Dump complete mapping for all nodes: SWH PID (string) <=> WebGraph node id (long) InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath)); FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, "UTF-8")); LineIterator swhPIDIterator = new LineIterator(buffer); - // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap - // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap + // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToNodeMap + // for the binary format of nodeToPidMap, see Python module swh.graph.pid:NodeToPidMap try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE))); BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) { // nodeToPidMap needs to write SWH PID in order of node id, so use a temporary array Object[][] nodeToSwhPID = ObjectBigArrays.newBigArray(nbIds); // To effectively run edge restriction during graph traversals, we store node id (long) -> SWH // type map. This is represented as a bitmap using minimum number of bits per Node.Type. final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); final int nbBitsPerNodeType = log2NbTypes; LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) { String strSwhPID = swhPIDIterator.next().toString(); SwhPID swhPID = new SwhPID(strSwhPID); byte[] swhPIDBin = swhPID.toBytes(); long mphId = mphMap.getLong(strSwhPID); long nodeId = LongBigArrays.get(bfsMap, mphId); pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length); pidToNodeMap.writeLong(nodeId); ObjectBigArrays.set(nodeToSwhPID, nodeId, swhPIDBin); nodeTypesMap.set(nodeId, swhPID.getType().ordinal()); } BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); for (long iNode = 0; iNode < nbIds; iNode++) { nodeToPidMap.write((byte[]) ObjectBigArrays.get(nodeToSwhPID, iNode)); } } } } diff --git a/swh/graph/backend.py b/swh/graph/backend.py index ca2b59d..9dffc5c 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,205 +1,205 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os import pathlib import struct import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway -from swh.graph.pid import IntToPidMap, PidToIntMap +from swh.graph.pid import NodeToPidMap, PidToNodeMap from swh.model.identifiers import PID_TYPES BUF_SIZE = 64*1024 BIN_FMT = '>q' # 64 bit integer, big endian PATH_SEPARATOR_ID = -1 NODE2PID_EXT = 'node2pid.bin' PID2NODE_EXT = 'pid2node.bin' def find_graph_jar(): """find swh-graph.jar, containing the Java part of swh-graph look both in development directories and installed data (for in-production deployments who fecthed the JAR from pypi) """ swh_graph_root = pathlib.Path(__file__).parents[2] try_paths = [ swh_graph_root / 'java/server/target/', pathlib.Path(sys.prefix) / 'share/swh-graph/', ] for path in try_paths: glob = list(path.glob('swh-graph-*.jar')) if glob: return str(glob[0]) raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?") def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path): self.gateway = None self.entry = None self.graph_path = graph_path def __enter__(self): # TODO: make all of that configurable with sane defaults java_opts = [ '-Xmx200G', '-server', '-XX:PretenureSizeThreshold=512M', '-XX:MaxNewSize=4G', '-XX:+UseLargePages', '-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB', '-XX:+ResizeTLAB', ] self.gateway = JavaGateway.launch_gateway( java_path=None, javaopts=java_opts, classpath=find_graph_jar(), die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) - self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT) - self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT) + self.node2pid = NodeToPidMap(self.graph_path + '.' + NODE2PID_EXT) + self.pid2node = PidToNodeMap(self.graph_path + '.' + PID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) return self def __exit__(self, exc_type, exc_value, tb): self.gateway.shutdown() def stats(self): return self.entry.stats() def count(self, ttype, direction, edges_fmt, src): method = getattr(self.entry, 'count_' + ttype) return method(direction, edges_fmt, src) async def simple_traversal(self, ttype, direction, edges_fmt, src): assert ttype in ('leaves', 'neighbors', 'visit_nodes') method = getattr(self.stream_proxy, ttype) async for node_id in method(direction, edges_fmt, src): yield node_id async def walk(self, direction, edges_fmt, algo, src, dst): if dst in PID_TYPES: it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) else: it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) async for node_id in it: yield node_id async def visit_paths(self, direction, edges_fmt, src): path = [] async for node in self.stream_proxy.visit_paths( direction, edges_fmt, src): if node == PATH_SEPARATOR_ID: yield path path = [] else: path.append(node) class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() open_thread = loop.run_in_executor(None, open, fname, 'rb') # Since the open() call on the FIFO is blocking until it is also opened # on the Java side, we await it with a timeout in case there is an # exception that prevents the write-side open(). with (await asyncio.wait_for(open_thread, timeout=2)) as f: while True: data = await loop.run_in_executor(None, f.read, BUF_SIZE) if not data: break for data in struct.iter_unpack(BIN_FMT, data): yield data[0] class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname: cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo') os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: # If the read-side open() timeouts, an exception on the # Java side probably happened that prevented the # write-side open(). We propagate this exception here if # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py index db35a6d..ad99aae 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,230 +1,230 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import aiohttp import click import sys from pathlib import Path from typing import Any, Dict, Tuple from swh.core import config from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.graph import client, webgraph -from swh.graph.pid import PidToIntMap, IntToPidMap +from swh.graph.pid import PidToNodeMap, NodeToPidMap from swh.graph.server.app import make_app from swh.graph.backend import Backend class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string""" def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx)) DEFAULT_CONFIG = { 'graph': ('dict', {}) } # type: Dict[str, Tuple[str, Any]] @click.group(name='graph', context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help='YAML configuration file') @click.pass_context def cli(ctx, config_file): """Software Heritage graph tools.""" ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) if 'graph' not in conf: raise ValueError('no "graph" stanza found in configuration file %s' % config_file) ctx.obj['config'] = conf @cli.command('api-client') @click.option('--host', default='localhost', help='Graph server host') @click.option('--port', default='5009', help='Graph server port') @click.pass_context def api_client(ctx, host, port): """client for the graph REST service""" url = 'http://{}:{}'.format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) @cli.group('map') @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass -def dump_pid2int(filename): - for (pid, int) in PidToIntMap(filename): +def dump_pid2node(filename): + for (pid, int) in PidToNodeMap(filename): print('{}\t{}'.format(pid, int)) -def dump_int2pid(filename): - for (int, pid) in IntToPidMap(filename): +def dump_node2pid(filename): + for (int, pid) in NodeToPidMap(filename): print('{}\t{}'.format(int, pid)) -def restore_pid2int(filename): +def restore_pid2node(filename): """read a textual PID->int map from stdin and write its binary version to filename """ with open(filename, 'wb') as dst: for line in sys.stdin: (str_pid, str_int) = line.split() - PidToIntMap.write_record(dst, str_pid, int(str_int)) + PidToNodeMap.write_record(dst, str_pid, int(str_int)) -def restore_int2pid(filename, length): +def restore_node2pid(filename, length): """read a textual int->PID map from stdin and write its binary version to filename """ - int2pid = IntToPidMap(filename, mode='wb', length=length) + node2pid = NodeToPidMap(filename, mode='wb', length=length) for line in sys.stdin: (str_int, str_pid) = line.split() - int2pid[int(str_int)] = str_pid - int2pid.close() + node2pid[int(str_int)] = str_pid + node2pid.close() @map.command('dump') @click.option('--type', '-t', 'map_type', required=True, - type=click.Choice(['pid2int', 'int2pid']), + type=click.Choice(['pid2node', 'node2pid']), help='type of map to dump') @click.argument('filename', required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): - """dump a binary PID<->int map to textual format""" - if map_type == 'pid2int': - dump_pid2int(filename) - elif map_type == 'int2pid': - dump_int2pid(filename) + """dump a binary PID<->node map to textual format""" + if map_type == 'pid2node': + dump_pid2node(filename) + elif map_type == 'node2pid': + dump_node2pid(filename) else: raise ValueError('invalid map type: ' + map_type) pass @map.command('restore') @click.option('--type', '-t', 'map_type', required=True, - type=click.Choice(['pid2int', 'int2pid']), + type=click.Choice(['pid2node', 'node2pid']), help='type of map to dump') @click.option('--length', '-l', type=int, help='''map size in number of logical records - (required for int2pid maps)''') + (required for node2pid maps)''') @click.argument('filename', required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): - """restore a binary PID<->int map from textual format""" - if map_type == 'pid2int': - restore_pid2int(filename) - elif map_type == 'int2pid': + """restore a binary PID<->node map from textual format""" + if map_type == 'pid2node': + restore_pid2node(filename) + elif map_type == 'node2pid': if length is None: raise click.UsageError( 'map length is required when restoring {} maps'.format( map_type), ctx) - restore_int2pid(filename, length) + restore_node2pid(filename, length) else: raise ValueError('invalid map type: ' + map_type) @map.command('write') @click.option('--type', '-t', 'map_type', required=True, - type=click.Choice(['pid2int', 'int2pid']), + type=click.Choice(['pid2node', 'node2pid']), help='type of map to write') @click.argument('filename', required=True, type=click.Path()) @click.pass_context def write(ctx, map_type, filename): """write a map to disk sequentially - read from stdin a textual PID->int mapping (for pid2int, or a simple - sequence of PIDs for int2pid) and write it to disk in the requested binary + read from stdin a textual PID->node mapping (for pid2node, or a simple + sequence of PIDs for node2pid) and write it to disk in the requested binary map format note that no sorting is applied, so the input should already be sorted as - required by the chosen map type (by PID for pid2int, by int for int2pid) + required by the chosen map type (by PID for pid2node, by int for node2pid) """ with open(filename, 'wb') as f: - if map_type == 'pid2int': + if map_type == 'pid2node': for line in sys.stdin: (pid, int_str) = line.rstrip().split(maxsplit=1) - PidToIntMap.write_record(f, pid, int(int_str)) - elif map_type == 'int2pid': + PidToNodeMap.write_record(f, pid, int(int_str)) + elif map_type == 'node2pid': for line in sys.stdin: pid = line.rstrip() - IntToPidMap.write_record(f, pid) + NodeToPidMap.write_record(f, pid) else: raise ValueError('invalid map type: ' + map_type) @cli.command(name='rpc-serve') @click.option('--host', '-h', default='0.0.0.0', metavar='IP', show_default=True, help='host IP address to bind the server on') @click.option('--port', '-p', default=5009, type=click.INT, metavar='PORT', show_default=True, help='port to bind the server on') @click.option('--graph', '-g', required=True, metavar='GRAPH', help='compressed graph basename') @click.pass_context def serve(ctx, host, port, graph): """run the graph REST service""" backend = Backend(graph_path=graph) app = make_app(backend=backend) with backend: aiohttp.web.run_app(app, host=host, port=port) @cli.command() @click.option('--graph', '-g', required=True, metavar='GRAPH', type=PathlibPath(), help='input graph basename') @click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR', type=PathlibPath(), help='directory where to store compressed graph') @click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(), help='run only these compression steps (default: all steps)') @click.pass_context def compress(ctx, graph, out_dir, steps): """Compress a graph using WebGraph Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz Output: a directory containing a WebGraph compressed graph Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, (11) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ graph_name = graph.name in_dir = graph.parent try: conf = ctx.obj['config']['graph']['compress'] except KeyError: conf = {} # use defaults webgraph.compress(graph_name, in_dir, out_dir, steps, conf) def main(): return cli(auto_envvar_prefix='SWH_GRAPH') if __name__ == '__main__': main() diff --git a/swh/graph/pid.py b/swh/graph/pid.py index 280fba3..8a780fa 100644 --- a/swh/graph/pid.py +++ b/swh/graph/pid.py @@ -1,402 +1,402 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import mmap import os import struct from collections.abc import MutableMapping from enum import Enum from mmap import MAP_SHARED, MAP_PRIVATE from typing import BinaryIO, Iterator, Tuple from swh.model.identifiers import PersistentId, parse_persistent_identifier PID_BIN_FMT = 'BB20s' # 2 unsigned chars + 20 bytes INT_BIN_FMT = '>q' # big endian, 8-byte integer PID_BIN_SIZE = 22 # in bytes INT_BIN_SIZE = 8 # in bytes class PidType(Enum): """types of existing PIDs, used to serialize PID type as a (char) integer Note that the order does matter also for driving the binary search in PID-indexed maps. Integer values also matter, for compatibility with the Java layer. """ content = 0 directory = 1 origin = 2 release = 3 revision = 4 snapshot = 5 def str_to_bytes(pid_str: str) -> bytes: """Convert a PID to a byte sequence The binary format used to represent PIDs as 22-byte long byte sequences as follows: - 1 byte for the namespace version represented as a C `unsigned char` - 1 byte for the object type, as the int value of :class:`PidType` enums, represented as a C `unsigned char` - 20 bytes for the SHA1 digest as a byte sequence Args: pid: persistent identifier Returns: bytes: byte sequence representation of pid """ pid = parse_persistent_identifier(pid_str) return struct.pack(PID_BIN_FMT, pid.scheme_version, PidType[pid.object_type].value, bytes.fromhex(pid.object_id)) def bytes_to_str(bytes: bytes) -> str: """Inverse function of :func:`str_to_bytes` See :func:`str_to_bytes` for a description of the binary PID format. Args: bytes: byte sequence representation of pid Returns: pid: persistent identifier """ (version, type, bin_digest) = struct.unpack(PID_BIN_FMT, bytes) pid = PersistentId(object_type=PidType(type).name, object_id=bin_digest) return str(pid) class _OnDiskMap(): """mmap-ed on-disk sequence of fixed size records """ def __init__(self, record_size: int, fname: str, mode: str = 'rb', length: int = None): """open an existing on-disk map Args: record_size: size of each record in bytes fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize writable maps at creation time. Must be given when mode is 'wb' and the map doesn't exist on disk; ignored otherwise """ os_modes = { 'rb': os.O_RDONLY, 'wb': os.O_RDWR | os.O_CREAT, 'rb+': os.O_RDWR } if mode not in os_modes: raise ValueError('invalid file open mode: ' + mode) new_map = (mode == 'wb') writable_map = mode in ['wb', 'rb+'] self.record_size = record_size self.fd = os.open(fname, os_modes[mode]) if new_map: if length is None: raise ValueError('missing length when creating new map') os.truncate(self.fd, length * self.record_size) self.size = os.path.getsize(fname) (self.length, remainder) = divmod(self.size, record_size) if remainder: raise ValueError( 'map size {} is not a multiple of the record size {}'.format( self.size, record_size)) self.mm = mmap.mmap( self.fd, self.size, flags=MAP_SHARED if writable_map else MAP_PRIVATE) def close(self) -> None: """close the map shuts down both the mmap and the underlying file descriptor """ if not self.mm.closed: self.mm.close() os.close(self.fd) def __len__(self) -> int: return self.length def __delitem__(self, pos: int) -> None: raise NotImplementedError('cannot delete records from fixed-size map') -class PidToIntMap(_OnDiskMap, MutableMapping): +class PidToNodeMap(_OnDiskMap, MutableMapping): """memory mapped map from PID (:ref:`persistent-identifiers`) to a continuous range 0..N of (8-byte long) integers - This is the converse mapping of :class:`IntToPidMap`. + This is the converse mapping of :class:`NodeToPidMap`. The on-disk serialization format is a sequence of fixed length (30 bytes) records with the following fields: - PID (22 bytes): binary PID representation as per :func:`str_to_bytes` - long (8 bytes): big endian long integer The records are sorted lexicographically by PID type and checksum, where type is the integer value of :class:`PidType`. PID lookup in the map is performed via binary search. Hence a huge map with, say, 11 B entries, will require ~30 disk seeks. Note that, due to fixed size + ordering, it is not possible to create these maps by random writing. Hence, __setitem__ can be used only to *update* the value associated to an existing key, rather than to add a missing item. To create an entire map from scratch, you should do so *sequentially*, using static method :meth:`write_record` (or, at your own risk, by hand via the mmap :attr:`mm`). """ # record binary format: PID + a big endian 8-byte big endian integer RECORD_BIN_FMT = '>' + PID_BIN_FMT + 'q' RECORD_SIZE = PID_BIN_SIZE + INT_BIN_SIZE def __init__(self, fname: str, mode: str = 'rb', length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]: """seek and return the (binary) record at a given (logical) position see :func:`_get_record` for an equivalent function with additional deserialization Args: pos: 0-based record number Returns: a pair `(pid, int)`, where pid and int are bytes """ rec_pos = pos * self.RECORD_SIZE int_pos = rec_pos + PID_BIN_SIZE return (self.mm[rec_pos:int_pos], self.mm[int_pos:int_pos+INT_BIN_SIZE]) def _get_record(self, pos: int) -> Tuple[str, int]: """seek and return the record at a given (logical) position moral equivalent of :func:`_get_bin_record`, with additional deserialization to non-bytes types Args: pos: 0-based record number Returns: a pair `(pid, int)`, where pid is a string-based PID and int the corresponding integer identifier """ (pid_bytes, int_bytes) = self._get_bin_record(pos) return (bytes_to_str(pid_bytes), struct.unpack(INT_BIN_FMT, int_bytes)[0]) @classmethod def write_record(cls, f: BinaryIO, pid: str, int: int) -> None: """write a logical record to a file-like object Args: f: file-like object to write the record to pid: textual PID int: PID integer identifier """ f.write(str_to_bytes(pid)) f.write(struct.pack(INT_BIN_FMT, int)) def _bisect_pos(self, pid_str: str) -> int: """bisect the position of the given identifier. If the identifier is not found, the position of the pid immediately after is returned. Args: pid_str: the pid as a string Returns: the logical record of the bisected position in the map """ if not isinstance(pid_str, str): raise TypeError('PID must be a str, not {}'.format(type(pid_str))) try: target = str_to_bytes(pid_str) # desired PID as bytes except ValueError: raise ValueError('invalid PID: "{}"'.format(pid_str)) lo = 0 hi = self.length - 1 while lo < hi: mid = (lo + hi) // 2 (pid, _value) = self._get_bin_record(mid) if pid < target: lo = mid + 1 else: hi = mid return lo def _find(self, pid_str: str) -> Tuple[int, int]: """lookup the integer identifier of a pid and its position Args: pid_str: the pid as a string Returns: a pair `(pid, pos)` with pid integer identifier and its logical record position in the map """ pos = self._bisect_pos(pid_str) pid_found, value = self._get_record(pos) if pid_found == pid_str: return (value, pos) raise KeyError(pid_str) def __getitem__(self, pid_str: str) -> int: """lookup the integer identifier of a PID Args: pid: the PID as a string Returns: the integer identifier of pid """ return self._find(pid_str)[0] # return element, ignore position def __setitem__(self, pid_str: str, int: str) -> None: (_pid, pos) = self._find(pid_str) # might raise KeyError and that's OK rec_pos = pos * self.RECORD_SIZE int_pos = rec_pos + PID_BIN_SIZE self.mm[rec_pos:int_pos] = str_to_bytes(pid_str) self.mm[int_pos:int_pos+INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int) def __iter__(self) -> Iterator[Tuple[str, int]]: for pos in range(self.length): yield self._get_record(pos) def iter_prefix(self, prefix: str): swh, n, t, sha = prefix.split(':') sha = sha.ljust(40, '0') start_pid = ':'.join([swh, n, t, sha]) start = self._bisect_pos(start_pid) for pos in range(start, self.length): pid, value = self._get_record(pos) if not pid.startswith(prefix): break yield pid, value def iter_type(self, pid_type: str) -> Iterator[Tuple[str, int]]: prefix = 'swh:1:{}:'.format(pid_type) yield from self.iter_prefix(prefix) -class IntToPidMap(_OnDiskMap, MutableMapping): +class NodeToPidMap(_OnDiskMap, MutableMapping): """memory mapped map from a continuous range of 0..N (8-byte long) integers to PIDs (:ref:`persistent-identifiers`) - This is the converse mapping of :class:`PidToIntMap`. + This is the converse mapping of :class:`PidToNodeMap`. The on-disk serialization format is a sequence of fixed length records (22 bytes), each being the binary representation of a PID as per :func:`str_to_bytes`. The records are sorted by long integer, so that integer lookup is possible via fixed-offset seek. """ RECORD_BIN_FMT = PID_BIN_FMT RECORD_SIZE = PID_BIN_SIZE def __init__(self, fname: str, mode: str = 'rb', length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') size: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise length: passed to :class:`_OnDiskMap` """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> bytes: """seek and return the (binary) PID at a given (logical) position Args: pos: 0-based record number Returns: PID as a byte sequence """ rec_pos = pos * self.RECORD_SIZE return self.mm[rec_pos:rec_pos+self.RECORD_SIZE] @classmethod def write_record(cls, f: BinaryIO, pid: str) -> None: """write a PID to a file-like object Args: f: file-like object to write the record to pid: textual PID """ f.write(str_to_bytes(pid)) def __getitem__(self, pos: int) -> str: orig_pos = pos if pos < 0: pos = len(self) + pos if not (0 <= pos < len(self)): raise IndexError(orig_pos) return bytes_to_str(self._get_bin_record(pos)) def __setitem__(self, pos: int, pid: str) -> None: rec_pos = pos * self.RECORD_SIZE self.mm[rec_pos:rec_pos+self.RECORD_SIZE] = str_to_bytes(pid) def __iter__(self) -> Iterator[Tuple[int, str]]: for pos in range(self.length): yield (pos, self[pos]) diff --git a/swh/graph/tests/test_pid.py b/swh/graph/tests/test_pid.py index 1cd1afd..69cb927 100644 --- a/swh/graph/tests/test_pid.py +++ b/swh/graph/tests/test_pid.py @@ -1,201 +1,201 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest from itertools import islice from swh.graph.pid import str_to_bytes, bytes_to_str -from swh.graph.pid import PidToIntMap, IntToPidMap +from swh.graph.pid import PidToNodeMap, NodeToPidMap from swh.model.identifiers import PID_TYPES class TestPidSerialization(unittest.TestCase): pairs = [ ('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', bytes.fromhex('01' + '00' + '94a9ed024d3859793618152ea559a168bbcbb5e2')), ('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', bytes.fromhex('01' + '01' + 'd198bc9d7a6bcf6db04f476d29314f157507d505')), ('swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f', bytes.fromhex('01' + '02' + 'b63a575fe3faab7692c9f38fb09d4bb45651bb0f')), ('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', bytes.fromhex('01' + '03' + '22ece559cc7cc2364edc5e5593d63ae8bd229f9f')), ('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', bytes.fromhex('01' + '04' + '309cf2674ee7a0749978cf8265ab91a60aea0f7d')), ('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', bytes.fromhex('01' + '05' + 'c7c108084bc0bf3d81436bf980b46e98bd338453')), ] def test_str_to_bytes(self): for (pid_str, pid_bytes) in self.pairs: self.assertEqual(str_to_bytes(pid_str), pid_bytes) def test_bytes_to_str(self): for (pid_str, pid_bytes) in self.pairs: self.assertEqual(bytes_to_str(pid_bytes), pid_str) def test_round_trip(self): for (pid_str, pid_bytes) in self.pairs: self.assertEqual(pid_str, bytes_to_str(str_to_bytes(pid_str))) self.assertEqual(pid_bytes, str_to_bytes(bytes_to_str(pid_bytes))) def gen_records(types=['cnt', 'dir', 'ori', 'rel', 'rev', 'snp'], length=10000): """generate sequential PID/int records, suitable for filling int<->pid maps for testing swh-graph on-disk binary databases Args: types (list): list of PID types to be generated, specified as the corresponding 3-letter component in PIDs length (int): number of PIDs to generate *per type* Yields: pairs (pid, int) where pid is a textual PID and int its sequential integer identifier """ pos = 0 for t in sorted(types): for i in range(0, length): seq = format(pos, 'x') # current position as hex string pid = 'swh:1:{}:{}{}'.format(t, '0' * (40 - len(seq)), seq) yield (pid, pos) pos += 1 # pairs PID/position in the sequence generated by :func:`gen_records` above MAP_PAIRS = [ ('swh:1:cnt:0000000000000000000000000000000000000000', 0), ('swh:1:cnt:000000000000000000000000000000000000002a', 42), ('swh:1:dir:0000000000000000000000000000000000002afc', 11004), ('swh:1:ori:00000000000000000000000000000000000056ce', 22222), ('swh:1:rel:0000000000000000000000000000000000008235', 33333), ('swh:1:rev:000000000000000000000000000000000000ad9c', 44444), ('swh:1:snp:000000000000000000000000000000000000ea5f', 59999), ] -class TestPidToIntMap(unittest.TestCase): +class TestPidToNodeMap(unittest.TestCase): @classmethod def setUpClass(cls): """create reasonably sized (~2 MB) PID->int map to test on-disk DB """ cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.') cls.fname = os.path.join(cls.tmpdir, 'pid2int.bin') with open(cls.fname, 'wb') as f: for (pid, i) in gen_records(length=10000): - PidToIntMap.write_record(f, pid, i) + PidToNodeMap.write_record(f, pid, i) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): - self.map = PidToIntMap(self.fname) + self.map = PidToNodeMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): for (pid, pos) in MAP_PAIRS: self.assertEqual(self.map[pid], pos) def test_missing(self): with self.assertRaises(KeyError): self.map['swh:1:ori:0101010100000000000000000000000000000000'], with self.assertRaises(KeyError): self.map['swh:1:cnt:0101010100000000000000000000000000000000'], def test_type_error(self): with self.assertRaises(TypeError): self.map[42] with self.assertRaises(TypeError): self.map[1.2] def test_update(self): fname2 = self.fname + '.update' shutil.copy(self.fname, fname2) # fresh map copy - map2 = PidToIntMap(fname2, mode='rb+') + map2 = PidToNodeMap(fname2, mode='rb+') for (pid, int) in islice(map2, 11): # update the first N items new_int = int + 42 map2[pid] = new_int self.assertEqual(map2[pid], new_int) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this def test_iter_type(self): for t in PID_TYPES: first_20 = list(islice(self.map.iter_type(t), 20)) k = first_20[0][1] expected = [('swh:1:{}:{:040x}'.format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected def test_iter_prefix(self): for t in PID_TYPES: prefix = self.map.iter_prefix('swh:1:{}:00'.format(t)) first_20 = list(islice(prefix, 20)) k = first_20[0][1] expected = [('swh:1:{}:{:040x}'.format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected -class TestIntToPidMap(unittest.TestCase): +class TestNodeToPidMap(unittest.TestCase): @classmethod def setUpClass(cls): """create reasonably sized (~1 MB) int->PID map to test on-disk DB """ cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.') cls.fname = os.path.join(cls.tmpdir, 'int2pid.bin') with open(cls.fname, 'wb') as f: for (pid, _i) in gen_records(length=10000): - IntToPidMap.write_record(f, pid) + NodeToPidMap.write_record(f, pid) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): - self.map = IntToPidMap(self.fname) + self.map = NodeToPidMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): for (pid, pos) in MAP_PAIRS: self.assertEqual(self.map[pos], pid) def test_out_of_bounds(self): with self.assertRaises(IndexError): self.map[1000000] with self.assertRaises(IndexError): self.map[-1000000] def test_update(self): fname2 = self.fname + '.update' shutil.copy(self.fname, fname2) # fresh map copy - map2 = IntToPidMap(fname2, mode='rb+') + map2 = NodeToPidMap(fname2, mode='rb+') for (int, pid) in islice(map2, 11): # update the first N items new_pid = pid.replace(':0', ':f') # mangle first hex digit map2[int] = new_pid self.assertEqual(map2[int], new_pid) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this