Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312347
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
44 KB
Subscribers
None
View Options
diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java
index 35ca81f..4d47a12 100644
--- a/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java
+++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Setup.java
@@ -1,124 +1,124 @@
package org.softwareheritage.graph.backend;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import it.unimi.dsi.bits.LongArrayBitVector;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigArrays;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.fastutil.objects.ObjectBigArrays;
import it.unimi.dsi.io.FastBufferedReader;
import it.unimi.dsi.io.LineIterator;
import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.SwhPID;
import org.softwareheritage.graph.backend.NodeTypesMap;
/**
* Pre-processing steps (such as dumping mapping files on disk) before running the graph service.
*
* @author The Software Heritage developers
*/
public class Setup {
/**
* Main entrypoint.
*
* @param args command line arguments
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: NODES_CSV_GZ COMPRESSED_GRAPH_BASE_NAME");
System.exit(1);
}
String nodesPath = args[0];
String graphPath = args[1];
System.out.println("Pre-computing node id maps...");
long startTime = System.nanoTime();
precomputeNodeIdMap(nodesPath, graphPath);
long endTime = System.nanoTime();
double duration = (endTime - startTime) / 1_000_000_000;
System.out.println("Done in: " + duration + " seconds");
}
/**
* Computes and dumps on disk mapping files.
*
* @param nodesPath path of the compressed csv nodes file
* @param graphPath path of the compressed graph
*/
// Suppress warning for Object2LongFunction cast
@SuppressWarnings("unchecked")
static void precomputeNodeIdMap(String nodesPath, String graphPath) throws IOException {
// First internal mapping: SWH PID (string) -> WebGraph MPH (long)
Object2LongFunction<String> mphMap = null;
try {
mphMap = (Object2LongFunction<String>) BinIO.loadObject(graphPath + ".mph");
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("The .mph file contains unknown class object: " + e);
}
long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size();
// Second internal mapping: WebGraph MPH (long) -> BFS ordering (long)
long[][] bfsMap = LongBigArrays.newBigArray(nbIds);
long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap);
if (loaded != nbIds) {
throw new IllegalArgumentException("Graph contains " + nbIds + " nodes, but read " + loaded);
}
// Dump complete mapping for all nodes: SWH PID (string) <=> WebGraph node id (long)
InputStream nodesStream = new GZIPInputStream(new FileInputStream(nodesPath));
FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(nodesStream, "UTF-8"));
LineIterator swhPIDIterator = new LineIterator(buffer);
- // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToIntMap
- // for the binary format of nodeToPidMap, see Python module swh.graph.pid:IntToPidMap
+ // for the binary format of pidToNodeMap, see Python module swh.graph.pid:PidToNodeMap
+ // for the binary format of nodeToPidMap, see Python module swh.graph.pid:NodeToPidMap
try (DataOutputStream pidToNodeMap = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(graphPath + Graph.PID_TO_NODE)));
BufferedOutputStream nodeToPidMap = new BufferedOutputStream(new FileOutputStream(graphPath + Graph.NODE_TO_PID))) {
// nodeToPidMap needs to write SWH PID in order of node id, so use a temporary array
Object[][] nodeToSwhPID = ObjectBigArrays.newBigArray(nbIds);
// To effectively run edge restriction during graph traversals, we store node id (long) -> SWH
// type map. This is represented as a bitmap using minimum number of bits per Node.Type.
final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2));
final int nbBitsPerNodeType = log2NbTypes;
LongArrayBitVector nodeTypesBitVector =
LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds);
LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType);
for (long iNode = 0; iNode < nbIds && swhPIDIterator.hasNext(); iNode++) {
String strSwhPID = swhPIDIterator.next().toString();
SwhPID swhPID = new SwhPID(strSwhPID);
byte[] swhPIDBin = swhPID.toBytes();
long mphId = mphMap.getLong(strSwhPID);
long nodeId = LongBigArrays.get(bfsMap, mphId);
pidToNodeMap.write(swhPIDBin, 0, swhPIDBin.length);
pidToNodeMap.writeLong(nodeId);
ObjectBigArrays.set(nodeToSwhPID, nodeId, swhPIDBin);
nodeTypesMap.set(nodeId, swhPID.getType().ordinal());
}
BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE);
for (long iNode = 0; iNode < nbIds; iNode++) {
nodeToPidMap.write((byte[]) ObjectBigArrays.get(nodeToSwhPID, iNode));
}
}
}
}
diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index ca2b59d..9dffc5c 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,205 +1,205 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
import os
import pathlib
import struct
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
-from swh.graph.pid import IntToPidMap, PidToIntMap
+from swh.graph.pid import NodeToPidMap, PidToNodeMap
from swh.model.identifiers import PID_TYPES
BUF_SIZE = 64*1024
BIN_FMT = '>q' # 64 bit integer, big endian
PATH_SEPARATOR_ID = -1
NODE2PID_EXT = 'node2pid.bin'
PID2NODE_EXT = 'pid2node.bin'
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = pathlib.Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/server/target/',
pathlib.Path(sys.prefix) / 'share/swh-graph/',
]
for path in try_paths:
glob = list(path.glob('swh-graph-*.jar'))
if glob:
return str(glob[0])
raise RuntimeError("swh-graph-*.jar not found. Have you run `make java`?")
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path):
self.gateway = None
self.entry = None
self.graph_path = graph_path
def __enter__(self):
# TODO: make all of that configurable with sane defaults
java_opts = [
'-Xmx200G',
'-server',
'-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G',
'-XX:+UseLargePages',
'-XX:+UseTransparentHugePages',
'-XX:+UseNUMA',
'-XX:+UseTLAB',
'-XX:+ResizeTLAB',
]
self.gateway = JavaGateway.launch_gateway(
java_path=None,
javaopts=java_opts,
classpath=find_graph_jar(),
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
- self.node2pid = IntToPidMap(self.graph_path + '.' + NODE2PID_EXT)
- self.pid2node = PidToIntMap(self.graph_path + '.' + PID2NODE_EXT)
+ self.node2pid = NodeToPidMap(self.graph_path + '.' + NODE2PID_EXT)
+ self.pid2node = PidToNodeMap(self.graph_path + '.' + PID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
return self
def __exit__(self, exc_type, exc_value, tb):
self.gateway.shutdown()
def stats(self):
return self.entry.stats()
def count(self, ttype, direction, edges_fmt, src):
method = getattr(self.entry, 'count_' + ttype)
return method(direction, edges_fmt, src)
async def simple_traversal(self, ttype, direction, edges_fmt, src):
assert ttype in ('leaves', 'neighbors', 'visit_nodes')
method = getattr(self.stream_proxy, ttype)
async for node_id in method(direction, edges_fmt, src):
yield node_id
async def walk(self, direction, edges_fmt, algo, src, dst):
if dst in PID_TYPES:
it = self.stream_proxy.walk_type(direction, edges_fmt, algo,
src, dst)
else:
it = self.stream_proxy.walk(direction, edges_fmt, algo,
src, dst)
async for node_id in it:
yield node_id
async def visit_paths(self, direction, edges_fmt, src):
path = []
async for node in self.stream_proxy.visit_paths(
direction, edges_fmt, src):
if node == PATH_SEPARATOR_ID:
yield path
path = []
else:
path.append(node)
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
open_thread = loop.run_in_executor(None, open, fname, 'rb')
# Since the open() call on the FIFO is blocking until it is also opened
# on the Java side, we await it with a timeout in case there is an
# exception that prevents the write-side open().
with (await asyncio.wait_for(open_thread, timeout=2)) as f:
while True:
data = await loop.run_in_executor(None, f.read, BUF_SIZE)
if not data:
break
for data in struct.iter_unpack(BIN_FMT, data):
yield data[0]
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix='swh-graph-') as tmpdirname:
cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
# If the read-side open() timeouts, an exception on the
# Java side probably happened that prevented the
# write-side open(). We propagate this exception here if
# that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index db35a6d..ad99aae 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,230 +1,230 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import aiohttp
import click
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.graph import client, webgraph
-from swh.graph.pid import PidToIntMap, IntToPidMap
+from swh.graph.pid import PidToNodeMap, NodeToPidMap
from swh.graph.server.app import make_app
from swh.graph.backend import Backend
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG = {
'graph': ('dict', {})
} # type: Dict[str, Tuple[str, Any]]
@click.group(name='graph', context_settings=CONTEXT_SETTINGS,
cls=AliasedGroup)
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help='YAML configuration file')
@click.pass_context
def cli(ctx, config_file):
"""Software Heritage graph tools."""
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if 'graph' not in conf:
raise ValueError('no "graph" stanza found in configuration file %s'
% config_file)
ctx.obj['config'] = conf
@cli.command('api-client')
@click.option('--host', default='localhost', help='Graph server host')
@click.option('--port', default='5009', help='Graph server port')
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph REST service"""
url = 'http://{}:{}'.format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@cli.group('map')
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
-def dump_pid2int(filename):
- for (pid, int) in PidToIntMap(filename):
+def dump_pid2node(filename):
+ for (pid, int) in PidToNodeMap(filename):
print('{}\t{}'.format(pid, int))
-def dump_int2pid(filename):
- for (int, pid) in IntToPidMap(filename):
+def dump_node2pid(filename):
+ for (int, pid) in NodeToPidMap(filename):
print('{}\t{}'.format(int, pid))
-def restore_pid2int(filename):
+def restore_pid2node(filename):
"""read a textual PID->int map from stdin and write its binary version to
filename
"""
with open(filename, 'wb') as dst:
for line in sys.stdin:
(str_pid, str_int) = line.split()
- PidToIntMap.write_record(dst, str_pid, int(str_int))
+ PidToNodeMap.write_record(dst, str_pid, int(str_int))
-def restore_int2pid(filename, length):
+def restore_node2pid(filename, length):
"""read a textual int->PID map from stdin and write its binary version to
filename
"""
- int2pid = IntToPidMap(filename, mode='wb', length=length)
+ node2pid = NodeToPidMap(filename, mode='wb', length=length)
for line in sys.stdin:
(str_int, str_pid) = line.split()
- int2pid[int(str_int)] = str_pid
- int2pid.close()
+ node2pid[int(str_int)] = str_pid
+ node2pid.close()
@map.command('dump')
@click.option('--type', '-t', 'map_type', required=True,
- type=click.Choice(['pid2int', 'int2pid']),
+ type=click.Choice(['pid2node', 'node2pid']),
help='type of map to dump')
@click.argument('filename', required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
- """dump a binary PID<->int map to textual format"""
- if map_type == 'pid2int':
- dump_pid2int(filename)
- elif map_type == 'int2pid':
- dump_int2pid(filename)
+ """dump a binary PID<->node map to textual format"""
+ if map_type == 'pid2node':
+ dump_pid2node(filename)
+ elif map_type == 'node2pid':
+ dump_node2pid(filename)
else:
raise ValueError('invalid map type: ' + map_type)
pass
@map.command('restore')
@click.option('--type', '-t', 'map_type', required=True,
- type=click.Choice(['pid2int', 'int2pid']),
+ type=click.Choice(['pid2node', 'node2pid']),
help='type of map to dump')
@click.option('--length', '-l', type=int,
help='''map size in number of logical records
- (required for int2pid maps)''')
+ (required for node2pid maps)''')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
- """restore a binary PID<->int map from textual format"""
- if map_type == 'pid2int':
- restore_pid2int(filename)
- elif map_type == 'int2pid':
+ """restore a binary PID<->node map from textual format"""
+ if map_type == 'pid2node':
+ restore_pid2node(filename)
+ elif map_type == 'node2pid':
if length is None:
raise click.UsageError(
'map length is required when restoring {} maps'.format(
map_type), ctx)
- restore_int2pid(filename, length)
+ restore_node2pid(filename, length)
else:
raise ValueError('invalid map type: ' + map_type)
@map.command('write')
@click.option('--type', '-t', 'map_type', required=True,
- type=click.Choice(['pid2int', 'int2pid']),
+ type=click.Choice(['pid2node', 'node2pid']),
help='type of map to write')
@click.argument('filename', required=True, type=click.Path())
@click.pass_context
def write(ctx, map_type, filename):
"""write a map to disk sequentially
- read from stdin a textual PID->int mapping (for pid2int, or a simple
- sequence of PIDs for int2pid) and write it to disk in the requested binary
+ read from stdin a textual PID->node mapping (for pid2node, or a simple
+ sequence of PIDs for node2pid) and write it to disk in the requested binary
map format
note that no sorting is applied, so the input should already be sorted as
- required by the chosen map type (by PID for pid2int, by int for int2pid)
+ required by the chosen map type (by PID for pid2node, by int for node2pid)
"""
with open(filename, 'wb') as f:
- if map_type == 'pid2int':
+ if map_type == 'pid2node':
for line in sys.stdin:
(pid, int_str) = line.rstrip().split(maxsplit=1)
- PidToIntMap.write_record(f, pid, int(int_str))
- elif map_type == 'int2pid':
+ PidToNodeMap.write_record(f, pid, int(int_str))
+ elif map_type == 'node2pid':
for line in sys.stdin:
pid = line.rstrip()
- IntToPidMap.write_record(f, pid)
+ NodeToPidMap.write_record(f, pid)
else:
raise ValueError('invalid map type: ' + map_type)
@cli.command(name='rpc-serve')
@click.option('--host', '-h', default='0.0.0.0',
metavar='IP', show_default=True,
help='host IP address to bind the server on')
@click.option('--port', '-p', default=5009, type=click.INT,
metavar='PORT', show_default=True,
help='port to bind the server on')
@click.option('--graph', '-g', required=True, metavar='GRAPH',
help='compressed graph basename')
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph REST service"""
backend = Backend(graph_path=graph)
app = make_app(backend=backend)
with backend:
aiohttp.web.run_app(app, host=host, port=port)
@cli.command()
@click.option('--graph', '-g', required=True, metavar='GRAPH',
type=PathlibPath(),
help='input graph basename')
@click.option('--outdir', '-o', 'out_dir', required=True, metavar='DIR',
type=PathlibPath(),
help='directory where to store compressed graph')
@click.option('--steps', '-s', metavar='STEPS', type=webgraph.StepOption(),
help='run only these compression steps (default: all steps)')
@click.pass_context
def compress(ctx, graph, out_dir, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute,
(6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps,
(11) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
graph_name = graph.name
in_dir = graph.parent
try:
conf = ctx.obj['config']['graph']['compress']
except KeyError:
conf = {} # use defaults
webgraph.compress(graph_name, in_dir, out_dir, steps, conf)
def main():
return cli(auto_envvar_prefix='SWH_GRAPH')
if __name__ == '__main__':
main()
diff --git a/swh/graph/pid.py b/swh/graph/pid.py
index 280fba3..8a780fa 100644
--- a/swh/graph/pid.py
+++ b/swh/graph/pid.py
@@ -1,402 +1,402 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import mmap
import os
import struct
from collections.abc import MutableMapping
from enum import Enum
from mmap import MAP_SHARED, MAP_PRIVATE
from typing import BinaryIO, Iterator, Tuple
from swh.model.identifiers import PersistentId, parse_persistent_identifier
PID_BIN_FMT = 'BB20s' # 2 unsigned chars + 20 bytes
INT_BIN_FMT = '>q' # big endian, 8-byte integer
PID_BIN_SIZE = 22 # in bytes
INT_BIN_SIZE = 8 # in bytes
class PidType(Enum):
"""types of existing PIDs, used to serialize PID type as a (char) integer
Note that the order does matter also for driving the binary search in
PID-indexed maps. Integer values also matter, for compatibility with the
Java layer.
"""
content = 0
directory = 1
origin = 2
release = 3
revision = 4
snapshot = 5
def str_to_bytes(pid_str: str) -> bytes:
"""Convert a PID to a byte sequence
The binary format used to represent PIDs as 22-byte long byte sequences as
follows:
- 1 byte for the namespace version represented as a C `unsigned char`
- 1 byte for the object type, as the int value of :class:`PidType` enums,
represented as a C `unsigned char`
- 20 bytes for the SHA1 digest as a byte sequence
Args:
pid: persistent identifier
Returns:
bytes: byte sequence representation of pid
"""
pid = parse_persistent_identifier(pid_str)
return struct.pack(PID_BIN_FMT, pid.scheme_version,
PidType[pid.object_type].value,
bytes.fromhex(pid.object_id))
def bytes_to_str(bytes: bytes) -> str:
"""Inverse function of :func:`str_to_bytes`
See :func:`str_to_bytes` for a description of the binary PID format.
Args:
bytes: byte sequence representation of pid
Returns:
pid: persistent identifier
"""
(version, type, bin_digest) = struct.unpack(PID_BIN_FMT, bytes)
pid = PersistentId(object_type=PidType(type).name, object_id=bin_digest)
return str(pid)
class _OnDiskMap():
"""mmap-ed on-disk sequence of fixed size records
"""
def __init__(self, record_size: int, fname: str, mode: str = 'rb',
length: int = None):
"""open an existing on-disk map
Args:
record_size: size of each record in bytes
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
length: map size in number of logical records; used to initialize
writable maps at creation time. Must be given when mode is 'wb'
and the map doesn't exist on disk; ignored otherwise
"""
os_modes = {
'rb': os.O_RDONLY,
'wb': os.O_RDWR | os.O_CREAT,
'rb+': os.O_RDWR
}
if mode not in os_modes:
raise ValueError('invalid file open mode: ' + mode)
new_map = (mode == 'wb')
writable_map = mode in ['wb', 'rb+']
self.record_size = record_size
self.fd = os.open(fname, os_modes[mode])
if new_map:
if length is None:
raise ValueError('missing length when creating new map')
os.truncate(self.fd, length * self.record_size)
self.size = os.path.getsize(fname)
(self.length, remainder) = divmod(self.size, record_size)
if remainder:
raise ValueError(
'map size {} is not a multiple of the record size {}'.format(
self.size, record_size))
self.mm = mmap.mmap(
self.fd, self.size,
flags=MAP_SHARED if writable_map else MAP_PRIVATE)
def close(self) -> None:
"""close the map
shuts down both the mmap and the underlying file descriptor
"""
if not self.mm.closed:
self.mm.close()
os.close(self.fd)
def __len__(self) -> int:
return self.length
def __delitem__(self, pos: int) -> None:
raise NotImplementedError('cannot delete records from fixed-size map')
-class PidToIntMap(_OnDiskMap, MutableMapping):
+class PidToNodeMap(_OnDiskMap, MutableMapping):
"""memory mapped map from PID (:ref:`persistent-identifiers`) to a continuous
range 0..N of (8-byte long) integers
- This is the converse mapping of :class:`IntToPidMap`.
+ This is the converse mapping of :class:`NodeToPidMap`.
The on-disk serialization format is a sequence of fixed length (30 bytes)
records with the following fields:
- PID (22 bytes): binary PID representation as per :func:`str_to_bytes`
- long (8 bytes): big endian long integer
The records are sorted lexicographically by PID type and checksum, where
type is the integer value of :class:`PidType`. PID lookup in the map is
performed via binary search. Hence a huge map with, say, 11 B entries,
will require ~30 disk seeks.
Note that, due to fixed size + ordering, it is not possible to create these
maps by random writing. Hence, __setitem__ can be used only to *update* the
value associated to an existing key, rather than to add a missing item. To
create an entire map from scratch, you should do so *sequentially*, using
static method :meth:`write_record` (or, at your own risk, by hand via the
mmap :attr:`mm`).
"""
# record binary format: PID + a big endian 8-byte big endian integer
RECORD_BIN_FMT = '>' + PID_BIN_FMT + 'q'
RECORD_SIZE = PID_BIN_SIZE + INT_BIN_SIZE
def __init__(self, fname: str, mode: str = 'rb', length: int = None):
"""open an existing on-disk map
Args:
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
length: map size in number of logical records; used to initialize
read-write maps at creation time. Must be given when mode is
'wb'; ignored otherwise
"""
super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length)
def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]:
"""seek and return the (binary) record at a given (logical) position
see :func:`_get_record` for an equivalent function with additional
deserialization
Args:
pos: 0-based record number
Returns:
a pair `(pid, int)`, where pid and int are bytes
"""
rec_pos = pos * self.RECORD_SIZE
int_pos = rec_pos + PID_BIN_SIZE
return (self.mm[rec_pos:int_pos],
self.mm[int_pos:int_pos+INT_BIN_SIZE])
def _get_record(self, pos: int) -> Tuple[str, int]:
"""seek and return the record at a given (logical) position
moral equivalent of :func:`_get_bin_record`, with additional
deserialization to non-bytes types
Args:
pos: 0-based record number
Returns:
a pair `(pid, int)`, where pid is a string-based PID and int the
corresponding integer identifier
"""
(pid_bytes, int_bytes) = self._get_bin_record(pos)
return (bytes_to_str(pid_bytes),
struct.unpack(INT_BIN_FMT, int_bytes)[0])
@classmethod
def write_record(cls, f: BinaryIO, pid: str, int: int) -> None:
"""write a logical record to a file-like object
Args:
f: file-like object to write the record to
pid: textual PID
int: PID integer identifier
"""
f.write(str_to_bytes(pid))
f.write(struct.pack(INT_BIN_FMT, int))
def _bisect_pos(self, pid_str: str) -> int:
"""bisect the position of the given identifier. If the identifier is
not found, the position of the pid immediately after is returned.
Args:
pid_str: the pid as a string
Returns:
the logical record of the bisected position in the map
"""
if not isinstance(pid_str, str):
raise TypeError('PID must be a str, not {}'.format(type(pid_str)))
try:
target = str_to_bytes(pid_str) # desired PID as bytes
except ValueError:
raise ValueError('invalid PID: "{}"'.format(pid_str))
lo = 0
hi = self.length - 1
while lo < hi:
mid = (lo + hi) // 2
(pid, _value) = self._get_bin_record(mid)
if pid < target:
lo = mid + 1
else:
hi = mid
return lo
def _find(self, pid_str: str) -> Tuple[int, int]:
"""lookup the integer identifier of a pid and its position
Args:
pid_str: the pid as a string
Returns:
a pair `(pid, pos)` with pid integer identifier and its logical
record position in the map
"""
pos = self._bisect_pos(pid_str)
pid_found, value = self._get_record(pos)
if pid_found == pid_str:
return (value, pos)
raise KeyError(pid_str)
def __getitem__(self, pid_str: str) -> int:
"""lookup the integer identifier of a PID
Args:
pid: the PID as a string
Returns:
the integer identifier of pid
"""
return self._find(pid_str)[0] # return element, ignore position
def __setitem__(self, pid_str: str, int: str) -> None:
(_pid, pos) = self._find(pid_str) # might raise KeyError and that's OK
rec_pos = pos * self.RECORD_SIZE
int_pos = rec_pos + PID_BIN_SIZE
self.mm[rec_pos:int_pos] = str_to_bytes(pid_str)
self.mm[int_pos:int_pos+INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int)
def __iter__(self) -> Iterator[Tuple[str, int]]:
for pos in range(self.length):
yield self._get_record(pos)
def iter_prefix(self, prefix: str):
swh, n, t, sha = prefix.split(':')
sha = sha.ljust(40, '0')
start_pid = ':'.join([swh, n, t, sha])
start = self._bisect_pos(start_pid)
for pos in range(start, self.length):
pid, value = self._get_record(pos)
if not pid.startswith(prefix):
break
yield pid, value
def iter_type(self, pid_type: str) -> Iterator[Tuple[str, int]]:
prefix = 'swh:1:{}:'.format(pid_type)
yield from self.iter_prefix(prefix)
-class IntToPidMap(_OnDiskMap, MutableMapping):
+class NodeToPidMap(_OnDiskMap, MutableMapping):
"""memory mapped map from a continuous range of 0..N (8-byte long) integers to
PIDs (:ref:`persistent-identifiers`)
- This is the converse mapping of :class:`PidToIntMap`.
+ This is the converse mapping of :class:`PidToNodeMap`.
The on-disk serialization format is a sequence of fixed length records (22
bytes), each being the binary representation of a PID as per
:func:`str_to_bytes`.
The records are sorted by long integer, so that integer lookup is possible
via fixed-offset seek.
"""
RECORD_BIN_FMT = PID_BIN_FMT
RECORD_SIZE = PID_BIN_SIZE
def __init__(self, fname: str, mode: str = 'rb', length: int = None):
"""open an existing on-disk map
Args:
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
size: map size in number of logical records; used to initialize
read-write maps at creation time. Must be given when mode is
'wb'; ignored otherwise
length: passed to :class:`_OnDiskMap`
"""
super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length)
def _get_bin_record(self, pos: int) -> bytes:
"""seek and return the (binary) PID at a given (logical) position
Args:
pos: 0-based record number
Returns:
PID as a byte sequence
"""
rec_pos = pos * self.RECORD_SIZE
return self.mm[rec_pos:rec_pos+self.RECORD_SIZE]
@classmethod
def write_record(cls, f: BinaryIO, pid: str) -> None:
"""write a PID to a file-like object
Args:
f: file-like object to write the record to
pid: textual PID
"""
f.write(str_to_bytes(pid))
def __getitem__(self, pos: int) -> str:
orig_pos = pos
if pos < 0:
pos = len(self) + pos
if not (0 <= pos < len(self)):
raise IndexError(orig_pos)
return bytes_to_str(self._get_bin_record(pos))
def __setitem__(self, pos: int, pid: str) -> None:
rec_pos = pos * self.RECORD_SIZE
self.mm[rec_pos:rec_pos+self.RECORD_SIZE] = str_to_bytes(pid)
def __iter__(self) -> Iterator[Tuple[int, str]]:
for pos in range(self.length):
yield (pos, self[pos])
diff --git a/swh/graph/tests/test_pid.py b/swh/graph/tests/test_pid.py
index 1cd1afd..69cb927 100644
--- a/swh/graph/tests/test_pid.py
+++ b/swh/graph/tests/test_pid.py
@@ -1,201 +1,201 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import tempfile
import unittest
from itertools import islice
from swh.graph.pid import str_to_bytes, bytes_to_str
-from swh.graph.pid import PidToIntMap, IntToPidMap
+from swh.graph.pid import PidToNodeMap, NodeToPidMap
from swh.model.identifiers import PID_TYPES
class TestPidSerialization(unittest.TestCase):
pairs = [
('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2',
bytes.fromhex('01' + '00' +
'94a9ed024d3859793618152ea559a168bbcbb5e2')),
('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505',
bytes.fromhex('01' + '01' +
'd198bc9d7a6bcf6db04f476d29314f157507d505')),
('swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f',
bytes.fromhex('01' + '02' +
'b63a575fe3faab7692c9f38fb09d4bb45651bb0f')),
('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f',
bytes.fromhex('01' + '03' +
'22ece559cc7cc2364edc5e5593d63ae8bd229f9f')),
('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d',
bytes.fromhex('01' + '04' +
'309cf2674ee7a0749978cf8265ab91a60aea0f7d')),
('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453',
bytes.fromhex('01' + '05' +
'c7c108084bc0bf3d81436bf980b46e98bd338453')),
]
def test_str_to_bytes(self):
for (pid_str, pid_bytes) in self.pairs:
self.assertEqual(str_to_bytes(pid_str), pid_bytes)
def test_bytes_to_str(self):
for (pid_str, pid_bytes) in self.pairs:
self.assertEqual(bytes_to_str(pid_bytes), pid_str)
def test_round_trip(self):
for (pid_str, pid_bytes) in self.pairs:
self.assertEqual(pid_str, bytes_to_str(str_to_bytes(pid_str)))
self.assertEqual(pid_bytes, str_to_bytes(bytes_to_str(pid_bytes)))
def gen_records(types=['cnt', 'dir', 'ori', 'rel', 'rev', 'snp'],
length=10000):
"""generate sequential PID/int records, suitable for filling int<->pid maps for
testing swh-graph on-disk binary databases
Args:
types (list): list of PID types to be generated, specified as the
corresponding 3-letter component in PIDs
length (int): number of PIDs to generate *per type*
Yields:
pairs (pid, int) where pid is a textual PID and int its sequential
integer identifier
"""
pos = 0
for t in sorted(types):
for i in range(0, length):
seq = format(pos, 'x') # current position as hex string
pid = 'swh:1:{}:{}{}'.format(t, '0' * (40 - len(seq)), seq)
yield (pid, pos)
pos += 1
# pairs PID/position in the sequence generated by :func:`gen_records` above
MAP_PAIRS = [
('swh:1:cnt:0000000000000000000000000000000000000000', 0),
('swh:1:cnt:000000000000000000000000000000000000002a', 42),
('swh:1:dir:0000000000000000000000000000000000002afc', 11004),
('swh:1:ori:00000000000000000000000000000000000056ce', 22222),
('swh:1:rel:0000000000000000000000000000000000008235', 33333),
('swh:1:rev:000000000000000000000000000000000000ad9c', 44444),
('swh:1:snp:000000000000000000000000000000000000ea5f', 59999),
]
-class TestPidToIntMap(unittest.TestCase):
+class TestPidToNodeMap(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""create reasonably sized (~2 MB) PID->int map to test on-disk DB
"""
cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.')
cls.fname = os.path.join(cls.tmpdir, 'pid2int.bin')
with open(cls.fname, 'wb') as f:
for (pid, i) in gen_records(length=10000):
- PidToIntMap.write_record(f, pid, i)
+ PidToNodeMap.write_record(f, pid, i)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
def setUp(self):
- self.map = PidToIntMap(self.fname)
+ self.map = PidToNodeMap(self.fname)
def tearDown(self):
self.map.close()
def test_lookup(self):
for (pid, pos) in MAP_PAIRS:
self.assertEqual(self.map[pid], pos)
def test_missing(self):
with self.assertRaises(KeyError):
self.map['swh:1:ori:0101010100000000000000000000000000000000'],
with self.assertRaises(KeyError):
self.map['swh:1:cnt:0101010100000000000000000000000000000000'],
def test_type_error(self):
with self.assertRaises(TypeError):
self.map[42]
with self.assertRaises(TypeError):
self.map[1.2]
def test_update(self):
fname2 = self.fname + '.update'
shutil.copy(self.fname, fname2) # fresh map copy
- map2 = PidToIntMap(fname2, mode='rb+')
+ map2 = PidToNodeMap(fname2, mode='rb+')
for (pid, int) in islice(map2, 11): # update the first N items
new_int = int + 42
map2[pid] = new_int
self.assertEqual(map2[pid], new_int) # check updated value
os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this
def test_iter_type(self):
for t in PID_TYPES:
first_20 = list(islice(self.map.iter_type(t), 20))
k = first_20[0][1]
expected = [('swh:1:{}:{:040x}'.format(t, i), i)
for i in range(k, k + 20)]
assert first_20 == expected
def test_iter_prefix(self):
for t in PID_TYPES:
prefix = self.map.iter_prefix('swh:1:{}:00'.format(t))
first_20 = list(islice(prefix, 20))
k = first_20[0][1]
expected = [('swh:1:{}:{:040x}'.format(t, i), i)
for i in range(k, k + 20)]
assert first_20 == expected
-class TestIntToPidMap(unittest.TestCase):
+class TestNodeToPidMap(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""create reasonably sized (~1 MB) int->PID map to test on-disk DB
"""
cls.tmpdir = tempfile.mkdtemp(prefix='swh.graph.test.')
cls.fname = os.path.join(cls.tmpdir, 'int2pid.bin')
with open(cls.fname, 'wb') as f:
for (pid, _i) in gen_records(length=10000):
- IntToPidMap.write_record(f, pid)
+ NodeToPidMap.write_record(f, pid)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
def setUp(self):
- self.map = IntToPidMap(self.fname)
+ self.map = NodeToPidMap(self.fname)
def tearDown(self):
self.map.close()
def test_lookup(self):
for (pid, pos) in MAP_PAIRS:
self.assertEqual(self.map[pos], pid)
def test_out_of_bounds(self):
with self.assertRaises(IndexError):
self.map[1000000]
with self.assertRaises(IndexError):
self.map[-1000000]
def test_update(self):
fname2 = self.fname + '.update'
shutil.copy(self.fname, fname2) # fresh map copy
- map2 = IntToPidMap(fname2, mode='rb+')
+ map2 = NodeToPidMap(fname2, mode='rb+')
for (int, pid) in islice(map2, 11): # update the first N items
new_pid = pid.replace(':0', ':f') # mangle first hex digit
map2[int] = new_pid
self.assertEqual(map2[int], new_pid) # check updated value
os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:50 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3212285
Attached To
rDGRPH Compressed graph representation
Event Timeline
Log In to Comment