Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java
index 35ca81f..4d47a12 100644
--- a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java
+++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java
@@ -1,124 +1,124 @@
package org.softwareheritage.graph.backend;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import it.unimi.dsi.bits.LongArrayBitVector;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigArrays;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.fastutil.objects.ObjectBigArrays;
import it.unimi.dsi.io.FastBufferedReader;
import it.unimi.dsi.io.LineIterator;
import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.SwhPID;
import org.softwareheritage.graph.backend.NodeTypesMap;
/**
* Pre-processing steps (such as dumping mapping files on disk) before running the graph service.
*
* @author The Software Heritage developers
*/
public class Setup {
/**
* Main entrypoint.
*
* @param args command line arguments
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME");
System.exit(1);
}
String nodesPath = args[0];
String graphPath = args[1];
System.out.println("Pre-computing node id maps...");
long startTime = System.nanoTime();
precomputeNodeIdMap(nodesPath, graphPath);
long endTime = System.nanoTime();
double duration = (endTime - startTime) / 1_000_000_000;
System.out.println("Done in: " + duration + " seconds");
}
/**
* Computes and dumps on disk mapping files.
*
* @param nodesPath path of the compressed csv nodes file
* @param graphPath path of the compressed graph
*/
// Suppress warning for Object2LongFunction cast
@SuppressWarnings("unchecked")
static void precomputeNodeIdMap(String nodesPath, String graphPath) throws IOException {
// First internal mapping: SWH PID (string) -> WebGraph MPH (long)
Object2LongFunction<String> mphMap = null;
try {
mphMap = (Object2LongFunction<String>) BinIO.loadObject(graphPath + ".mph");
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("The .mph file contains unknown class object: " + e);
}
long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size();
// Second internal mapping: WebGraph MPH (long) -> BFS ordering (long)
long[][] bfsMap = LongBigArrays.newBigArray(nbIds);
long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap);
if (loaded != nbIds) {
throw new IllegalArgumentException("Graph contains " + nbIds + " nodes, but read " + loaded);
}
// Dump complete mapping for all nodes: SWH PID (string) <=> WebGraph node id (long)
InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath));
FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, "UTF-8"));
LineIterator swhPIDIterator = new LineIterator(buffer);
- // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap
- // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap
+ // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToNodeMap
+ // for the binary format of nodeToPidMap, see Python module swh.graph.pid:NodeToPidMap
try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE)));
BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) {
// nodeToPidMap needs to write SWH PID in order of node id, so use a temporary array
Object[][] nodeToSwhPID = ObjectBigArrays.newBigArray(nbIds);
// To effectively run edge restriction during graph traversals, we store node id (long) -> SWH
// type map. This is represented as a bitmap using minimum number of bits per Node.Type.
final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2));
final int nbBitsPerNodeType = log2NbTypes;
LongArrayBitVector nodeTypesBitVector =
LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds);
LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType);
for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) {
String strSwhPID = swhPIDIterator.next().toString();
SwhPID swhPID = new SwhPID(strSwhPID);
byte[] swhPIDBin = swhPID.toBytes();
long mphId = mphMap.getLong(strSwhPID);
long nodeId = LongBigArrays.get(bfsMap, mphId);
pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length);
pidToNodeMap.writeLong(nodeId);
ObjectBigArrays.set(nodeToSwhPID, nodeId, swhPIDBin);
nodeTypesMap.set(nodeId, swhPID.getType().ordinal());
}
BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE);
for (long iNode = 0; iNode < nbIds; iNode++) {
nodeToPidMap.write((byte[]) ObjectBigArrays.get(nodeToSwhPID, iNode));
}
}
}
}
diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index ca2b59d..9dffc5c 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,205 +1,205 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
import os
import pathlib
import struct
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
-from swh.graph.pid import IntToPidMap, PidToIntMap
+from swh.graph.pid import NodeToPidMap, PidToNodeMap
from swh.model.identifiers import PID_TYPES
BUF_SIZE = 64*1024
BIN_FMT = '>q' # 64 bit integer, big endian
PATH_SEPARATOR_ID = -1
NODE2PID_EXT = 'node2pid.bin'
PID2NODE_EXT = 'pid2node.bin'
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = pathlib.Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/server/target/',
pathlib.Path(sys.prefix) / 'share/swh-graph/',
]
for path in try_paths:
glob = list(path.glob('swh-graph-*.jar'))
if glob:
return str(glob[0])
raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?")
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path):
self.gateway = None
self.entry = None
self.graph_path = graph_path
def __enter__(self):
# TODO: make all of that configurable with sane defaults
java_opts = [
'-Xmx200G',
'-server',
'-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G',
'-XX:+UseLargePages',
'-XX:+UseTransparentHugePages',
'-XX:+UseNUMA',
'-XX:+UseTLAB',
'-XX:+ResizeTLAB',
]
self.gateway = JavaGateway.launch_gateway(
java_path=None,
javaopts=java_opts,
classpath=find_graph_jar(),
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
- self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT)
- self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT)
+ self.node2pid = NodeToPidMap(self.graph_path + '.' + NODE2PID_EXT)
+ self.pid2node = PidToNodeMap(self.graph_path + '.' + PID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
def stats(self):
return self.entry.stats()
def count(self, ttype, direction, edges_fmt, src):
method = getattr(self.entry, 'count_' + ttype)
return method(direction, edges_fmt, src)
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ('leaves', 'neighbors', 'visit_nodes')
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
if dst in PID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
src, dst)
else:
it = self.stream_proxy.walk(direction, edges_fmt, algo,
src, dst)
async for node_id in it:
yield node_id
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(
direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
path.append(node)
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
open_thread = loop.run_in_executor(None, open, fname, 'rb')
# Since the open() call on the FIFO is blocking until it is also opened
# on the Java side, we await it with a timeout in case there is an
# exception that prevents the write-side open().
with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
break
for data in struct.iter_unpack(BIN_FMT, data):
yield data[0]
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname:
cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
# If the read-side open() timeouts, an exception on the
# Java side probably happened that prevented the
# write-side open(). We propagate this exception here if
# that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index db35a6d..ad99aae 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,230 +1,230 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import aiohttp
import click
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.graph import client, webgraph
-from swh.graph.pid import PidToIntMap, IntToPidMap
+from swh.graph.pid import PidToNodeMap, NodeToPidMap
from swh.graph.server.app import make_app
from swh.graph.backend import Backend
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG = {
'graph': ('dict', {})
} # type: Dict[str, Tuple[str, Any]]
@click.group(name='graph', context_settings=CONTEXT_SETTINGS,
cls=AliasedGroup)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help='YAML configuration file')
@click.pass_context
def cli(ctx, config_file):
"""Software Heritage graph tools."""
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if 'graph' not in conf:
raise ValueError('no "graph" stanza found in configuration file %s'
% config_file)
ctx.obj['config'] = conf
@cli.command('api-client')
@click.option('--host', default='localhost', help='Graph server host')
@click.option('--port', default='5009', help='Graph server port')
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph REST service"""
url = 'http://{}:{}'.format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@cli.group('map')
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
-def dump_pid2int(filename):
- for (pid, int) in PidToIntMap(filename):
+def dump_pid2node(filename):
+ for (pid, int) in PidToNodeMap(filename):
print('{}\t{}'.format(pid, int))
-def dump_int2pid(filename):
- for (int, pid) in IntToPidMap(filename):
+def dump_node2pid(filename):
+ for (int, pid) in NodeToPidMap(filename):
print('{}\t{}'.format(int, pid))
-def restore_pid2int(filename):
+def restore_pid2node(filename):
"""read a textual PID->int map from stdin and write its binary version to
filename
"""
with open(filename, 'wb') as dst:
for line in sys.stdin:
(str_pid, str_int) = line.split()
- PidToIntMap.write_record(dst, str_pid, int(str_int))
+ PidToNodeMap.write_record(dst, str_pid, int(str_int))
-def restore_int2pid(filename, length):
+def restore_node2pid(filename, length):
"""read a textual int->PID map from stdin and write its binary version to
filename
"""
- int2pid = IntToPidMap(filename, mode='wb', length=length)
+ node2pid = NodeToPidMap(filename, mode='wb', length=length)
for line in sys.stdin:
(str_int, str_pid) = line.split()
- int2pid[int(str_int)] = str_pid
- int2pid.close()
+ node2pid[int(str_int)] = str_pid
+ node2pid.close()
@map.command('dump')
@click.option('--type', '-t', 'map_type', required=True,
- type=click.Choice(['pid2int', 'int2pid']),
+ type=click.Choice(['pid2node', 'node2pid']),
help='type of map to dump')
@click.argument('filename', required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
- """dump a binary PID<->int map to textual format"""
- if map_type == 'pid2int':
- dump_pid2int(filename)
- elif map_type == 'int2pid':
- dump_int2pid(filename)
+ """dump a binary PID<->node map to textual format"""
+ if map_type == 'pid2node':
+ dump_pid2node(filename)
+ elif map_type == 'node2pid':
+ dump_node2pid(filename)
else:
raise ValueError('invalid map type: ' + map_type)
pass
@map.command('restore')
@click.option('--type', '-t', 'map_type', required=True,
- type=click.Choice(['pid2int', 'int2pid']),
+ type=click.Choice(['pid2node', 'node2pid']),
help='type of map to dump')
@click.option('--length', '-l', type=int,
help='''map size in number of logical records
- (required for int2pid maps)''')
+ (required for node2pid maps)''')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
- """restore a binary PID<->int map from textual format"""
- if map_type == 'pid2int':
- restore_pid2int(filename)
- elif map_type == 'int2pid':
+ """restore a binary PID<->node map from textual format"""
+ if map_type == 'pid2node':
+ restore_pid2node(filename)
+ elif map_type == 'node2pid':
if length is None:
raise click.UsageError(
'map length is required when restoring {} maps'.format(
map_type), ctx)
- restore_int2pid(filename, length)
+ restore_node2pid(filename, length)
else:
raise ValueError('invalid map type: ' + map_type)
@map.command('write')
@click.option('--type', '-t', 'map_type', required=True,
- type=click.Choice(['pid2int', 'int2pid']),
+ type=click.Choice(['pid2node', 'node2pid']),
help='type of map to write')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def write(ctx, map_type, filename):
"""write a map to disk sequentially
- read from stdin a textual PID->int mapping (for pid2int, or a simple
- sequence of PIDs for int2pid) and write it to disk in the requested binary
+ read from stdin a textual PID->node mapping (for pid2node, or a simple
+ sequence of PIDs for node2pid) and write it to disk in the requested binary
map format
note that no sorting is applied, so the input should already be sorted as
- required by the chosen map type (by PID for pid2int, by int for int2pid)
+ required by the chosen map type (by PID for pid2node, by int for node2pid)
"""
with open(filename, 'wb') as f:
- if map_type == 'pid2int':
+ if map_type == 'pid2node':
for line in sys.stdin:
(pid, int_str) = line.rstrip().split(maxsplit=1)
- PidToIntMap.write_record(f, pid, int(int_str))
- elif map_type == 'int2pid':
+ PidToNodeMap.write_record(f, pid, int(int_str))
+ elif map_type == 'node2pid':
for line in sys.stdin:
pid = line.rstrip()
- IntToPidMap.write_record(f, pid)
+ NodeToPidMap.write_record(f, pid)
else:
raise ValueError('invalid map type: ' + map_type)
@cli.command(name='rpc-serve')
@click.option('--host', '-h', default='0.0.0.0',
metavar='IP', show_default=True,
help='host IP address to bind the server on')
@click.option('--port', '-p', default=5009, type=click.INT,
metavar='PORT', show_default=True,
help='port to bind the server on')
@click.option('--graph', '-g', required=True, metavar='GRAPH',
help='compressed graph basename')
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph REST service"""
backend = Backend(graph_path=graph)
app = make_app(backend=backend)
with backend:
aiohttp.web.run_app(app, host=host, port=port)
@cli.command()
@click.option('--graph', '-g', required=True, metavar='GRAPH',
type=PathlibPath(),
help='input graph basename')
@click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR',
type=PathlibPath(),
help='directory where to store compressed graph')
@click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(),
help='run only these compression steps (default: all steps)')
@click.pass_context
def compress(ctx, graph, out_dir, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute,
(6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps,
(11) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
graph_name = graph.name
in_dir = graph.parent
try:
conf = ctx.obj['config']['graph']['compress']
except KeyError:
conf = {} # use defaults
webgraph.compress(graph_name, in_dir, out_dir, steps, conf)
def main():
return cli(auto_envvar_prefix='SWH_GRAPH')
if __name__ == '__main__':
main()
diff --git a/swh/graph/pid.py b/swh/graph/pid.py
index 280fba3..8a780fa 100644
--- a/swh/graph/pid.py
+++ b/swh/graph/pid.py
@@ -1,402 +1,402 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import mmap
import os
import struct
from collections.abc import MutableMapping
from enum import Enum
from mmap import MAP_SHARED, MAP_PRIVATE
from typing import BinaryIO, Iterator, Tuple
from swh.model.identifiers import PersistentId, parse_persistent_identifier
PID_BIN_FMT = 'BB20s' # 2 unsigned chars + 20 bytes
INT_BIN_FMT = '>q' # big endian, 8-byte integer
PID_BIN_SIZE = 22 # in bytes
INT_BIN_SIZE = 8 # in bytes
class PidType(Enum):
"""types of existing PIDs, used to serialize PID type as a (char) integer
Note that the order does matter also for driving the binary search in
PID-indexed maps. Integer values also matter, for compatibility with the
Java layer.
"""
content = 0
directory = 1
origin = 2
release = 3
revision = 4
snapshot = 5
def str_to_bytes(pid_str: str) -> bytes:
"""Convert a PID to a byte sequence
The binary format used to represent PIDs as 22-byte long byte sequences as
follows:
- 1 byte for the namespace version represented as a C `unsigned char`
- 1 byte for the object type, as the int value of :class:`PidType` enums,
represented as a C `unsigned char`
- 20 bytes for the SHA1 digest as a byte sequence
Args:
pid: persistent identifier
Returns:
bytes: byte sequence representation of pid
"""
pid = parse_persistent_identifier(pid_str)
return struct.pack(PID_BIN_FMT, pid.scheme_version,
PidType[pid.object_type].value,
bytes.fromhex(pid.object_id))
def bytes_to_str(bytes: bytes) -> str:
"""Inverse function of :func:`str_to_bytes`
See :func:`str_to_bytes` for a description of the binary PID format.
Args:
bytes: byte sequence representation of pid
Returns:
pid: persistent identifier
"""
(version, type, bin_digest) = struct.unpack(PID_BIN_FMT, bytes)
pid = PersistentId(object_type=PidType(type).name, object_id=bin_digest)
return str(pid)
class _OnDiskMap():
"""mmap-ed on-disk sequence of fixed size records
"""
def __init__(self, record_size: int, fname: str, mode: str = 'rb',
length: int = None):
"""open an existing on-disk map
Args:
record_size: size of each record in bytes
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
length: map size in number of logical records; used to initialize
writable maps at creation time. Must be given when mode is 'wb'
and the map doesn't exist on disk; ignored otherwise
"""
os_modes = {
'rb': os.O_RDONLY,
'wb': os.O_RDWR | os.O_CREAT,
'rb+': os.O_RDWR
}
if mode not in os_modes:
raise ValueError('invalid file open mode: ' + mode)
new_map = (mode == 'wb')
writable_map = mode in ['wb', 'rb+']
self.record_size = record_size
self.fd = os.open(fname, os_modes[mode])
if new_map:
if length is None:
raise ValueError('missing length when creating new map')
os.truncate(self.fd, length * self.record_size)
self.size = os.path.getsize(fname)
(self.length, remainder) = divmod(self.size, record_size)
if remainder:
raise ValueError(
'map size {} is not a multiple of the record size {}'.format(
self.size, record_size))
self.mm = mmap.mmap(
self.fd, self.size,
flags=MAP_SHARED if writable_map else MAP_PRIVATE)
def close(self) -> None:
"""close the map
shuts down both the mmap and the underlying file descriptor
"""
if not self.mm.closed:
self.mm.close()
os.close(self.fd)
def __len__(self) -> int:
return self.length
def __delitem__(self, pos: int) -> None:
raise NotImplementedError('cannot delete records from fixed-size map')
-class PidToIntMap(_OnDiskMap, MutableMapping):
+class PidToNodeMap(_OnDiskMap, MutableMapping):
"""memory mapped map from PID (:ref:`persistent-identifiers`) to a continuous
range 0..N of (8-byte long) integers
- This is the converse mapping of :class:`IntToPidMap`.
+ This is the converse mapping of :class:`NodeToPidMap`.
The on-disk serialization format is a sequence of fixed length (30 bytes)
records with the following fields:
- PID (22 bytes): binary PID representation as per :func:`str_to_bytes`
- long (8 bytes): big endian long integer
The records are sorted lexicographically by PID type and checksum, where
type is the integer value of :class:`PidType`. PID lookup in the map is
performed via binary search. Hence a huge map with, say, 11 B entries,
will require ~30 disk seeks.
Note that, due to fixed size + ordering, it is not possible to create these
maps by random writing. Hence, __setitem__ can be used only to *update* the
value associated to an existing key, rather than to add a missing item. To
create an entire map from scratch, you should do so *sequentially*, using
static method :meth:`write_record` (or, at your own risk, by hand via the
mmap :attr:`mm`).
"""
# record binary format: PID + a big endian 8-byte big endian integer
RECORD_BIN_FMT = '>' + PID_BIN_FMT + 'q'
RECORD_SIZE = PID_BIN_SIZE + INT_BIN_SIZE
def __init__(self, fname: str, mode: str = 'rb', length: int = None):
"""open an existing on-disk map
Args:
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
length: map size in number of logical records; used to initialize
read-write maps at creation time. Must be given when mode is
'wb'; ignored otherwise
"""
super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length)
def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]:
"""seek and return the (binary) record at a given (logical) position
see :func:`_get_record` for an equivalent function with additional
deserialization
Args:
pos: 0-based record number
Returns:
a pair `(pid, int)`, where pid and int are bytes
"""
rec_pos = pos * self.RECORD_SIZE
int_pos = rec_pos + PID_BIN_SIZE
return (self.mm[rec_pos:int_pos],
self.mm[int_pos:int_pos+INT_BIN_SIZE])
def _get_record(self, pos: int) -> Tuple[str, int]:
"""seek and return the record at a given (logical) position
moral equivalent of :func:`_get_bin_record`, with additional
deserialization to non-bytes types
Args:
pos: 0-based record number
Returns:
a pair `(pid, int)`, where pid is a string-based PID and int the
corresponding integer identifier
"""
(pid_bytes, int_bytes) = self._get_bin_record(pos)
return (bytes_to_str(pid_bytes),
struct.unpack(INT_BIN_FMT, int_bytes)[0])
@classmethod
def write_record(cls, f: BinaryIO, pid: str, int: int) -> None:
"""write a logical record to a file-like object
Args:
f: file-like object to write the record to
pid: textual PID
int: PID integer identifier
"""
f.write(str_to_bytes(pid))
f.write(struct.pack(INT_BIN_FMT, int))
def _bisect_pos(self, pid_str: str) -> int:
"""bisect the position of the given identifier. If the identifier is
not found, the position of the pid immediately after is returned.
Args:
pid_str: the pid as a string
Returns:
the logical record of the bisected position in the map
"""
if not isinstance(pid_str, str):
raise TypeError('PID must be a str, not {}'.format(type(pid_str)))
try:
target = str_to_bytes(pid_str) # desired PID as bytes
except ValueError:
raise ValueError('invalid PID: "{}"'.format(pid_str))
lo = 0
hi = self.length - 1
while lo < hi:
mid = (lo + hi) // 2
(pid, _value) = self._get_bin_record(mid)
if pid < target:
lo = mid + 1
else:
hi = mid
return lo
def _find(self, pid_str: str) -> Tuple[int, int]:
"""lookup the integer identifier of a pid and its position
Args:
pid_str: the pid as a string
Returns:
a pair `(pid, pos)` with pid integer identifier and its logical
record position in the map
"""
pos = self._bisect_pos(pid_str)
pid_found, value = self._get_record(pos)
if pid_found == pid_str:
return (value, pos)
raise KeyError(pid_str)
def __getitem__(self, pid_str: str) -> int:
"""lookup the integer identifier of a PID
Args:
pid: the PID as a string
Returns:
the integer identifier of pid
"""
return self._find(pid_str)[0] # return element, ignore position
def __setitem__(self, pid_str: str, int: str) -> None:
(_pid, pos) = self._find(pid_str) # might raise KeyError and that's OK
rec_pos = pos * self.RECORD_SIZE
int_pos = rec_pos + PID_BIN_SIZE
self.mm[rec_pos:int_pos] = str_to_bytes(pid_str)
self.mm[int_pos:int_pos+INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int)
def __iter__(self) -> Iterator[Tuple[str, int]]:
for pos in range(self.length):
yield self._get_record(pos)
def iter_prefix(self, prefix: str):
swh, n, t, sha = prefix.split(':')
sha = sha.ljust(40, '0')
start_pid = ':'.join([swh, n, t, sha])
start = self._bisect_pos(start_pid)
for pos in range(start, self.length):
pid, value = self._get_record(pos)
if not pid.startswith(prefix):
break
yield pid, value
def iter_type(self, pid_type: str) -> Iterator[Tuple[str, int]]:
prefix = 'swh:1:{}:'.format(pid_type)
yield from self.iter_prefix(prefix)
-class IntToPidMap(_OnDiskMap, MutableMapping):
+class NodeToPidMap(_OnDiskMap, MutableMapping):
"""memory mapped map from a continuous range of 0..N (8-byte long) integers to
PIDs (:ref:`persistent-identifiers`)
- This is the converse mapping of :class:`PidToIntMap`.
+ This is the converse mapping of :class:`PidToNodeMap`.
The on-disk serialization format is a sequence of fixed length records (22
bytes), each being the binary representation of a PID as per
:func:`str_to_bytes`.
The records are sorted by long integer, so that integer lookup is possible
via fixed-offset seek.
"""
RECORD_BIN_FMT = PID_BIN_FMT
RECORD_SIZE = PID_BIN_SIZE
def __init__(self, fname: str, mode: str = 'rb', length: int = None):
"""open an existing on-disk map
Args:
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
size: map size in number of logical records; used to initialize
read-write maps at creation time. Must be given when mode is
'wb'; ignored otherwise
length: passed to :class:`_OnDiskMap`
"""
super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length)
def _get_bin_record(self, pos: int) -> bytes:
"""seek and return the (binary) PID at a given (logical) position
Args:
pos: 0-based record number
Returns:
PID as a byte sequence
"""
rec_pos = pos * self.RECORD_SIZE
return self.mm[rec_pos:rec_pos+self.RECORD_SIZE]
@classmethod
def write_record(cls, f: BinaryIO, pid: str) -> None:
"""write a PID to a file-like object
Args:
f: file-like object to write the record to
pid: textual PID
"""
f.write(str_to_bytes(pid))
def __getitem__(self, pos: int) -> str:
orig_pos = pos
if pos < 0:
pos = len(self) + pos
if not (0 <= pos < len(self)):
raise IndexError(orig_pos)
return bytes_to_str(self._get_bin_record(pos))
def __setitem__(self, pos: int, pid: str) -> None:
rec_pos = pos * self.RECORD_SIZE
self.mm[rec_pos:rec_pos+self.RECORD_SIZE] = str_to_bytes(pid)
def __iter__(self) -> Iterator[Tuple[int, str]]:
for pos in range(self.length):
yield (pos, self[pos])
diff --git a/swh/graph/tests/test_pid.py b/swh/graph/tests/test_pid.py
index 1cd1afd..69cb927 100644
--- a/swh/graph/tests/test_pid.py
+++ b/swh/graph/tests/test_pid.py
@@ -1,201 +1,201 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import tempfile
import unittest
from itertools import islice
from swh.graph.pid import str_to_bytes, bytes_to_str
-from swh.graph.pid import PidToIntMap, IntToPidMap
+from swh.graph.pid import PidToNodeMap, NodeToPidMap
from swh.model.identifiers import PID_TYPES
class TestPidSerialization(unittest.TestCase):
pairs = [
('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2',
bytes.fromhex('01' + '00' +
'94a9ed024d3859793618152ea559a168bbcbb5e2')),
('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505',
bytes.fromhex('01' + '01' +
'd198bc9d7a6bcf6db04f476d29314f157507d505')),
('swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f',
bytes.fromhex('01' + '02' +
'b63a575fe3faab7692c9f38fb09d4bb45651bb0f')),
('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f',
bytes.fromhex('01' + '03' +
'22ece559cc7cc2364edc5e5593d63ae8bd229f9f')),
('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d',
bytes.fromhex('01' + '04' +
'309cf2674ee7a0749978cf8265ab91a60aea0f7d')),
('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453',
bytes.fromhex('01' + '05' +
'c7c108084bc0bf3d81436bf980b46e98bd338453')),
]
def test_str_to_bytes(self):
for (pid_str, pid_bytes) in self.pairs:
self.assertEqual(str_to_bytes(pid_str), pid_bytes)
def test_bytes_to_str(self):
for (pid_str, pid_bytes) in self.pairs:
self.assertEqual(bytes_to_str(pid_bytes), pid_str)
def test_round_trip(self):
for (pid_str, pid_bytes) in self.pairs:
self.assertEqual(pid_str, bytes_to_str(str_to_bytes(pid_str)))
self.assertEqual(pid_bytes, str_to_bytes(bytes_to_str(pid_bytes)))
def gen_records(types=['cnt', 'dir', 'ori', 'rel', 'rev', 'snp'],
length=10000):
"""generate sequential PID/int records, suitable for filling int<->pid maps for
testing swh-graph on-disk binary databases
Args:
types (list): list of PID types to be generated, specified as the
corresponding 3-letter component in PIDs
length (int): number of PIDs to generate *per type*
Yields:
pairs (pid, int) where pid is a textual PID and int its sequential
integer identifier
"""
pos = 0
for t in sorted(types):
for i in range(0, length):
seq = format(pos, 'x') # current position as hex string
pid = 'swh:1:{}:{}{}'.format(t, '0' * (40 - len(seq)), seq)
yield (pid, pos)
pos += 1
# pairs PID/position in the sequence generated by :func:`gen_records` above
MAP_PAIRS = [
('swh:1:cnt:0000000000000000000000000000000000000000', 0),
('swh:1:cnt:000000000000000000000000000000000000002a', 42),
('swh:1:dir:0000000000000000000000000000000000002afc', 11004),
('swh:1:ori:00000000000000000000000000000000000056ce', 22222),
('swh:1:rel:0000000000000000000000000000000000008235', 33333),
('swh:1:rev:000000000000000000000000000000000000ad9c', 44444),
('swh:1:snp:000000000000000000000000000000000000ea5f', 59999),
]
-class TestPidToIntMap(unittest.TestCase):
+class TestPidToNodeMap(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""create reasonably sized (~2 MB) PID->int map to test on-disk DB
"""
cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.')
cls.fname = os.path.join(cls.tmpdir, 'pid2int.bin')
with open(cls.fname, 'wb') as f:
for (pid, i) in gen_records(length=10000):
- PidToIntMap.write_record(f, pid, i)
+ PidToNodeMap.write_record(f, pid, i)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
def setUp(self):
- self.map = PidToIntMap(self.fname)
+ self.map = PidToNodeMap(self.fname)
def tearDown(self):
self.map.close()
def test_lookup(self):
for (pid, pos) in MAP_PAIRS:
self.assertEqual(self.map[pid], pos)
def test_missing(self):
with self.assertRaises(KeyError):
self.map['swh:1:ori:0101010100000000000000000000000000000000'],
with self.assertRaises(KeyError):
self.map['swh:1:cnt:0101010100000000000000000000000000000000'],
def test_type_error(self):
with self.assertRaises(TypeError):
self.map[42]
with self.assertRaises(TypeError):
self.map[1.2]
def test_update(self):
fname2 = self.fname + '.update'
shutil.copy(self.fname, fname2) # fresh map copy
- map2 = PidToIntMap(fname2, mode='rb+')
+ map2 = PidToNodeMap(fname2, mode='rb+')
for (pid, int) in islice(map2, 11): # update the first N items
new_int = int + 42
map2[pid] = new_int
self.assertEqual(map2[pid], new_int) # check updated value
os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this
def test_iter_type(self):
for t in PID_TYPES:
first_20 = list(islice(self.map.iter_type(t), 20))
k = first_20[0][1]
expected = [('swh:1:{}:{:040x}'.format(t, i), i)
for i in range(k, k + 20)]
assert first_20 == expected
def test_iter_prefix(self):
for t in PID_TYPES:
prefix = self.map.iter_prefix('swh:1:{}:00'.format(t))
first_20 = list(islice(prefix, 20))
k = first_20[0][1]
expected = [('swh:1:{}:{:040x}'.format(t, i), i)
for i in range(k, k + 20)]
assert first_20 == expected
-class TestIntToPidMap(unittest.TestCase):
+class TestNodeToPidMap(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""create reasonably sized (~1 MB) int->PID map to test on-disk DB
"""
cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.')
cls.fname = os.path.join(cls.tmpdir, 'int2pid.bin')
with open(cls.fname, 'wb') as f:
for (pid, _i) in gen_records(length=10000):
- IntToPidMap.write_record(f, pid)
+ NodeToPidMap.write_record(f, pid)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
def setUp(self):
- self.map = IntToPidMap(self.fname)
+ self.map = NodeToPidMap(self.fname)
def tearDown(self):
self.map.close()
def test_lookup(self):
for (pid, pos) in MAP_PAIRS:
self.assertEqual(self.map[pos], pid)
def test_out_of_bounds(self):
with self.assertRaises(IndexError):
self.map[1000000]
with self.assertRaises(IndexError):
self.map[-1000000]
def test_update(self):
fname2 = self.fname + '.update'
shutil.copy(self.fname, fname2) # fresh map copy
- map2 = IntToPidMap(fname2, mode='rb+')
+ map2 = NodeToPidMap(fname2, mode='rb+')
for (int, pid) in islice(map2, 11): # update the first N items
new_pid = pid.replace(':0', ':f') # mangle first hex digit
map2[int] = new_pid
self.assertEqual(map2[int], new_pid) # check updated value
os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:50 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3212285

Event Timeline