path);
+}
diff --git a/java/server/src/main/java/org/softwareheritage/graph/algo/StreamTraversal.java b/java/server/src/main/java/org/softwareheritage/graph/algo/StreamTraversal.java
new file mode 100644
index 0000000..a22c7a3
--- /dev/null
+++ b/java/server/src/main/java/org/softwareheritage/graph/algo/StreamTraversal.java
@@ -0,0 +1,77 @@
+package org.softwareheritage.graph.algo;
+
+import java.util.function.BiConsumer;
+
+import org.softwareheritage.graph.Graph;
+import org.softwareheritage.graph.algo.NodeIdsConsumer;
+import org.softwareheritage.graph.algo.Traversal;
+
+/** Traversal algorithm implementations that stream results to the caller in
+ * batch, to avoid loading in memory (potentially very) large results at once.
+ *
+ * @author The Software Heritage developers
+ * @see org.softwareheritage.graph.algo.Traversal
+ */
+
+public class StreamTraversal extends Traversal {
+
+ static int DEFAULT_BUFFER_SIZE = 16_777_216; // 128 MB / sizeof(long)
+
+ /** number of node identifiers to buffer before returning to client */
+ private int bufferSize;
+
+ /** node identifier buffer */
+ private long[] buffer;
+
+ /** current position in the buffer */
+ private int bufferPos;
+
+ /** callback to be fired when buffer is full (or processing is done) */
+ private NodeIdsConsumer cb;
+
+ public StreamTraversal(Graph graph, String direction, String edgesFmt) {
+ this(graph, direction, edgesFmt, DEFAULT_BUFFER_SIZE);
+ }
+
+ public StreamTraversal(Graph graph, String direction, String edgesFmt, int bufferSize) {
+ super(graph, direction, edgesFmt);
+
+ this.buffer = new long[bufferSize];
+ this.bufferSize = bufferSize;
+ this.bufferPos = 0;
+ }
+
+ private void bufferize(long nodeId) {
+ buffer[bufferPos++] = nodeId;
+ if (bufferPos == bufferSize) {
+ cb.accept(buffer, bufferPos);
+ bufferPos = 0;
+ }
+ }
+
+ private void flush() {
+ if (bufferPos > 0) {
+ cb.accept(buffer, bufferPos);
+ bufferPos = 0;
+ }
+ }
+
+ public void leaves(long srcNodeId, NodeIdsConsumer cb) {
+ this.cb = cb;
+ this.leavesVisitor(srcNodeId, (nodeId) -> this.bufferize(nodeId));
+ this.flush();
+ }
+
+ public void neighbors(long srcNodeId, NodeIdsConsumer cb) {
+ this.cb = cb;
+ this.neighborsVisitor(srcNodeId, (nodeId) -> this.bufferize(nodeId));
+ this.flush();
+ }
+
+ public void visitNodes(long srcNodeId, NodeIdsConsumer cb) {
+ this.cb = cb;
+ this.visitNodesVisitor(srcNodeId, (nodeId) -> this.bufferize(nodeId));
+ this.flush();
+ }
+
+}
diff --git a/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java b/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java
index fe0f56f..9aecce8 100644
--- a/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java
+++ b/java/server/src/main/java/org/softwareheritage/graph/algo/Traversal.java
@@ -1,328 +1,353 @@
package org.softwareheritage.graph.algo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Stack;
import it.unimi.dsi.bits.LongArrayBitVector;
import org.softwareheritage.graph.AllowedEdges;
import org.softwareheritage.graph.Endpoint;
import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Neighbors;
import org.softwareheritage.graph.Node;
+import org.softwareheritage.graph.algo.NodeIdConsumer;
+import org.softwareheritage.graph.algo.PathConsumer;
/**
* Traversal algorithms on the compressed graph.
*
* Internal implementation of the traversal API endpoints. These methods only input/output internal
* long ids, which are converted in the {@link Endpoint} higher-level class to Software Heritage
* PID.
*
* @author The Software Heritage developers
* @see org.softwareheritage.graph.Endpoint
*/
public class Traversal {
/** Graph used in the traversal */
Graph graph;
/** Boolean to specify the use of the transposed graph */
boolean useTransposed;
/** Graph edge restriction */
AllowedEdges edges;
/** Bit array storing if we have visited a node */
LongArrayBitVector visited;
/** Hash map storing parent node id for each nodes during a traversal */
Map parentNode;
/** Number of edges accessed during traversal */
long nbEdgesAccessed;
/**
* Constructor.
*
* @param graph graph used in the traversal
* @param direction a string (either "forward" or "backward") specifying edge orientation
* @param edgesFmt a formatted string describing allowed edges
*/
public Traversal(Graph graph, String direction, String edgesFmt) {
if (!direction.matches("forward|backward")) {
throw new IllegalArgumentException("Unknown traversal direction: " + direction);
}
this.graph = graph;
this.useTransposed = (direction.equals("backward"));
this.edges = new AllowedEdges(graph, edgesFmt);
long nbNodes = graph.getNbNodes();
this.visited = LongArrayBitVector.ofLength(nbNodes);
this.parentNode = new HashMap<>();
this.nbEdgesAccessed = 0;
}
/**
* Returns number of accessed edges during traversal.
*
* @return number of edges accessed in last traversal
*/
public long getNbEdgesAccessed() {
return nbEdgesAccessed;
}
/**
- * Returns the leaves of a subgraph rooted at the specified source node.
- *
- * @param srcNodeId source node
- * @return list of node ids corresponding to the leaves
+ * Push version of {@link leaves}: will fire passed callback for each leaf.
*/
- public ArrayList leaves(long srcNodeId) {
- ArrayList nodeIds = new ArrayList();
+ public void leavesVisitor(long srcNodeId, NodeIdConsumer cb) {
Stack stack = new Stack();
this.nbEdgesAccessed = 0;
stack.push(srcNodeId);
visited.set(srcNodeId);
while (!stack.isEmpty()) {
long currentNodeId = stack.pop();
long neighborsCnt = 0;
nbEdgesAccessed += graph.degree(currentNodeId, useTransposed);
for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) {
neighborsCnt++;
if (!visited.getBoolean(neighborNodeId)) {
stack.push(neighborNodeId);
visited.set(neighborNodeId);
}
}
if (neighborsCnt == 0) {
- nodeIds.add(currentNodeId);
+ cb.accept(currentNodeId);
}
}
-
- return nodeIds;
}
/**
- * Returns node direct neighbors (linked with exactly one edge).
+ * Returns the leaves of a subgraph rooted at the specified source node.
*
* @param srcNodeId source node
- * @return list of node ids corresponding to the neighbors
+ * @return list of node ids corresponding to the leaves
*/
- public ArrayList neighbors(long srcNodeId) {
+ public ArrayList leaves(long srcNodeId) {
ArrayList nodeIds = new ArrayList();
+ leavesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId));
+ return nodeIds;
+ }
+
+ /**
+ * Push version of {@link neighbors}: will fire passed callback on each
+ * neighbor.
+ */
+ public void neighborsVisitor(long srcNodeId, NodeIdConsumer cb) {
this.nbEdgesAccessed = graph.degree(srcNodeId, useTransposed);
for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, srcNodeId)) {
- nodeIds.add(neighborNodeId);
+ cb.accept(neighborNodeId);
}
- return nodeIds;
}
/**
- * Performs a graph traversal and returns explored nodes.
+ * Returns node direct neighbors (linked with exactly one edge).
*
* @param srcNodeId source node
- * @return list of explored node ids
+ * @return list of node ids corresponding to the neighbors
*/
- public ArrayList visitNodes(long srcNodeId) {
+ public ArrayList neighbors(long srcNodeId) {
ArrayList nodeIds = new ArrayList();
+ neighborsVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId));
+ return nodeIds;
+ }
+
+ /**
+ * Push version of {@link visitNodes}: will fire passed callback on each
+ * visited node.
+ */
+ public void visitNodesVisitor(long srcNodeId, NodeIdConsumer cb) {
Stack stack = new Stack();
this.nbEdgesAccessed = 0;
stack.push(srcNodeId);
visited.set(srcNodeId);
while (!stack.isEmpty()) {
long currentNodeId = stack.pop();
- nodeIds.add(currentNodeId);
+ cb.accept(currentNodeId);
nbEdgesAccessed += graph.degree(currentNodeId, useTransposed);
for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) {
if (!visited.getBoolean(neighborNodeId)) {
stack.push(neighborNodeId);
visited.set(neighborNodeId);
}
}
}
+ }
- return nodeIds;
+ /**
+ * Performs a graph traversal and returns explored nodes.
+ *
+ * @param srcNodeId source node
+ * @return list of explored node ids
+ */
+ public ArrayList visitNodes(long srcNodeId) {
+ ArrayList nodeIds = new ArrayList();
+ visitNodesVisitor(srcNodeId, (nodeId) -> nodeIds.add(nodeId));
+ return nodeIds;
+ }
+
+ /**
+ * Push version of {@link visitPaths}: will fire passed callback on each
+ * discovered (complete) path.
+ */
+ public void visitPathsVisitor(long srcNodeId, PathConsumer cb) {
+ Stack currentPath = new Stack();
+ this.nbEdgesAccessed = 0;
+ visitPathsInternalVisitor(srcNodeId, currentPath, cb);
}
/**
* Performs a graph traversal and returns explored paths.
*
* @param srcNodeId source node
* @return list of explored paths (represented as a list of node ids)
*/
public ArrayList> visitPaths(long srcNodeId) {
ArrayList> paths = new ArrayList<>();
- Stack currentPath = new Stack();
- this.nbEdgesAccessed = 0;
- visitPathsInternal(srcNodeId, paths, currentPath);
+ visitPathsVisitor(srcNodeId, (path) -> paths.add(path));
return paths;
}
- /**
- * Internal recursive function of {@link #visitPaths}.
- *
- * @param currentNodeId current node
- * @param paths list of currently stored paths
- * @param currentPath current path as node ids
- */
- private void visitPathsInternal(
- long currentNodeId, ArrayList> paths, Stack currentPath) {
+ private void visitPathsInternalVisitor(long currentNodeId,
+ Stack currentPath,
+ PathConsumer cb) {
currentPath.push(currentNodeId);
long visitedNeighbors = 0;
nbEdgesAccessed += graph.degree(currentNodeId, useTransposed);
for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) {
- visitPathsInternal(neighborNodeId, paths, currentPath);
+ visitPathsInternalVisitor(neighborNodeId, currentPath, cb);
visitedNeighbors++;
}
if (visitedNeighbors == 0) {
ArrayList path = new ArrayList();
for (long nodeId : currentPath) {
path.add(nodeId);
}
- paths.add(path);
+ cb.accept(path);
}
currentPath.pop();
}
/**
* Performs a graph traversal and returns the first found path from source to destination.
*
* @param srcNodeId source node
* @param dst destination (either a node or a node type)
* @return found path as a list of node ids
*/
public ArrayList walk(long srcNodeId, T dst, String algorithm) {
long dstNodeId = -1;
if (algorithm.equals("dfs")) {
dstNodeId = walkInternalDfs(srcNodeId, dst);
} else if (algorithm.equals("bfs")) {
dstNodeId = walkInternalBfs(srcNodeId, dst);
} else {
throw new IllegalArgumentException("Unknown traversal algorithm: " + algorithm);
}
if (dstNodeId == -1) {
throw new IllegalArgumentException("Unable to find destination point: " + dst);
}
ArrayList nodeIds = backtracking(srcNodeId, dstNodeId);
return nodeIds;
}
/**
* Internal DFS function of {@link #walk}.
*
* @param srcNodeId source node
* @param dst destination (either a node or a node type)
* @return final destination node or -1 if no path found
*/
private long walkInternalDfs(long srcNodeId, T dst) {
Stack stack = new Stack();
this.nbEdgesAccessed = 0;
stack.push(srcNodeId);
visited.set(srcNodeId);
while (!stack.isEmpty()) {
long currentNodeId = stack.pop();
if (isDstNode(currentNodeId, dst)) {
return currentNodeId;
}
nbEdgesAccessed += graph.degree(currentNodeId, useTransposed);
for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) {
if (!visited.getBoolean(neighborNodeId)) {
stack.push(neighborNodeId);
visited.set(neighborNodeId);
parentNode.put(neighborNodeId, currentNodeId);
}
}
}
return -1;
}
/**
* Internal BFS function of {@link #walk}.
*
* @param srcNodeId source node
* @param dst destination (either a node or a node type)
* @return final destination node or -1 if no path found
*/
private long walkInternalBfs(long srcNodeId, T dst) {
Queue queue = new LinkedList();
this.nbEdgesAccessed = 0;
queue.add(srcNodeId);
visited.set(srcNodeId);
while (!queue.isEmpty()) {
long currentNodeId = queue.poll();
if (isDstNode(currentNodeId, dst)) {
return currentNodeId;
}
nbEdgesAccessed += graph.degree(currentNodeId, useTransposed);
for (long neighborNodeId : new Neighbors(graph, useTransposed, edges, currentNodeId)) {
if (!visited.getBoolean(neighborNodeId)) {
queue.add(neighborNodeId);
visited.set(neighborNodeId);
parentNode.put(neighborNodeId, currentNodeId);
}
}
}
return -1;
}
/**
* Internal function of {@link #walk} to check if a node corresponds to the destination.
*
* @param nodeId current node
* @param dst destination (either a node or a node type)
* @return true if the node is a destination, or false otherwise
*/
private boolean isDstNode(long nodeId, T dst) {
if (dst instanceof Long) {
long dstNodeId = (Long) dst;
return nodeId == dstNodeId;
} else if (dst instanceof Node.Type) {
Node.Type dstType = (Node.Type) dst;
return graph.getNodeType(nodeId) == dstType;
} else {
return false;
}
}
/**
* Internal backtracking function of {@link #walk}.
*
* @param srcNodeId source node
* @param dstNodeId destination node
* @return the found path, as a list of node ids
*/
private ArrayList backtracking(long srcNodeId, long dstNodeId) {
ArrayList path = new ArrayList();
long currentNodeId = dstNodeId;
while (currentNodeId != srcNodeId) {
path.add(currentNodeId);
currentNodeId = parentNode.get(currentNodeId);
}
path.add(srcNodeId);
Collections.reverse(path);
return path;
}
}
diff --git a/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java b/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java
new file mode 100644
index 0000000..aa7ab3c
--- /dev/null
+++ b/java/server/src/main/java/org/softwareheritage/graph/backend/Pp.java
@@ -0,0 +1,42 @@
+package org.softwareheritage.graph.backend;
+
+import java.io.BufferedWriter;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.util.zip.GZIPInputStream;
+
+import it.unimi.dsi.bits.LongArrayBitVector;
+import it.unimi.dsi.fastutil.Size64;
+import it.unimi.dsi.fastutil.io.BinIO;
+import it.unimi.dsi.fastutil.longs.LongBigArrays;
+import it.unimi.dsi.fastutil.longs.LongBigList;
+import it.unimi.dsi.fastutil.objects.Object2LongFunction;
+import it.unimi.dsi.fastutil.objects.ObjectBigArrays;
+import it.unimi.dsi.io.FastBufferedReader;
+import it.unimi.dsi.io.LineIterator;
+
+import org.softwareheritage.graph.Graph;
+import org.softwareheritage.graph.Node;
+import org.softwareheritage.graph.SwhPID;
+import org.softwareheritage.graph.backend.NodeTypesMap;
+
+public class Pp {
+
+ public static void main(String[] args) throws IOException {
+
+ Object2LongFunction mphMap = null;
+ try {
+ mphMap = (Object2LongFunction) BinIO.loadObject("all.mph");
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("The .mph file contains unknown class object: " + e);
+ }
+
+ long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size();
+
+ System.out.println("mph size: " + nbIds);
+ }
+}
diff --git a/java/server/src/main/java/org/softwareheritage/graph/py4jcli.py b/java/server/src/main/java/org/softwareheritage/graph/py4jcli.py
new file mode 100755
index 0000000..3b133ea
--- /dev/null
+++ b/java/server/src/main/java/org/softwareheritage/graph/py4jcli.py
@@ -0,0 +1,47 @@
+#!/usr/bin/python3
+
+import os
+import struct
+import sys
+import tempfile
+
+from concurrent.futures import ThreadPoolExecutor
+
+from py4j.java_gateway import JavaGateway, GatewayParameters
+
+
+GATEWAY_SERVER_PORT = 25333
+
+BUF_SIZE = 64*1024
+BIN_FMT = '>q' # 64 bit integer, big endian
+
+
+def print_node_ids(fname):
+ with open(fname, 'rb') as f:
+ data = f.read(BUF_SIZE)
+ while(data):
+ for data in struct.iter_unpack(BIN_FMT, data):
+ print(data[0])
+ data = f.read(BUF_SIZE)
+
+
+if __name__ == '__main__':
+ try:
+ node_id = int(sys.argv[1])
+ except IndexError:
+ print('Usage: py4jcli NODE_ID')
+ sys.exit(1)
+
+ gw_params = GatewayParameters(port=GATEWAY_SERVER_PORT)
+ gateway = JavaGateway(gateway_parameters=gw_params)
+
+ with tempfile.TemporaryDirectory() as tmpdirname:
+ cli_fifo = os.path.join(tmpdirname, 'swh-graph.fifo')
+ os.mkfifo(cli_fifo)
+
+ with ThreadPoolExecutor(max_workers=1) as executor:
+ future = executor.submit(print_node_ids, cli_fifo)
+ gateway.entry_point.visit(node_id, 'forward', '*', cli_fifo)
+ _result = future.result()
+
+ gateway.shutdown()
diff --git a/java/server/src/main/java/org/softwareheritage/graph/t/WriteLong.java b/java/server/src/main/java/org/softwareheritage/graph/t/WriteLong.java
new file mode 100644
index 0000000..547011a
--- /dev/null
+++ b/java/server/src/main/java/org/softwareheritage/graph/t/WriteLong.java
@@ -0,0 +1,24 @@
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class WriteLong {
+ public static void main(String args[]) {
+ String filename = null;
+ try {
+ filename = args[0];
+ FileOutputStream file = new FileOutputStream(filename);
+ DataOutputStream data = new DataOutputStream(file);
+ while (true) {
+ data.writeLong(Long.parseLong(args[1]));
+ }
+ //data.close();
+ } catch (IOException e) {
+ System.out.println("cannot write to file " + filename + "\n" + e);
+ System.exit(2);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ System.out.println("Usage: Writer FILENAME INT");
+ System.exit(1);
+ }
+ }
+}
diff --git a/java/server/src/main/java/org/softwareheritage/graph/t/aiohttp_rest.py b/java/server/src/main/java/org/softwareheritage/graph/t/aiohttp_rest.py
new file mode 100644
index 0000000..87c89a3
--- /dev/null
+++ b/java/server/src/main/java/org/softwareheritage/graph/t/aiohttp_rest.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2018 Antoine Pietri
+# SPDX-License-Identifier: MIT
+
+import argparse
+import aiohttp
+import aiohttp.web
+import hashutil
+
+
+async def hello(request):
+ return aiohttp.web.json_response(
+ {'hi': 'hello'}, headers={'Access-Control-Allow-Origin': '*'})
+
+
+async def make_app():
+ app = aiohttp.web.Application()
+ app.add_routes([
+ aiohttp.web.get('/hello', hello)])
+ return app
+
+
+async def get_stream(request): # from objstorage
+ hex_id = request.match_info['hex_id']
+ obj_id = hashutil.hash_to_bytes(hex_id)
+ response = aiohttp.web.StreamResponse()
+ await response.prepare(request)
+ for chunk in request.app['objstorage'].get_stream(obj_id, 2 << 20):
+ await response.write(chunk)
+ await response.write_eof()
+ return response
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description='Test')
+ parser.add_argument('--host', default='127.0.0.1', help='Bind address')
+ parser.add_argument('--port', default=9012, help='Bind port')
+
+ args = parser.parse_args()
+
+ aiohttp.web.run_app(make_app(), host=args.host, port=args.port)
diff --git a/java/server/src/main/java/org/softwareheritage/graph/t/data.bin b/java/server/src/main/java/org/softwareheritage/graph/t/data.bin
new file mode 100644
index 0000000..2580cea
Binary files /dev/null and b/java/server/src/main/java/org/softwareheritage/graph/t/data.bin differ
diff --git a/java/server/src/main/java/org/softwareheritage/graph/t/read_long.py b/java/server/src/main/java/org/softwareheritage/graph/t/read_long.py
new file mode 100755
index 0000000..c353798
--- /dev/null
+++ b/java/server/src/main/java/org/softwareheritage/graph/t/read_long.py
@@ -0,0 +1,24 @@
+#!/usr/bin/python3
+
+import struct
+import sys
+
+BUF_SIZE = 64*1024
+BIN_FMT = '>q' # 64 bit integer, big endian
+
+
+def main(fname):
+ with open(fname, 'rb') as f:
+ data = f.read(BUF_SIZE)
+ while(data):
+ for data in struct.iter_unpack(BIN_FMT, data):
+ print(data[0])
+ data = f.read(BUF_SIZE)
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv[1])
+ except IndexError:
+ print('Usage: read_long FILENAME')
+ sys.exit(1)