res;
+ Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges, returnTypes);
+ if (dst.matches("ori|snp|rel|rev|dir|cnt")) {
+ Node.Type dstType = Node.Type.fromStr(dst);
+ res = t.randomWalk(srcNodeId, dstType, retries);
+ } else {
+ long dstNodeId = graph.getNodeId(new SWHID(dst));
+ res = t.randomWalk(srcNodeId, dstNodeId, retries);
}
- close();
- }
-
- public void random_walk_type(String direction, String edgesFmt, int retries, long srcNodeId, String dst,
- String returnTypes) {
- open();
- Node.Type dstType = Node.Type.fromStr(dst);
- Traversal t = new Traversal(this.graph, direction, edgesFmt, 0, returnTypes);
- for (Long nodeId : t.randomWalk(srcNodeId, dstType, retries)) {
- writeNode(nodeId);
+ for (Long nodeId : res) {
+ writeNode(graph.getSWHID(nodeId));
}
close();
}
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/Graph.java b/java/src/main/java/org/softwareheritage/graph/Graph.java
index c5f5513..8d9acf1 100644
--- a/java/src/main/java/org/softwareheritage/graph/Graph.java
+++ b/java/src/main/java/org/softwareheritage/graph/Graph.java
@@ -1,310 +1,304 @@
package org.softwareheritage.graph;
import it.unimi.dsi.big.webgraph.ImmutableGraph;
import it.unimi.dsi.big.webgraph.LazyLongIterator;
-import it.unimi.dsi.big.webgraph.Transform;
import it.unimi.dsi.logging.ProgressLogger;
import org.softwareheritage.graph.maps.NodeIdMap;
import org.softwareheritage.graph.maps.NodeTypesMap;
import java.io.IOException;
/**
* Main class storing the compressed graph and node id mappings.
*
* The compressed graph is stored using the WebGraph
* ecosystem. Additional mappings are necessary because Software Heritage uses string based persistent
* identifiers (SWHID) while WebGraph uses integers internally. These two mappings (long id
* ↔ SWHID) are used for the input (users refer to the graph using SWHID) and the output
* (convert back to SWHID for users results). However, since graph traversal can be restricted
* depending on the node type (see {@link AllowedEdges}), a long id → node type map is stored
* as well to avoid a full SWHID lookup.
*
* @author The Software Heritage developers
* @see org.softwareheritage.graph.AllowedEdges
* @see org.softwareheritage.graph.maps.NodeIdMap
* @see org.softwareheritage.graph.maps.NodeTypesMap
*/
public class Graph extends ImmutableGraph {
- /** File extension for the SWHID to long node id map */
- public static final String SWHID_TO_NODE = ".swhid2node.bin";
- /** File extension for the long node id to SWHID map */
- public static final String NODE_TO_SWHID = ".node2swhid.bin";
- /** File extension for the long node id to node type map */
- public static final String NODE_TO_TYPE = ".node2type.map";
-
- /** Compressed graph stored as a {@link it.unimi.dsi.big.webgraph.BVGraph} */
- ImmutableGraph graph;
- /** Transposed compressed graph (used for backward traversals) */
- ImmutableGraph graphTransposed;
+ /**
+ * Bidirectional graph containing two compressed {@link it.unimi.dsi.big.webgraph.BVGraph} one for
+ * each direction
+ */
+ BidirectionalImmutableGraph graph;
+
/** Path and basename of the compressed graph */
String path;
/** Mapping long id ↔ SWHIDs */
NodeIdMap nodeIdMap;
/** Mapping long id → node types */
NodeTypesMap nodeTypesMap;
/**
* Constructor.
*
* @param path path and basename of the compressed graph to load
*/
private Graph(String path) throws IOException {
loadInternal(path, null, LoadMethod.MAPPED);
}
/**
* Loading mechanisms
*/
enum LoadMethod {
MEMORY, MAPPED, OFFLINE,
}
protected Graph loadInternal(String path, ProgressLogger pl, LoadMethod method) throws IOException {
this.path = path;
+ ImmutableGraph direct = null;
+ ImmutableGraph transposed = null;
if (method == LoadMethod.MEMORY) {
- this.graph = ImmutableGraph.load(path, pl);
- this.graphTransposed = ImmutableGraph.load(path + "-transposed", pl);
+ direct = ImmutableGraph.load(path, pl);
+ transposed = ImmutableGraph.load(path + "-transposed", pl);
} else if (method == LoadMethod.MAPPED) {
- this.graph = ImmutableGraph.loadMapped(path, pl);
- this.graphTransposed = ImmutableGraph.loadMapped(path + "-transposed", pl);
+ direct = ImmutableGraph.load(path, pl);
+ transposed = ImmutableGraph.loadMapped(path + "-transposed", pl);
} else if (method == LoadMethod.OFFLINE) {
- this.graph = ImmutableGraph.loadOffline(path, pl);
- this.graphTransposed = ImmutableGraph.loadOffline(path + "-transposed", pl);
+ direct = ImmutableGraph.loadOffline(path, pl);
+ transposed = ImmutableGraph.loadOffline(path + "-transposed", pl);
}
+ this.graph = new BidirectionalImmutableGraph(direct, transposed);
this.nodeTypesMap = new NodeTypesMap(path);
this.nodeIdMap = new NodeIdMap(path, numNodes());
return this;
}
protected Graph() {
}
public static Graph load(String path, ProgressLogger pl) throws IOException {
return new Graph().loadInternal(path, pl, LoadMethod.MEMORY);
}
public static Graph loadMapped(String path, ProgressLogger pl) throws IOException {
return new Graph().loadInternal(path, pl, LoadMethod.MAPPED);
}
public static Graph loadOffline(String path, ProgressLogger pl) throws IOException {
return new Graph().loadInternal(path, null, LoadMethod.OFFLINE);
}
public static Graph load(String path) throws IOException {
return new Graph().loadInternal(path, null, LoadMethod.MEMORY);
}
public static Graph loadMapped(String path) throws IOException {
return new Graph().loadInternal(path, null, LoadMethod.MAPPED);
}
public static Graph loadOffline(String path) throws IOException {
return new Graph().loadInternal(path, null, LoadMethod.OFFLINE);
}
/**
* Constructor used for copy()
*/
- protected Graph(ImmutableGraph graph, ImmutableGraph graphTransposed, String path, NodeIdMap nodeIdMap,
- NodeTypesMap nodeTypesMap) {
+ protected Graph(BidirectionalImmutableGraph graph, String path, NodeIdMap nodeIdMap, NodeTypesMap nodeTypesMap) {
this.graph = graph;
- this.graphTransposed = graphTransposed;
this.path = path;
this.nodeIdMap = nodeIdMap;
this.nodeTypesMap = nodeTypesMap;
}
/**
* Return a flyweight copy of the graph.
*/
@Override
public Graph copy() {
- return new Graph(this.graph.copy(), this.graphTransposed.copy(), this.path, this.nodeIdMap, this.nodeTypesMap);
+ return new Graph(this.graph.copy(), this.path, this.nodeIdMap, this.nodeTypesMap);
}
@Override
public boolean randomAccess() {
- return graph.randomAccess() && graphTransposed.randomAccess();
+ return graph.randomAccess();
}
/**
* Return a transposed version of the graph.
*/
public Graph transpose() {
- return new Graph(this.graphTransposed, this.graph, this.path, this.nodeIdMap, this.nodeTypesMap);
+ return new Graph(this.graph.transpose(), this.path, this.nodeIdMap, this.nodeTypesMap);
}
/**
* Return a symmetric version of the graph.
*/
public Graph symmetrize() {
- ImmutableGraph symmetric = Transform.union(graph, graphTransposed);
- return new Graph(symmetric, symmetric, this.path, this.nodeIdMap, this.nodeTypesMap);
+ return new Graph(this.graph.symmetrize(), this.path, this.nodeIdMap, this.nodeTypesMap);
}
/**
* Cleans up graph resources after use.
*/
public void cleanUp() throws IOException {
nodeIdMap.close();
}
/**
* Returns number of nodes in the graph.
*
* @return number of nodes in the graph
*/
@Override
public long numNodes() {
return graph.numNodes();
}
/**
* Returns number of edges in the graph.
*
* @return number of edges in the graph
*/
@Override
public long numArcs() {
return graph.numArcs();
}
/**
* Returns lazy iterator of successors of a node.
*
* @param nodeId node specified as a long id
* @return lazy iterator of successors of the node, specified as a
* WebGraph LazyLongIterator
*/
@Override
public LazyLongIterator successors(long nodeId) {
return graph.successors(nodeId);
}
/**
* Returns lazy iterator of successors of a node while following a specific set of edge types.
*
* @param nodeId node specified as a long id
* @param allowedEdges the specification of which edges can be traversed
* @return lazy iterator of successors of the node, specified as a
* WebGraph LazyLongIterator
*/
public LazyLongIterator successors(long nodeId, AllowedEdges allowedEdges) {
if (allowedEdges.restrictedTo == null) {
// All edges are allowed, bypass edge check
return this.successors(nodeId);
} else {
LazyLongIterator allSuccessors = this.successors(nodeId);
Graph thisGraph = this;
return new LazyLongIterator() {
@Override
public long nextLong() {
long neighbor;
while ((neighbor = allSuccessors.nextLong()) != -1) {
if (allowedEdges.isAllowed(thisGraph.getNodeType(nodeId), thisGraph.getNodeType(neighbor))) {
return neighbor;
}
}
return -1;
}
@Override
public long skip(final long n) {
long i;
for (i = 0; i < n && nextLong() != -1; i++)
;
return i;
}
};
}
}
/**
* Returns the outdegree of a node.
*
* @param nodeId node specified as a long id
* @return outdegree of a node
*/
@Override
public long outdegree(long nodeId) {
return graph.outdegree(nodeId);
}
/**
* Returns lazy iterator of predecessors of a node.
*
* @param nodeId node specified as a long id
* @return lazy iterator of predecessors of the node, specified as a
* WebGraph LazyLongIterator
*/
public LazyLongIterator predecessors(long nodeId) {
- return this.transpose().successors(nodeId);
+ return graph.predecessors(nodeId);
}
/**
* Returns the indegree of a node.
*
* @param nodeId node specified as a long id
* @return indegree of a node
*/
public long indegree(long nodeId) {
- return this.transpose().outdegree(nodeId);
+ return graph.indegree(nodeId);
}
/**
- * Returns the underlying BVGraph.
+ * Returns the underlying BidirectionalImmutableGraph.
*
- * @return WebGraph BVGraph
+ * @return WebGraph ImmutableGraph
*/
public ImmutableGraph getGraph() {
return this.graph;
}
/**
* Returns the graph full path.
*
* @return graph full path
*/
public String getPath() {
return path;
}
/**
* Converts {@link SWHID} node to long.
*
* @param swhid node specified as a {@link SWHID}
* @return internal long node id
* @see SWHID
*/
public long getNodeId(SWHID swhid) {
return nodeIdMap.getNodeId(swhid);
}
/**
* Converts long id node to {@link SWHID}.
*
* @param nodeId node specified as a long id
* @return external SWHID
* @see SWHID
*/
public SWHID getSWHID(long nodeId) {
return nodeIdMap.getSWHID(nodeId);
}
/**
* Returns node type.
*
* @param nodeId node specified as a long id
* @return corresponding node type
* @see org.softwareheritage.graph.Node.Type
*/
public Node.Type getNodeType(long nodeId) {
return nodeTypesMap.getType(nodeId);
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java b/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java
index bb6779e..f897e00 100644
--- a/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java
+++ b/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java
@@ -1,90 +1,98 @@
package org.softwareheritage.graph.experiments.topology;
import com.google.common.primitives.Longs;
import com.martiansoftware.jsap.*;
import it.unimi.dsi.Util;
import it.unimi.dsi.big.webgraph.LazyLongIterator;
import it.unimi.dsi.bits.LongArrayBitVector;
import it.unimi.dsi.fastutil.Arrays;
import it.unimi.dsi.fastutil.BigArrays;
import it.unimi.dsi.fastutil.longs.LongBigArrays;
import it.unimi.dsi.io.ByteDiskQueue;
import it.unimi.dsi.logging.ProgressLogger;
import it.unimi.dsi.util.XoRoShiRo128PlusRandom;
import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.experiments.forks.ForkCC;
import java.io.*;
public class SubdatasetSizeFunction {
private SubdatasetSizeFunction() {
}
public static void run(final Graph graph) throws IOException {
final ProgressLogger pl = new ProgressLogger();
pl.itemsName = "nodes";
pl.expectedUpdates = graph.numNodes();
long n = graph.numNodes();
LongArrayBitVector visited = LongArrayBitVector.ofLength(n);
int bufferSize = (int) Math.min(Arrays.MAX_ARRAY_SIZE & ~0x7, 8L * n);
final File queueFile = File.createTempFile(ForkCC.class.getSimpleName(), "queue");
final ByteDiskQueue queue = ByteDiskQueue.createNew(queueFile, bufferSize, true);
final byte[] byteBuf = new byte[Long.BYTES];
long[][] randomPerm = Util.identity(graph.numNodes());
LongBigArrays.shuffle(randomPerm, new XoRoShiRo128PlusRandom());
- long visitedSize = 0;
+ long visitedNodes = 0;
+ long visitedEdges = 0;
+ long visitedOrigins = 0;
+ long visitedContents = 0;
pl.start("Running traversal starting from origins...");
for (long j = 0; j < n; ++j) {
long i = BigArrays.get(randomPerm, j);
if (visited.getBoolean(i) || graph.getNodeType(i) != Node.Type.ORI) {
continue;
}
+ visitedOrigins++;
queue.enqueue(Longs.toByteArray(i));
visited.set(i);
while (!queue.isEmpty()) {
queue.dequeue(byteBuf);
final long currentNode = Longs.fromByteArray(byteBuf);
- visitedSize++;
+ visitedNodes++;
+ if (graph.getNodeType(currentNode) == Node.Type.CNT)
+ visitedContents++;
final LazyLongIterator iterator = graph.successors(currentNode);
long succ;
while ((succ = iterator.nextLong()) != -1) {
+ visitedEdges++;
if (visited.getBoolean(succ))
continue;
visited.set(succ);
queue.enqueue(Longs.toByteArray(succ));
}
pl.update();
}
- System.out.println(visitedSize);
+ if (visitedOrigins % 10000 == 0)
+ System.out.println(visitedNodes + " " + visitedEdges + " " + visitedContents);
}
pl.done();
}
static public void main(final String[] arg)
throws IllegalArgumentException, SecurityException, JSAPException, IOException {
final SimpleJSAP jsap = new SimpleJSAP(SubdatasetSizeFunction.class.getName(),
- "Computes in and out degrees of the given SWHGraph",
+ "Computes subdataset size functions using a random uniform order",
new Parameter[]{new UnflaggedOption("basename", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED,
JSAP.NOT_GREEDY, "The basename of the graph."),});
final JSAPResult jsapResult = jsap.parse(arg);
if (jsap.messagePrinted())
System.exit(1);
final String basename = jsapResult.getString("basename");
Graph graph = Graph.loadMapped(basename);
run(graph);
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
index 46a566b..2a2c50f 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
+++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
@@ -1,188 +1,186 @@
package org.softwareheritage.graph.maps;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.util.ByteBufferLongBigList;
-import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.SWHID;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* Mapping between internal long node id and external SWHID.
*
* The SWHID -> node mapping is obtained from hashing the SWHID with a MPH, then permuting it using
* an mmap()-ed .order file containing the graph permutation.
*
* The node -> SWHID reverse mapping is pre-computed and dumped on disk in the
* {@link NodeMapBuilder} class, then it is loaded here using mmap().
*
* @author The Software Heritage developers
* @see NodeMapBuilder
*/
public class NodeIdMap {
/** Fixed length of binary SWHID buffer */
public static final int SWHID_BIN_SIZE = 22;
+ /** File extension for the long node id to SWHID map */
+ public static final String NODE_TO_SWHID = ".node2swhid.bin";
+
/** Graph path and basename */
String graphPath;
/** Number of ids to map */
long nbIds;
/** mmap()-ed NODE_TO_SWHID file */
MapFile nodeToSwhMap;
/** Minimal perfect hash (MPH) function SWHID -> initial order */
Object2LongFunction mph;
/** mmap()-ed long list with the permutation initial order -> graph order */
LongBigList orderMap;
/** FileInputStream containing the permutation */
FileInputStream orderInputStream;
/**
* Constructor.
*
* @param graphPath full graph path
* @param nbNodes number of nodes in the graph
*/
public NodeIdMap(String graphPath, long nbNodes) throws IOException {
this.graphPath = graphPath;
this.nbIds = nbNodes;
// node -> SWHID
- this.nodeToSwhMap = new MapFile(graphPath + Graph.NODE_TO_SWHID, SWHID_BIN_SIZE);
+ this.nodeToSwhMap = new MapFile(graphPath + NODE_TO_SWHID, SWHID_BIN_SIZE);
// SWHID -> node
this.mph = loadMph(graphPath + ".mph");
this.orderInputStream = new FileInputStream(graphPath + ".order");
this.orderMap = ByteBufferLongBigList.map(orderInputStream.getChannel());
}
@SuppressWarnings("unchecked")
public static Object2LongFunction loadMph(String path) throws IOException {
Object obj;
try {
obj = BinIO.loadObject(path);
} catch (ClassNotFoundException e) {
throw new IOException(e.getMessage());
}
Object2LongFunction res = (Object2LongFunction) obj;
// Backward-compatibility for old maps parametrized with .
// New maps should be parametrized with , which is faster.
try {
// Try to call it with bytes, will fail if it's a O2LF.
res.getLong("42".getBytes(StandardCharsets.UTF_8));
} catch (ClassCastException e) {
class StringCompatibleByteFunction implements Object2LongFunction, Size64 {
private final Object2LongFunction legacyFunction;
public StringCompatibleByteFunction(Object2LongFunction legacyFunction) {
this.legacyFunction = legacyFunction;
}
@Override
public long getLong(Object o) {
byte[] bi = (byte[]) o;
return legacyFunction.getLong(new String(bi, StandardCharsets.UTF_8));
}
@Override
public int size() {
return legacyFunction.size();
}
@Override
public long size64() {
return (legacyFunction instanceof Size64)
? ((Size64) legacyFunction).size64()
: legacyFunction.size();
}
}
Object2LongFunction mphLegacy = (Object2LongFunction) obj;
return new StringCompatibleByteFunction(mphLegacy);
- /*
- * res = (o -> { byte[] bi = (byte[]) o; return mphLegacy.getLong(new String(bi,
- * StandardCharsets.UTF_8)); });
- */
}
// End of backward-compatibility block
return res;
}
/**
* Converts byte-form SWHID to corresponding long node id. Low-level function, does not check if the
* SWHID is valid.
*
* @param swhid node represented as bytes
* @return corresponding node as a long id
*/
public long getNodeId(byte[] swhid) {
// 1. Hash the SWHID with the MPH to get its original ID
long origNodeId = mph.getLong(swhid);
// 2. Use the order permutation to get the position in the permuted graph
return this.orderMap.getLong(origNodeId);
}
/**
* Converts SWHID to corresponding long node id.
*
* @param swhid node represented as a {@link SWHID}
* @param checkExists if true, error if the SWHID is not present in the graph, if false the check
* will be skipped and invalid data will be returned for non-existing SWHIDs.
* @return corresponding node as a long id
* @see SWHID
*/
public long getNodeId(SWHID swhid, boolean checkExists) {
// Convert the SWHID to bytes and call getNodeId()
long nodeId = getNodeId(swhid.toString().getBytes(StandardCharsets.US_ASCII));
// Check that the position effectively corresponds to a real node using the reverse map.
// This is necessary because the MPH makes no guarantees on whether the input SWHID is valid.
if (!checkExists || getSWHID(nodeId).equals(swhid)) {
return nodeId;
} else {
throw new IllegalArgumentException("Unknown SWHID: " + swhid);
}
}
public long getNodeId(SWHID swhid) {
return getNodeId(swhid, true);
}
/**
* Converts a node long id to corresponding SWHID.
*
* @param nodeId node as a long id
* @return corresponding node as a {@link SWHID}
* @see SWHID
*/
public SWHID getSWHID(long nodeId) {
/*
* Each line in NODE_TO_SWHID is formatted as: swhid The file is ordered by nodeId, meaning node0's
* swhid is at line 0, hence we can read the nodeId-th line to get corresponding swhid
*/
if (nodeId < 0 || nodeId >= nbIds) {
throw new IllegalArgumentException("Node id " + nodeId + " should be between 0 and " + nbIds);
}
return SWHID.fromBytes(nodeToSwhMap.readAtLine(nodeId));
}
/**
* Closes the mapping files.
*/
public void close() throws IOException {
orderInputStream.close();
nodeToSwhMap.close();
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
index 3c9f6ef..626c747 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
+++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
@@ -1,217 +1,191 @@
package org.softwareheritage.graph.maps;
import it.unimi.dsi.bits.LongArrayBitVector;
import it.unimi.dsi.fastutil.BigArrays;
import it.unimi.dsi.fastutil.Size64;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigArrays;
import it.unimi.dsi.fastutil.longs.LongBigList;
import it.unimi.dsi.fastutil.objects.Object2LongFunction;
import it.unimi.dsi.io.FastBufferedReader;
import it.unimi.dsi.io.LineIterator;
import it.unimi.dsi.logging.ProgressLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.SWHID;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
/**
* Create maps needed at runtime by the graph service, in particular:
*
*
* - SWHID → WebGraph long node id
* - WebGraph long node id → SWHID (converse of the former)
* - WebGraph long node id → SWH node type (enum)
*
*
* @author The Software Heritage developers
*/
public class NodeMapBuilder {
final static String SORT_BUFFER_SIZE = "40%";
final static Logger logger = LoggerFactory.getLogger(NodeMapBuilder.class);
/**
* Main entrypoint.
*
* @param args command line arguments
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
logger.error("Usage: COMPRESSED_GRAPH_BASE_NAME TEMP_DIR < NODES_CSV");
System.exit(1);
}
String graphPath = args[0];
String tmpDir = args[1];
logger.info("starting maps generation...");
precomputeNodeIdMap(graphPath, tmpDir);
logger.info("maps generation completed");
}
/**
* Computes and dumps on disk mapping files.
*
* @param graphPath path of the compressed graph
*/
- // Suppress warning for Object2LongFunction cast
- @SuppressWarnings("unchecked")
static void precomputeNodeIdMap(String graphPath, String tmpDir) throws IOException {
ProgressLogger plSWHID2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS);
ProgressLogger plNode2SWHID = new ProgressLogger(logger, 10, TimeUnit.SECONDS);
- plSWHID2Node.itemsName = "swhid→node";
- plNode2SWHID.itemsName = "node→swhid";
-
- /*
- * avg speed for swhid→node is sometime skewed due to write to the sort pipe hanging when sort is
- * sorting; hence also desplay local speed
- */
- plSWHID2Node.displayLocalSpeed = true;
+ plSWHID2Node.itemsName = "Hashing swhid→node";
+ plNode2SWHID.itemsName = "Building map node→swhid";
// first half of SWHID->node mapping: SWHID -> WebGraph MPH (long)
- Object2LongFunction mphMap = null;
- try {
- logger.info("loading MPH function...");
- mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph");
- logger.info("MPH function loaded");
- } catch (ClassNotFoundException e) {
- logger.error("unknown class object in .mph file: " + e);
- System.exit(2);
- }
+ Object2LongFunction mphMap = NodeIdMap.loadMph(graphPath + ".mph");
long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size();
plSWHID2Node.expectedUpdates = nbIds;
plNode2SWHID.expectedUpdates = nbIds;
// second half of SWHID->node mapping: WebGraph MPH (long) -> BFS order (long)
long[][] bfsMap = LongBigArrays.newBigArray(nbIds);
logger.info("loading BFS order file...");
long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap);
logger.info("BFS order file loaded");
if (loaded != nbIds) {
logger.error("graph contains " + nbIds + " nodes, but read " + loaded);
System.exit(2);
}
/*
- * Create mapping SWHID -> WebGraph node id, by sequentially reading nodes, hashing them with MPH,
- * and permuting according to BFS order
+ * Read on stdin a list of SWHIDs, hash them with MPH, then permute them according to the .order
+ * file
*/
FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, StandardCharsets.US_ASCII));
LineIterator swhidIterator = new LineIterator(buffer);
/*
* The WebGraph node id -> SWHID mapping can be obtained from the SWHID->node one by numerically
* sorting on node id and sequentially writing obtained SWHIDs to a binary map. Delegates the
* sorting job to /usr/bin/sort via pipes
*/
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("sort", "--numeric-sort", "--key", "2", "--buffer-size", SORT_BUFFER_SIZE,
"--temporary-directory", tmpDir);
Process sort = processBuilder.start();
BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream());
BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream());
- // for the binary format of swhidToNodeMap, see Python module swh.graph.swhid:SwhidToIntMap
// for the binary format of nodeToSwhidMap, see Python module swh.graph.swhid:IntToSwhidMap
- try (DataOutputStream swhidToNodeMap = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(graphPath + Graph.SWHID_TO_NODE)));
- BufferedOutputStream nodeToSwhidMap = new BufferedOutputStream(
- new FileOutputStream(graphPath + Graph.NODE_TO_SWHID))) {
+ try (BufferedOutputStream nodeToSwhidMap = new BufferedOutputStream(
+ new FileOutputStream(graphPath + NodeIdMap.NODE_TO_SWHID))) {
/*
- * background handler for sort output, it will be fed SWHID/node pairs while swhidToNodeMap is being
- * filled, and will itself fill nodeToSwhidMap as soon as data from sort is ready
+ * background handler for sort output, it will be fed SWHID/node pairs, and will itself fill
+ * nodeToSwhidMap as soon as data from sort is ready.
*/
SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToSwhidMap, plNode2SWHID);
outputHandler.start();
/*
* Type map from WebGraph node ID to SWH type. Used at runtime by pure Java graph traversals to
* efficiently check edge restrictions.
*/
- final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2));
- final int nbBitsPerNodeType = log2NbTypes;
+ final int nbBitsPerNodeType = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2));
LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds);
LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType);
- plSWHID2Node.start("filling swhid2node map");
+ plSWHID2Node.start("Hashing SWHIDs to fill sort input");
for (long iNode = 0; iNode < nbIds && swhidIterator.hasNext(); iNode++) {
String swhidStr = swhidIterator.next().toString();
SWHID swhid = new SWHID(swhidStr);
- byte[] swhidBin = swhid.toBytes();
long mphId = mphMap.getLong(swhidStr.getBytes(StandardCharsets.US_ASCII));
long nodeId = BigArrays.get(bfsMap, mphId);
-
- swhidToNodeMap.write(swhidBin, 0, swhidBin.length);
- swhidToNodeMap.writeLong(nodeId);
sort_stdin.write((swhidStr + "\t" + nodeId + "\n").getBytes(StandardCharsets.US_ASCII));
nodeTypesMap.set(nodeId, swhid.getType().ordinal());
plSWHID2Node.lightUpdate();
}
plSWHID2Node.done();
sort_stdin.close();
// write type map
logger.info("storing type map");
- BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE);
+ BinIO.storeObject(nodeTypesMap, graphPath + NodeTypesMap.NODE_TO_TYPE);
logger.info("type map stored");
// wait for nodeToSwhidMap filling
try {
logger.info("waiting for node2swhid map...");
int sortExitCode = sort.waitFor();
if (sortExitCode != 0) {
logger.error("sort returned non-zero exit code: " + sortExitCode);
System.exit(2);
}
outputHandler.join();
} catch (InterruptedException e) {
logger.error("processing of sort output failed with: " + e);
System.exit(2);
}
}
-
}
private static class SortOutputHandler extends Thread {
- private Scanner input;
- private OutputStream output;
- private ProgressLogger pl;
+ private final Scanner input;
+ private final OutputStream output;
+ private final ProgressLogger pl;
SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) {
this.input = new Scanner(input, StandardCharsets.US_ASCII);
this.output = output;
this.pl = pl;
}
public void run() {
boolean sortDone = false;
logger.info("node2swhid: waiting for sort output...");
while (input.hasNextLine()) {
if (!sortDone) {
sortDone = true;
this.pl.start("filling node2swhid map");
}
String line = input.nextLine(); // format: SWHID NODE_ID
SWHID swhid = new SWHID(line.split("\\t")[0]); // get SWHID
try {
- output.write((byte[]) swhid.toBytes());
+ output.write(swhid.toBytes());
} catch (IOException e) {
logger.error("writing to node->SWHID map failed with: " + e);
}
this.pl.lightUpdate();
}
this.pl.done();
}
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
index c835e02..befe094 100644
--- a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
+++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
@@ -1,52 +1,54 @@
package org.softwareheritage.graph.maps;
import it.unimi.dsi.fastutil.io.BinIO;
import it.unimi.dsi.fastutil.longs.LongBigList;
-import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import java.io.IOException;
/**
* Mapping between long node id and SWH node type as described in the
* data model.
*
* The type mapping is pre-computed and dumped on disk in the {@link NodeMapBuilder} class, then it
* is loaded in-memory here using fastutil LongBigList.
* To be space-efficient, the mapping is stored as a bitmap using minimum number of bits per
* {@link Node.Type}.
*
* @author The Software Heritage developers
*/
public class NodeTypesMap {
+ /** File extension for the long node id to node type map */
+ public static final String NODE_TO_TYPE = ".node2type.map";
+
/**
* Array storing for each node its type
*/
public LongBigList nodeTypesMap;
/**
* Constructor.
*
* @param graphPath path and basename of the compressed graph
*/
public NodeTypesMap(String graphPath) throws IOException {
try {
- nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + Graph.NODE_TO_TYPE);
+ nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + NODE_TO_TYPE);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Unknown class object: " + e);
}
}
/**
* Returns node type from a node long id.
*
* @param nodeId node as a long id
* @return corresponding {@link Node.Type} value
* @see org.softwareheritage.graph.Node.Type
*/
public Node.Type getType(long nodeId) {
long type = nodeTypesMap.getLong(nodeId);
return Node.Type.fromInt((int) type);
}
}
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java b/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
new file mode 100644
index 0000000..3e094e8
--- /dev/null
+++ b/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
@@ -0,0 +1,51 @@
+package org.softwareheritage.graph.utils;
+
+import com.martiansoftware.jsap.*;
+import it.unimi.dsi.Util;
+import it.unimi.dsi.fastutil.io.BinIO;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * CLI program used to compose two on-disk permutations.
+ *
+ * It takes two on-disk permutations as parameters, p1 and p2, and writes on disk (p1 o p2) at the
+ * given location. This is useful for multi-step compression (e.g. Unordered -> BFS -> LLP), as it
+ * can be used to merge all the intermediate permutations.
+ */
+public class ComposePermutations {
+ private static JSAPResult parse_args(String[] args) {
+ JSAPResult config = null;
+ try {
+ SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{
+ new UnflaggedOption("firstPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED, "The first permutation"),
+ new UnflaggedOption("secondPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED,
+ "The second permutation"),
+ new UnflaggedOption("outputPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED,
+ "The output permutation"),});
+
+ config = jsap.parse(args);
+ if (jsap.messagePrinted()) {
+ System.exit(1);
+ }
+ } catch (JSAPException e) {
+ e.printStackTrace();
+ }
+ return config;
+ }
+
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+ JSAPResult config = parse_args(args);
+ String firstPermFilename = config.getString("firstPermutation");
+ String secondPermFilename = config.getString("secondPermutation");
+ String outputPermFilename = config.getString("outputPermutation");
+
+ long[][] firstPerm = BinIO.loadLongsBig(new File(firstPermFilename));
+ long[][] secondPerm = BinIO.loadLongsBig(new File(secondPermFilename));
+
+ long[][] outputPerm = Util.composePermutationsInPlace(firstPerm, secondPerm);
+
+ BinIO.storeLongs(outputPerm, outputPermFilename);
+ }
+}
diff --git a/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java b/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java
index fa02cc1..71379f2 100644
--- a/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java
+++ b/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java
@@ -1,114 +1,116 @@
package org.softwareheritage.graph.utils;
import it.unimi.dsi.big.webgraph.LazyLongIterator;
import it.unimi.dsi.fastutil.BigArrays;
import it.unimi.dsi.fastutil.io.BinIO;
import org.softwareheritage.graph.AllowedEdges;
import org.softwareheritage.graph.Graph;
import org.softwareheritage.graph.Node;
import org.softwareheritage.graph.SWHID;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Stack;
/* sample invocation on granet.internal.softwareheritage.org for benchmarking
* purposes, with the main swh-graph service already running:
*
* $ java -cp ~/swh-environment/swh-graph/java/target/swh-graph-0.3.0.jar -Xmx300G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G -XX:+UseLargePages -XX:+UseTransparentHugePages -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB org.softwareheritage.graph.utils.FindEarliestRevision --timing /dev/shm/swh-graph/default/graph
*
*/
public class FindEarliestRevision {
public static void main(String[] args) throws IOException, ClassNotFoundException {
String graphPath = args[0];
boolean timing = false;
long ts, elapsedNanos;
Duration elapsed;
if (args.length >= 2 && (args[0].equals("-t") || args[0].equals("--timing"))) {
timing = true;
graphPath = args[1];
System.err.println("started with timing option, will keep track of elapsed time");
}
System.err.println("loading transposed graph...");
ts = System.nanoTime();
Graph graph = Graph.loadMapped(graphPath).transpose();
elapsed = Duration.ofNanos(System.nanoTime() - ts);
System.err.println(String.format("transposed graph loaded (duration: %s).", elapsed));
System.err.println("loading revision timestamps...");
ts = System.nanoTime();
long[][] committerTimestamps = BinIO.loadLongsBig(graphPath + "-rev_committer_timestamps.bin");
elapsed = Duration.ofNanos(System.nanoTime() - ts);
System.err.println(String.format("revision timestamps loaded (duration: %s).", elapsed));
Scanner stdin = new Scanner(System.in);
AllowedEdges edges = new AllowedEdges("cnt:dir,dir:dir,dir:rev");
String rawSWHID = null;
SWHID srcSWHID = null;
long lineCount = 0;
+ long srcNodeId = -1;
if (timing) {
System.err.println("starting SWHID processing...");
elapsed = Duration.ZERO;
}
while (stdin.hasNextLine()) {
if (timing)
ts = System.nanoTime();
rawSWHID = stdin.nextLine().strip();
lineCount++;
try {
srcSWHID = new SWHID(rawSWHID);
+ srcNodeId = graph.getNodeId(srcSWHID);
} catch (IllegalArgumentException e) {
- System.err.println(String.format("skipping invalid SWHID %s on line %d", rawSWHID, lineCount));
+ System.err
+ .println(String.format("skipping invalid or unknown SWHID %s on line %d", rawSWHID, lineCount));
continue;
}
- long srcNodeId = graph.getNodeId(srcSWHID);
if (timing)
System.err.println("starting traversal for: " + srcSWHID.toString());
Stack stack = new Stack<>();
HashSet visited = new HashSet<>();
stack.push(srcNodeId);
visited.add(srcNodeId);
long minRevId = -1;
long minTimestamp = Long.MAX_VALUE;
while (!stack.isEmpty()) {
long currentNodeId = stack.pop();
if (graph.getNodeType(currentNodeId) == Node.Type.REV) {
long committerTs = BigArrays.get(committerTimestamps, currentNodeId);
if (committerTs < minTimestamp) {
minRevId = currentNodeId;
minTimestamp = committerTs;
}
}
LazyLongIterator it = graph.successors(currentNodeId, edges);
for (long neighborNodeId; (neighborNodeId = it.nextLong()) != -1;) {
if (!visited.contains(neighborNodeId)) {
stack.push(neighborNodeId);
visited.add(neighborNodeId);
}
}
}
if (minRevId == -1) {
System.err.println("no revision found containing: " + srcSWHID.toString());
} else {
System.out.println(srcSWHID.toString() + "\t" + graph.getSWHID(minRevId).toString());
}
if (timing) {
elapsedNanos = System.nanoTime() - ts; // processing time for current SWHID
elapsed = elapsed.plus(Duration.ofNanos(elapsedNanos)); // cumulative processing time for all SWHIDs
System.err.println(String.format("visit time (s):\t%.6f", (double) elapsedNanos / 1_000_000_000));
}
}
if (timing)
System.err.println(String.format("processed %d SWHIDs in %s (%s avg)", lineCount, elapsed,
elapsed.dividedBy(lineCount)));
}
}
diff --git a/swh.graph.egg-info/PKG-INFO b/swh.graph.egg-info/PKG-INFO
index 18ff922..5268263 100644
--- a/swh.graph.egg-info/PKG-INFO
+++ b/swh.graph.egg-info/PKG-INFO
@@ -1,56 +1,56 @@
Metadata-Version: 2.1
Name: swh.graph
-Version: 0.5.0
+Version: 0.5.1
Summary: Software Heritage graph service
Home-page: https://forge.softwareheritage.org/diffusion/DGRPH
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-graph
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-graph/
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 3 - Alpha
Requires-Python: >=3.7
Description-Content-Type: text/x-rst
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
Software Heritage - graph service
=================================
Tooling and services, collectively known as ``swh-graph``, providing fast
access to the graph representation of the `Software Heritage
`_
`archive `_. The service is in-memory,
based on a compressed representation of the Software Heritage Merkle DAG.
Bibliography
------------
In addition to accompanying technical documentation, ``swh-graph`` is also
described in the following scientific paper. If you publish results based on
``swh-graph``, please acknowledge it by citing the paper as follows:
.. note::
Paolo Boldi, Antoine Pietri, Sebastiano Vigna, Stefano Zacchiroli.
`Ultra-Large-Scale Repository Analysis via Graph Compression
`_. In proceedings of `SANER
2020 `_: The 27th IEEE International
Conference on Software Analysis, Evolution and Reengineering, pages
184-194. IEEE 2020.
Links: `preprint
`_,
`bibtex
`_.
diff --git a/swh.graph.egg-info/SOURCES.txt b/swh.graph.egg-info/SOURCES.txt
index e2e1364..dac7f07 100644
--- a/swh.graph.egg-info/SOURCES.txt
+++ b/swh.graph.egg-info/SOURCES.txt
@@ -1,193 +1,193 @@
.gitignore
.pre-commit-config.yaml
AUTHORS
CODE_OF_CONDUCT.md
CONTRIBUTORS
LICENSE
MANIFEST.in
Makefile
Makefile.local
README.rst
mypy.ini
pyproject.toml
pytest.ini
requirements-swh.txt
requirements-test.txt
requirements.txt
setup.cfg
setup.py
tox.ini
docker/Dockerfile
docker/build.sh
docker/run.sh
docs/.gitignore
docs/Makefile
docs/Makefile.local
docs/README.rst
docs/api.rst
docs/cli.rst
docs/compression.rst
docs/conf.py
docs/docker.rst
docs/git2graph.md
docs/index.rst
docs/quickstart.rst
docs/use-cases.rst
docs/_static/.placeholder
docs/_templates/.placeholder
docs/images/.gitignore
docs/images/Makefile
docs/images/compression_steps.dot
java/.coding-style.xml
java/.gitignore
java/AUTHORS
java/LICENSE
java/README.md
java/pom.xml
+java/.mvn/jvm.config
java/src/main/java/org/softwareheritage/graph/AllowedEdges.java
java/src/main/java/org/softwareheritage/graph/AllowedNodes.java
+java/src/main/java/org/softwareheritage/graph/BidirectionalImmutableGraph.java
java/src/main/java/org/softwareheritage/graph/Entry.java
java/src/main/java/org/softwareheritage/graph/Graph.java
java/src/main/java/org/softwareheritage/graph/Node.java
java/src/main/java/org/softwareheritage/graph/NodesFiltering.java
java/src/main/java/org/softwareheritage/graph/SWHID.java
java/src/main/java/org/softwareheritage/graph/Stats.java
java/src/main/java/org/softwareheritage/graph/Subgraph.java
java/src/main/java/org/softwareheritage/graph/SwhPath.java
java/src/main/java/org/softwareheritage/graph/Traversal.java
java/src/main/java/org/softwareheritage/graph/algo/TopologicalTraversal.java
java/src/main/java/org/softwareheritage/graph/benchmark/AccessEdge.java
java/src/main/java/org/softwareheritage/graph/benchmark/BFS.java
java/src/main/java/org/softwareheritage/graph/benchmark/Benchmark.java
java/src/main/java/org/softwareheritage/graph/benchmark/Browsing.java
java/src/main/java/org/softwareheritage/graph/benchmark/Provenance.java
java/src/main/java/org/softwareheritage/graph/benchmark/Vault.java
java/src/main/java/org/softwareheritage/graph/benchmark/utils/Random.java
java/src/main/java/org/softwareheritage/graph/benchmark/utils/Statistics.java
java/src/main/java/org/softwareheritage/graph/benchmark/utils/Timing.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/FindCommonAncestor.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/FindPath.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/ForkCC.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/ForkCliques.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/ListEmptyOrigins.java
java/src/main/java/org/softwareheritage/graph/experiments/multiplicationfactor/GenDistribution.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/AveragePaths.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/ClusteringCoefficient.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/ConnectedComponents.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/InOutDegree.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java
java/src/main/java/org/softwareheritage/graph/labels/AbstractLongListLabel.java
java/src/main/java/org/softwareheritage/graph/labels/DirEntry.java
java/src/main/java/org/softwareheritage/graph/labels/FixedWidthLongListLabel.java
java/src/main/java/org/softwareheritage/graph/labels/SwhLabel.java
java/src/main/java/org/softwareheritage/graph/maps/LabelMapBuilder.java
java/src/main/java/org/softwareheritage/graph/maps/MapFile.java
java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java
java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
java/src/main/java/org/softwareheritage/graph/server/App.java
java/src/main/java/org/softwareheritage/graph/server/Endpoint.java
+java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java
java/src/main/java/org/softwareheritage/graph/utils/ExportSubdataset.java
java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java
java/src/main/java/org/softwareheritage/graph/utils/MPHTranslate.java
java/src/main/java/org/softwareheritage/graph/utils/ReadGraph.java
java/src/main/java/org/softwareheritage/graph/utils/ReadLabelledGraph.java
java/src/main/java/org/softwareheritage/graph/utils/WriteRevisionTimestamps.java
java/src/test/java/org/softwareheritage/graph/AllowedEdgesTest.java
java/src/test/java/org/softwareheritage/graph/GraphTest.java
java/src/test/java/org/softwareheritage/graph/LeavesTest.java
java/src/test/java/org/softwareheritage/graph/NeighborsTest.java
java/src/test/java/org/softwareheritage/graph/SubgraphTest.java
java/src/test/java/org/softwareheritage/graph/VisitTest.java
java/src/test/java/org/softwareheritage/graph/WalkTest.java
-java/target/swh-graph-0.5.0.jar
+java/target/swh-graph-0.5.1.jar
reports/.gitignore
reports/benchmarks/Makefile
reports/benchmarks/benchmarks.tex
reports/experiments/Makefile
reports/experiments/experiments.tex
reports/linux_log/LinuxLog.java
reports/linux_log/Makefile
reports/linux_log/linux_log.tex
reports/node_mapping/Makefile
reports/node_mapping/NodeIdMapHaloDB.java
reports/node_mapping/NodeIdMapRocksDB.java
reports/node_mapping/node_mapping.tex
swh/__init__.py
swh.graph.egg-info/PKG-INFO
swh.graph.egg-info/SOURCES.txt
swh.graph.egg-info/dependency_links.txt
swh.graph.egg-info/entry_points.txt
swh.graph.egg-info/requires.txt
swh.graph.egg-info/top_level.txt
swh/graph/__init__.py
swh/graph/backend.py
swh/graph/cli.py
swh/graph/client.py
swh/graph/config.py
swh/graph/dot.py
-swh/graph/graph.py
swh/graph/naive_client.py
swh/graph/py.typed
swh/graph/swhid.py
swh/graph/webgraph.py
swh/graph/server/__init__.py
swh/graph/server/app.py
swh/graph/tests/__init__.py
swh/graph/tests/conftest.py
swh/graph/tests/test_api_client.py
swh/graph/tests/test_cli.py
-swh/graph/tests/test_graph.py
swh/graph/tests/test_swhid.py
swh/graph/tests/dataset/.gitignore
swh/graph/tests/dataset/example.edges.csv
swh/graph/tests/dataset/example.edges.csv.zst
swh/graph/tests/dataset/example.nodes.csv
swh/graph/tests/dataset/example.nodes.csv.zst
swh/graph/tests/dataset/generate_graph.sh
swh/graph/tests/dataset/img/.gitignore
swh/graph/tests/dataset/img/Makefile
swh/graph/tests/dataset/img/example.dot
swh/graph/tests/dataset/output/example-transposed.graph
swh/graph/tests/dataset/output/example-transposed.obl
swh/graph/tests/dataset/output/example-transposed.offsets
swh/graph/tests/dataset/output/example-transposed.properties
swh/graph/tests/dataset/output/example.graph
swh/graph/tests/dataset/output/example.indegree
swh/graph/tests/dataset/output/example.mph
swh/graph/tests/dataset/output/example.node2swhid.bin
swh/graph/tests/dataset/output/example.node2type.map
swh/graph/tests/dataset/output/example.obl
swh/graph/tests/dataset/output/example.offsets
swh/graph/tests/dataset/output/example.order
swh/graph/tests/dataset/output/example.outdegree
swh/graph/tests/dataset/output/example.properties
swh/graph/tests/dataset/output/example.stats
-swh/graph/tests/dataset/output/example.swhid2node.bin
tools/dir2graph
tools/swhid2int2int2swhid.sh
tools/git2graph/.gitignore
tools/git2graph/Makefile
tools/git2graph/README.md
tools/git2graph/git2graph.c
tools/git2graph/tests/edge-filters.bats
tools/git2graph/tests/full-graph.bats
tools/git2graph/tests/node-filters.bats
tools/git2graph/tests/repo_helper.bash
tools/git2graph/tests/data/sample-repo.tgz
tools/git2graph/tests/data/graphs/dir-nodes/edges.csv
tools/git2graph/tests/data/graphs/dir-nodes/nodes.csv
tools/git2graph/tests/data/graphs/from-dir-edges/edges.csv
tools/git2graph/tests/data/graphs/from-dir-edges/nodes.csv
tools/git2graph/tests/data/graphs/from-rel-edges/edges.csv
tools/git2graph/tests/data/graphs/from-rel-edges/nodes.csv
tools/git2graph/tests/data/graphs/fs-nodes/edges.csv
tools/git2graph/tests/data/graphs/fs-nodes/nodes.csv
tools/git2graph/tests/data/graphs/full/edges.csv
tools/git2graph/tests/data/graphs/full/nodes.csv
tools/git2graph/tests/data/graphs/rev-edges/edges.csv
tools/git2graph/tests/data/graphs/rev-edges/nodes.csv
tools/git2graph/tests/data/graphs/rev-nodes/edges.csv
tools/git2graph/tests/data/graphs/rev-nodes/nodes.csv
tools/git2graph/tests/data/graphs/to-rev-edges/edges.csv
tools/git2graph/tests/data/graphs/to-rev-edges/nodes.csv
\ No newline at end of file
diff --git a/swh/__init__.py b/swh/__init__.py
index 8d9f151..b36383a 100644
--- a/swh/__init__.py
+++ b/swh/__init__.py
@@ -1,4 +1,3 @@
from pkgutil import extend_path
-from typing import List
-__path__: List[str] = extend_path(__path__, __name__)
+__path__ = extend_path(__path__, __name__)
diff --git a/swh/graph/backend.py b/swh/graph/backend.py
index 22d4036..5fb82f5 100644
--- a/swh/graph/backend.py
+++ b/swh/graph/backend.py
@@ -1,206 +1,176 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import contextlib
import io
import os
-import struct
+import re
import subprocess
import sys
import tempfile
from py4j.java_gateway import JavaGateway
+from py4j.protocol import Py4JJavaError
from swh.graph.config import check_config
-from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
-from swh.model.identifiers import EXTENDED_SWHID_TYPES
-BUF_SIZE = 64 * 1024
-BIN_FMT = ">q" # 64 bit integer, big endian
-PATH_SEPARATOR_ID = -1
-NODE2SWHID_EXT = "node2swhid.bin"
-SWHID2NODE_EXT = "swhid2node.bin"
+BUF_LINES = 1024
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
return subprocess.STDOUT
else:
return sys.stderr
class Backend:
def __init__(self, graph_path, config=None):
self.gateway = None
self.entry = None
self.graph_path = graph_path
self.config = check_config(config or {})
def start_gateway(self):
self.gateway = JavaGateway.launch_gateway(
java_path=None,
javaopts=self.config["java_tool_options"].split(),
classpath=self.config["classpath"],
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
)
self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry()
self.entry.load_graph(self.graph_path)
- self.node2swhid = NodeToSwhidMap(self.graph_path + "." + NODE2SWHID_EXT)
- self.swhid2node = SwhidToNodeMap(self.graph_path + "." + SWHID2NODE_EXT)
self.stream_proxy = JavaStreamProxy(self.entry)
def stop_gateway(self):
self.gateway.shutdown()
def __enter__(self):
self.start_gateway()
return self
def __exit__(self, exc_type, exc_value, tb):
self.stop_gateway()
def stats(self):
return self.entry.stats()
- def count(self, ttype, direction, edges_fmt, src):
+ def check_swhid(self, swhid):
+ try:
+ self.entry.check_swhid(swhid)
+ except Py4JJavaError as e:
+ m = re.search(r"malformed SWHID: (\w+)", str(e))
+ if m:
+ raise ValueError(f"malformed SWHID: {m[1]}")
+ m = re.search(r"Unknown SWHID: (\w+)", str(e))
+ if m:
+ raise NameError(f"Unknown SWHID: {m[1]}")
+ raise
+
+ def count(self, ttype, *args):
method = getattr(self.entry, "count_" + ttype)
- return method(direction, edges_fmt, src)
+ return method(*args)
- async def simple_traversal(
- self, ttype, direction, edges_fmt, src, max_edges, return_types
- ):
- assert ttype in ("leaves", "neighbors", "visit_nodes")
+ async def traversal(self, ttype, *args):
method = getattr(self.stream_proxy, ttype)
- async for node_id in method(direction, edges_fmt, src, max_edges, return_types):
- yield node_id
-
- async def walk(self, direction, edges_fmt, algo, src, dst):
- if dst in EXTENDED_SWHID_TYPES:
- it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst)
- else:
- it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst)
- async for node_id in it:
- yield node_id
-
- async def random_walk(self, direction, edges_fmt, retries, src, dst, return_types):
- if dst in EXTENDED_SWHID_TYPES:
- it = self.stream_proxy.random_walk_type(
- direction, edges_fmt, retries, src, dst, return_types
- )
- else:
- it = self.stream_proxy.random_walk(
- direction, edges_fmt, retries, src, dst, return_types
- )
- async for node_id in it: # TODO return 404 if path is empty
- yield node_id
-
- async def visit_edges(self, direction, edges_fmt, src, max_edges):
- it = self.stream_proxy.visit_edges(direction, edges_fmt, src, max_edges)
- # convert stream a, b, c, d -> (a, b), (c, d)
- prevNode = None
- async for node in it:
- if prevNode is not None:
- yield (prevNode, node)
- prevNode = None
- else:
- prevNode = node
-
- async def visit_paths(self, direction, edges_fmt, src, max_edges):
- path = []
- async for node in self.stream_proxy.visit_paths(
- direction, edges_fmt, src, max_edges
- ):
- if node == PATH_SEPARATOR_ID:
- yield path
- path = []
- else:
- path.append(node)
+ async for line in method(*args):
+ yield line.decode().rstrip("\n")
class JavaStreamProxy:
"""A proxy class for the org.softwareheritage.graph.Entry Java class that
takes care of the setup and teardown of the named-pipe FIFO communication
between Python and Java.
Initialize JavaStreamProxy using:
proxy = JavaStreamProxy(swh_entry_class_instance)
Then you can call an Entry method and iterate on the FIFO results like
this:
async for value in proxy.java_method(arg1, arg2):
print(value)
"""
def __init__(self, entry):
self.entry = entry
async def read_node_ids(self, fname):
loop = asyncio.get_event_loop()
open_thread = loop.run_in_executor(None, open, fname, "rb")
# Since the open() call on the FIFO is blocking until it is also opened
# on the Java side, we await it with a timeout in case there is an
# exception that prevents the write-side open().
with (await asyncio.wait_for(open_thread, timeout=2)) as f:
+
+ def read_n_lines(f, n):
+ buf = []
+ for _ in range(n):
+ try:
+ buf.append(next(f))
+ except StopIteration:
+ break
+ return buf
+
while True:
- data = await loop.run_in_executor(None, f.read, BUF_SIZE)
- if not data:
+ lines = await loop.run_in_executor(None, read_n_lines, f, BUF_LINES)
+ if not lines:
break
- for data in struct.iter_unpack(BIN_FMT, data):
- yield data[0]
+ for line in lines:
+ yield line
class _HandlerWrapper:
def __init__(self, handler):
self._handler = handler
def __getattr__(self, name):
func = getattr(self._handler, name)
async def java_call(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def java_task(*args, **kwargs):
return asyncio.create_task(java_call(*args, **kwargs))
return java_task
@contextlib.contextmanager
def get_handler(self):
with tempfile.TemporaryDirectory(prefix="swh-graph-") as tmpdirname:
cli_fifo = os.path.join(tmpdirname, "swh-graph.fifo")
os.mkfifo(cli_fifo)
reader = self.read_node_ids(cli_fifo)
query_handler = self.entry.get_handler(cli_fifo)
handler = self._HandlerWrapper(query_handler)
yield (handler, reader)
def __getattr__(self, name):
async def java_call_iterator(*args, **kwargs):
with self.get_handler() as (handler, reader):
java_task = getattr(handler, name)(*args, **kwargs)
try:
async for value in reader:
yield value
except asyncio.TimeoutError:
# If the read-side open() timeouts, an exception on the
# Java side probably happened that prevented the
# write-side open(). We propagate this exception here if
# that is the case.
task_exc = java_task.exception()
if task_exc:
raise task_exc
raise
await java_task
return java_call_iterator
diff --git a/swh/graph/cli.py b/swh/graph/cli.py
index 3b224b3..7d399ac 100644
--- a/swh/graph/cli.py
+++ b/swh/graph/cli.py
@@ -1,446 +1,447 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import sys
from typing import TYPE_CHECKING, Any, Dict, Set, Tuple
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.core.cli import swh as swh_cli_group
if TYPE_CHECKING:
from swh.graph.webgraph import CompressionStep # noqa
class StepOption(click.ParamType):
"""click type for specifying a compression step on the CLI
parse either individual steps, specified as step names or integers, or step
ranges
"""
name = "compression step"
def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep]
from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa
steps: Set[CompressionStep] = set()
specs = value.split(",")
for spec in specs:
if "-" in spec: # step range
(raw_l, raw_r) = spec.split("-", maxsplit=1)
if raw_l == "": # no left endpoint
raw_l = COMP_SEQ[0].name
if raw_r == "": # no right endpoint
raw_r = COMP_SEQ[-1].name
l_step = self.convert(raw_l, param, ctx)
r_step = self.convert(raw_r, param, ctx)
if len(l_step) != 1 or len(r_step) != 1:
self.fail(f"invalid step specification: {value}, " f"see --help")
l_idx = l_step.pop()
r_idx = r_step.pop()
steps = steps.union(
set(CompressionStep(i) for i in range(l_idx.value, r_idx.value + 1))
)
else: # singleton step
try:
steps.add(CompressionStep(int(spec))) # integer step
except ValueError:
try:
steps.add(CompressionStep[spec.upper()]) # step name
except KeyError:
self.fail(
f"invalid step specification: {value}, " f"see --help"
)
return steps
class PathlibPath(click.Path):
"""A Click path argument that returns a pathlib Path, not a string"""
def convert(self, value, param, ctx):
return Path(super().convert(value, param, ctx))
DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})}
@swh_cli_group.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="YAML configuration file",
)
@click.pass_context
def graph_cli_group(ctx, config_file):
"""Software Heritage graph tools."""
from swh.core import config
ctx.ensure_object(dict)
conf = config.read(config_file, DEFAULT_CONFIG)
if "graph" not in conf:
raise ValueError(
'no "graph" stanza found in configuration file %s' % config_file
)
ctx.obj["config"] = conf
@graph_cli_group.command("api-client")
@click.option("--host", default="localhost", help="Graph server host")
@click.option("--port", default="5009", help="Graph server port")
@click.pass_context
def api_client(ctx, host, port):
"""client for the graph RPC service"""
from swh.graph import client
url = "http://{}:{}".format(host, port)
app = client.RemoteGraphClient(url)
# TODO: run web app
print(app.stats())
@graph_cli_group.group("map")
@click.pass_context
def map(ctx):
"""Manage swh-graph on-disk maps"""
pass
def dump_swhid2node(filename):
from swh.graph.swhid import SwhidToNodeMap
for (swhid, int) in SwhidToNodeMap(filename):
print("{}\t{}".format(swhid, int))
def dump_node2swhid(filename):
from swh.graph.swhid import NodeToSwhidMap
for (int, swhid) in NodeToSwhidMap(filename):
print("{}\t{}".format(int, swhid))
def restore_swhid2node(filename):
"""read a textual SWHID->int map from stdin and write its binary version to
filename
"""
from swh.graph.swhid import SwhidToNodeMap
with open(filename, "wb") as dst:
for line in sys.stdin:
(str_swhid, str_int) = line.split()
SwhidToNodeMap.write_record(dst, str_swhid, int(str_int))
def restore_node2swhid(filename, length):
"""read a textual int->SWHID map from stdin and write its binary version to
filename
"""
from swh.graph.swhid import NodeToSwhidMap
node2swhid = NodeToSwhidMap(filename, mode="wb", length=length)
for line in sys.stdin:
(str_int, str_swhid) = line.split()
node2swhid[int(str_int)] = str_swhid
node2swhid.close()
@map.command("dump")
@click.option(
"--type",
"-t",
"map_type",
required=True,
type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to dump",
)
@click.argument("filename", required=True, type=click.Path(exists=True))
@click.pass_context
def dump_map(ctx, map_type, filename):
"""Dump a binary SWHID<->node map to textual format."""
if map_type == "swhid2node":
dump_swhid2node(filename)
elif map_type == "node2swhid":
dump_node2swhid(filename)
else:
raise ValueError("invalid map type: " + map_type)
pass
@map.command("restore")
@click.option(
"--type",
"-t",
"map_type",
required=True,
type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to dump",
)
@click.option(
"--length",
"-l",
type=int,
help="""map size in number of logical records
(required for node2swhid maps)""",
)
@click.argument("filename", required=True, type=click.Path())
@click.pass_context
def restore_map(ctx, map_type, length, filename):
"""Restore a binary SWHID<->node map from textual format."""
if map_type == "swhid2node":
restore_swhid2node(filename)
elif map_type == "node2swhid":
if length is None:
raise click.UsageError(
"map length is required when restoring {} maps".format(map_type), ctx
)
restore_node2swhid(filename, length)
else:
raise ValueError("invalid map type: " + map_type)
@map.command("write")
@click.option(
"--type",
"-t",
"map_type",
required=True,
type=click.Choice(["swhid2node", "node2swhid"]),
help="type of map to write",
)
@click.argument("filename", required=True, type=click.Path())
@click.pass_context
def write(ctx, map_type, filename):
"""Write a map to disk sequentially.
read from stdin a textual SWHID->node mapping (for swhid2node, or a simple
sequence of SWHIDs for node2swhid) and write it to disk in the requested binary
map format
note that no sorting is applied, so the input should already be sorted as
required by the chosen map type (by SWHID for swhid2node, by int for node2swhid)
"""
from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
with open(filename, "wb") as f:
if map_type == "swhid2node":
for line in sys.stdin:
(swhid, int_str) = line.rstrip().split(maxsplit=1)
SwhidToNodeMap.write_record(f, swhid, int(int_str))
elif map_type == "node2swhid":
for line in sys.stdin:
swhid = line.rstrip()
NodeToSwhidMap.write_record(f, swhid)
else:
raise ValueError("invalid map type: " + map_type)
@map.command("lookup")
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.argument("identifiers", nargs=-1)
def map_lookup(graph, identifiers):
"""Lookup identifiers using on-disk maps.
Depending on the identifier type lookup either a SWHID into a SWHID->node (and
return the node integer identifier) or, vice-versa, lookup a node integer
identifier into a node->SWHID (and return the SWHID). The desired behavior is
chosen depending on the syntax of each given identifier.
Identifiers can be passed either directly on the command line or on
standard input, separate by blanks. Logical lines (as returned by
readline()) in stdin will be preserved in stdout.
"""
from swh.graph.backend import NODE2SWHID_EXT, SWHID2NODE_EXT
from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap
import swh.model.exceptions
- from swh.model.identifiers import ExtendedSWHID
+ from swh.model.swhids import ExtendedSWHID
success = True # no identifiers failed to be looked up
swhid2node = SwhidToNodeMap(f"{graph}.{SWHID2NODE_EXT}")
node2swhid = NodeToSwhidMap(f"{graph}.{NODE2SWHID_EXT}")
def lookup(identifier):
nonlocal success, swhid2node, node2swhid
is_swhid = None
try:
int(identifier)
is_swhid = False
except ValueError:
try:
ExtendedSWHID.from_string(identifier)
is_swhid = True
except swh.model.exceptions.ValidationError:
success = False
logging.error(f'invalid identifier: "{identifier}", skipping')
try:
if is_swhid:
return str(swhid2node[identifier])
else:
return node2swhid[int(identifier)]
except KeyError:
success = False
logging.error(f'identifier not found: "{identifier}", skipping')
if identifiers: # lookup identifiers passed via CLI
for identifier in identifiers:
print(lookup(identifier))
else: # lookup identifiers passed via stdin, preserving logical lines
for line in sys.stdin:
results = [lookup(id) for id in line.rstrip().split()]
if results: # might be empty if all IDs on the same line failed
print(" ".join(results))
sys.exit(0 if success else 1)
@graph_cli_group.command(name="rpc-serve")
@click.option(
"--host",
"-h",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="host IP address to bind the server on",
)
@click.option(
"--port",
"-p",
default=5009,
type=click.INT,
metavar="PORT",
show_default=True,
help="port to bind the server on",
)
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph RPC service"""
import aiohttp
from swh.graph.server.app import make_app
config = ctx.obj["config"]
config.setdefault("graph", {})
config["graph"]["path"] = graph
app = make_app(config=config)
aiohttp.web.run_app(app, host=host, port=port)
@graph_cli_group.command()
@click.option(
"--graph",
"-g",
required=True,
metavar="GRAPH",
type=PathlibPath(),
help="input graph basename",
)
@click.option(
"--outdir",
"-o",
"out_dir",
required=True,
metavar="DIR",
type=PathlibPath(),
help="directory where to store compressed graph",
)
@click.option(
"--steps",
"-s",
metavar="STEPS",
type=StepOption(),
help="run only these compression steps (default: all steps)",
)
@click.pass_context
def compress(ctx, graph, out_dir, steps):
"""Compress a graph using WebGraph
Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz
Output: a directory containing a WebGraph compressed graph
- Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute,
- (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps,
- (11) clean_tmp. Compression steps can be selected by name or number using
+ Compression steps are: (1) mph, (2) bv, (3) bfs, (4) permute_bfs,
+ (5) transpose_bfs, (6) simplify, (7) llp, (8) permute_llp, (9) obl, (10)
+ compose_orders, (11) stats, (12) transpose, (13) transpose_obl, (14) maps,
+ (15) clean_tmp. Compression steps can be selected by name or number using
--steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are
also supported.
"""
from swh.graph import webgraph
graph_name = graph.name
in_dir = graph.parent
try:
conf = ctx.obj["config"]["graph"]["compress"]
except KeyError:
conf = {} # use defaults
webgraph.compress(graph_name, in_dir, out_dir, steps, conf)
@graph_cli_group.command(name="cachemount")
@click.option(
"--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename"
)
@click.option(
"--cache",
"-c",
default="/dev/shm/swh-graph/default",
metavar="CACHE",
type=PathlibPath(),
help="Memory cache path (defaults to /dev/shm/swh-graph/default)",
)
@click.pass_context
def cachemount(ctx, graph, cache):
"""
Cache the mmapped files of the compressed graph in a tmpfs.
This command creates a new directory at the path given by CACHE that has
the same structure as the compressed graph basename, except it copies the
files that require mmap access (:file:`{*}.graph`) but uses symlinks from the source
for all the other files (:file:`{*}.map`, :file:`{*}.bin`, ...).
The command outputs the path to the memory cache directory (particularly
useful when relying on the default value).
"""
import shutil
cache.mkdir(parents=True)
for src in Path(graph).parent.glob("*"):
dst = cache / src.name
if src.suffix == ".graph":
shutil.copy2(src, dst)
else:
dst.symlink_to(src.resolve())
print(cache)
def main():
return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
if __name__ == "__main__":
main()
diff --git a/swh/graph/config.py b/swh/graph/config.py
index 0d52b3f..f144f26 100644
--- a/swh/graph/config.py
+++ b/swh/graph/config.py
@@ -1,113 +1,115 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from pathlib import Path
import sys
import psutil
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = Path(__file__).parents[2]
try_paths = [
swh_graph_root / "java/target/",
Path(sys.prefix) / "share/swh-graph/",
Path(sys.prefix) / "local/share/swh-graph/",
]
for path in try_paths:
glob = list(path.glob("swh-graph-*.jar"))
if glob:
if len(glob) > 1:
logging.warn(
"found multiple swh-graph JARs, " "arbitrarily picking one"
)
logging.info("using swh-graph JAR: {0}".format(glob[0]))
return str(glob[0])
raise RuntimeError("swh-graph JAR not found. Have you run `make java`?")
def check_config(conf):
"""check configuration and propagate defaults"""
conf = conf.copy()
if "batch_size" not in conf:
# Use 0.1% of the RAM as a batch size:
# ~1 billion for big servers, ~10 million for small desktop machines
conf["batch_size"] = int(psutil.virtual_memory().total / 1000)
+ if "llp_gammas" not in conf:
+ conf["llp_gammas"] = "-0,-1,-2,-3,-4"
if "max_ram" not in conf:
conf["max_ram"] = str(psutil.virtual_memory().total)
if "java_tool_options" not in conf:
conf["java_tool_options"] = " ".join(
[
"-Xmx{max_ram}",
"-XX:PretenureSizeThreshold=512M",
"-XX:MaxNewSize=4G",
"-XX:+UseLargePages",
"-XX:+UseTransparentHugePages",
"-XX:+UseNUMA",
"-XX:+UseTLAB",
"-XX:+ResizeTLAB",
]
)
conf["java_tool_options"] = conf["java_tool_options"].format(
max_ram=conf["max_ram"]
)
if "java" not in conf:
conf["java"] = "java"
if "classpath" not in conf:
conf["classpath"] = find_graph_jar()
return conf
def check_config_compress(config, graph_name, in_dir, out_dir):
"""check compression-specific configuration and initialize its execution
environment.
"""
conf = check_config(config)
conf["graph_name"] = graph_name
conf["in_dir"] = str(in_dir)
conf["out_dir"] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if "tmp_dir" not in conf:
tmp_dir = out_dir / "tmp"
conf["tmp_dir"] = str(tmp_dir)
else:
tmp_dir = Path(conf["tmp_dir"])
tmp_dir.mkdir(parents=True, exist_ok=True)
if "logback" not in conf:
logback_confpath = tmp_dir / "logback.xml"
with open(logback_confpath, "w") as conffile:
conffile.write(
"""
%d %r %p [%t] %logger{1} - %m%n
"""
)
conf["logback"] = str(logback_confpath)
conf["java_tool_options"] += " -Dlogback.configurationFile={logback}"
conf["java_tool_options"] += " -Djava.io.tmpdir={tmp_dir}"
conf["java_tool_options"] = conf["java_tool_options"].format(
logback=conf["logback"], tmp_dir=conf["tmp_dir"],
)
return conf
diff --git a/swh/graph/graph.py b/swh/graph/graph.py
deleted file mode 100644
index 3fd853b..0000000
--- a/swh/graph/graph.py
+++ /dev/null
@@ -1,193 +0,0 @@
-# Copyright (C) 2019 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import asyncio
-import contextlib
-import functools
-
-from swh.graph.backend import Backend
-from swh.graph.dot import KIND_TO_SHAPE, dot_to_svg, graph_dot
-
-BASE_URL = "https://archive.softwareheritage.org/browse"
-KIND_TO_URL_FRAGMENT = {
- "ori": "/origin/{}",
- "snp": "/snapshot/{}",
- "rel": "/release/{}",
- "rev": "/revision/{}",
- "dir": "/directory/{}",
- "cnt": "/content/sha1_git:{}/",
-}
-
-
-def call_async_gen(generator, *args, **kwargs):
- loop = asyncio.get_event_loop()
- it = generator(*args, **kwargs).__aiter__()
- while True:
- try:
- res = loop.run_until_complete(it.__anext__())
- yield res
- except StopAsyncIteration:
- break
-
-
-class Neighbors:
- """Neighbor iterator with custom O(1) length method"""
-
- def __init__(self, graph, iterator, length_func):
- self.graph = graph
- self.iterator = iterator
- self.length_func = length_func
-
- def __iter__(self):
- return self
-
- def __next__(self):
- succ = self.iterator.nextLong()
- if succ == -1:
- raise StopIteration
- return GraphNode(self.graph, succ)
-
- def __len__(self):
- return self.length_func()
-
-
-class GraphNode:
- """Node in the SWH graph"""
-
- def __init__(self, graph, node_id):
- self.graph = graph
- self.id = node_id
-
- def children(self):
- return Neighbors(
- self.graph,
- self.graph.java_graph.successors(self.id),
- lambda: self.graph.java_graph.outdegree(self.id),
- )
-
- def parents(self):
- return Neighbors(
- self.graph,
- self.graph.java_graph.predecessors(self.id),
- lambda: self.graph.java_graph.indegree(self.id),
- )
-
- def simple_traversal(
- self, ttype, direction="forward", edges="*", max_edges=0, return_types="*"
- ):
- for node in call_async_gen(
- self.graph.backend.simple_traversal,
- ttype,
- direction,
- edges,
- self.id,
- max_edges,
- return_types,
- ):
- yield self.graph[node]
-
- def leaves(self, *args, **kwargs):
- yield from self.simple_traversal("leaves", *args, **kwargs)
-
- def visit_nodes(self, *args, **kwargs):
- yield from self.simple_traversal("visit_nodes", *args, **kwargs)
-
- def visit_edges(self, direction="forward", edges="*", max_edges=0):
- for src, dst in call_async_gen(
- self.graph.backend.visit_edges, direction, edges, self.id, max_edges
- ):
- yield (self.graph[src], self.graph[dst])
-
- def visit_paths(self, direction="forward", edges="*", max_edges=0):
- for path in call_async_gen(
- self.graph.backend.visit_paths, direction, edges, self.id, max_edges
- ):
- yield [self.graph[node] for node in path]
-
- def walk(self, dst, direction="forward", edges="*", traversal="dfs"):
- for node in call_async_gen(
- self.graph.backend.walk, direction, edges, traversal, self.id, dst
- ):
- yield self.graph[node]
-
- def _count(self, ttype, direction="forward", edges="*"):
- return self.graph.backend.count(ttype, direction, edges, self.id)
-
- count_leaves = functools.partialmethod(_count, ttype="leaves")
- count_neighbors = functools.partialmethod(_count, ttype="neighbors")
- count_visit_nodes = functools.partialmethod(_count, ttype="visit_nodes")
-
- @property
- def swhid(self):
- return self.graph.node2swhid[self.id]
-
- @property
- def kind(self):
- return self.swhid.split(":")[2]
-
- def __str__(self):
- return self.swhid
-
- def __repr__(self):
- return "<{}>".format(self.swhid)
-
- def dot_fragment(self):
- swh, version, kind, hash = self.swhid.split(":")
- label = "{}:{}..{}".format(kind, hash[0:2], hash[-2:])
- url = BASE_URL + KIND_TO_URL_FRAGMENT[kind].format(hash)
- shape = KIND_TO_SHAPE[kind]
- return '{} [label="{}", href="{}", target="_blank", shape="{}"];'.format(
- self.id, label, url, shape
- )
-
- def _repr_svg_(self):
- nodes = [self, *list(self.children()), *list(self.parents())]
- dot = graph_dot(nodes)
- svg = dot_to_svg(dot)
- return svg
-
-
-class Graph:
- def __init__(self, backend, node2swhid, swhid2node):
- self.backend = backend
- self.java_graph = backend.entry.get_graph()
- self.node2swhid = node2swhid
- self.swhid2node = swhid2node
-
- def stats(self):
- return self.backend.stats()
-
- @property
- def path(self):
- return self.java_graph.getPath()
-
- def __len__(self):
- return self.java_graph.numNodes()
-
- def __getitem__(self, node_id):
- if isinstance(node_id, int):
- self.node2swhid[node_id] # check existence
- return GraphNode(self, node_id)
- elif isinstance(node_id, str):
- node_id = self.swhid2node[node_id]
- return GraphNode(self, node_id)
-
- def __iter__(self):
- for swhid, pos in self.backend.swhid2node:
- yield self[swhid]
-
- def iter_prefix(self, prefix):
- for swhid, pos in self.backend.swhid2node.iter_prefix(prefix):
- yield self[swhid]
-
- def iter_type(self, swhid_type):
- for swhid, pos in self.backend.swhid2node.iter_type(swhid_type):
- yield self[swhid]
-
-
-@contextlib.contextmanager
-def load(graph_path):
- with Backend(graph_path) as backend:
- yield Graph(backend, backend.node2swhid, backend.swhid2node)
diff --git a/swh/graph/naive_client.py b/swh/graph/naive_client.py
index 8191311..9e08d65 100644
--- a/swh/graph/naive_client.py
+++ b/swh/graph/naive_client.py
@@ -1,369 +1,369 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import functools
import inspect
import re
import statistics
from typing import (
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
TypeVar,
)
-from swh.model.identifiers import ExtendedSWHID, ValidationError
+from swh.model.swhids import ExtendedSWHID, ValidationError
from .client import GraphArgumentException
_NODE_TYPES = "ori|snp|rel|rev|dir|cnt"
NODES_RE = re.compile(fr"(\*|{_NODE_TYPES})")
EDGES_RE = re.compile(fr"(\*|{_NODE_TYPES}):(\*|{_NODE_TYPES})")
T = TypeVar("T", bound=Callable)
def check_arguments(f: T) -> T:
"""Decorator for generic argument checking for methods of NaiveClient.
Checks ``src`` is a valid and known SWHID, and ``edges`` has the right format."""
signature = inspect.signature(f)
@functools.wraps(f)
def newf(*args, **kwargs):
__tracebackhide__ = True # for pytest
try:
bound_args = signature.bind(*args, **kwargs)
except TypeError as e:
# rethrow the exception from here so pytest doesn't flood the terminal
# with signature.bind's call stack.
raise TypeError(*e.args) from None
self = bound_args.arguments["self"]
src = bound_args.arguments.get("src")
if src:
self._check_swhid(src)
edges = bound_args.arguments.get("edges")
if edges:
if edges != "*" and not EDGES_RE.match(edges):
raise GraphArgumentException(f"invalid edge restriction: {edges}")
return_types = bound_args.arguments.get("return_types")
if return_types:
if not NODES_RE.match(return_types):
raise GraphArgumentException(
f"invalid return_types restriction: {return_types}"
)
return f(*args, **kwargs)
return newf # type: ignore
def filter_node_types(node_types: str, nodes: Iterable[str]) -> Iterator[str]:
if node_types == "*":
yield from nodes
else:
prefixes = tuple(f"swh:1:{type_}:" for type_ in node_types.split(","))
for node in nodes:
if node.startswith(prefixes):
yield node
class NaiveClient:
"""An alternative implementation of :class:`swh.graph.backend.Backend`,
written in pure-python and meant for simulating it in other components' test
cases.
It is NOT meant to be efficient in any way; only to be a very simple
implementation that provides the same behavior."""
def __init__(self, *, nodes: List[str], edges: List[Tuple[str, str]]):
self.graph = Graph(nodes, edges)
def _check_swhid(self, swhid):
try:
ExtendedSWHID.from_string(swhid)
except ValidationError as e:
raise GraphArgumentException(*e.args) from None
if swhid not in self.graph.nodes:
raise GraphArgumentException(f"SWHID not found: {swhid}")
def stats(self) -> Dict:
return {
"counts": {
"nodes": len(self.graph.nodes),
"edges": sum(map(len, self.graph.forward_edges.values())),
},
"ratios": {
"compression": 1.0,
"bits_per_edge": 100.0,
"bits_per_node": 100.0,
"avg_locality": 0.0,
},
"indegree": {
"min": min(map(len, self.graph.backward_edges.values())),
"max": max(map(len, self.graph.backward_edges.values())),
"avg": statistics.mean(map(len, self.graph.backward_edges.values())),
},
"outdegree": {
"min": min(map(len, self.graph.forward_edges.values())),
"max": max(map(len, self.graph.forward_edges.values())),
"avg": statistics.mean(map(len, self.graph.forward_edges.values())),
},
}
@check_arguments
def leaves(
self,
src: str,
edges: str = "*",
direction: str = "forward",
max_edges: int = 0,
return_types: str = "*",
) -> Iterator[str]:
# TODO: max_edges
yield from filter_node_types(
return_types,
[
node
for node in self.graph.get_subgraph(src, edges, direction)
if not self.graph.get_filtered_neighbors(node, edges, direction)
],
)
@check_arguments
def neighbors(
self,
src: str,
edges: str = "*",
direction: str = "forward",
max_edges: int = 0,
return_types: str = "*",
) -> Iterator[str]:
# TODO: max_edges
yield from filter_node_types(
return_types, self.graph.get_filtered_neighbors(src, edges, direction)
)
@check_arguments
def visit_nodes(
self,
src: str,
edges: str = "*",
direction: str = "forward",
max_edges: int = 0,
return_types: str = "*",
) -> Iterator[str]:
# TODO: max_edges
yield from filter_node_types(
return_types, self.graph.get_subgraph(src, edges, direction)
)
@check_arguments
def visit_edges(
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0
) -> Iterator[Tuple[str, str]]:
if max_edges == 0:
max_edges = None # type: ignore
else:
max_edges -= 1
yield from list(self.graph.iter_edges_dfs(direction, edges, src))[:max_edges]
@check_arguments
def visit_paths(
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0
) -> Iterator[List[str]]:
# TODO: max_edges
for path in self.graph.iter_paths_dfs(direction, edges, src):
if path[-1] in self.leaves(src, edges, direction):
yield list(path)
@check_arguments
def walk(
self,
src: str,
dst: str,
edges: str = "*",
traversal: str = "dfs",
direction: str = "forward",
limit: Optional[int] = None,
) -> Iterator[str]:
# TODO: implement algo="bfs"
# TODO: limit
match_path: Callable[[str], bool]
if ":" in dst:
match_path = dst.__eq__
self._check_swhid(dst)
else:
match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa
for path in self.graph.iter_paths_dfs(direction, edges, src):
if match_path(path[-1]):
if not limit:
# 0 or None
yield from path
elif limit > 0:
yield from path[0:limit]
else:
yield from path[limit:]
@check_arguments
def random_walk(
self,
src: str,
dst: str,
edges: str = "*",
direction: str = "forward",
limit: Optional[int] = None,
):
# TODO: limit
yield from self.walk(src, dst, edges, "dfs", direction, limit)
@check_arguments
def count_leaves(
self, src: str, edges: str = "*", direction: str = "forward"
) -> int:
return len(list(self.leaves(src, edges, direction)))
@check_arguments
def count_neighbors(
self, src: str, edges: str = "*", direction: str = "forward"
) -> int:
return len(self.graph.get_filtered_neighbors(src, edges, direction))
@check_arguments
def count_visit_nodes(
self, src: str, edges: str = "*", direction: str = "forward"
) -> int:
return len(self.graph.get_subgraph(src, edges, direction))
class Graph:
def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]):
self.nodes = nodes
self.forward_edges: Dict[str, List[str]] = {}
self.backward_edges: Dict[str, List[str]] = {}
for node in nodes:
self.forward_edges[node] = []
self.backward_edges[node] = []
for (src, dst) in edges:
self.forward_edges[src].append(dst)
self.backward_edges[dst].append(src)
def get_filtered_neighbors(
self, src: str, edges_fmt: str, direction: str,
) -> Set[str]:
if direction == "forward":
edges = self.forward_edges
elif direction == "backward":
edges = self.backward_edges
else:
raise GraphArgumentException(f"invalid direction: {direction}")
neighbors = edges.get(src, [])
if edges_fmt == "*":
return set(neighbors)
else:
filtered_neighbors: Set[str] = set()
for edges_fmt_item in edges_fmt.split(","):
(src_fmt, dst_fmt) = edges_fmt_item.split(":")
if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"):
continue
if dst_fmt == "*":
filtered_neighbors.update(neighbors)
else:
prefix = f"swh:1:{dst_fmt}:"
filtered_neighbors.update(
n for n in neighbors if n.startswith(prefix)
)
return filtered_neighbors
def get_subgraph(self, src: str, edges_fmt: str, direction: str) -> Set[str]:
seen = set()
to_visit = {src}
while to_visit:
node = to_visit.pop()
seen.add(node)
neighbors = set(self.get_filtered_neighbors(node, edges_fmt, direction))
new_nodes = neighbors - seen
to_visit.update(new_nodes)
return seen
def iter_paths_dfs(
self, direction: str, edges_fmt: str, src: str
) -> Iterator[Tuple[str, ...]]:
for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src):
yield path + (node,)
def iter_edges_dfs(
self, direction: str, edges_fmt: str, src: str
) -> Iterator[Tuple[str, str]]:
for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src):
if len(path) > 0:
yield (path[-1], node)
class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]):
def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str):
self.graph = graph
self.direction = direction
self.edges_fmt = edges_fmt
self.seen: Set[str] = set()
self.src = src
def more_work(self) -> bool:
raise NotImplementedError()
def pop(self) -> Tuple[Tuple[str, ...], str]:
raise NotImplementedError()
def push(self, new_path: Tuple[str, ...], neighbor: str) -> None:
raise NotImplementedError()
def __next__(self) -> Tuple[Tuple[str, ...], str]:
# Stores (path, next_node)
if not self.more_work():
raise StopIteration()
(path, node) = self.pop()
new_path = path + (node,)
if node not in self.seen:
neighbors = self.graph.get_filtered_neighbors(
node, self.edges_fmt, self.direction
)
# We want to visit the first neighbor first, and to_visit is a stack;
# so we need to reversed() the list of neighbors to get it on top
# of the stack.
for neighbor in reversed(list(neighbors)):
self.push(new_path, neighbor)
self.seen.add(node)
return (path, node)
class DfsSubgraphIterator(SubgraphIterator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.to_visit: List[Tuple[Tuple[str, ...], str]] = [((), self.src)]
def more_work(self) -> bool:
return bool(self.to_visit)
def pop(self) -> Tuple[Tuple[str, ...], str]:
return self.to_visit.pop()
def push(self, new_path: Tuple[str, ...], neighbor: str) -> None:
self.to_visit.append((new_path, neighbor))
diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py
index 0816a4b..0128f2a 100644
--- a/swh/graph/server/app.py
+++ b/swh/graph/server/app.py
@@ -1,402 +1,373 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using
FIFO as a transport to stream integers between the two languages.
"""
import asyncio
from collections import deque
-import json
import os
from typing import Optional
import aiohttp.web
from swh.core.api.asynchronous import RPCServerApp
from swh.core.config import read as config_read
from swh.graph.backend import Backend
-from swh.model.exceptions import ValidationError
-from swh.model.identifiers import EXTENDED_SWHID_TYPES
+from swh.model.swhids import EXTENDED_SWHID_TYPES
try:
from contextlib import asynccontextmanager
except ImportError:
# Compatibility with 3.6 backport
from async_generator import asynccontextmanager # type: ignore
# maximum number of retries for random walks
RANDOM_RETRIES = 5 # TODO make this configurable via rpc-serve configuration
class GraphServerApp(RPCServerApp):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.on_startup.append(self._start_gateway)
self.on_shutdown.append(self._stop_gateway)
@staticmethod
async def _start_gateway(app):
# Equivalent to entering `with app["backend"]:`
app["backend"].start_gateway()
@staticmethod
async def _stop_gateway(app):
# Equivalent to exiting `with app["backend"]:` with no error
app["backend"].stop_gateway()
async def index(request):
return aiohttp.web.Response(
content_type="text/html",
body="""
Software Heritage graph server
You have reached the
Software Heritage graph API server.
See its
API
documentation for more information.
""",
)
class GraphView(aiohttp.web.View):
"""Base class for views working on the graph, with utility functions"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.backend = self.request.app["backend"]
- def node_of_swhid(self, swhid):
- """Lookup a SWHID in a swhid2node map, failing in an HTTP-nice way if
- needed."""
- try:
- return self.backend.swhid2node[swhid]
- except KeyError:
- raise aiohttp.web.HTTPNotFound(text=f"SWHID not found: {swhid}")
- except ValidationError:
- raise aiohttp.web.HTTPBadRequest(text=f"malformed SWHID: {swhid}")
-
- def swhid_of_node(self, node):
- """Lookup a node in a node2swhid map, failing in an HTTP-nice way if
- needed."""
- try:
- return self.backend.node2swhid[node]
- except KeyError:
- raise aiohttp.web.HTTPInternalServerError(
- text=f"reverse lookup failed for node id: {node}"
- )
-
def get_direction(self):
"""Validate HTTP query parameter `direction`"""
s = self.request.query.get("direction", "forward")
if s not in ("forward", "backward"):
raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}")
return s
def get_edges(self):
"""Validate HTTP query parameter `edges`, i.e., edge restrictions"""
s = self.request.query.get("edges", "*")
if any(
[
node_type != "*" and node_type not in EXTENDED_SWHID_TYPES
for edge in s.split(":")
for node_type in edge.split(",", maxsplit=1)
]
):
raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}")
return s
def get_return_types(self):
"""Validate HTTP query parameter 'return types', i.e,
a set of types which we will filter the query results with"""
s = self.request.query.get("return_types", "*")
if any(
node_type != "*" and node_type not in EXTENDED_SWHID_TYPES
for node_type in s.split(",")
):
raise aiohttp.web.HTTPBadRequest(
text=f"invalid type for filtering res: {s}"
)
# if the user puts a star,
# then we filter nothing, we don't need the other information
if "*" in s:
return "*"
else:
return s
def get_traversal(self):
"""Validate HTTP query parameter `traversal`, i.e., visit order"""
s = self.request.query.get("traversal", "dfs")
if s not in ("bfs", "dfs"):
raise aiohttp.web.HTTPBadRequest(text=f"invalid traversal order: {s}")
return s
def get_limit(self):
"""Validate HTTP query parameter `limit`, i.e., number of results"""
s = self.request.query.get("limit", "0")
try:
return int(s)
except ValueError:
raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}")
def get_max_edges(self):
"""Validate HTTP query parameter 'max_edges', i.e.,
the limit of the number of edges that can be visited"""
s = self.request.query.get("max_edges", "0")
try:
return int(s)
except ValueError:
raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}")
+ def check_swhid(self, swhid):
+ """Validate that the given SWHID exists in the graph"""
+ try:
+ self.backend.check_swhid(swhid)
+ except (NameError, ValueError) as e:
+ raise aiohttp.web.HTTPBadRequest(text=str(e))
+
class StreamingGraphView(GraphView):
"""Base class for views streaming their response line by line."""
content_type = "text/plain"
@asynccontextmanager
async def response_streamer(self, *args, **kwargs):
"""Context manager to prepare then close a StreamResponse"""
response = aiohttp.web.StreamResponse(*args, **kwargs)
response.content_type = self.content_type
await response.prepare(self.request)
yield response
await response.write_eof()
async def get(self):
await self.prepare_response()
async with self.response_streamer() as self.response_stream:
self._buf = []
try:
await self.stream_response()
finally:
await self._flush_buffer()
return self.response_stream
async def prepare_response(self):
"""This can be overridden with some setup to be run before the response
actually starts streaming.
"""
pass
async def stream_response(self):
"""Override this to perform the response streaming. Implementations of
this should await self.stream_line(line) to write each line.
"""
raise NotImplementedError
async def stream_line(self, line):
"""Write a line in the response stream."""
self._buf.append(line)
if len(self._buf) > 100:
await self._flush_buffer()
async def _flush_buffer(self):
await self.response_stream.write("\n".join(self._buf).encode() + b"\n")
self._buf = []
class StatsView(GraphView):
"""View showing some statistics on the graph"""
async def get(self):
stats = self.backend.stats()
return aiohttp.web.Response(body=stats, content_type="application/json")
class SimpleTraversalView(StreamingGraphView):
"""Base class for views of simple traversals"""
simple_traversal_type: Optional[str] = None
async def prepare_response(self):
- src = self.request.match_info["src"]
- self.src_node = self.node_of_swhid(src)
-
+ self.src = self.request.match_info["src"]
self.edges = self.get_edges()
self.direction = self.get_direction()
self.max_edges = self.get_max_edges()
self.return_types = self.get_return_types()
+ self.check_swhid(self.src)
async def stream_response(self):
- async for res_node in self.backend.simple_traversal(
+ async for res_line in self.backend.traversal(
self.simple_traversal_type,
self.direction,
self.edges,
- self.src_node,
+ self.src,
self.max_edges,
self.return_types,
):
- res_swhid = self.swhid_of_node(res_node)
- await self.stream_line(res_swhid)
+ await self.stream_line(res_line)
class LeavesView(SimpleTraversalView):
simple_traversal_type = "leaves"
class NeighborsView(SimpleTraversalView):
simple_traversal_type = "neighbors"
class VisitNodesView(SimpleTraversalView):
simple_traversal_type = "visit_nodes"
+class VisitEdgesView(SimpleTraversalView):
+ simple_traversal_type = "visit_edges"
+
+
class WalkView(StreamingGraphView):
async def prepare_response(self):
- src = self.request.match_info["src"]
- dst = self.request.match_info["dst"]
- self.src_node = self.node_of_swhid(src)
- if dst not in EXTENDED_SWHID_TYPES:
- self.dst_thing = self.node_of_swhid(dst)
- else:
- self.dst_thing = dst
+ self.src = self.request.match_info["src"]
+ self.dst = self.request.match_info["dst"]
self.edges = self.get_edges()
self.direction = self.get_direction()
self.algo = self.get_traversal()
self.limit = self.get_limit()
+ self.max_edges = self.get_max_edges()
self.return_types = self.get_return_types()
+ self.check_swhid(self.src)
+ if self.dst not in EXTENDED_SWHID_TYPES:
+ self.check_swhid(self.dst)
+
async def get_walk_iterator(self):
- return self.backend.walk(
- self.direction, self.edges, self.algo, self.src_node, self.dst_thing
+ return self.backend.traversal(
+ "walk",
+ self.direction,
+ self.edges,
+ self.algo,
+ self.src,
+ self.dst,
+ self.max_edges,
+ self.return_types,
)
async def stream_response(self):
it = self.get_walk_iterator()
if self.limit < 0:
queue = deque(maxlen=-self.limit)
- async for res_node in it:
- res_swhid = self.swhid_of_node(res_node)
+ async for res_swhid in it:
queue.append(res_swhid)
while queue:
await self.stream_line(queue.popleft())
else:
count = 0
- async for res_node in it:
+ async for res_swhid in it:
if self.limit == 0 or count < self.limit:
- res_swhid = self.swhid_of_node(res_node)
await self.stream_line(res_swhid)
count += 1
else:
break
class RandomWalkView(WalkView):
def get_walk_iterator(self):
- return self.backend.random_walk(
+ return self.backend.traversal(
+ "random_walk",
self.direction,
self.edges,
RANDOM_RETRIES,
- self.src_node,
- self.dst_thing,
+ self.src,
+ self.dst,
+ self.max_edges,
self.return_types,
)
-class VisitEdgesView(SimpleTraversalView):
- async def stream_response(self):
- it = self.backend.visit_edges(
- self.direction, self.edges, self.src_node, self.max_edges
- )
- async for (res_src, res_dst) in it:
- res_src_swhid = self.swhid_of_node(res_src)
- res_dst_swhid = self.swhid_of_node(res_dst)
- await self.stream_line("{} {}".format(res_src_swhid, res_dst_swhid))
-
-
-class VisitPathsView(SimpleTraversalView):
- content_type = "application/x-ndjson"
-
- async def stream_response(self):
- it = self.backend.visit_paths(
- self.direction, self.edges, self.src_node, self.max_edges
- )
- async for res_path in it:
- res_path_swhid = [self.swhid_of_node(n) for n in res_path]
- line = json.dumps(res_path_swhid)
- await self.stream_line(line)
-
-
class CountView(GraphView):
"""Base class for counting views."""
count_type: Optional[str] = None
async def get(self):
- src = self.request.match_info["src"]
- self.src_node = self.node_of_swhid(src)
+ self.src = self.request.match_info["src"]
+ self.check_swhid(self.src)
self.edges = self.get_edges()
self.direction = self.get_direction()
+ self.max_edges = self.get_max_edges()
loop = asyncio.get_event_loop()
cnt = await loop.run_in_executor(
None,
self.backend.count,
self.count_type,
self.direction,
self.edges,
- self.src_node,
+ self.src,
+ self.max_edges,
)
return aiohttp.web.Response(body=str(cnt), content_type="application/json")
class CountNeighborsView(CountView):
count_type = "neighbors"
class CountLeavesView(CountView):
count_type = "leaves"
class CountVisitNodesView(CountView):
count_type = "visit_nodes"
def make_app(config=None, backend=None, **kwargs):
if (config is None) == (backend is None):
raise ValueError("make_app() expects exactly one of 'config' or 'backend'")
if backend is None:
backend = Backend(graph_path=config["graph"]["path"], config=config["graph"])
app = GraphServerApp(**kwargs)
app.add_routes(
[
aiohttp.web.get("/", index),
aiohttp.web.get("/graph", index),
aiohttp.web.view("/graph/stats", StatsView),
aiohttp.web.view("/graph/leaves/{src}", LeavesView),
aiohttp.web.view("/graph/neighbors/{src}", NeighborsView),
aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView),
aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView),
- aiohttp.web.view("/graph/visit/paths/{src}", VisitPathsView),
# temporarily disabled in wait of a proper fix for T1969
# aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView)
aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView),
aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView),
aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView),
aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView),
]
)
app["backend"] = backend
return app
def make_app_from_configfile():
"""Load configuration and then build application to run
"""
config_file = os.environ.get("SWH_CONFIG_FILENAME")
config = config_read(config_file)
return make_app(config=config)
diff --git a/swh/graph/swhid.py b/swh/graph/swhid.py
index aadd0d0..90db73f 100644
--- a/swh/graph/swhid.py
+++ b/swh/graph/swhid.py
@@ -1,419 +1,419 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from collections.abc import MutableMapping
from enum import Enum
import mmap
from mmap import MAP_SHARED, PROT_READ, PROT_WRITE
import os
import struct
from typing import BinaryIO, Iterator, Tuple
from swh.model.hashutil import hash_to_hex
-from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID
+from swh.model.swhids import ExtendedObjectType, ExtendedSWHID
SWHID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes
INT_BIN_FMT = ">q" # big endian, 8-byte integer
SWHID_BIN_SIZE = 22 # in bytes
INT_BIN_SIZE = 8 # in bytes
class SwhidType(Enum):
"""types of existing SWHIDs, used to serialize ExtendedSWHID type as a (char)
integer
Note that the order does matter also for driving the binary search in
SWHID-indexed maps. Integer values also matter, for compatibility with the
Java layer.
"""
content = 0
directory = 1
origin = 2
release = 3
revision = 4
snapshot = 5
@classmethod
def from_extended_object_type(cls, object_type: ExtendedObjectType) -> SwhidType:
return cls[object_type.name.lower()]
def to_extended_object_type(self) -> ExtendedObjectType:
return ExtendedObjectType[SwhidType(self).name.upper()]
def str_to_bytes(swhid_str: str) -> bytes:
"""Convert a SWHID to a byte sequence
The binary format used to represent SWHIDs as 22-byte long byte sequences as
follows:
- 1 byte for the namespace version represented as a C `unsigned char`
- 1 byte for the object type, as the int value of :class:`SwhidType` enums,
represented as a C `unsigned char`
- 20 bytes for the SHA1 digest as a byte sequence
Args:
swhid: persistent identifier
Returns:
bytes: byte sequence representation of swhid
"""
swhid = ExtendedSWHID.from_string(swhid_str)
return struct.pack(
SWHID_BIN_FMT,
swhid.scheme_version,
SwhidType.from_extended_object_type(swhid.object_type).value,
swhid.object_id,
)
def bytes_to_str(bytes: bytes) -> str:
"""Inverse function of :func:`str_to_bytes`
See :func:`str_to_bytes` for a description of the binary SWHID format.
Args:
bytes: byte sequence representation of swhid
Returns:
swhid: persistent identifier
"""
(version, type, bin_digest) = struct.unpack(SWHID_BIN_FMT, bytes)
# The following is equivalent to:
# return str(ExtendedSWHID(
# object_type=SwhidType(type).to_extended_object_type(), object_id=bin_digest
# )
# but more efficient, because ExtendedSWHID.__init__ is extremely slow.
object_type = ExtendedObjectType[SwhidType(type).name.upper()]
return f"swh:1:{object_type.value}:{hash_to_hex(bin_digest)}"
class _OnDiskMap:
"""mmap-ed on-disk sequence of fixed size records"""
def __init__(
self, record_size: int, fname: str, mode: str = "rb", length: int = None
):
"""open an existing on-disk map
Args:
record_size: size of each record in bytes
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
length: map size in number of logical records; used to initialize
writable maps at creation time. Must be given when mode is 'wb'
and the map doesn't exist on disk; ignored otherwise
"""
os_modes = {"rb": os.O_RDONLY, "wb": os.O_RDWR | os.O_CREAT, "rb+": os.O_RDWR}
if mode not in os_modes:
raise ValueError("invalid file open mode: " + mode)
new_map = mode == "wb"
writable_map = mode in ["wb", "rb+"]
self.record_size = record_size
self.fd = os.open(fname, os_modes[mode])
if new_map:
if length is None:
raise ValueError("missing length when creating new map")
os.truncate(self.fd, length * self.record_size)
self.size = os.path.getsize(fname)
(self.length, remainder) = divmod(self.size, record_size)
if remainder:
raise ValueError(
"map size {} is not a multiple of the record size {}".format(
self.size, record_size
)
)
self.mm = mmap.mmap(
self.fd,
self.size,
prot=(PROT_READ | PROT_WRITE if writable_map else PROT_READ),
flags=MAP_SHARED,
)
def close(self) -> None:
"""close the map
shuts down both the mmap and the underlying file descriptor
"""
if not self.mm.closed:
self.mm.close()
os.close(self.fd)
def __len__(self) -> int:
return self.length
def __delitem__(self, pos: int) -> None:
raise NotImplementedError("cannot delete records from fixed-size map")
class SwhidToNodeMap(_OnDiskMap, MutableMapping):
"""memory mapped map from :ref:`SWHIDs ` to a
continuous range 0..N of (8-byte long) integers
This is the converse mapping of :class:`NodeToSwhidMap`.
The on-disk serialization format is a sequence of fixed length (30 bytes)
records with the following fields:
- SWHID (22 bytes): binary SWHID representation as per :func:`str_to_bytes`
- long (8 bytes): big endian long integer
The records are sorted lexicographically by SWHID type and checksum, where
type is the integer value of :class:`SwhidType`. SWHID lookup in the map is
performed via binary search. Hence a huge map with, say, 11 B entries,
will require ~30 disk seeks.
Note that, due to fixed size + ordering, it is not possible to create these
maps by random writing. Hence, __setitem__ can be used only to *update* the
value associated to an existing key, rather than to add a missing item. To
create an entire map from scratch, you should do so *sequentially*, using
static method :meth:`write_record` (or, at your own risk, by hand via the
mmap :attr:`mm`).
"""
# record binary format: SWHID + a big endian 8-byte big endian integer
RECORD_BIN_FMT = ">" + SWHID_BIN_FMT + "q"
RECORD_SIZE = SWHID_BIN_SIZE + INT_BIN_SIZE
def __init__(self, fname: str, mode: str = "rb", length: int = None):
"""open an existing on-disk map
Args:
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
length: map size in number of logical records; used to initialize
read-write maps at creation time. Must be given when mode is
'wb'; ignored otherwise
"""
super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length)
def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]:
"""seek and return the (binary) record at a given (logical) position
see :func:`_get_record` for an equivalent function with additional
deserialization
Args:
pos: 0-based record number
Returns:
a pair `(swhid, int)`, where swhid and int are bytes
"""
rec_pos = pos * self.RECORD_SIZE
int_pos = rec_pos + SWHID_BIN_SIZE
return (self.mm[rec_pos:int_pos], self.mm[int_pos : int_pos + INT_BIN_SIZE])
def _get_record(self, pos: int) -> Tuple[str, int]:
"""seek and return the record at a given (logical) position
moral equivalent of :func:`_get_bin_record`, with additional
deserialization to non-bytes types
Args:
pos: 0-based record number
Returns:
a pair `(swhid, int)`, where swhid is a string-based SWHID and int the
corresponding integer identifier
"""
(swhid_bytes, int_bytes) = self._get_bin_record(pos)
return (bytes_to_str(swhid_bytes), struct.unpack(INT_BIN_FMT, int_bytes)[0])
@classmethod
def write_record(cls, f: BinaryIO, swhid: str, int: int) -> None:
"""write a logical record to a file-like object
Args:
f: file-like object to write the record to
swhid: textual SWHID
int: SWHID integer identifier
"""
f.write(str_to_bytes(swhid))
f.write(struct.pack(INT_BIN_FMT, int))
def _bisect_pos(self, swhid_str: str) -> int:
"""bisect the position of the given identifier. If the identifier is
not found, the position of the swhid immediately after is returned.
Args:
swhid_str: the swhid as a string
Returns:
the logical record of the bisected position in the map
"""
if not isinstance(swhid_str, str):
raise TypeError("SWHID must be a str, not {}".format(type(swhid_str)))
try:
target = str_to_bytes(swhid_str) # desired SWHID as bytes
except ValueError:
raise ValueError('invalid SWHID: "{}"'.format(swhid_str))
lo = 0
hi = self.length - 1
while lo < hi:
mid = (lo + hi) // 2
(swhid, _value) = self._get_bin_record(mid)
if swhid < target:
lo = mid + 1
else:
hi = mid
return lo
def _find(self, swhid_str: str) -> Tuple[int, int]:
"""lookup the integer identifier of a swhid and its position
Args:
swhid_str: the swhid as a string
Returns:
a pair `(swhid, pos)` with swhid integer identifier and its logical
record position in the map
"""
pos = self._bisect_pos(swhid_str)
swhid_found, value = self._get_record(pos)
if swhid_found == swhid_str:
return (value, pos)
raise KeyError(swhid_str)
def __getitem__(self, swhid_str: str) -> int:
"""lookup the integer identifier of a SWHID
Args:
swhid: the SWHID as a string
Returns:
the integer identifier of swhid
"""
return self._find(swhid_str)[0] # return element, ignore position
def __setitem__(self, swhid_str: str, int: str) -> None:
(_swhid, pos) = self._find(swhid_str) # might raise KeyError and that's OK
rec_pos = pos * self.RECORD_SIZE
int_pos = rec_pos + SWHID_BIN_SIZE
self.mm[rec_pos:int_pos] = str_to_bytes(swhid_str)
self.mm[int_pos : int_pos + INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int)
def __iter__(self) -> Iterator[Tuple[str, int]]:
for pos in range(self.length):
yield self._get_record(pos)
def iter_prefix(self, prefix: str):
swh, n, t, sha = prefix.split(":")
sha = sha.ljust(40, "0")
start_swhid = ":".join([swh, n, t, sha])
start = self._bisect_pos(start_swhid)
for pos in range(start, self.length):
swhid, value = self._get_record(pos)
if not swhid.startswith(prefix):
break
yield swhid, value
def iter_type(self, swhid_type: str) -> Iterator[Tuple[str, int]]:
prefix = "swh:1:{}:".format(swhid_type)
yield from self.iter_prefix(prefix)
class NodeToSwhidMap(_OnDiskMap, MutableMapping):
"""memory mapped map from a continuous range of 0..N (8-byte long) integers to
:ref:`SWHIDs `
This is the converse mapping of :class:`SwhidToNodeMap`.
The on-disk serialization format is a sequence of fixed length records (22
bytes), each being the binary representation of a SWHID as per
:func:`str_to_bytes`.
The records are sorted by long integer, so that integer lookup is possible
via fixed-offset seek.
"""
RECORD_BIN_FMT = SWHID_BIN_FMT
RECORD_SIZE = SWHID_BIN_SIZE
def __init__(self, fname: str, mode: str = "rb", length: int = None):
"""open an existing on-disk map
Args:
fname: path to the on-disk map
mode: file open mode, usually either 'rb' for read-only maps, 'wb'
for creating new maps, or 'rb+' for updating existing ones
(default: 'rb')
size: map size in number of logical records; used to initialize
read-write maps at creation time. Must be given when mode is
'wb'; ignored otherwise
length: passed to :class:`_OnDiskMap`
"""
super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length)
def _get_bin_record(self, pos: int) -> bytes:
"""seek and return the (binary) SWHID at a given (logical) position
Args:
pos: 0-based record number
Returns:
SWHID as a byte sequence
"""
rec_pos = pos * self.RECORD_SIZE
return self.mm[rec_pos : rec_pos + self.RECORD_SIZE]
@classmethod
def write_record(cls, f: BinaryIO, swhid: str) -> None:
"""write a SWHID to a file-like object
Args:
f: file-like object to write the record to
swhid: textual SWHID
"""
f.write(str_to_bytes(swhid))
def __getitem__(self, pos: int) -> str:
orig_pos = pos
if pos < 0:
pos = len(self) + pos
if not (0 <= pos < len(self)):
raise IndexError(orig_pos)
return bytes_to_str(self._get_bin_record(pos))
def __setitem__(self, pos: int, swhid: str) -> None:
rec_pos = pos * self.RECORD_SIZE
self.mm[rec_pos : rec_pos + self.RECORD_SIZE] = str_to_bytes(swhid)
def __iter__(self) -> Iterator[Tuple[int, str]]:
for pos in range(self.length):
yield (pos, self[pos])
diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py
index 5a7bb92..fed877b 100644
--- a/swh/graph/tests/conftest.py
+++ b/swh/graph/tests/conftest.py
@@ -1,68 +1,59 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import csv
import multiprocessing
from pathlib import Path
from aiohttp.test_utils import TestClient, TestServer, loop_context
import pytest
from swh.graph.client import RemoteGraphClient
from swh.graph.naive_client import NaiveClient
SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0]
TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/output/example"
class GraphServerProcess(multiprocessing.Process):
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
# Lazy import to allow debian packaging
from swh.graph.backend import Backend
from swh.graph.server.app import make_app
try:
backend = Backend(graph_path=str(TEST_GRAPH_PATH))
with loop_context() as loop:
app = make_app(backend=backend, debug=True)
client = TestClient(TestServer(app), loop=loop)
loop.run_until_complete(client.start_server())
url = client.make_url("/graph/")
self.q.put(url)
loop.run_forever()
except Exception as e:
self.q.put(e)
@pytest.fixture(scope="module", params=["remote", "naive"])
def graph_client(request):
if request.param == "remote":
queue = multiprocessing.Queue()
server = GraphServerProcess(queue)
server.start()
res = queue.get()
if isinstance(res, Exception):
raise res
yield RemoteGraphClient(str(res))
server.terminate()
else:
with open(SWH_GRAPH_TESTS_ROOT / "dataset/example.nodes.csv") as fd:
nodes = [node for (node,) in csv.reader(fd, delimiter=" ")]
with open(SWH_GRAPH_TESTS_ROOT / "dataset/example.edges.csv") as fd:
edges = list(csv.reader(fd, delimiter=" "))
yield NaiveClient(nodes=nodes, edges=edges)
-
-
-@pytest.fixture(scope="module")
-def graph():
- # Lazy import to allow debian packaging
- from swh.graph.graph import load as graph_load
-
- with graph_load(str(TEST_GRAPH_PATH)) as g:
- yield g
diff --git a/swh/graph/tests/dataset/output/example-transposed.graph b/swh/graph/tests/dataset/output/example-transposed.graph
index 5460ea4..ad5756e 100644
--- a/swh/graph/tests/dataset/output/example-transposed.graph
+++ b/swh/graph/tests/dataset/output/example-transposed.graph
@@ -1 +1 @@
-[):+u6Mjk5ֺ
\ No newline at end of file
+z.hѮIt{
\ No newline at end of file
diff --git a/swh/graph/tests/dataset/output/example-transposed.obl b/swh/graph/tests/dataset/output/example-transposed.obl
index 1291af6..54f0ac8 100644
Binary files a/swh/graph/tests/dataset/output/example-transposed.obl and b/swh/graph/tests/dataset/output/example-transposed.obl differ
diff --git a/swh/graph/tests/dataset/output/example-transposed.offsets b/swh/graph/tests/dataset/output/example-transposed.offsets
index 0b2e742..92c2947 100644
--- a/swh/graph/tests/dataset/output/example-transposed.offsets
+++ b/swh/graph/tests/dataset/output/example-transposed.offsets
@@ -1 +1,2 @@
-(pBERB
\ No newline at end of file
+
+ RqG4PTP(
\ No newline at end of file
diff --git a/swh/graph/tests/dataset/output/example-transposed.properties b/swh/graph/tests/dataset/output/example-transposed.properties
index 1f6c525..512ce9d 100644
--- a/swh/graph/tests/dataset/output/example-transposed.properties
+++ b/swh/graph/tests/dataset/output/example-transposed.properties
@@ -1,35 +1,35 @@
#BVGraph properties
-#Fri Apr 02 13:56:41 UTC 2021
-bitsforreferences=27
+#Sat Dec 04 01:37:28 CET 2021
+bitsforreferences=31
avgbitsforintervals=0.714
graphclass=it.unimi.dsi.big.webgraph.BVGraph
-avgdist=0.381
-successoravggap=6.739
-residualexpstats=6,5,2,1,2,1
+avgdist=0.571
+successoravggap=6.478
+residualexpstats=6,6,2,2,2
arcs=23
minintervallength=4
bitsforoutdegrees=61
-residualavgloggap=2.2888731039272048
+residualavgloggap=2.1534522798004265
avgbitsforoutdegrees=2.905
-bitsforresiduals=83
-successoravgloggap=2.2822834512468524
+bitsforresiduals=85
+successoravgloggap=2.3226776741991215
maxrefcount=3
-successorexpstats=7,6,5,3,1,1
-residualarcs=17
-avgbitsforresiduals=3.952
-avgbitsforblocks=0.286
+successorexpstats=7,6,4,3,3
+residualarcs=18
+avgbitsforresiduals=4.048
+avgbitsforblocks=0.238
windowsize=7
-residualavggap=7.971
-copiedarcs=6
-avgbitsforreferences=1.286
+residualavggap=5.667
+copiedarcs=5
+avgbitsforreferences=1.476
version=0
-compratio=1.515
-bitsperlink=8.348
+compratio=1.554
+bitsperlink=8.565
compressionflags=
nodes=21
-avgref=0.333
+avgref=0.238
zetak=3
bitsforintervals=15
intervalisedarcs=0
-bitspernode=9.143
-bitsforblocks=6
+bitspernode=9.381
+bitsforblocks=5
diff --git a/swh/graph/tests/dataset/output/example.graph b/swh/graph/tests/dataset/output/example.graph
index e13c173..621b9b7 100644
--- a/swh/graph/tests/dataset/output/example.graph
+++ b/swh/graph/tests/dataset/output/example.graph
@@ -1 +1 @@
-}]/I#zWu.ޥ`
\ No newline at end of file
+'t}UOGϹ]ް].dP}R
\ No newline at end of file
diff --git a/swh/graph/tests/dataset/output/example.mph b/swh/graph/tests/dataset/output/example.mph
index 7838165..c6f9e19 100644
Binary files a/swh/graph/tests/dataset/output/example.mph and b/swh/graph/tests/dataset/output/example.mph differ
diff --git a/swh/graph/tests/dataset/output/example.node2swhid.bin b/swh/graph/tests/dataset/output/example.node2swhid.bin
index 63cecba..9cc50b2 100644
Binary files a/swh/graph/tests/dataset/output/example.node2swhid.bin and b/swh/graph/tests/dataset/output/example.node2swhid.bin differ
diff --git a/swh/graph/tests/dataset/output/example.node2type.map b/swh/graph/tests/dataset/output/example.node2type.map
index 0a0a609..6b91c37 100644
Binary files a/swh/graph/tests/dataset/output/example.node2type.map and b/swh/graph/tests/dataset/output/example.node2type.map differ
diff --git a/swh/graph/tests/dataset/output/example.obl b/swh/graph/tests/dataset/output/example.obl
index 456c6ef..1b4fd2e 100644
Binary files a/swh/graph/tests/dataset/output/example.obl and b/swh/graph/tests/dataset/output/example.obl differ
diff --git a/swh/graph/tests/dataset/output/example.offsets b/swh/graph/tests/dataset/output/example.offsets
index f7d2333..407e1a6 100644
--- a/swh/graph/tests/dataset/output/example.offsets
+++ b/swh/graph/tests/dataset/output/example.offsets
@@ -1,2 +1 @@
-(PHPԒ
-P)
\ No newline at end of file
+BU!B
diff --git a/swh/graph/tests/dataset/output/example.order b/swh/graph/tests/dataset/output/example.order
index 5e99fea..2cb5540 100644
Binary files a/swh/graph/tests/dataset/output/example.order and b/swh/graph/tests/dataset/output/example.order differ
diff --git a/swh/graph/tests/dataset/output/example.properties b/swh/graph/tests/dataset/output/example.properties
index 5b48508..cb6975a 100644
--- a/swh/graph/tests/dataset/output/example.properties
+++ b/swh/graph/tests/dataset/output/example.properties
@@ -1,35 +1,35 @@
#BVGraph properties
-#Fri Apr 02 13:56:08 UTC 2021
+#Sat Dec 04 01:37:26 CET 2021
bitsforreferences=14
avgbitsforintervals=0.667
graphclass=it.unimi.dsi.big.webgraph.BVGraph
avgdist=0
-successoravggap=7.652
-residualexpstats=7,5,5,3,2,1
+successoravggap=7.391
+residualexpstats=7,7,3,3,2,1
arcs=23
minintervallength=4
bitsforoutdegrees=51
-residualavgloggap=2.40434236090153
+residualavgloggap=2.32668281341601
avgbitsforoutdegrees=2.429
-bitsforresiduals=108
-successoravgloggap=2.40434236090153
+bitsforresiduals=111
+successoravgloggap=2.32668281341601
maxrefcount=3
-successorexpstats=7,5,5,3,2,1
+successorexpstats=7,7,3,3,2,1
residualarcs=23
-avgbitsforresiduals=5.143
+avgbitsforresiduals=5.286
avgbitsforblocks=0
windowsize=7
-residualavggap=7.652
+residualavggap=7.391
copiedarcs=0
avgbitsforreferences=0.667
version=0
-compratio=1.475
-bitsperlink=8.13
+compratio=1.499
+bitsperlink=8.261
compressionflags=
nodes=21
avgref=0
zetak=3
bitsforintervals=14
intervalisedarcs=0
-bitspernode=8.905
+bitspernode=9.048
bitsforblocks=0
diff --git a/swh/graph/tests/dataset/output/example.stats b/swh/graph/tests/dataset/output/example.stats
index 8b1eb1c..a58d3e2 100644
--- a/swh/graph/tests/dataset/output/example.stats
+++ b/swh/graph/tests/dataset/output/example.stats
@@ -1,20 +1,20 @@
nodes=21
arcs=23
loops=0
-successoravggap=5.765
-avglocality=3.826
+successoravggap=7.765
+avglocality=3.783
minoutdegree=0
maxoutdegree=3
-minoutdegreenode=7
-maxoutdegreenode=4
+minoutdegreenode=1
+maxoutdegreenode=0
dangling=7
terminal=7
percdangling=33.333333333333336
avgoutdegree=1.0952380952380953
-successorlogdeltastats=7,9,3,3,1
-successoravglogdelta=1.020
+successorlogdeltastats=11,7,1,3,1
+successoravglogdelta=0.911
minindegree=0
maxindegree=3
-minindegreenode=19
-maxindegreenode=1
+minindegreenode=17
+maxindegreenode=18
avgindegree=1.0952380952380953
diff --git a/swh/graph/tests/dataset/output/example.swhid2node.bin b/swh/graph/tests/dataset/output/example.swhid2node.bin
deleted file mode 100644
index b8df2fa..0000000
Binary files a/swh/graph/tests/dataset/output/example.swhid2node.bin and /dev/null differ
diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py
index 90f9a0a..46e0227 100644
--- a/swh/graph/tests/test_api_client.py
+++ b/swh/graph/tests/test_api_client.py
@@ -1,375 +1,379 @@
import pytest
from pytest import raises
from swh.core.api import RemoteException
from swh.graph.client import GraphArgumentException
def test_stats(graph_client):
stats = graph_client.stats()
assert set(stats.keys()) == {"counts", "ratios", "indegree", "outdegree"}
assert set(stats["counts"].keys()) == {"nodes", "edges"}
assert set(stats["ratios"].keys()) == {
"compression",
"bits_per_node",
"bits_per_edge",
"avg_locality",
}
assert set(stats["indegree"].keys()) == {"min", "max", "avg"}
assert set(stats["outdegree"].keys()) == {"min", "max", "avg"}
assert stats["counts"]["nodes"] == 21
assert stats["counts"]["edges"] == 23
assert isinstance(stats["ratios"]["compression"], float)
assert isinstance(stats["ratios"]["bits_per_node"], float)
assert isinstance(stats["ratios"]["bits_per_edge"], float)
assert isinstance(stats["ratios"]["avg_locality"], float)
assert stats["indegree"]["min"] == 0
assert stats["indegree"]["max"] == 3
assert isinstance(stats["indegree"]["avg"], float)
assert stats["outdegree"]["min"] == 0
assert stats["outdegree"]["max"] == 3
assert isinstance(stats["outdegree"]["avg"], float)
def test_leaves(graph_client):
actual = list(
graph_client.leaves("swh:1:ori:0000000000000000000000000000000000000021")
)
expected = [
"swh:1:cnt:0000000000000000000000000000000000000001",
"swh:1:cnt:0000000000000000000000000000000000000004",
"swh:1:cnt:0000000000000000000000000000000000000005",
"swh:1:cnt:0000000000000000000000000000000000000007",
]
assert set(actual) == set(expected)
def test_neighbors(graph_client):
actual = list(
graph_client.neighbors(
"swh:1:rev:0000000000000000000000000000000000000009", direction="backward"
)
)
expected = [
"swh:1:snp:0000000000000000000000000000000000000020",
"swh:1:rel:0000000000000000000000000000000000000010",
"swh:1:rev:0000000000000000000000000000000000000013",
]
assert set(actual) == set(expected)
def test_visit_nodes(graph_client):
actual = list(
graph_client.visit_nodes(
"swh:1:rel:0000000000000000000000000000000000000010",
edges="rel:rev,rev:rev",
)
)
expected = [
"swh:1:rel:0000000000000000000000000000000000000010",
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:rev:0000000000000000000000000000000000000003",
]
assert set(actual) == set(expected)
def test_visit_nodes_filtered(graph_client):
actual = list(
graph_client.visit_nodes(
"swh:1:rel:0000000000000000000000000000000000000010", return_types="dir",
)
)
expected = [
"swh:1:dir:0000000000000000000000000000000000000002",
"swh:1:dir:0000000000000000000000000000000000000008",
"swh:1:dir:0000000000000000000000000000000000000006",
]
assert set(actual) == set(expected)
def test_visit_nodes_filtered_star(graph_client):
actual = list(
graph_client.visit_nodes(
"swh:1:rel:0000000000000000000000000000000000000010", return_types="*",
)
)
expected = [
"swh:1:rel:0000000000000000000000000000000000000010",
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:rev:0000000000000000000000000000000000000003",
"swh:1:dir:0000000000000000000000000000000000000002",
"swh:1:cnt:0000000000000000000000000000000000000001",
"swh:1:dir:0000000000000000000000000000000000000008",
"swh:1:cnt:0000000000000000000000000000000000000007",
"swh:1:dir:0000000000000000000000000000000000000006",
"swh:1:cnt:0000000000000000000000000000000000000004",
"swh:1:cnt:0000000000000000000000000000000000000005",
]
assert set(actual) == set(expected)
def test_visit_edges(graph_client):
actual = list(
graph_client.visit_edges(
"swh:1:rel:0000000000000000000000000000000000000010",
edges="rel:rev,rev:rev,rev:dir",
)
)
expected = [
(
"swh:1:rel:0000000000000000000000000000000000000010",
"swh:1:rev:0000000000000000000000000000000000000009",
),
(
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:rev:0000000000000000000000000000000000000003",
),
(
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:dir:0000000000000000000000000000000000000008",
),
(
"swh:1:rev:0000000000000000000000000000000000000003",
"swh:1:dir:0000000000000000000000000000000000000002",
),
]
assert set(actual) == set(expected)
def test_visit_edges_limited(graph_client):
actual = list(
graph_client.visit_edges(
"swh:1:rel:0000000000000000000000000000000000000010",
max_edges=4,
edges="rel:rev,rev:rev,rev:dir",
)
)
expected = [
(
"swh:1:rel:0000000000000000000000000000000000000010",
"swh:1:rev:0000000000000000000000000000000000000009",
),
(
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:rev:0000000000000000000000000000000000000003",
),
(
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:dir:0000000000000000000000000000000000000008",
),
(
"swh:1:rev:0000000000000000000000000000000000000003",
"swh:1:dir:0000000000000000000000000000000000000002",
),
]
# As there are four valid answers (up to reordering), we cannot check for
# equality. Instead, we check the client returned all edges but one.
assert set(actual).issubset(set(expected))
assert len(actual) == 3
def test_visit_edges_diamond_pattern(graph_client):
actual = list(
graph_client.visit_edges(
"swh:1:rev:0000000000000000000000000000000000000009", edges="*",
)
)
expected = [
(
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:rev:0000000000000000000000000000000000000003",
),
(
"swh:1:rev:0000000000000000000000000000000000000009",
"swh:1:dir:0000000000000000000000000000000000000008",
),
(
"swh:1:rev:0000000000000000000000000000000000000003",
"swh:1:dir:0000000000000000000000000000000000000002",
),
(
"swh:1:dir:0000000000000000000000000000000000000002",
"swh:1:cnt:0000000000000000000000000000000000000001",
),
(
"swh:1:dir:0000000000000000000000000000000000000008",
"swh:1:cnt:0000000000000000000000000000000000000001",
),
(
"swh:1:dir:0000000000000000000000000000000000000008",
"swh:1:cnt:0000000000000000000000000000000000000007",
),
(
"swh:1:dir:0000000000000000000000000000000000000008",
"swh:1:dir:0000000000000000000000000000000000000006",
),
(
"swh:1:dir:0000000000000000000000000000000000000006",
"swh:1:cnt:0000000000000000000000000000000000000004",
),
(
"swh:1:dir:0000000000000000000000000000000000000006",
"swh:1:cnt:0000000000000000000000000000000000000005",
),
]
assert set(actual) == set(expected)
-def test_visit_paths(graph_client):
- actual = list(
- graph_client.visit_paths(
- "swh:1:snp:0000000000000000000000000000000000000020", edges="snp:*,rev:*"
- )
- )
- actual = [tuple(path) for path in actual]
- expected = [
- (
- "swh:1:snp:0000000000000000000000000000000000000020",
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:rev:0000000000000000000000000000000000000003",
- "swh:1:dir:0000000000000000000000000000000000000002",
- ),
- (
- "swh:1:snp:0000000000000000000000000000000000000020",
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:dir:0000000000000000000000000000000000000008",
- ),
- (
- "swh:1:snp:0000000000000000000000000000000000000020",
- "swh:1:rel:0000000000000000000000000000000000000010",
- ),
- ]
- assert set(actual) == set(expected)
-
-
@pytest.mark.skip(reason="currently disabled due to T1969")
def test_walk(graph_client):
args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel")
kwargs = {
"edges": "dir:dir,dir:rev,rev:*",
"direction": "backward",
"traversal": "bfs",
}
actual = list(graph_client.walk(*args, **kwargs))
expected = [
"swh:1:dir:0000000000000000000000000000000000000016",
"swh:1:dir:0000000000000000000000000000000000000017",
"swh:1:rev:0000000000000000000000000000000000000018",
"swh:1:rel:0000000000000000000000000000000000000019",
]
assert set(actual) == set(expected)
kwargs2 = kwargs.copy()
kwargs2["limit"] = -1
actual = list(graph_client.walk(*args, **kwargs2))
expected = ["swh:1:rel:0000000000000000000000000000000000000019"]
assert set(actual) == set(expected)
kwargs2 = kwargs.copy()
kwargs2["limit"] = 2
actual = list(graph_client.walk(*args, **kwargs2))
expected = [
"swh:1:dir:0000000000000000000000000000000000000016",
"swh:1:dir:0000000000000000000000000000000000000017",
]
assert set(actual) == set(expected)
-def test_random_walk(graph_client):
+def test_random_walk_dst_is_type(graph_client):
"""as the walk is random, we test a visit from a cnt node to the only
origin in the dataset, and only check the final node of the path
(i.e., the origin)
"""
args = ("swh:1:cnt:0000000000000000000000000000000000000001", "ori")
kwargs = {"direction": "backward"}
expected_root = "swh:1:ori:0000000000000000000000000000000000000021"
actual = list(graph_client.random_walk(*args, **kwargs))
assert len(actual) > 1 # no origin directly links to a content
assert actual[0] == args[0]
assert actual[-1] == expected_root
kwargs2 = kwargs.copy()
kwargs2["limit"] = -1
actual = list(graph_client.random_walk(*args, **kwargs2))
assert actual == [expected_root]
kwargs2["limit"] = -2
actual = list(graph_client.random_walk(*args, **kwargs2))
assert len(actual) == 2
assert actual[-1] == expected_root
kwargs2["limit"] = 3
actual = list(graph_client.random_walk(*args, **kwargs2))
assert len(actual) == 3
+def test_random_walk_dst_is_node(graph_client):
+ """Same as test_random_walk_dst_is_type, but we target the specific origin
+ node instead of a type
+ """
+ args = (
+ "swh:1:cnt:0000000000000000000000000000000000000001",
+ "swh:1:ori:0000000000000000000000000000000000000021",
+ )
+ kwargs = {"direction": "backward"}
+ expected_root = "swh:1:ori:0000000000000000000000000000000000000021"
+
+ actual = list(graph_client.random_walk(*args, **kwargs))
+ assert len(actual) > 1 # no origin directly links to a content
+ assert actual[0] == args[0]
+ assert actual[-1] == expected_root
+
+ kwargs2 = kwargs.copy()
+ kwargs2["limit"] = -1
+ actual = list(graph_client.random_walk(*args, **kwargs2))
+ assert actual == [expected_root]
+
+ kwargs2["limit"] = -2
+ actual = list(graph_client.random_walk(*args, **kwargs2))
+ assert len(actual) == 2
+ assert actual[-1] == expected_root
+
+ kwargs2["limit"] = 3
+ actual = list(graph_client.random_walk(*args, **kwargs2))
+ assert len(actual) == 3
+
+
def test_count(graph_client):
actual = graph_client.count_leaves(
"swh:1:ori:0000000000000000000000000000000000000021"
)
assert actual == 4
actual = graph_client.count_visit_nodes(
"swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev"
)
assert actual == 3
actual = graph_client.count_neighbors(
"swh:1:rev:0000000000000000000000000000000000000009", direction="backward"
)
assert actual == 3
def test_param_validation(graph_client):
with raises(GraphArgumentException) as exc_info: # SWHID not found
list(graph_client.leaves("swh:1:ori:fff0000000000000000000000000000000000021"))
if exc_info.value.response:
assert exc_info.value.response.status_code == 404
with raises(GraphArgumentException) as exc_info: # malformed SWHID
list(
graph_client.neighbors("swh:1:ori:fff000000zzzzzz0000000000000000000000021")
)
if exc_info.value.response:
assert exc_info.value.response.status_code == 400
with raises(GraphArgumentException) as exc_info: # malformed edge specificaiton
list(
graph_client.visit_nodes(
"swh:1:dir:0000000000000000000000000000000000000016",
edges="dir:notanodetype,dir:rev,rev:*",
direction="backward",
)
)
if exc_info.value.response:
assert exc_info.value.response.status_code == 400
with raises(GraphArgumentException) as exc_info: # malformed direction
list(
graph_client.visit_nodes(
"swh:1:dir:0000000000000000000000000000000000000016",
edges="dir:dir,dir:rev,rev:*",
direction="notadirection",
)
)
if exc_info.value.response:
assert exc_info.value.response.status_code == 400
@pytest.mark.skip(reason="currently disabled due to T1969")
def test_param_validation_walk(graph_client):
"""test validation of walk-specific parameters only"""
with raises(RemoteException) as exc_info: # malformed traversal order
list(
graph_client.walk(
"swh:1:dir:0000000000000000000000000000000000000016",
"rel",
edges="dir:dir,dir:rev,rev:*",
direction="backward",
traversal="notatraversalorder",
)
)
assert exc_info.value.response.status_code == 400
diff --git a/swh/graph/tests/test_graph.py b/swh/graph/tests/test_graph.py
deleted file mode 100644
index c752580..0000000
--- a/swh/graph/tests/test_graph.py
+++ /dev/null
@@ -1,166 +0,0 @@
-import pytest
-
-
-def test_graph(graph):
- assert len(graph) == 21
-
- obj = "swh:1:dir:0000000000000000000000000000000000000008"
- node = graph[obj]
-
- assert str(node) == obj
- assert len(node.children()) == 3
- assert len(node.parents()) == 2
-
- actual = {p.swhid for p in node.children()}
- expected = {
- "swh:1:cnt:0000000000000000000000000000000000000001",
- "swh:1:dir:0000000000000000000000000000000000000006",
- "swh:1:cnt:0000000000000000000000000000000000000007",
- }
- assert expected == actual
-
- actual = {p.swhid for p in node.parents()}
- expected = {
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:dir:0000000000000000000000000000000000000012",
- }
- assert expected == actual
-
-
-def test_invalid_swhid(graph):
- with pytest.raises(IndexError):
- graph[1337]
-
- with pytest.raises(IndexError):
- graph[len(graph) + 1]
-
- with pytest.raises(KeyError):
- graph["swh:1:dir:0000000000000000000000000000000420000012"]
-
-
-def test_leaves(graph):
- actual = list(graph["swh:1:ori:0000000000000000000000000000000000000021"].leaves())
- actual = [p.swhid for p in actual]
- expected = [
- "swh:1:cnt:0000000000000000000000000000000000000001",
- "swh:1:cnt:0000000000000000000000000000000000000004",
- "swh:1:cnt:0000000000000000000000000000000000000005",
- "swh:1:cnt:0000000000000000000000000000000000000007",
- ]
- assert set(actual) == set(expected)
-
-
-def test_visit_nodes(graph):
- actual = list(
- graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_nodes(
- edges="rel:rev,rev:rev"
- )
- )
- actual = [p.swhid for p in actual]
- expected = [
- "swh:1:rel:0000000000000000000000000000000000000010",
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:rev:0000000000000000000000000000000000000003",
- ]
- assert set(actual) == set(expected)
-
-
-def test_visit_edges(graph):
- actual = list(
- graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_edges(
- edges="rel:rev,rev:rev,rev:dir"
- )
- )
- actual = [(src.swhid, dst.swhid) for src, dst in actual]
- expected = [
- (
- "swh:1:rel:0000000000000000000000000000000000000010",
- "swh:1:rev:0000000000000000000000000000000000000009",
- ),
- (
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:rev:0000000000000000000000000000000000000003",
- ),
- (
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:dir:0000000000000000000000000000000000000008",
- ),
- (
- "swh:1:rev:0000000000000000000000000000000000000003",
- "swh:1:dir:0000000000000000000000000000000000000002",
- ),
- ]
- assert set(actual) == set(expected)
-
-
-def test_visit_paths(graph):
- actual = list(
- graph["swh:1:snp:0000000000000000000000000000000000000020"].visit_paths(
- edges="snp:*,rev:*"
- )
- )
- actual = [tuple(n.swhid for n in path) for path in actual]
- expected = [
- (
- "swh:1:snp:0000000000000000000000000000000000000020",
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:rev:0000000000000000000000000000000000000003",
- "swh:1:dir:0000000000000000000000000000000000000002",
- ),
- (
- "swh:1:snp:0000000000000000000000000000000000000020",
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:dir:0000000000000000000000000000000000000008",
- ),
- (
- "swh:1:snp:0000000000000000000000000000000000000020",
- "swh:1:rel:0000000000000000000000000000000000000010",
- ),
- ]
- assert set(actual) == set(expected)
-
-
-def test_walk(graph):
- actual = list(
- graph["swh:1:dir:0000000000000000000000000000000000000016"].walk(
- "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="bfs"
- )
- )
- actual = [p.swhid for p in actual]
- expected = [
- "swh:1:dir:0000000000000000000000000000000000000016",
- "swh:1:dir:0000000000000000000000000000000000000017",
- "swh:1:rev:0000000000000000000000000000000000000018",
- "swh:1:rel:0000000000000000000000000000000000000019",
- ]
- assert set(actual) == set(expected)
-
-
-def test_count(graph):
- assert (
- graph["swh:1:ori:0000000000000000000000000000000000000021"].count_leaves() == 4
- )
- assert (
- graph["swh:1:rel:0000000000000000000000000000000000000010"].count_visit_nodes(
- edges="rel:rev,rev:rev"
- )
- == 3
- )
- assert (
- graph["swh:1:rev:0000000000000000000000000000000000000009"].count_neighbors(
- direction="backward"
- )
- == 3
- )
-
-
-def test_iter_type(graph):
- rev_list = list(graph.iter_type("rev"))
- actual = [n.swhid for n in rev_list]
- expected = [
- "swh:1:rev:0000000000000000000000000000000000000003",
- "swh:1:rev:0000000000000000000000000000000000000009",
- "swh:1:rev:0000000000000000000000000000000000000013",
- "swh:1:rev:0000000000000000000000000000000000000018",
- ]
- assert expected == actual
diff --git a/swh/graph/tests/test_swhid.py b/swh/graph/tests/test_swhid.py
index 722e6b1..6053215 100644
--- a/swh/graph/tests/test_swhid.py
+++ b/swh/graph/tests/test_swhid.py
@@ -1,196 +1,196 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from itertools import islice
import os
import shutil
import tempfile
import unittest
from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap, bytes_to_str, str_to_bytes
-from swh.model.identifiers import SWHID_TYPES
+from swh.model.swhids import SWHID_TYPES
class TestSwhidSerialization(unittest.TestCase):
pairs = [
(
"swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2",
bytes.fromhex("01" + "00" + "94a9ed024d3859793618152ea559a168bbcbb5e2"),
),
(
"swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505",
bytes.fromhex("01" + "01" + "d198bc9d7a6bcf6db04f476d29314f157507d505"),
),
(
"swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f",
bytes.fromhex("01" + "02" + "b63a575fe3faab7692c9f38fb09d4bb45651bb0f"),
),
(
"swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f",
bytes.fromhex("01" + "03" + "22ece559cc7cc2364edc5e5593d63ae8bd229f9f"),
),
(
"swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d",
bytes.fromhex("01" + "04" + "309cf2674ee7a0749978cf8265ab91a60aea0f7d"),
),
(
"swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453",
bytes.fromhex("01" + "05" + "c7c108084bc0bf3d81436bf980b46e98bd338453"),
),
]
def test_str_to_bytes(self):
for (swhid_str, swhid_bytes) in self.pairs:
self.assertEqual(str_to_bytes(swhid_str), swhid_bytes)
def test_bytes_to_str(self):
for (swhid_str, swhid_bytes) in self.pairs:
self.assertEqual(bytes_to_str(swhid_bytes), swhid_str)
def test_round_trip(self):
for (swhid_str, swhid_bytes) in self.pairs:
self.assertEqual(swhid_str, bytes_to_str(str_to_bytes(swhid_str)))
self.assertEqual(swhid_bytes, str_to_bytes(bytes_to_str(swhid_bytes)))
def gen_records(types=["cnt", "dir", "ori", "rel", "rev", "snp"], length=10000):
"""generate sequential SWHID/int records, suitable for filling int<->swhid maps for
testing swh-graph on-disk binary databases
Args:
types (list): list of SWHID types to be generated, specified as the
corresponding 3-letter component in SWHIDs
length (int): number of SWHIDs to generate *per type*
Yields:
pairs (swhid, int) where swhid is a textual SWHID and int its sequential
integer identifier
"""
pos = 0
for t in sorted(types):
for i in range(0, length):
seq = format(pos, "x") # current position as hex string
swhid = "swh:1:{}:{}{}".format(t, "0" * (40 - len(seq)), seq)
yield (swhid, pos)
pos += 1
# pairs SWHID/position in the sequence generated by :func:`gen_records` above
MAP_PAIRS = [
("swh:1:cnt:0000000000000000000000000000000000000000", 0),
("swh:1:cnt:000000000000000000000000000000000000002a", 42),
("swh:1:dir:0000000000000000000000000000000000002afc", 11004),
("swh:1:ori:00000000000000000000000000000000000056ce", 22222),
("swh:1:rel:0000000000000000000000000000000000008235", 33333),
("swh:1:rev:000000000000000000000000000000000000ad9c", 44444),
("swh:1:snp:000000000000000000000000000000000000ea5f", 59999),
]
class TestSwhidToNodeMap(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""create reasonably sized (~2 MB) SWHID->int map to test on-disk DB"""
cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.")
cls.fname = os.path.join(cls.tmpdir, "swhid2int.bin")
with open(cls.fname, "wb") as f:
for (swhid, i) in gen_records(length=10000):
SwhidToNodeMap.write_record(f, swhid, i)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
def setUp(self):
self.map = SwhidToNodeMap(self.fname)
def tearDown(self):
self.map.close()
def test_lookup(self):
for (swhid, pos) in MAP_PAIRS:
self.assertEqual(self.map[swhid], pos)
def test_missing(self):
with self.assertRaises(KeyError):
self.map["swh:1:ori:0101010100000000000000000000000000000000"],
with self.assertRaises(KeyError):
self.map["swh:1:cnt:0101010100000000000000000000000000000000"],
def test_type_error(self):
with self.assertRaises(TypeError):
self.map[42]
with self.assertRaises(TypeError):
self.map[1.2]
def test_update(self):
fname2 = self.fname + ".update"
shutil.copy(self.fname, fname2) # fresh map copy
map2 = SwhidToNodeMap(fname2, mode="rb+")
for (swhid, int) in islice(map2, 11): # update the first N items
new_int = int + 42
map2[swhid] = new_int
self.assertEqual(map2[swhid], new_int) # check updated value
os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this
def test_iter_type(self):
for t in SWHID_TYPES + ["ori"]:
first_20 = list(islice(self.map.iter_type(t), 20))
k = first_20[0][1]
expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)]
assert first_20 == expected
def test_iter_prefix(self):
for t in SWHID_TYPES + ["ori"]:
prefix = self.map.iter_prefix("swh:1:{}:00".format(t))
first_20 = list(islice(prefix, 20))
k = first_20[0][1]
expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)]
assert first_20 == expected
class TestNodeToSwhidMap(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""create reasonably sized (~1 MB) int->SWHID map to test on-disk DB"""
cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.")
cls.fname = os.path.join(cls.tmpdir, "int2swhid.bin")
with open(cls.fname, "wb") as f:
for (swhid, _i) in gen_records(length=10000):
NodeToSwhidMap.write_record(f, swhid)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
def setUp(self):
self.map = NodeToSwhidMap(self.fname)
def tearDown(self):
self.map.close()
def test_lookup(self):
for (swhid, pos) in MAP_PAIRS:
self.assertEqual(self.map[pos], swhid)
def test_out_of_bounds(self):
with self.assertRaises(IndexError):
self.map[1000000]
with self.assertRaises(IndexError):
self.map[-1000000]
def test_update(self):
fname2 = self.fname + ".update"
shutil.copy(self.fname, fname2) # fresh map copy
map2 = NodeToSwhidMap(fname2, mode="rb+")
for (int, swhid) in islice(map2, 11): # update the first N items
new_swhid = swhid.replace(":0", ":f") # mangle first hex digit
map2[int] = new_swhid
self.assertEqual(map2[int], new_swhid) # check updated value
os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this
diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py
index 87c5341..24bb4b5 100644
--- a/swh/graph/webgraph.py
+++ b/swh/graph/webgraph.py
@@ -1,229 +1,280 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""WebGraph driver
"""
from datetime import datetime
from enum import Enum
import logging
import os
from pathlib import Path
import subprocess
from typing import Dict, List, Set
from swh.graph.config import check_config_compress
class CompressionStep(Enum):
MPH = 1
BV = 2
- BV_OBL = 3
- BFS = 4
- PERMUTE = 5
- PERMUTE_OBL = 6
- STATS = 7
- TRANSPOSE = 8
- TRANSPOSE_OBL = 9
- MAPS = 10
- CLEAN_TMP = 11
+ BFS = 3
+ PERMUTE_BFS = 4
+ TRANSPOSE_BFS = 5
+ SIMPLIFY = 6
+ LLP = 7
+ PERMUTE_LLP = 8
+ OBL = 9
+ COMPOSE_ORDERS = 10
+ STATS = 11
+ TRANSPOSE = 12
+ TRANSPOSE_OBL = 13
+ MAPS = 14
+ CLEAN_TMP = 15
def __str__(self):
return self.name
# full compression pipeline
COMP_SEQ = list(CompressionStep)
# Mapping from compression steps to shell commands implementing them. Commands
# will be executed by the shell, so be careful with meta characters. They are
# specified here as lists of tokens that will be joined together only for ease
# of line splitting. In commands, {tokens} will be interpolated with
# configuration values, see :func:`compress`.
STEP_ARGV: Dict[CompressionStep, List[str]] = {
CompressionStep.MPH: [
"{java}",
"it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"{out_dir}/{graph_name}.mph",
"<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )",
],
# use process substitution (and hence FIFO) above as MPH class load the
# entire file in memory when reading from stdin
CompressionStep.BV: [
"zstdcat",
"{in_dir}/{graph_name}.edges.csv.zst",
"|",
"cut -d' ' -f1,2",
"|",
"{java}",
"it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph",
"--byte-array",
"--temp-dir",
"{tmp_dir}",
"--function",
"{out_dir}/{graph_name}.mph",
- "{out_dir}/{graph_name}-bv",
- ],
- CompressionStep.BV_OBL: [
- "{java}",
- "it.unimi.dsi.big.webgraph.BVGraph",
- "--list",
- "{out_dir}/{graph_name}-bv",
+ "{out_dir}/{graph_name}-base",
],
CompressionStep.BFS: [
"{java}",
"it.unimi.dsi.law.big.graph.BFS",
- "{out_dir}/{graph_name}-bv",
- "{out_dir}/{graph_name}.order",
+ "{out_dir}/{graph_name}-base",
+ "{out_dir}/{graph_name}-bfs.order",
+ ],
+ CompressionStep.PERMUTE_BFS: [
+ "{java}",
+ "it.unimi.dsi.big.webgraph.Transform",
+ "mapOffline",
+ "{out_dir}/{graph_name}-base",
+ "{out_dir}/{graph_name}-bfs",
+ "{out_dir}/{graph_name}-bfs.order",
+ "{batch_size}",
+ "{tmp_dir}",
+ ],
+ CompressionStep.TRANSPOSE_BFS: [
+ "{java}",
+ "it.unimi.dsi.big.webgraph.Transform",
+ "transposeOffline",
+ "{out_dir}/{graph_name}-bfs",
+ "{out_dir}/{graph_name}-bfs-transposed",
+ "{batch_size}",
+ "{tmp_dir}",
],
- CompressionStep.PERMUTE: [
+ CompressionStep.SIMPLIFY: [
+ "{java}",
+ "it.unimi.dsi.big.webgraph.Transform",
+ "simplify",
+ "{out_dir}/{graph_name}-bfs",
+ "{out_dir}/{graph_name}-bfs-transposed",
+ "{out_dir}/{graph_name}-bfs-simplified",
+ ],
+ CompressionStep.LLP: [
+ "{java}",
+ "it.unimi.dsi.law.big.graph.LayeredLabelPropagation",
+ "-g",
+ "{llp_gammas}",
+ "{out_dir}/{graph_name}-bfs-simplified",
+ "{out_dir}/{graph_name}-llp.order",
+ ],
+ CompressionStep.PERMUTE_LLP: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"mapOffline",
- "{out_dir}/{graph_name}-bv",
+ "{out_dir}/{graph_name}-bfs",
"{out_dir}/{graph_name}",
- "{out_dir}/{graph_name}.order",
+ "{out_dir}/{graph_name}-llp.order",
"{batch_size}",
"{tmp_dir}",
],
- CompressionStep.PERMUTE_OBL: [
+ CompressionStep.OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}",
],
+ CompressionStep.COMPOSE_ORDERS: [
+ "{java}",
+ "org.softwareheritage.graph.utils.ComposePermutations",
+ "{out_dir}/{graph_name}-bfs.order",
+ "{out_dir}/{graph_name}-llp.order",
+ "{out_dir}/{graph_name}.order",
+ ],
CompressionStep.STATS: [
"{java}",
"it.unimi.dsi.big.webgraph.Stats",
"{out_dir}/{graph_name}",
],
CompressionStep.TRANSPOSE: [
"{java}",
"it.unimi.dsi.big.webgraph.Transform",
"transposeOffline",
"{out_dir}/{graph_name}",
"{out_dir}/{graph_name}-transposed",
"{batch_size}",
"{tmp_dir}",
],
CompressionStep.TRANSPOSE_OBL: [
"{java}",
"it.unimi.dsi.big.webgraph.BVGraph",
"--list",
"{out_dir}/{graph_name}-transposed",
],
CompressionStep.MAPS: [
"zstdcat",
"{in_dir}/{graph_name}.nodes.csv.zst",
"|",
"{java}",
"org.softwareheritage.graph.maps.NodeMapBuilder",
"{out_dir}/{graph_name}",
"{tmp_dir}",
],
CompressionStep.CLEAN_TMP: [
"rm",
"-rf",
- "{out_dir}/{graph_name}-bv.graph",
- "{out_dir}/{graph_name}-bv.obl",
- "{out_dir}/{graph_name}-bv.offsets",
+ "{out_dir}/{graph_name}-base.graph",
+ "{out_dir}/{graph_name}-base.offsets",
+ "{out_dir}/{graph_name}-base.properties",
+ "{out_dir}/{graph_name}-bfs-simplified.graph",
+ "{out_dir}/{graph_name}-bfs-simplified.offsets",
+ "{out_dir}/{graph_name}-bfs-simplified.properties",
+ "{out_dir}/{graph_name}-bfs-transposed.graph",
+ "{out_dir}/{graph_name}-bfs-transposed.offsets",
+ "{out_dir}/{graph_name}-bfs-transposed.properties",
+ "{out_dir}/{graph_name}-bfs.graph",
+ "{out_dir}/{graph_name}-bfs.offsets",
+ "{out_dir}/{graph_name}-bfs.order",
+ "{out_dir}/{graph_name}-bfs.properties",
+ "{out_dir}/{graph_name}-llp.order",
"{tmp_dir}",
],
}
def do_step(step, conf):
cmd = " ".join(STEP_ARGV[step]).format(**conf)
cmd_env = os.environ.copy()
cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"]
cmd_env["CLASSPATH"] = conf["classpath"]
logging.info(f"running: {cmd}")
process = subprocess.Popen(
["/bin/bash", "-c", cmd],
env=cmd_env,
encoding="utf8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
with process.stdout as stdout:
for line in stdout:
logging.info(line.rstrip())
rc = process.wait()
if rc != 0:
raise RuntimeError(
f"compression step {step} returned non-zero " f"exit code {rc}"
)
else:
return rc
def compress(
graph_name: str,
in_dir: Path,
out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {},
):
"""graph compression pipeline driver from nodes/edges files to compressed
on-disk representation
Args:
graph_name: graph base name, relative to in_dir
in_dir: input directory, where the uncompressed graph can be found
out_dir: output directory, where the compressed graph will be stored
steps: compression steps to run (default: all steps)
conf: compression configuration, supporting the following keys (all are
optional, so an empty configuration is fine and is the default)
- batch_size: batch size for `WebGraph transformations
`_;
defaults to 1 billion
- classpath: java classpath, defaults to swh-graph JAR only
- java: command to run java VM, defaults to "java"
- java_tool_options: value for JAVA_TOOL_OPTIONS environment
variable; defaults to various settings for high memory machines
- logback: path to a logback.xml configuration file; if not provided
a temporary one will be created and used
- max_ram: maximum RAM to use for compression; defaults to available
virtual memory
- tmp_dir: temporary directory, defaults to the "tmp" subdir of
out_dir
"""
if not steps:
steps = set(COMP_SEQ)
conf = check_config_compress(conf, graph_name, in_dir, out_dir)
compression_start_time = datetime.now()
logging.info(f"starting compression at {compression_start_time}")
seq_no = 0
for step in COMP_SEQ:
if step not in steps:
logging.debug(f"skipping compression step {step}")
continue
seq_no += 1
step_start_time = datetime.now()
logging.info(
f"starting compression step {step} "
f"({seq_no}/{len(steps)}) at {step_start_time}"
)
do_step(step, conf)
step_end_time = datetime.now()
step_duration = step_end_time - step_start_time
logging.info(
f"completed compression step {step} "
f"({seq_no}/{len(steps)}) "
f"at {step_end_time} in {step_duration}"
)
compression_end_time = datetime.now()
compression_duration = compression_end_time - compression_start_time
logging.info(f"completed compression in {compression_duration}")
diff --git a/tox.ini b/tox.ini
index b5f2819..959bbb8 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,76 +1,76 @@
[tox]
envlist=black,flake8,mypy,py3
[testenv]
extras =
testing
deps =
pytest-cov
whitelist_externals =
mvn
sh
commands =
sh -c 'if ! [ -d {envdir}/share/swh-graph ]; then mvn -f java/pom.xml compile assembly:single; mkdir {envdir}/share/swh-graph; cp java/target/*.jar {envdir}/share/swh-graph; fi'
pytest --cov={envsitepackagesdir}/swh/graph \
{envsitepackagesdir}/swh/graph \
--cov-branch {posargs}
[testenv:black]
skip_install = true
deps =
black==19.10b0
commands =
{envpython} -m black --check swh
[testenv:flake8]
skip_install = true
deps =
flake8
commands =
{envpython} -m flake8
[testenv:mypy]
extras =
testing
deps =
- mypy
+ mypy==0.920
commands =
mypy swh
# build documentation outside swh-environment using the current
# git HEAD of swh-docs, is executed on CI for each diff to prevent
# breaking doc build
[testenv:sphinx]
whitelist_externals = make
usedevelop = true
extras =
testing
deps =
# fetch and install swh-docs in develop mode
-e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs
setenv =
SWH_PACKAGE_DOC_TOX_BUILD = 1
# turn warnings into errors
SPHINXOPTS = -W
commands =
make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs
# build documentation only inside swh-environment using local state
# of swh-docs package
[testenv:sphinx-dev]
whitelist_externals = make
usedevelop = true
extras =
testing
deps =
# install swh-docs in develop mode
-e ../swh-docs
setenv =
SWH_PACKAGE_DOC_TOX_BUILD = 1
# turn warnings into errors
SPHINXOPTS = -W
commands =
make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs