diff --git a/PKG-INFO b/PKG-INFO index 18ff922..5268263 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,56 +1,56 @@ Metadata-Version: 2.1 Name: swh.graph -Version: 0.5.0 +Version: 0.5.1 Summary: Software Heritage graph service Home-page: https://forge.softwareheritage.org/diffusion/DGRPH Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-graph Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-graph/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - graph service ================================= Tooling and services, collectively known as ``swh-graph``, providing fast access to the graph representation of the `Software Heritage `_ `archive `_. The service is in-memory, based on a compressed representation of the Software Heritage Merkle DAG. Bibliography ------------ In addition to accompanying technical documentation, ``swh-graph`` is also described in the following scientific paper. If you publish results based on ``swh-graph``, please acknowledge it by citing the paper as follows: .. note:: Paolo Boldi, Antoine Pietri, Sebastiano Vigna, Stefano Zacchiroli. `Ultra-Large-Scale Repository Analysis via Graph Compression `_. In proceedings of `SANER 2020 `_: The 27th IEEE International Conference on Software Analysis, Evolution and Reengineering, pages 184-194. IEEE 2020. Links: `preprint `_, `bibtex `_. diff --git a/docs/api.rst b/docs/api.rst index b6d8e63..0b7f1a2 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,385 +1,407 @@ .. _swh-graph-api: Graph RPC API ============= Terminology ----------- This API uses the following notions: - **Node**: a node in the :ref:`Software Heritage graph `, represented by a :ref:`SWHID `. - **Node type**: the 3-letter specifier from the node SWHID (``cnt``, ``dir``, ``rel``, ``rev``, ``snp``, ``ori``), or ``*`` for all node types. - **Edge type**: a pair ``src:dst`` where ``src`` and ``dst`` are either node types, or ``*`` to denote all node types. - **Edge restrictions**: a textual specification of which edges can be followed during graph traversal. Either ``*`` to denote that all edges can be followed or a comma separated list of edge types to allow following only those edges. Note that when traversing the *backward* (i.e., transposed) graph, edge types are reversed too. So, for instance, ``ori:snp`` makes sense when traversing the forward graph, but useless (due to lack of matching edges in the graph) when traversing the backward graph; conversely ``snp:ori`` is useful when traversing the backward graph, but not in the forward one. For the same reason ``dir:dir`` allows following edges from parent directories to sub-directories when traversing the forward graph, but the same restriction allows following edges from sub-directories to parent directories. +- **Node restrictions**: a textual specification of which type of nodes can be + returned after a request. Either ``*`` to denote that all types of nodes can + be returned or a comma separated list of node types to allow returning only + those node types. + Examples ~~~~~~~~ - ``swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2`` the SWHID of a node of type content containing the full text of the GPL3 license. - ``swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35`` the SWHID of a node of type revision corresponding to the commit in Linux that merged the 'x86/urgent' branch on 31 December 2017. - ``"dir:dir,dir:cnt"`` node types allowing edges from directories to directories nodes, or directories to contents nodes. - ``"rev:rev,dir:*"`` node types allowing edges from revisions to revisions nodes, or from directories nodes. - ``"*:rel"`` node types allowing all edges to releases. +- ``"cnt,snp"`` accepted node types returned in the query results. Leaves ------ .. http:get:: /graph/leaves/:src Performs a graph traversal and returns the leaves of the subgraph rooted at the specified source node. :param string src: source node specified as a SWHID :query string edges: edges types the traversal can follow; default to ``"*"`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` + :query integer max_edges: how many edges can be traversed during the visit; + default to 0 (not restricted) + :query string return_types: only return the nodes matching this type; + default to ``"*"`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found **Example:** .. sourcecode:: http GET /graph/leaves/swh:1:dir:432d1b21c1256f7408a07c577b6974bbdbcc1323 HTTP/1.1 Content-Type: text/plain Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK swh:1:cnt:540faad6b1e02e2db4f349a4845192db521ff2bd swh:1:cnt:630585fc6d34e5e121139e2aee0a64e83dc9aae6 swh:1:cnt:f8634ced669f0a9155c8cab1b2621d57d778215e swh:1:cnt:ba6daa801ad3ea587904b1abe9161dceedb2e0bd ... Neighbors --------- .. http:get:: /graph/neighbors/:src Returns node direct neighbors (linked with exactly one edge) in the graph. :param string src: source node specified as a SWHID :query string edges: edges types allowed to be listed as neighbors; default to ``"*"`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` + :query integer max_edges: how many edges can be traversed during the visit; + default to 0 (not restricted) + :query string return_types: only return the nodes matching this type; + default to ``"*"`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found **Example:** .. sourcecode:: http GET /graph/neighbors/swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35 HTTP/1.1 Content-Type: text/plain Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK swh:1:rev:a31e58e129f73ab5b04016330b13ed51fde7a961 swh:1:dir:b5d2aa0746b70300ebbca82a8132af386cc5986d swh:1:rev:52c90f2d32bfa7d6eccd66a56c44ace1f78fbadd ... Walk ---- .. .. http:get:: /graph/walk/:src/:dst Performs a graph traversal and returns the first found path from source to destination (final destination node included). :param string src: starting node specified as a SWHID :param string dst: destination node, either as a node SWHID or a node type. The traversal will stop at the first node encountered matching the desired destination. :query string edges: edges types the traversal can follow; default to ``"*"`` :query string traversal: traversal algorithm; can be either ``dfs`` or ``bfs``, default to ``dfs`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` + :query string return_types: types of nodes we want to be displayed; default to ``"*"`` + :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found **Example:** .. sourcecode:: http HTTP/1.1 200 OK swh:1:rev:f39d7d78b70e0f39facb1e4fab77ad3df5c52a35 swh:1:rev:52c90f2d32bfa7d6eccd66a56c44ace1f78fbadd swh:1:rev:cea92e843e40452c08ba313abc39f59efbb4c29c swh:1:rev:8d517bdfb57154b8a11d7f1682ecc0f79abf8e02 ... .. http:get:: /graph/randomwalk/:src/:dst Performs a graph *random* traversal, i.e., picking one random successor node at each hop, from source to destination (final destination node included). :param string src: starting node specified as a SWHID :param string dst: destination node, either as a node SWHID or a node type. The traversal will stop at the first node encountered matching the desired destination. :query string edges: edges types the traversal can follow; default to ``"*"`` :query string direction: direction in which graph edges will be followed; can be either ``forward`` or ``backward``, default to ``forward`` :query int limit: limit the number of nodes returned. You can use positive numbers to get the first N results, or negative numbers to get the last N results starting from the tail; default to ``0``, meaning no limit. + :query integer max_edges: how many edges can be traversed during the visit; + default to 0 (not restricted) + :query string return_types: only return the nodes matching this type; + default to ``"*"`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found **Example:** .. sourcecode:: http GET /graph/randomwalk/swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2/ori?direction=backward HTTP/1.1 Content-Type: text/plain Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2 swh:1:dir:8de8a8823a0780524529c94464ee6ef60b98e2ed swh:1:dir:7146ea6cbd5ffbfec58cc8df5e0552da45e69cb7 swh:1:rev:b12563e00026b48b817fd3532fc3df2db2a0f460 swh:1:rev:13e8ebe80fb878bade776131e738d5772aa0ad1b swh:1:rev:cb39b849f167c70c1f86d4356f02d1285d49ee13 ... swh:1:rev:ff70949f336593d6c59b18e4989edf24d7f0f254 swh:1:snp:a511810642b7795e725033febdd82075064ed863 swh:1:ori:98aa0e71f5c789b12673717a97f6e9fa20aa1161 **Limit example:** .. sourcecode:: http GET /graph/randomwalk/swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2/ori?direction=backward&limit=-2 HTTP/1.1 Content-Type: text/plain Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK swh:1:ori:98aa0e71f5c789b12673717a97f6e9fa20aa1161 swh:1:snp:a511810642b7795e725033febdd82075064ed863 Visit ----- .. http:get:: /graph/visit/nodes/:src .. http:get:: /graph/visit/edges/:src .. http:get:: /graph/visit/paths/:src Performs a graph traversal and returns explored nodes, edges or paths (in the order of the traversal). :param string src: starting node specified as a SWHID :query string edges: edges types the traversal can follow; default to ``"*"`` - :query string direction: direction in which graph edges will be followed; - can be either ``forward`` or ``backward``, default to ``forward`` + :query integer max_edges: how many edges can be traversed during the visit; + default to 0 (not restricted) + :query string return_types: only return the nodes matching this type; + default to ``"*"`` :statuscode 200: success :statuscode 400: invalid query string provided :statuscode 404: starting node cannot be found **Example:** .. sourcecode:: http GET /graph/visit/nodes/swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc HTTP/1.1 Content-Type: text/plain Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:cfab784723a6c2d33468c9ed8a566fd5e2abd8c9 swh:1:rev:53e5df0e7a6b7bd4919074c081a173655c0da164 swh:1:rev:f85647f14b8243532283eff3e08f4ee96c35945f swh:1:rev:fe5f9ef854715fc59b9ec22f9878f11498cfcdbf swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb swh:1:cnt:c8cece50beae7a954f4ea27e3ae7bf941dc6d0c0 swh:1:dir:a358d0cf89821227d4c00b0ced5e0a8b3756b5db swh:1:cnt:cc407b7e24dd300d2e1a77d8f04af89b3f962a51 swh:1:cnt:701bd0a63e11b3390a547ce8515d28c6bab8a201 ... **Example:** .. sourcecode:: http GET /graph/visit/edges/swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc HTTP/1.1 Content-Type: text/plain Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:61f92a7db95f5a6d1fcb94d2b897ed3797584d7b swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:00e81c89c29ff3e58745fdaf7abb68daa1389e85 swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:7596fdc31c9aa00aed281ccb026a74cabf2383bb swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:ec7a2341ac3d9d8b571bbdfb90a089d4e54dea56 swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:1c5b5eac61eda2454034a43eb124ab490885ef3a swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:4dfa88ca55e04e8afe05e8543ddddee32dde7236 swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:d56ae79e43ff1b37534370911c8a78ec7f38d437 swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:19ba5d6203a040a39ecc4a77b165d3f097c1e662 swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:9c56102eefea23c95405533e1de23da4b873ecc4 swh:1:snp:40f9f177b8ab0b7b3d70ee14bbc8b214e2b2dcfc swh:1:rev:3f54e816b46c2e179cd164e17fea93b3013a9db4 ... **Example:** .. sourcecode:: http GET /graph/visit/paths/swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb HTTP/1.1 Content-Type: application/x-ndjson Transfer-Encoding: chunked .. sourcecode:: http HTTP/1.1 200 OK ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb", "swh:1:cnt:acfb7cabd63b368a03a9df87670ece1488c8bce0"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb", "swh:1:cnt:2a0837708151d76edf28fdbb90dc3eabc676cff3"] ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb", "swh:1:cnt:eaf025ad54b94b2fdda26af75594cfae3491ec75"] ... ["swh:1:dir:644dd466d8ad527ea3a609bfd588a3244e6dafcb", "swh:1:dir:2ebd4b96fa5665ff74f2b27ae41aecdc43af4463", "swh:1:cnt:1d3b6575fb7bf2a147d228e78ffd77ea193c3639"] ... Counting results ---------------- The following method variants, with trailing `/count` added, behave like their already discussed counterparts but, instead of returning results, return the *amount* of results that would have been returned: .. http:get:: /graph/leaves/count/:src Return the amount of :http:get:`/graph/leaves/:src` results .. http:get:: /graph/neighbors/count/:src Return the amount of :http:get:`/graph/neighbors/:src` results .. http:get:: /graph/visit/nodes/count/:src Return the amount of :http:get:`/graph/visit/nodes/:src` results Stats ----- .. http:get:: /graph/stats Returns statistics on the compressed graph. :statuscode 200: success **Example** .. sourcecode:: http GET /graph/stats HTTP/1.1 Content-Type: application/json .. sourcecode:: http HTTP/1.1 200 OK { "counts": { "nodes": 16222788, "edges": 9907464 }, "ratios": { "compression": 0.367, "bits_per_node": 5.846, "bits_per_edge": 9.573, "avg_locality": 270.369 }, "indegree": { "min": 0, "max": 12382, "avg": 0.6107127825377487 }, "outdegree": { "min": 0, "max": 1, "avg": 0.6107127825377487 } } diff --git a/java/.mvn/jvm.config b/java/.mvn/jvm.config new file mode 100644 index 0000000..61a567f --- /dev/null +++ b/java/.mvn/jvm.config @@ -0,0 +1,5 @@ +--add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED diff --git a/java/pom.xml b/java/pom.xml index 3256e70..26f0ade 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -1,274 +1,274 @@ 4.0.0 org.softwareheritage.graph swh-graph ${git.closest.tag.name} swh-graph https://forge.softwareheritage.org/source/swh-graph/ UTF-8 11 ch.qos.logback logback-classic 1.2.3 org.junit.jupiter junit-jupiter-api 5.7.0 test org.junit.jupiter junit-jupiter-engine 5.7.0 test org.hamcrest hamcrest - 2.1 + 2.2 test io.javalin javalin 3.0.0 org.slf4j slf4j-simple 1.7.26 com.fasterxml.jackson.core jackson-databind - 2.9.8 + 2.13.0 it.unimi.dsi webgraph-big 3.6.6 it.unimi.dsi fastutil - 8.5.4 + 8.5.6 it.unimi.dsi dsiutils 2.6.17 it.unimi.dsi sux4j 5.2.3 it.unimi.dsi law - 2.7.1 + 2.7.2 org.apache.hadoop hadoop-common org.umlgraph umlgraph org.eclipse.jetty.aggregate jetty-all it.unimi.di mg4j it.unimi.di mg4j-big com.martiansoftware jsap 2.1 net.sf.py4j py4j - 0.10.8.1 + 0.10.9.3 commons-codec commons-codec - 1.11 + 1.15 maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 11 11 -verbose -Xlint:all maven-surefire-plugin 2.22.2 maven-failsafe-plugin 2.22.2 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0 maven-assembly-plugin 3.3.0 org.softwareheritage.graph.server.App jar-with-dependencies false make-assembly package single com.diffplug.spotless spotless-maven-plugin 2.4.1 *.md .gitignore true 4 4.16.0 .coding-style.xml pl.project13.maven git-commit-id-plugin 3.0.1 get-the-git-infos revision initialize true true true true v* git.closest.tag.name ^v true org.apache.maven.plugins maven-javadoc-plugin 3.1.1 diff --git a/java/src/main/java/org/softwareheritage/graph/BidirectionalImmutableGraph.java b/java/src/main/java/org/softwareheritage/graph/BidirectionalImmutableGraph.java new file mode 100644 index 0000000..be19956 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/BidirectionalImmutableGraph.java @@ -0,0 +1,128 @@ +package org.softwareheritage.graph; + +import it.unimi.dsi.big.webgraph.ImmutableGraph; +import it.unimi.dsi.big.webgraph.LazyLongIterator; +import it.unimi.dsi.big.webgraph.Transform; +import it.unimi.dsi.fastutil.longs.LongIterator; + +/** + * A directed immutable graph which can be iterated in both directions (forward and backward). It + * exposes the backward equivalents of the ImmutableGraph primitives (indegree() and + * predecessors()). This is implemented by passing two graphs, one in the forward and one in the + * backward direction. + */ +public class BidirectionalImmutableGraph extends ImmutableGraph { + private final ImmutableGraph forwardGraph; + private final ImmutableGraph backwardGraph; + + /** + * Creates a bidirectional immutable graph + * + * @param forwardGraph The graph in the forward direction + * @param backwardGraph The graph in the backward direction + */ + protected BidirectionalImmutableGraph(ImmutableGraph forwardGraph, ImmutableGraph backwardGraph) { + this.forwardGraph = forwardGraph; + this.backwardGraph = backwardGraph; + } + + @Override + public long numNodes() { + assert forwardGraph.numNodes() == backwardGraph.numNodes(); + return this.forwardGraph.numNodes(); + } + + @Override + public long numArcs() { + assert forwardGraph.numArcs() == backwardGraph.numArcs(); + return this.forwardGraph.numArcs(); + } + + @Override + public boolean randomAccess() { + return this.forwardGraph.randomAccess() && this.backwardGraph.randomAccess(); + } + + @Override + public boolean hasCopiableIterators() { + return forwardGraph.hasCopiableIterators() && backwardGraph.hasCopiableIterators(); + } + + @Override + public BidirectionalImmutableGraph copy() { + return new BidirectionalImmutableGraph(this.forwardGraph.copy(), this.backwardGraph.copy()); + } + + /** + * Returns the transposed version of the bidirectional graph. Successors become predecessors, and + * vice-versa. + */ + public BidirectionalImmutableGraph transpose() { + return new BidirectionalImmutableGraph(backwardGraph, forwardGraph); + } + + /** + * Returns the symmetric version of the bidirectional graph. It returns the (lazy) union of the + * forward graph and the backward graph. This is equivalent to removing the directionality of the + * edges: the successors of a node are also its predecessors. + * + * @return a symmetric, undirected BidirectionalImmutableGraph. + */ + public BidirectionalImmutableGraph symmetrize() { + ImmutableGraph symmetric = Transform.union(forwardGraph, backwardGraph); + return new BidirectionalImmutableGraph(symmetric, symmetric); + } + + /** + * Returns the simplified version of the bidirectional graph. Works like symmetrize(), but also + * removes the loop edges. + * + * @return a simplified (loopless and symmetric) BidirectionalImmutableGraph + */ + public BidirectionalImmutableGraph simplify() { + ImmutableGraph simplified = Transform.simplify(forwardGraph, backwardGraph); + return new BidirectionalImmutableGraph(simplified, simplified); + } + + /** Returns the outdegree of a node */ + @Override + public long outdegree(long l) { + return forwardGraph.outdegree(l); + } + + /** Returns the indegree of a node */ + public long indegree(long l) { + return backwardGraph.outdegree(l); + } + + /** Returns a lazy iterator over the successors of a given node. */ + @Override + public LazyLongIterator successors(long nodeId) { + return forwardGraph.successors(nodeId); + } + + /** Returns a lazy iterator over the predecessors of a given node. */ + public LazyLongIterator predecessors(long nodeId) { + return backwardGraph.successors(nodeId); + } + + /** Returns a reference to an array containing the predecessors of a given node. */ + public long[][] predecessorBigArray(long x) { + return backwardGraph.successorBigArray(x); + } + + /** Returns an iterator enumerating the indegrees of the nodes of this graph. */ + public LongIterator indegrees() { + return backwardGraph.outdegrees(); + } + + /** Returns the underlying ImmutableGraph in the forward direction. */ + public ImmutableGraph getForwardGraph() { + return forwardGraph; + } + + /** Returns the underlying ImmutableGraph in the backward direction. */ + public ImmutableGraph getBackwardGraph() { + return backwardGraph; + } +} diff --git a/java/src/main/java/org/softwareheritage/graph/Entry.java b/java/src/main/java/org/softwareheritage/graph/Entry.java index 0db1bb6..a2d3f5a 100644 --- a/java/src/main/java/org/softwareheritage/graph/Entry.java +++ b/java/src/main/java/org/softwareheritage/graph/Entry.java @@ -1,196 +1,193 @@ package org.softwareheritage.graph; -import java.io.DataOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.util.ArrayList; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; public class Entry { - private final long PATH_SEPARATOR_ID = -1; private Graph graph; public void load_graph(String graphBasename) throws IOException { System.err.println("Loading graph " + graphBasename + " ..."); this.graph = Graph.loadMapped(graphBasename); System.err.println("Graph loaded."); } public Graph get_graph() { return graph.copy(); } public String stats() { try { Stats stats = new Stats(graph.getPath()); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return objectMapper.writeValueAsString(stats); } catch (IOException e) { throw new RuntimeException("Cannot read stats: " + e); } } + public void check_swhid(String src) { + graph.getNodeId(new SWHID(src)); + } + private int count_visitor(NodeCountVisitor f, long srcNodeId) { int[] count = {0}; f.accept(srcNodeId, (node) -> { count[0]++; }); return count[0]; } - public int count_leaves(String direction, String edgesFmt, long srcNodeId) { - Traversal t = new Traversal(this.graph.copy(), direction, edgesFmt); + public int count_leaves(String direction, String edgesFmt, String src, long maxEdges) { + long srcNodeId = graph.getNodeId(new SWHID(src)); + Traversal t = new Traversal(graph.copy(), direction, edgesFmt, maxEdges); return count_visitor(t::leavesVisitor, srcNodeId); } - public int count_neighbors(String direction, String edgesFmt, long srcNodeId) { - Traversal t = new Traversal(this.graph.copy(), direction, edgesFmt); + public int count_neighbors(String direction, String edgesFmt, String src, long maxEdges) { + long srcNodeId = graph.getNodeId(new SWHID(src)); + Traversal t = new Traversal(graph.copy(), direction, edgesFmt, maxEdges); return count_visitor(t::neighborsVisitor, srcNodeId); } - public int count_visit_nodes(String direction, String edgesFmt, long srcNodeId) { - Traversal t = new Traversal(this.graph.copy(), direction, edgesFmt); + public int count_visit_nodes(String direction, String edgesFmt, String src, long maxEdges) { + long srcNodeId = graph.getNodeId(new SWHID(src)); + Traversal t = new Traversal(graph.copy(), direction, edgesFmt, maxEdges); return count_visitor(t::visitNodesVisitor, srcNodeId); } public QueryHandler get_handler(String clientFIFO) { - return new QueryHandler(this.graph.copy(), clientFIFO); + return new QueryHandler(graph.copy(), clientFIFO); } private interface NodeCountVisitor { void accept(long nodeId, Traversal.NodeIdConsumer consumer); } public class QueryHandler { Graph graph; - DataOutputStream out; + BufferedWriter out; String clientFIFO; public QueryHandler(Graph graph, String clientFIFO) { this.graph = graph; this.clientFIFO = clientFIFO; this.out = null; } - public void writeNode(long nodeId) { + public void writeNode(SWHID swhid) { try { - out.writeLong(nodeId); + out.write(swhid.toString() + "\n"); } catch (IOException e) { throw new RuntimeException("Cannot write response to client: " + e); } } - public void writeEdge(long srcId, long dstId) { - writeNode(srcId); - writeNode(dstId); - } - - public void writePath(ArrayList path) { - for (Long nodeId : path) { - writeNode(nodeId); + public void writeEdge(SWHID src, SWHID dst) { + try { + out.write(src.toString() + " " + dst.toString() + "\n"); + } catch (IOException e) { + throw new RuntimeException("Cannot write response to client: " + e); } - writeNode(PATH_SEPARATOR_ID); } public void open() { try { FileOutputStream file = new FileOutputStream(this.clientFIFO); - this.out = new DataOutputStream(file); + this.out = new BufferedWriter(new OutputStreamWriter(file)); } catch (IOException e) { throw new RuntimeException("Cannot open client FIFO: " + e); } } public void close() { try { out.close(); } catch (IOException e) { throw new RuntimeException("Cannot write response to client: " + e); } } - public void leaves(String direction, String edgesFmt, long srcNodeId, long maxEdges, String returnTypes) { + public void leaves(String direction, String edgesFmt, String src, long maxEdges, String returnTypes) { + long srcNodeId = graph.getNodeId(new SWHID(src)); open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt, maxEdges, returnTypes); + Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges, returnTypes); for (Long nodeId : t.leaves(srcNodeId)) { - writeNode(nodeId); + writeNode(graph.getSWHID(nodeId)); } close(); } - public void neighbors(String direction, String edgesFmt, long srcNodeId, long maxEdges, String returnTypes) { + public void neighbors(String direction, String edgesFmt, String src, long maxEdges, String returnTypes) { + long srcNodeId = graph.getNodeId(new SWHID(src)); open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt, maxEdges, returnTypes); + Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges, returnTypes); for (Long nodeId : t.neighbors(srcNodeId)) { - writeNode(nodeId); + writeNode(graph.getSWHID(nodeId)); } close(); } - public void visit_nodes(String direction, String edgesFmt, long srcNodeId, long maxEdges, String returnTypes) { + public void visit_nodes(String direction, String edgesFmt, String src, long maxEdges, String returnTypes) { + long srcNodeId = graph.getNodeId(new SWHID(src)); open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt, maxEdges, returnTypes); + Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges, returnTypes); for (Long nodeId : t.visitNodes(srcNodeId)) { - writeNode(nodeId); + writeNode(graph.getSWHID(nodeId)); } close(); } - public void visit_edges(String direction, String edgesFmt, long srcNodeId, long maxEdges) { - open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt, maxEdges); - t.visitNodesVisitor(srcNodeId, null, this::writeEdge); - close(); - } - - public void visit_paths(String direction, String edgesFmt, long srcNodeId, long maxEdges) { + public void visit_edges(String direction, String edgesFmt, String src, long maxEdges, String returnTypes) { + long srcNodeId = graph.getNodeId(new SWHID(src)); open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt, maxEdges); - t.visitPathsVisitor(srcNodeId, this::writePath); + Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges); + t.visitNodesVisitor(srcNodeId, null, (srcId, dstId) -> { + writeEdge(graph.getSWHID(srcId), graph.getSWHID(dstId)); + }); close(); } - public void walk(String direction, String edgesFmt, String algorithm, long srcNodeId, long dstNodeId) { + public void walk(String direction, String edgesFmt, String algorithm, String src, String dst, long maxEdges, + String returnTypes) { + long srcNodeId = graph.getNodeId(new SWHID(src)); open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt); - for (Long nodeId : t.walk(srcNodeId, dstNodeId, algorithm)) { - writeNode(nodeId); + ArrayList res; + Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges, returnTypes); + if (dst.matches("ori|snp|rel|rev|dir|cnt")) { + Node.Type dstType = Node.Type.fromStr(dst); + res = t.walk(srcNodeId, dstType, algorithm); + } else { + long dstNodeId = graph.getNodeId(new SWHID(dst)); + res = t.walk(srcNodeId, dstNodeId, algorithm); } - close(); - } - - public void walk_type(String direction, String edgesFmt, String algorithm, long srcNodeId, String dst) { - open(); - Node.Type dstType = Node.Type.fromStr(dst); - Traversal t = new Traversal(this.graph, direction, edgesFmt); - for (Long nodeId : t.walk(srcNodeId, dstType, algorithm)) { - writeNode(nodeId); + for (Long nodeId : res) { + writeNode(graph.getSWHID(nodeId)); } close(); } - public void random_walk(String direction, String edgesFmt, int retries, long srcNodeId, long dstNodeId, + public void random_walk(String direction, String edgesFmt, int retries, String src, String dst, long maxEdges, String returnTypes) { + long srcNodeId = graph.getNodeId(new SWHID(src)); open(); - Traversal t = new Traversal(this.graph, direction, edgesFmt, 0, returnTypes); - for (Long nodeId : t.randomWalk(srcNodeId, dstNodeId, retries)) { - writeNode(nodeId); + ArrayList res; + Traversal t = new Traversal(graph, direction, edgesFmt, maxEdges, returnTypes); + if (dst.matches("ori|snp|rel|rev|dir|cnt")) { + Node.Type dstType = Node.Type.fromStr(dst); + res = t.randomWalk(srcNodeId, dstType, retries); + } else { + long dstNodeId = graph.getNodeId(new SWHID(dst)); + res = t.randomWalk(srcNodeId, dstNodeId, retries); } - close(); - } - - public void random_walk_type(String direction, String edgesFmt, int retries, long srcNodeId, String dst, - String returnTypes) { - open(); - Node.Type dstType = Node.Type.fromStr(dst); - Traversal t = new Traversal(this.graph, direction, edgesFmt, 0, returnTypes); - for (Long nodeId : t.randomWalk(srcNodeId, dstType, retries)) { - writeNode(nodeId); + for (Long nodeId : res) { + writeNode(graph.getSWHID(nodeId)); } close(); } } } diff --git a/java/src/main/java/org/softwareheritage/graph/Graph.java b/java/src/main/java/org/softwareheritage/graph/Graph.java index c5f5513..8d9acf1 100644 --- a/java/src/main/java/org/softwareheritage/graph/Graph.java +++ b/java/src/main/java/org/softwareheritage/graph/Graph.java @@ -1,310 +1,304 @@ package org.softwareheritage.graph; import it.unimi.dsi.big.webgraph.ImmutableGraph; import it.unimi.dsi.big.webgraph.LazyLongIterator; -import it.unimi.dsi.big.webgraph.Transform; import it.unimi.dsi.logging.ProgressLogger; import org.softwareheritage.graph.maps.NodeIdMap; import org.softwareheritage.graph.maps.NodeTypesMap; import java.io.IOException; /** * Main class storing the compressed graph and node id mappings. *

* The compressed graph is stored using the WebGraph * ecosystem. Additional mappings are necessary because Software Heritage uses string based persistent * identifiers (SWHID) while WebGraph uses integers internally. These two mappings (long id * ↔ SWHID) are used for the input (users refer to the graph using SWHID) and the output * (convert back to SWHID for users results). However, since graph traversal can be restricted * depending on the node type (see {@link AllowedEdges}), a long id → node type map is stored * as well to avoid a full SWHID lookup. * * @author The Software Heritage developers * @see org.softwareheritage.graph.AllowedEdges * @see org.softwareheritage.graph.maps.NodeIdMap * @see org.softwareheritage.graph.maps.NodeTypesMap */ public class Graph extends ImmutableGraph { - /** File extension for the SWHID to long node id map */ - public static final String SWHID_TO_NODE = ".swhid2node.bin"; - /** File extension for the long node id to SWHID map */ - public static final String NODE_TO_SWHID = ".node2swhid.bin"; - /** File extension for the long node id to node type map */ - public static final String NODE_TO_TYPE = ".node2type.map"; - - /** Compressed graph stored as a {@link it.unimi.dsi.big.webgraph.BVGraph} */ - ImmutableGraph graph; - /** Transposed compressed graph (used for backward traversals) */ - ImmutableGraph graphTransposed; + /** + * Bidirectional graph containing two compressed {@link it.unimi.dsi.big.webgraph.BVGraph} one for + * each direction + */ + BidirectionalImmutableGraph graph; + /** Path and basename of the compressed graph */ String path; /** Mapping long id ↔ SWHIDs */ NodeIdMap nodeIdMap; /** Mapping long id → node types */ NodeTypesMap nodeTypesMap; /** * Constructor. * * @param path path and basename of the compressed graph to load */ private Graph(String path) throws IOException { loadInternal(path, null, LoadMethod.MAPPED); } /** * Loading mechanisms */ enum LoadMethod { MEMORY, MAPPED, OFFLINE, } protected Graph loadInternal(String path, ProgressLogger pl, LoadMethod method) throws IOException { this.path = path; + ImmutableGraph direct = null; + ImmutableGraph transposed = null; if (method == LoadMethod.MEMORY) { - this.graph = ImmutableGraph.load(path, pl); - this.graphTransposed = ImmutableGraph.load(path + "-transposed", pl); + direct = ImmutableGraph.load(path, pl); + transposed = ImmutableGraph.load(path + "-transposed", pl); } else if (method == LoadMethod.MAPPED) { - this.graph = ImmutableGraph.loadMapped(path, pl); - this.graphTransposed = ImmutableGraph.loadMapped(path + "-transposed", pl); + direct = ImmutableGraph.load(path, pl); + transposed = ImmutableGraph.loadMapped(path + "-transposed", pl); } else if (method == LoadMethod.OFFLINE) { - this.graph = ImmutableGraph.loadOffline(path, pl); - this.graphTransposed = ImmutableGraph.loadOffline(path + "-transposed", pl); + direct = ImmutableGraph.loadOffline(path, pl); + transposed = ImmutableGraph.loadOffline(path + "-transposed", pl); } + this.graph = new BidirectionalImmutableGraph(direct, transposed); this.nodeTypesMap = new NodeTypesMap(path); this.nodeIdMap = new NodeIdMap(path, numNodes()); return this; } protected Graph() { } public static Graph load(String path, ProgressLogger pl) throws IOException { return new Graph().loadInternal(path, pl, LoadMethod.MEMORY); } public static Graph loadMapped(String path, ProgressLogger pl) throws IOException { return new Graph().loadInternal(path, pl, LoadMethod.MAPPED); } public static Graph loadOffline(String path, ProgressLogger pl) throws IOException { return new Graph().loadInternal(path, null, LoadMethod.OFFLINE); } public static Graph load(String path) throws IOException { return new Graph().loadInternal(path, null, LoadMethod.MEMORY); } public static Graph loadMapped(String path) throws IOException { return new Graph().loadInternal(path, null, LoadMethod.MAPPED); } public static Graph loadOffline(String path) throws IOException { return new Graph().loadInternal(path, null, LoadMethod.OFFLINE); } /** * Constructor used for copy() */ - protected Graph(ImmutableGraph graph, ImmutableGraph graphTransposed, String path, NodeIdMap nodeIdMap, - NodeTypesMap nodeTypesMap) { + protected Graph(BidirectionalImmutableGraph graph, String path, NodeIdMap nodeIdMap, NodeTypesMap nodeTypesMap) { this.graph = graph; - this.graphTransposed = graphTransposed; this.path = path; this.nodeIdMap = nodeIdMap; this.nodeTypesMap = nodeTypesMap; } /** * Return a flyweight copy of the graph. */ @Override public Graph copy() { - return new Graph(this.graph.copy(), this.graphTransposed.copy(), this.path, this.nodeIdMap, this.nodeTypesMap); + return new Graph(this.graph.copy(), this.path, this.nodeIdMap, this.nodeTypesMap); } @Override public boolean randomAccess() { - return graph.randomAccess() && graphTransposed.randomAccess(); + return graph.randomAccess(); } /** * Return a transposed version of the graph. */ public Graph transpose() { - return new Graph(this.graphTransposed, this.graph, this.path, this.nodeIdMap, this.nodeTypesMap); + return new Graph(this.graph.transpose(), this.path, this.nodeIdMap, this.nodeTypesMap); } /** * Return a symmetric version of the graph. */ public Graph symmetrize() { - ImmutableGraph symmetric = Transform.union(graph, graphTransposed); - return new Graph(symmetric, symmetric, this.path, this.nodeIdMap, this.nodeTypesMap); + return new Graph(this.graph.symmetrize(), this.path, this.nodeIdMap, this.nodeTypesMap); } /** * Cleans up graph resources after use. */ public void cleanUp() throws IOException { nodeIdMap.close(); } /** * Returns number of nodes in the graph. * * @return number of nodes in the graph */ @Override public long numNodes() { return graph.numNodes(); } /** * Returns number of edges in the graph. * * @return number of edges in the graph */ @Override public long numArcs() { return graph.numArcs(); } /** * Returns lazy iterator of successors of a node. * * @param nodeId node specified as a long id * @return lazy iterator of successors of the node, specified as a * WebGraph LazyLongIterator */ @Override public LazyLongIterator successors(long nodeId) { return graph.successors(nodeId); } /** * Returns lazy iterator of successors of a node while following a specific set of edge types. * * @param nodeId node specified as a long id * @param allowedEdges the specification of which edges can be traversed * @return lazy iterator of successors of the node, specified as a * WebGraph LazyLongIterator */ public LazyLongIterator successors(long nodeId, AllowedEdges allowedEdges) { if (allowedEdges.restrictedTo == null) { // All edges are allowed, bypass edge check return this.successors(nodeId); } else { LazyLongIterator allSuccessors = this.successors(nodeId); Graph thisGraph = this; return new LazyLongIterator() { @Override public long nextLong() { long neighbor; while ((neighbor = allSuccessors.nextLong()) != -1) { if (allowedEdges.isAllowed(thisGraph.getNodeType(nodeId), thisGraph.getNodeType(neighbor))) { return neighbor; } } return -1; } @Override public long skip(final long n) { long i; for (i = 0; i < n && nextLong() != -1; i++) ; return i; } }; } } /** * Returns the outdegree of a node. * * @param nodeId node specified as a long id * @return outdegree of a node */ @Override public long outdegree(long nodeId) { return graph.outdegree(nodeId); } /** * Returns lazy iterator of predecessors of a node. * * @param nodeId node specified as a long id * @return lazy iterator of predecessors of the node, specified as a * WebGraph LazyLongIterator */ public LazyLongIterator predecessors(long nodeId) { - return this.transpose().successors(nodeId); + return graph.predecessors(nodeId); } /** * Returns the indegree of a node. * * @param nodeId node specified as a long id * @return indegree of a node */ public long indegree(long nodeId) { - return this.transpose().outdegree(nodeId); + return graph.indegree(nodeId); } /** - * Returns the underlying BVGraph. + * Returns the underlying BidirectionalImmutableGraph. * - * @return WebGraph BVGraph + * @return WebGraph ImmutableGraph */ public ImmutableGraph getGraph() { return this.graph; } /** * Returns the graph full path. * * @return graph full path */ public String getPath() { return path; } /** * Converts {@link SWHID} node to long. * * @param swhid node specified as a {@link SWHID} * @return internal long node id * @see SWHID */ public long getNodeId(SWHID swhid) { return nodeIdMap.getNodeId(swhid); } /** * Converts long id node to {@link SWHID}. * * @param nodeId node specified as a long id * @return external SWHID * @see SWHID */ public SWHID getSWHID(long nodeId) { return nodeIdMap.getSWHID(nodeId); } /** * Returns node type. * * @param nodeId node specified as a long id * @return corresponding node type * @see org.softwareheritage.graph.Node.Type */ public Node.Type getNodeType(long nodeId) { return nodeTypesMap.getType(nodeId); } } diff --git a/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java b/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java index bb6779e..f897e00 100644 --- a/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java +++ b/java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java @@ -1,90 +1,98 @@ package org.softwareheritage.graph.experiments.topology; import com.google.common.primitives.Longs; import com.martiansoftware.jsap.*; import it.unimi.dsi.Util; import it.unimi.dsi.big.webgraph.LazyLongIterator; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.Arrays; import it.unimi.dsi.fastutil.BigArrays; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.io.ByteDiskQueue; import it.unimi.dsi.logging.ProgressLogger; import it.unimi.dsi.util.XoRoShiRo128PlusRandom; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.experiments.forks.ForkCC; import java.io.*; public class SubdatasetSizeFunction { private SubdatasetSizeFunction() { } public static void run(final Graph graph) throws IOException { final ProgressLogger pl = new ProgressLogger(); pl.itemsName = "nodes"; pl.expectedUpdates = graph.numNodes(); long n = graph.numNodes(); LongArrayBitVector visited = LongArrayBitVector.ofLength(n); int bufferSize = (int) Math.min(Arrays.MAX_ARRAY_SIZE & ~0x7, 8L * n); final File queueFile = File.createTempFile(ForkCC.class.getSimpleName(), "queue"); final ByteDiskQueue queue = ByteDiskQueue.createNew(queueFile, bufferSize, true); final byte[] byteBuf = new byte[Long.BYTES]; long[][] randomPerm = Util.identity(graph.numNodes()); LongBigArrays.shuffle(randomPerm, new XoRoShiRo128PlusRandom()); - long visitedSize = 0; + long visitedNodes = 0; + long visitedEdges = 0; + long visitedOrigins = 0; + long visitedContents = 0; pl.start("Running traversal starting from origins..."); for (long j = 0; j < n; ++j) { long i = BigArrays.get(randomPerm, j); if (visited.getBoolean(i) || graph.getNodeType(i) != Node.Type.ORI) { continue; } + visitedOrigins++; queue.enqueue(Longs.toByteArray(i)); visited.set(i); while (!queue.isEmpty()) { queue.dequeue(byteBuf); final long currentNode = Longs.fromByteArray(byteBuf); - visitedSize++; + visitedNodes++; + if (graph.getNodeType(currentNode) == Node.Type.CNT) + visitedContents++; final LazyLongIterator iterator = graph.successors(currentNode); long succ; while ((succ = iterator.nextLong()) != -1) { + visitedEdges++; if (visited.getBoolean(succ)) continue; visited.set(succ); queue.enqueue(Longs.toByteArray(succ)); } pl.update(); } - System.out.println(visitedSize); + if (visitedOrigins % 10000 == 0) + System.out.println(visitedNodes + " " + visitedEdges + " " + visitedContents); } pl.done(); } static public void main(final String[] arg) throws IllegalArgumentException, SecurityException, JSAPException, IOException { final SimpleJSAP jsap = new SimpleJSAP(SubdatasetSizeFunction.class.getName(), - "Computes in and out degrees of the given SWHGraph", + "Computes subdataset size functions using a random uniform order", new Parameter[]{new UnflaggedOption("basename", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, JSAP.NOT_GREEDY, "The basename of the graph."),}); final JSAPResult jsapResult = jsap.parse(arg); if (jsap.messagePrinted()) System.exit(1); final String basename = jsapResult.getString("basename"); Graph graph = Graph.loadMapped(basename); run(graph); } } diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java index 46a566b..2a2c50f 100644 --- a/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java +++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java @@ -1,188 +1,186 @@ package org.softwareheritage.graph.maps; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.util.ByteBufferLongBigList; -import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.SWHID; import java.io.FileInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * Mapping between internal long node id and external SWHID. *

* The SWHID -> node mapping is obtained from hashing the SWHID with a MPH, then permuting it using * an mmap()-ed .order file containing the graph permutation. * * The node -> SWHID reverse mapping is pre-computed and dumped on disk in the * {@link NodeMapBuilder} class, then it is loaded here using mmap(). * * @author The Software Heritage developers * @see NodeMapBuilder */ public class NodeIdMap { /** Fixed length of binary SWHID buffer */ public static final int SWHID_BIN_SIZE = 22; + /** File extension for the long node id to SWHID map */ + public static final String NODE_TO_SWHID = ".node2swhid.bin"; + /** Graph path and basename */ String graphPath; /** Number of ids to map */ long nbIds; /** mmap()-ed NODE_TO_SWHID file */ MapFile nodeToSwhMap; /** Minimal perfect hash (MPH) function SWHID -> initial order */ Object2LongFunction mph; /** mmap()-ed long list with the permutation initial order -> graph order */ LongBigList orderMap; /** FileInputStream containing the permutation */ FileInputStream orderInputStream; /** * Constructor. * * @param graphPath full graph path * @param nbNodes number of nodes in the graph */ public NodeIdMap(String graphPath, long nbNodes) throws IOException { this.graphPath = graphPath; this.nbIds = nbNodes; // node -> SWHID - this.nodeToSwhMap = new MapFile(graphPath + Graph.NODE_TO_SWHID, SWHID_BIN_SIZE); + this.nodeToSwhMap = new MapFile(graphPath + NODE_TO_SWHID, SWHID_BIN_SIZE); // SWHID -> node this.mph = loadMph(graphPath + ".mph"); this.orderInputStream = new FileInputStream(graphPath + ".order"); this.orderMap = ByteBufferLongBigList.map(orderInputStream.getChannel()); } @SuppressWarnings("unchecked") public static Object2LongFunction loadMph(String path) throws IOException { Object obj; try { obj = BinIO.loadObject(path); } catch (ClassNotFoundException e) { throw new IOException(e.getMessage()); } Object2LongFunction res = (Object2LongFunction) obj; // Backward-compatibility for old maps parametrized with . // New maps should be parametrized with , which is faster. try { // Try to call it with bytes, will fail if it's a O2LF. res.getLong("42".getBytes(StandardCharsets.UTF_8)); } catch (ClassCastException e) { class StringCompatibleByteFunction implements Object2LongFunction, Size64 { private final Object2LongFunction legacyFunction; public StringCompatibleByteFunction(Object2LongFunction legacyFunction) { this.legacyFunction = legacyFunction; } @Override public long getLong(Object o) { byte[] bi = (byte[]) o; return legacyFunction.getLong(new String(bi, StandardCharsets.UTF_8)); } @Override public int size() { return legacyFunction.size(); } @Override public long size64() { return (legacyFunction instanceof Size64) ? ((Size64) legacyFunction).size64() : legacyFunction.size(); } } Object2LongFunction mphLegacy = (Object2LongFunction) obj; return new StringCompatibleByteFunction(mphLegacy); - /* - * res = (o -> { byte[] bi = (byte[]) o; return mphLegacy.getLong(new String(bi, - * StandardCharsets.UTF_8)); }); - */ } // End of backward-compatibility block return res; } /** * Converts byte-form SWHID to corresponding long node id. Low-level function, does not check if the * SWHID is valid. * * @param swhid node represented as bytes * @return corresponding node as a long id */ public long getNodeId(byte[] swhid) { // 1. Hash the SWHID with the MPH to get its original ID long origNodeId = mph.getLong(swhid); // 2. Use the order permutation to get the position in the permuted graph return this.orderMap.getLong(origNodeId); } /** * Converts SWHID to corresponding long node id. * * @param swhid node represented as a {@link SWHID} * @param checkExists if true, error if the SWHID is not present in the graph, if false the check * will be skipped and invalid data will be returned for non-existing SWHIDs. * @return corresponding node as a long id * @see SWHID */ public long getNodeId(SWHID swhid, boolean checkExists) { // Convert the SWHID to bytes and call getNodeId() long nodeId = getNodeId(swhid.toString().getBytes(StandardCharsets.US_ASCII)); // Check that the position effectively corresponds to a real node using the reverse map. // This is necessary because the MPH makes no guarantees on whether the input SWHID is valid. if (!checkExists || getSWHID(nodeId).equals(swhid)) { return nodeId; } else { throw new IllegalArgumentException("Unknown SWHID: " + swhid); } } public long getNodeId(SWHID swhid) { return getNodeId(swhid, true); } /** * Converts a node long id to corresponding SWHID. * * @param nodeId node as a long id * @return corresponding node as a {@link SWHID} * @see SWHID */ public SWHID getSWHID(long nodeId) { /* * Each line in NODE_TO_SWHID is formatted as: swhid The file is ordered by nodeId, meaning node0's * swhid is at line 0, hence we can read the nodeId-th line to get corresponding swhid */ if (nodeId < 0 || nodeId >= nbIds) { throw new IllegalArgumentException("Node id " + nodeId + " should be between 0 and " + nbIds); } return SWHID.fromBytes(nodeToSwhMap.readAtLine(nodeId)); } /** * Closes the mapping files. */ public void close() throws IOException { orderInputStream.close(); nodeToSwhMap.close(); } } diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java index 3c9f6ef..626c747 100644 --- a/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java +++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java @@ -1,217 +1,191 @@ package org.softwareheritage.graph.maps; import it.unimi.dsi.bits.LongArrayBitVector; import it.unimi.dsi.fastutil.BigArrays; import it.unimi.dsi.fastutil.Size64; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigArrays; import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.objects.Object2LongFunction; import it.unimi.dsi.io.FastBufferedReader; import it.unimi.dsi.io.LineIterator; import it.unimi.dsi.logging.ProgressLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SWHID; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeUnit; /** * Create maps needed at runtime by the graph service, in particular: *

*

    *
  • SWHID → WebGraph long node id
  • *
  • WebGraph long node id → SWHID (converse of the former)
  • *
  • WebGraph long node id → SWH node type (enum)
  • *
* * @author The Software Heritage developers */ public class NodeMapBuilder { final static String SORT_BUFFER_SIZE = "40%"; final static Logger logger = LoggerFactory.getLogger(NodeMapBuilder.class); /** * Main entrypoint. * * @param args command line arguments */ public static void main(String[] args) throws IOException { if (args.length != 2) { logger.error("Usage: COMPRESSED_GRAPH_BASE_NAME TEMP_DIR < NODES_CSV"); System.exit(1); } String graphPath = args[0]; String tmpDir = args[1]; logger.info("starting maps generation..."); precomputeNodeIdMap(graphPath, tmpDir); logger.info("maps generation completed"); } /** * Computes and dumps on disk mapping files. * * @param graphPath path of the compressed graph */ - // Suppress warning for Object2LongFunction cast - @SuppressWarnings("unchecked") static void precomputeNodeIdMap(String graphPath, String tmpDir) throws IOException { ProgressLogger plSWHID2Node = new ProgressLogger(logger, 10, TimeUnit.SECONDS); ProgressLogger plNode2SWHID = new ProgressLogger(logger, 10, TimeUnit.SECONDS); - plSWHID2Node.itemsName = "swhid→node"; - plNode2SWHID.itemsName = "node→swhid"; - - /* - * avg speed for swhid→node is sometime skewed due to write to the sort pipe hanging when sort is - * sorting; hence also desplay local speed - */ - plSWHID2Node.displayLocalSpeed = true; + plSWHID2Node.itemsName = "Hashing swhid→node"; + plNode2SWHID.itemsName = "Building map node→swhid"; // first half of SWHID->node mapping: SWHID -> WebGraph MPH (long) - Object2LongFunction mphMap = null; - try { - logger.info("loading MPH function..."); - mphMap = (Object2LongFunction) BinIO.loadObject(graphPath + ".mph"); - logger.info("MPH function loaded"); - } catch (ClassNotFoundException e) { - logger.error("unknown class object in .mph file: " + e); - System.exit(2); - } + Object2LongFunction mphMap = NodeIdMap.loadMph(graphPath + ".mph"); long nbIds = (mphMap instanceof Size64) ? ((Size64) mphMap).size64() : mphMap.size(); plSWHID2Node.expectedUpdates = nbIds; plNode2SWHID.expectedUpdates = nbIds; // second half of SWHID->node mapping: WebGraph MPH (long) -> BFS order (long) long[][] bfsMap = LongBigArrays.newBigArray(nbIds); logger.info("loading BFS order file..."); long loaded = BinIO.loadLongs(graphPath + ".order", bfsMap); logger.info("BFS order file loaded"); if (loaded != nbIds) { logger.error("graph contains " + nbIds + " nodes, but read " + loaded); System.exit(2); } /* - * Create mapping SWHID -> WebGraph node id, by sequentially reading nodes, hashing them with MPH, - * and permuting according to BFS order + * Read on stdin a list of SWHIDs, hash them with MPH, then permute them according to the .order + * file */ FastBufferedReader buffer = new FastBufferedReader(new InputStreamReader(System.in, StandardCharsets.US_ASCII)); LineIterator swhidIterator = new LineIterator(buffer); /* * The WebGraph node id -> SWHID mapping can be obtained from the SWHID->node one by numerically * sorting on node id and sequentially writing obtained SWHIDs to a binary map. Delegates the * sorting job to /usr/bin/sort via pipes */ ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command("sort", "--numeric-sort", "--key", "2", "--buffer-size", SORT_BUFFER_SIZE, "--temporary-directory", tmpDir); Process sort = processBuilder.start(); BufferedOutputStream sort_stdin = new BufferedOutputStream(sort.getOutputStream()); BufferedInputStream sort_stdout = new BufferedInputStream(sort.getInputStream()); - // for the binary format of swhidToNodeMap, see Python module swh.graph.swhid:SwhidToIntMap // for the binary format of nodeToSwhidMap, see Python module swh.graph.swhid:IntToSwhidMap - try (DataOutputStream swhidToNodeMap = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(graphPath + Graph.SWHID_TO_NODE))); - BufferedOutputStream nodeToSwhidMap = new BufferedOutputStream( - new FileOutputStream(graphPath + Graph.NODE_TO_SWHID))) { + try (BufferedOutputStream nodeToSwhidMap = new BufferedOutputStream( + new FileOutputStream(graphPath + NodeIdMap.NODE_TO_SWHID))) { /* - * background handler for sort output, it will be fed SWHID/node pairs while swhidToNodeMap is being - * filled, and will itself fill nodeToSwhidMap as soon as data from sort is ready + * background handler for sort output, it will be fed SWHID/node pairs, and will itself fill + * nodeToSwhidMap as soon as data from sort is ready. */ SortOutputHandler outputHandler = new SortOutputHandler(sort_stdout, nodeToSwhidMap, plNode2SWHID); outputHandler.start(); /* * Type map from WebGraph node ID to SWH type. Used at runtime by pure Java graph traversals to * efficiently check edge restrictions. */ - final int log2NbTypes = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); - final int nbBitsPerNodeType = log2NbTypes; + final int nbBitsPerNodeType = (int) Math.ceil(Math.log(Node.Type.values().length) / Math.log(2)); LongArrayBitVector nodeTypesBitVector = LongArrayBitVector.ofLength(nbBitsPerNodeType * nbIds); LongBigList nodeTypesMap = nodeTypesBitVector.asLongBigList(nbBitsPerNodeType); - plSWHID2Node.start("filling swhid2node map"); + plSWHID2Node.start("Hashing SWHIDs to fill sort input"); for (long iNode = 0; iNode < nbIds && swhidIterator.hasNext(); iNode++) { String swhidStr = swhidIterator.next().toString(); SWHID swhid = new SWHID(swhidStr); - byte[] swhidBin = swhid.toBytes(); long mphId = mphMap.getLong(swhidStr.getBytes(StandardCharsets.US_ASCII)); long nodeId = BigArrays.get(bfsMap, mphId); - - swhidToNodeMap.write(swhidBin, 0, swhidBin.length); - swhidToNodeMap.writeLong(nodeId); sort_stdin.write((swhidStr + "\t" + nodeId + "\n").getBytes(StandardCharsets.US_ASCII)); nodeTypesMap.set(nodeId, swhid.getType().ordinal()); plSWHID2Node.lightUpdate(); } plSWHID2Node.done(); sort_stdin.close(); // write type map logger.info("storing type map"); - BinIO.storeObject(nodeTypesMap, graphPath + Graph.NODE_TO_TYPE); + BinIO.storeObject(nodeTypesMap, graphPath + NodeTypesMap.NODE_TO_TYPE); logger.info("type map stored"); // wait for nodeToSwhidMap filling try { logger.info("waiting for node2swhid map..."); int sortExitCode = sort.waitFor(); if (sortExitCode != 0) { logger.error("sort returned non-zero exit code: " + sortExitCode); System.exit(2); } outputHandler.join(); } catch (InterruptedException e) { logger.error("processing of sort output failed with: " + e); System.exit(2); } } - } private static class SortOutputHandler extends Thread { - private Scanner input; - private OutputStream output; - private ProgressLogger pl; + private final Scanner input; + private final OutputStream output; + private final ProgressLogger pl; SortOutputHandler(InputStream input, OutputStream output, ProgressLogger pl) { this.input = new Scanner(input, StandardCharsets.US_ASCII); this.output = output; this.pl = pl; } public void run() { boolean sortDone = false; logger.info("node2swhid: waiting for sort output..."); while (input.hasNextLine()) { if (!sortDone) { sortDone = true; this.pl.start("filling node2swhid map"); } String line = input.nextLine(); // format: SWHID NODE_ID SWHID swhid = new SWHID(line.split("\\t")[0]); // get SWHID try { - output.write((byte[]) swhid.toBytes()); + output.write(swhid.toBytes()); } catch (IOException e) { logger.error("writing to node->SWHID map failed with: " + e); } this.pl.lightUpdate(); } this.pl.done(); } } } diff --git a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java index c835e02..befe094 100644 --- a/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java +++ b/java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java @@ -1,52 +1,54 @@ package org.softwareheritage.graph.maps; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.longs.LongBigList; -import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import java.io.IOException; /** * Mapping between long node id and SWH node type as described in the * data model. *

* The type mapping is pre-computed and dumped on disk in the {@link NodeMapBuilder} class, then it * is loaded in-memory here using fastutil LongBigList. * To be space-efficient, the mapping is stored as a bitmap using minimum number of bits per * {@link Node.Type}. * * @author The Software Heritage developers */ public class NodeTypesMap { + /** File extension for the long node id to node type map */ + public static final String NODE_TO_TYPE = ".node2type.map"; + /** * Array storing for each node its type */ public LongBigList nodeTypesMap; /** * Constructor. * * @param graphPath path and basename of the compressed graph */ public NodeTypesMap(String graphPath) throws IOException { try { - nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + Graph.NODE_TO_TYPE); + nodeTypesMap = (LongBigList) BinIO.loadObject(graphPath + NODE_TO_TYPE); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Unknown class object: " + e); } } /** * Returns node type from a node long id. * * @param nodeId node as a long id * @return corresponding {@link Node.Type} value * @see org.softwareheritage.graph.Node.Type */ public Node.Type getType(long nodeId) { long type = nodeTypesMap.getLong(nodeId); return Node.Type.fromInt((int) type); } } diff --git a/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java b/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java new file mode 100644 index 0000000..3e094e8 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java @@ -0,0 +1,51 @@ +package org.softwareheritage.graph.utils; + +import com.martiansoftware.jsap.*; +import it.unimi.dsi.Util; +import it.unimi.dsi.fastutil.io.BinIO; + +import java.io.File; +import java.io.IOException; + +/** + * CLI program used to compose two on-disk permutations. + * + * It takes two on-disk permutations as parameters, p1 and p2, and writes on disk (p1 o p2) at the + * given location. This is useful for multi-step compression (e.g. Unordered -> BFS -> LLP), as it + * can be used to merge all the intermediate permutations. + */ +public class ComposePermutations { + private static JSAPResult parse_args(String[] args) { + JSAPResult config = null; + try { + SimpleJSAP jsap = new SimpleJSAP(ComposePermutations.class.getName(), "", new Parameter[]{ + new UnflaggedOption("firstPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED, "The first permutation"), + new UnflaggedOption("secondPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED, + "The second permutation"), + new UnflaggedOption("outputPermutation", JSAP.STRING_PARSER, JSAP.REQUIRED, + "The output permutation"),}); + + config = jsap.parse(args); + if (jsap.messagePrinted()) { + System.exit(1); + } + } catch (JSAPException e) { + e.printStackTrace(); + } + return config; + } + + public static void main(String[] args) throws IOException, ClassNotFoundException { + JSAPResult config = parse_args(args); + String firstPermFilename = config.getString("firstPermutation"); + String secondPermFilename = config.getString("secondPermutation"); + String outputPermFilename = config.getString("outputPermutation"); + + long[][] firstPerm = BinIO.loadLongsBig(new File(firstPermFilename)); + long[][] secondPerm = BinIO.loadLongsBig(new File(secondPermFilename)); + + long[][] outputPerm = Util.composePermutationsInPlace(firstPerm, secondPerm); + + BinIO.storeLongs(outputPerm, outputPermFilename); + } +} diff --git a/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java b/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java index fa02cc1..71379f2 100644 --- a/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java +++ b/java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java @@ -1,114 +1,116 @@ package org.softwareheritage.graph.utils; import it.unimi.dsi.big.webgraph.LazyLongIterator; import it.unimi.dsi.fastutil.BigArrays; import it.unimi.dsi.fastutil.io.BinIO; import org.softwareheritage.graph.AllowedEdges; import org.softwareheritage.graph.Graph; import org.softwareheritage.graph.Node; import org.softwareheritage.graph.SWHID; import java.io.IOException; import java.time.Duration; import java.util.HashSet; import java.util.Scanner; import java.util.Stack; /* sample invocation on granet.internal.softwareheritage.org for benchmarking * purposes, with the main swh-graph service already running: * * $ java -cp ~/swh-environment/swh-graph/java/target/swh-graph-0.3.0.jar -Xmx300G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G -XX:+UseLargePages -XX:+UseTransparentHugePages -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB org.softwareheritage.graph.utils.FindEarliestRevision --timing /dev/shm/swh-graph/default/graph * */ public class FindEarliestRevision { public static void main(String[] args) throws IOException, ClassNotFoundException { String graphPath = args[0]; boolean timing = false; long ts, elapsedNanos; Duration elapsed; if (args.length >= 2 && (args[0].equals("-t") || args[0].equals("--timing"))) { timing = true; graphPath = args[1]; System.err.println("started with timing option, will keep track of elapsed time"); } System.err.println("loading transposed graph..."); ts = System.nanoTime(); Graph graph = Graph.loadMapped(graphPath).transpose(); elapsed = Duration.ofNanos(System.nanoTime() - ts); System.err.println(String.format("transposed graph loaded (duration: %s).", elapsed)); System.err.println("loading revision timestamps..."); ts = System.nanoTime(); long[][] committerTimestamps = BinIO.loadLongsBig(graphPath + "-rev_committer_timestamps.bin"); elapsed = Duration.ofNanos(System.nanoTime() - ts); System.err.println(String.format("revision timestamps loaded (duration: %s).", elapsed)); Scanner stdin = new Scanner(System.in); AllowedEdges edges = new AllowedEdges("cnt:dir,dir:dir,dir:rev"); String rawSWHID = null; SWHID srcSWHID = null; long lineCount = 0; + long srcNodeId = -1; if (timing) { System.err.println("starting SWHID processing..."); elapsed = Duration.ZERO; } while (stdin.hasNextLine()) { if (timing) ts = System.nanoTime(); rawSWHID = stdin.nextLine().strip(); lineCount++; try { srcSWHID = new SWHID(rawSWHID); + srcNodeId = graph.getNodeId(srcSWHID); } catch (IllegalArgumentException e) { - System.err.println(String.format("skipping invalid SWHID %s on line %d", rawSWHID, lineCount)); + System.err + .println(String.format("skipping invalid or unknown SWHID %s on line %d", rawSWHID, lineCount)); continue; } - long srcNodeId = graph.getNodeId(srcSWHID); if (timing) System.err.println("starting traversal for: " + srcSWHID.toString()); Stack stack = new Stack<>(); HashSet visited = new HashSet<>(); stack.push(srcNodeId); visited.add(srcNodeId); long minRevId = -1; long minTimestamp = Long.MAX_VALUE; while (!stack.isEmpty()) { long currentNodeId = stack.pop(); if (graph.getNodeType(currentNodeId) == Node.Type.REV) { long committerTs = BigArrays.get(committerTimestamps, currentNodeId); if (committerTs < minTimestamp) { minRevId = currentNodeId; minTimestamp = committerTs; } } LazyLongIterator it = graph.successors(currentNodeId, edges); for (long neighborNodeId; (neighborNodeId = it.nextLong()) != -1;) { if (!visited.contains(neighborNodeId)) { stack.push(neighborNodeId); visited.add(neighborNodeId); } } } if (minRevId == -1) { System.err.println("no revision found containing: " + srcSWHID.toString()); } else { System.out.println(srcSWHID.toString() + "\t" + graph.getSWHID(minRevId).toString()); } if (timing) { elapsedNanos = System.nanoTime() - ts; // processing time for current SWHID elapsed = elapsed.plus(Duration.ofNanos(elapsedNanos)); // cumulative processing time for all SWHIDs System.err.println(String.format("visit time (s):\t%.6f", (double) elapsedNanos / 1_000_000_000)); } } if (timing) System.err.println(String.format("processed %d SWHIDs in %s (%s avg)", lineCount, elapsed, elapsed.dividedBy(lineCount))); } } diff --git a/swh.graph.egg-info/PKG-INFO b/swh.graph.egg-info/PKG-INFO index 18ff922..5268263 100644 --- a/swh.graph.egg-info/PKG-INFO +++ b/swh.graph.egg-info/PKG-INFO @@ -1,56 +1,56 @@ Metadata-Version: 2.1 Name: swh.graph -Version: 0.5.0 +Version: 0.5.1 Summary: Software Heritage graph service Home-page: https://forge.softwareheritage.org/diffusion/DGRPH Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-graph Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-graph/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 3 - Alpha Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing License-File: LICENSE License-File: AUTHORS Software Heritage - graph service ================================= Tooling and services, collectively known as ``swh-graph``, providing fast access to the graph representation of the `Software Heritage `_ `archive `_. The service is in-memory, based on a compressed representation of the Software Heritage Merkle DAG. Bibliography ------------ In addition to accompanying technical documentation, ``swh-graph`` is also described in the following scientific paper. If you publish results based on ``swh-graph``, please acknowledge it by citing the paper as follows: .. note:: Paolo Boldi, Antoine Pietri, Sebastiano Vigna, Stefano Zacchiroli. `Ultra-Large-Scale Repository Analysis via Graph Compression `_. In proceedings of `SANER 2020 `_: The 27th IEEE International Conference on Software Analysis, Evolution and Reengineering, pages 184-194. IEEE 2020. Links: `preprint `_, `bibtex `_. diff --git a/swh.graph.egg-info/SOURCES.txt b/swh.graph.egg-info/SOURCES.txt index e2e1364..dac7f07 100644 --- a/swh.graph.egg-info/SOURCES.txt +++ b/swh.graph.egg-info/SOURCES.txt @@ -1,193 +1,193 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile Makefile.local README.rst mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docker/Dockerfile docker/build.sh docker/run.sh docs/.gitignore docs/Makefile docs/Makefile.local docs/README.rst docs/api.rst docs/cli.rst docs/compression.rst docs/conf.py docs/docker.rst docs/git2graph.md docs/index.rst docs/quickstart.rst docs/use-cases.rst docs/_static/.placeholder docs/_templates/.placeholder docs/images/.gitignore docs/images/Makefile docs/images/compression_steps.dot java/.coding-style.xml java/.gitignore java/AUTHORS java/LICENSE java/README.md java/pom.xml +java/.mvn/jvm.config java/src/main/java/org/softwareheritage/graph/AllowedEdges.java java/src/main/java/org/softwareheritage/graph/AllowedNodes.java +java/src/main/java/org/softwareheritage/graph/BidirectionalImmutableGraph.java java/src/main/java/org/softwareheritage/graph/Entry.java java/src/main/java/org/softwareheritage/graph/Graph.java java/src/main/java/org/softwareheritage/graph/Node.java java/src/main/java/org/softwareheritage/graph/NodesFiltering.java java/src/main/java/org/softwareheritage/graph/SWHID.java java/src/main/java/org/softwareheritage/graph/Stats.java java/src/main/java/org/softwareheritage/graph/Subgraph.java java/src/main/java/org/softwareheritage/graph/SwhPath.java java/src/main/java/org/softwareheritage/graph/Traversal.java java/src/main/java/org/softwareheritage/graph/algo/TopologicalTraversal.java java/src/main/java/org/softwareheritage/graph/benchmark/AccessEdge.java java/src/main/java/org/softwareheritage/graph/benchmark/BFS.java java/src/main/java/org/softwareheritage/graph/benchmark/Benchmark.java java/src/main/java/org/softwareheritage/graph/benchmark/Browsing.java java/src/main/java/org/softwareheritage/graph/benchmark/Provenance.java java/src/main/java/org/softwareheritage/graph/benchmark/Vault.java java/src/main/java/org/softwareheritage/graph/benchmark/utils/Random.java java/src/main/java/org/softwareheritage/graph/benchmark/utils/Statistics.java java/src/main/java/org/softwareheritage/graph/benchmark/utils/Timing.java java/src/main/java/org/softwareheritage/graph/experiments/forks/FindCommonAncestor.java java/src/main/java/org/softwareheritage/graph/experiments/forks/FindPath.java java/src/main/java/org/softwareheritage/graph/experiments/forks/ForkCC.java java/src/main/java/org/softwareheritage/graph/experiments/forks/ForkCliques.java java/src/main/java/org/softwareheritage/graph/experiments/forks/ListEmptyOrigins.java java/src/main/java/org/softwareheritage/graph/experiments/multiplicationfactor/GenDistribution.java java/src/main/java/org/softwareheritage/graph/experiments/topology/AveragePaths.java java/src/main/java/org/softwareheritage/graph/experiments/topology/ClusteringCoefficient.java java/src/main/java/org/softwareheritage/graph/experiments/topology/ConnectedComponents.java java/src/main/java/org/softwareheritage/graph/experiments/topology/InOutDegree.java java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java java/src/main/java/org/softwareheritage/graph/labels/AbstractLongListLabel.java java/src/main/java/org/softwareheritage/graph/labels/DirEntry.java java/src/main/java/org/softwareheritage/graph/labels/FixedWidthLongListLabel.java java/src/main/java/org/softwareheritage/graph/labels/SwhLabel.java java/src/main/java/org/softwareheritage/graph/maps/LabelMapBuilder.java java/src/main/java/org/softwareheritage/graph/maps/MapFile.java java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java java/src/main/java/org/softwareheritage/graph/maps/NodeMapBuilder.java java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java java/src/main/java/org/softwareheritage/graph/server/App.java java/src/main/java/org/softwareheritage/graph/server/Endpoint.java +java/src/main/java/org/softwareheritage/graph/utils/ComposePermutations.java java/src/main/java/org/softwareheritage/graph/utils/ExportSubdataset.java java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java java/src/main/java/org/softwareheritage/graph/utils/MPHTranslate.java java/src/main/java/org/softwareheritage/graph/utils/ReadGraph.java java/src/main/java/org/softwareheritage/graph/utils/ReadLabelledGraph.java java/src/main/java/org/softwareheritage/graph/utils/WriteRevisionTimestamps.java java/src/test/java/org/softwareheritage/graph/AllowedEdgesTest.java java/src/test/java/org/softwareheritage/graph/GraphTest.java java/src/test/java/org/softwareheritage/graph/LeavesTest.java java/src/test/java/org/softwareheritage/graph/NeighborsTest.java java/src/test/java/org/softwareheritage/graph/SubgraphTest.java java/src/test/java/org/softwareheritage/graph/VisitTest.java java/src/test/java/org/softwareheritage/graph/WalkTest.java -java/target/swh-graph-0.5.0.jar +java/target/swh-graph-0.5.1.jar reports/.gitignore reports/benchmarks/Makefile reports/benchmarks/benchmarks.tex reports/experiments/Makefile reports/experiments/experiments.tex reports/linux_log/LinuxLog.java reports/linux_log/Makefile reports/linux_log/linux_log.tex reports/node_mapping/Makefile reports/node_mapping/NodeIdMapHaloDB.java reports/node_mapping/NodeIdMapRocksDB.java reports/node_mapping/node_mapping.tex swh/__init__.py swh.graph.egg-info/PKG-INFO swh.graph.egg-info/SOURCES.txt swh.graph.egg-info/dependency_links.txt swh.graph.egg-info/entry_points.txt swh.graph.egg-info/requires.txt swh.graph.egg-info/top_level.txt swh/graph/__init__.py swh/graph/backend.py swh/graph/cli.py swh/graph/client.py swh/graph/config.py swh/graph/dot.py -swh/graph/graph.py swh/graph/naive_client.py swh/graph/py.typed swh/graph/swhid.py swh/graph/webgraph.py swh/graph/server/__init__.py swh/graph/server/app.py swh/graph/tests/__init__.py swh/graph/tests/conftest.py swh/graph/tests/test_api_client.py swh/graph/tests/test_cli.py -swh/graph/tests/test_graph.py swh/graph/tests/test_swhid.py swh/graph/tests/dataset/.gitignore swh/graph/tests/dataset/example.edges.csv swh/graph/tests/dataset/example.edges.csv.zst swh/graph/tests/dataset/example.nodes.csv swh/graph/tests/dataset/example.nodes.csv.zst swh/graph/tests/dataset/generate_graph.sh swh/graph/tests/dataset/img/.gitignore swh/graph/tests/dataset/img/Makefile swh/graph/tests/dataset/img/example.dot swh/graph/tests/dataset/output/example-transposed.graph swh/graph/tests/dataset/output/example-transposed.obl swh/graph/tests/dataset/output/example-transposed.offsets swh/graph/tests/dataset/output/example-transposed.properties swh/graph/tests/dataset/output/example.graph swh/graph/tests/dataset/output/example.indegree swh/graph/tests/dataset/output/example.mph swh/graph/tests/dataset/output/example.node2swhid.bin swh/graph/tests/dataset/output/example.node2type.map swh/graph/tests/dataset/output/example.obl swh/graph/tests/dataset/output/example.offsets swh/graph/tests/dataset/output/example.order swh/graph/tests/dataset/output/example.outdegree swh/graph/tests/dataset/output/example.properties swh/graph/tests/dataset/output/example.stats -swh/graph/tests/dataset/output/example.swhid2node.bin tools/dir2graph tools/swhid2int2int2swhid.sh tools/git2graph/.gitignore tools/git2graph/Makefile tools/git2graph/README.md tools/git2graph/git2graph.c tools/git2graph/tests/edge-filters.bats tools/git2graph/tests/full-graph.bats tools/git2graph/tests/node-filters.bats tools/git2graph/tests/repo_helper.bash tools/git2graph/tests/data/sample-repo.tgz tools/git2graph/tests/data/graphs/dir-nodes/edges.csv tools/git2graph/tests/data/graphs/dir-nodes/nodes.csv tools/git2graph/tests/data/graphs/from-dir-edges/edges.csv tools/git2graph/tests/data/graphs/from-dir-edges/nodes.csv tools/git2graph/tests/data/graphs/from-rel-edges/edges.csv tools/git2graph/tests/data/graphs/from-rel-edges/nodes.csv tools/git2graph/tests/data/graphs/fs-nodes/edges.csv tools/git2graph/tests/data/graphs/fs-nodes/nodes.csv tools/git2graph/tests/data/graphs/full/edges.csv tools/git2graph/tests/data/graphs/full/nodes.csv tools/git2graph/tests/data/graphs/rev-edges/edges.csv tools/git2graph/tests/data/graphs/rev-edges/nodes.csv tools/git2graph/tests/data/graphs/rev-nodes/edges.csv tools/git2graph/tests/data/graphs/rev-nodes/nodes.csv tools/git2graph/tests/data/graphs/to-rev-edges/edges.csv tools/git2graph/tests/data/graphs/to-rev-edges/nodes.csv \ No newline at end of file diff --git a/swh/__init__.py b/swh/__init__.py index 8d9f151..b36383a 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,4 +1,3 @@ from pkgutil import extend_path -from typing import List -__path__: List[str] = extend_path(__path__, __name__) +__path__ = extend_path(__path__, __name__) diff --git a/swh/graph/backend.py b/swh/graph/backend.py index 22d4036..5fb82f5 100644 --- a/swh/graph/backend.py +++ b/swh/graph/backend.py @@ -1,206 +1,176 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import contextlib import io import os -import struct +import re import subprocess import sys import tempfile from py4j.java_gateway import JavaGateway +from py4j.protocol import Py4JJavaError from swh.graph.config import check_config -from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap -from swh.model.identifiers import EXTENDED_SWHID_TYPES -BUF_SIZE = 64 * 1024 -BIN_FMT = ">q" # 64 bit integer, big endian -PATH_SEPARATOR_ID = -1 -NODE2SWHID_EXT = "node2swhid.bin" -SWHID2NODE_EXT = "swhid2node.bin" +BUF_LINES = 1024 def _get_pipe_stderr(): # Get stderr if possible, or pipe to stdout if running with Jupyter. try: sys.stderr.fileno() except io.UnsupportedOperation: return subprocess.STDOUT else: return sys.stderr class Backend: def __init__(self, graph_path, config=None): self.gateway = None self.entry = None self.graph_path = graph_path self.config = check_config(config or {}) def start_gateway(self): self.gateway = JavaGateway.launch_gateway( java_path=None, javaopts=self.config["java_tool_options"].split(), classpath=self.config["classpath"], die_on_exit=True, redirect_stdout=sys.stdout, redirect_stderr=_get_pipe_stderr(), ) self.entry = self.gateway.jvm.org.softwareheritage.graph.Entry() self.entry.load_graph(self.graph_path) - self.node2swhid = NodeToSwhidMap(self.graph_path + "." + NODE2SWHID_EXT) - self.swhid2node = SwhidToNodeMap(self.graph_path + "." + SWHID2NODE_EXT) self.stream_proxy = JavaStreamProxy(self.entry) def stop_gateway(self): self.gateway.shutdown() def __enter__(self): self.start_gateway() return self def __exit__(self, exc_type, exc_value, tb): self.stop_gateway() def stats(self): return self.entry.stats() - def count(self, ttype, direction, edges_fmt, src): + def check_swhid(self, swhid): + try: + self.entry.check_swhid(swhid) + except Py4JJavaError as e: + m = re.search(r"malformed SWHID: (\w+)", str(e)) + if m: + raise ValueError(f"malformed SWHID: {m[1]}") + m = re.search(r"Unknown SWHID: (\w+)", str(e)) + if m: + raise NameError(f"Unknown SWHID: {m[1]}") + raise + + def count(self, ttype, *args): method = getattr(self.entry, "count_" + ttype) - return method(direction, edges_fmt, src) + return method(*args) - async def simple_traversal( - self, ttype, direction, edges_fmt, src, max_edges, return_types - ): - assert ttype in ("leaves", "neighbors", "visit_nodes") + async def traversal(self, ttype, *args): method = getattr(self.stream_proxy, ttype) - async for node_id in method(direction, edges_fmt, src, max_edges, return_types): - yield node_id - - async def walk(self, direction, edges_fmt, algo, src, dst): - if dst in EXTENDED_SWHID_TYPES: - it = self.stream_proxy.walk_type(direction, edges_fmt, algo, src, dst) - else: - it = self.stream_proxy.walk(direction, edges_fmt, algo, src, dst) - async for node_id in it: - yield node_id - - async def random_walk(self, direction, edges_fmt, retries, src, dst, return_types): - if dst in EXTENDED_SWHID_TYPES: - it = self.stream_proxy.random_walk_type( - direction, edges_fmt, retries, src, dst, return_types - ) - else: - it = self.stream_proxy.random_walk( - direction, edges_fmt, retries, src, dst, return_types - ) - async for node_id in it: # TODO return 404 if path is empty - yield node_id - - async def visit_edges(self, direction, edges_fmt, src, max_edges): - it = self.stream_proxy.visit_edges(direction, edges_fmt, src, max_edges) - # convert stream a, b, c, d -> (a, b), (c, d) - prevNode = None - async for node in it: - if prevNode is not None: - yield (prevNode, node) - prevNode = None - else: - prevNode = node - - async def visit_paths(self, direction, edges_fmt, src, max_edges): - path = [] - async for node in self.stream_proxy.visit_paths( - direction, edges_fmt, src, max_edges - ): - if node == PATH_SEPARATOR_ID: - yield path - path = [] - else: - path.append(node) + async for line in method(*args): + yield line.decode().rstrip("\n") class JavaStreamProxy: """A proxy class for the org.softwareheritage.graph.Entry Java class that takes care of the setup and teardown of the named-pipe FIFO communication between Python and Java. Initialize JavaStreamProxy using: proxy = JavaStreamProxy(swh_entry_class_instance) Then you can call an Entry method and iterate on the FIFO results like this: async for value in proxy.java_method(arg1, arg2): print(value) """ def __init__(self, entry): self.entry = entry async def read_node_ids(self, fname): loop = asyncio.get_event_loop() open_thread = loop.run_in_executor(None, open, fname, "rb") # Since the open() call on the FIFO is blocking until it is also opened # on the Java side, we await it with a timeout in case there is an # exception that prevents the write-side open(). with (await asyncio.wait_for(open_thread, timeout=2)) as f: + + def read_n_lines(f, n): + buf = [] + for _ in range(n): + try: + buf.append(next(f)) + except StopIteration: + break + return buf + while True: - data = await loop.run_in_executor(None, f.read, BUF_SIZE) - if not data: + lines = await loop.run_in_executor(None, read_n_lines, f, BUF_LINES) + if not lines: break - for data in struct.iter_unpack(BIN_FMT, data): - yield data[0] + for line in lines: + yield line class _HandlerWrapper: def __init__(self, handler): self._handler = handler def __getattr__(self, name): func = getattr(self._handler, name) async def java_call(*args, **kwargs): loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: func(*args, **kwargs)) def java_task(*args, **kwargs): return asyncio.create_task(java_call(*args, **kwargs)) return java_task @contextlib.contextmanager def get_handler(self): with tempfile.TemporaryDirectory(prefix="swh-graph-") as tmpdirname: cli_fifo = os.path.join(tmpdirname, "swh-graph.fifo") os.mkfifo(cli_fifo) reader = self.read_node_ids(cli_fifo) query_handler = self.entry.get_handler(cli_fifo) handler = self._HandlerWrapper(query_handler) yield (handler, reader) def __getattr__(self, name): async def java_call_iterator(*args, **kwargs): with self.get_handler() as (handler, reader): java_task = getattr(handler, name)(*args, **kwargs) try: async for value in reader: yield value except asyncio.TimeoutError: # If the read-side open() timeouts, an exception on the # Java side probably happened that prevented the # write-side open(). We propagate this exception here if # that is the case. task_exc = java_task.exception() if task_exc: raise task_exc raise await java_task return java_call_iterator diff --git a/swh/graph/cli.py b/swh/graph/cli.py index 3b224b3..7d399ac 100644 --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -1,446 +1,447 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from pathlib import Path import sys from typing import TYPE_CHECKING, Any, Dict, Set, Tuple # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group if TYPE_CHECKING: from swh.graph.webgraph import CompressionStep # noqa class StepOption(click.ParamType): """click type for specifying a compression step on the CLI parse either individual steps, specified as step names or integers, or step ranges """ name = "compression step" def convert(self, value, param, ctx): # type: (...) -> Set[CompressionStep] from swh.graph.webgraph import COMP_SEQ, CompressionStep # noqa steps: Set[CompressionStep] = set() specs = value.split(",") for spec in specs: if "-" in spec: # step range (raw_l, raw_r) = spec.split("-", maxsplit=1) if raw_l == "": # no left endpoint raw_l = COMP_SEQ[0].name if raw_r == "": # no right endpoint raw_r = COMP_SEQ[-1].name l_step = self.convert(raw_l, param, ctx) r_step = self.convert(raw_r, param, ctx) if len(l_step) != 1 or len(r_step) != 1: self.fail(f"invalid step specification: {value}, " f"see --help") l_idx = l_step.pop() r_idx = r_step.pop() steps = steps.union( set(CompressionStep(i) for i in range(l_idx.value, r_idx.value + 1)) ) else: # singleton step try: steps.add(CompressionStep(int(spec))) # integer step except ValueError: try: steps.add(CompressionStep[spec.upper()]) # step name except KeyError: self.fail( f"invalid step specification: {value}, " f"see --help" ) return steps class PathlibPath(click.Path): """A Click path argument that returns a pathlib Path, not a string""" def convert(self, value, param, ctx): return Path(super().convert(value, param, ctx)) DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = {"graph": ("dict", {})} @swh_cli_group.group(name="graph", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="YAML configuration file", ) @click.pass_context def graph_cli_group(ctx, config_file): """Software Heritage graph tools.""" from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file, DEFAULT_CONFIG) if "graph" not in conf: raise ValueError( 'no "graph" stanza found in configuration file %s' % config_file ) ctx.obj["config"] = conf @graph_cli_group.command("api-client") @click.option("--host", default="localhost", help="Graph server host") @click.option("--port", default="5009", help="Graph server port") @click.pass_context def api_client(ctx, host, port): """client for the graph RPC service""" from swh.graph import client url = "http://{}:{}".format(host, port) app = client.RemoteGraphClient(url) # TODO: run web app print(app.stats()) @graph_cli_group.group("map") @click.pass_context def map(ctx): """Manage swh-graph on-disk maps""" pass def dump_swhid2node(filename): from swh.graph.swhid import SwhidToNodeMap for (swhid, int) in SwhidToNodeMap(filename): print("{}\t{}".format(swhid, int)) def dump_node2swhid(filename): from swh.graph.swhid import NodeToSwhidMap for (int, swhid) in NodeToSwhidMap(filename): print("{}\t{}".format(int, swhid)) def restore_swhid2node(filename): """read a textual SWHID->int map from stdin and write its binary version to filename """ from swh.graph.swhid import SwhidToNodeMap with open(filename, "wb") as dst: for line in sys.stdin: (str_swhid, str_int) = line.split() SwhidToNodeMap.write_record(dst, str_swhid, int(str_int)) def restore_node2swhid(filename, length): """read a textual int->SWHID map from stdin and write its binary version to filename """ from swh.graph.swhid import NodeToSwhidMap node2swhid = NodeToSwhidMap(filename, mode="wb", length=length) for line in sys.stdin: (str_int, str_swhid) = line.split() node2swhid[int(str_int)] = str_swhid node2swhid.close() @map.command("dump") @click.option( "--type", "-t", "map_type", required=True, type=click.Choice(["swhid2node", "node2swhid"]), help="type of map to dump", ) @click.argument("filename", required=True, type=click.Path(exists=True)) @click.pass_context def dump_map(ctx, map_type, filename): """Dump a binary SWHID<->node map to textual format.""" if map_type == "swhid2node": dump_swhid2node(filename) elif map_type == "node2swhid": dump_node2swhid(filename) else: raise ValueError("invalid map type: " + map_type) pass @map.command("restore") @click.option( "--type", "-t", "map_type", required=True, type=click.Choice(["swhid2node", "node2swhid"]), help="type of map to dump", ) @click.option( "--length", "-l", type=int, help="""map size in number of logical records (required for node2swhid maps)""", ) @click.argument("filename", required=True, type=click.Path()) @click.pass_context def restore_map(ctx, map_type, length, filename): """Restore a binary SWHID<->node map from textual format.""" if map_type == "swhid2node": restore_swhid2node(filename) elif map_type == "node2swhid": if length is None: raise click.UsageError( "map length is required when restoring {} maps".format(map_type), ctx ) restore_node2swhid(filename, length) else: raise ValueError("invalid map type: " + map_type) @map.command("write") @click.option( "--type", "-t", "map_type", required=True, type=click.Choice(["swhid2node", "node2swhid"]), help="type of map to write", ) @click.argument("filename", required=True, type=click.Path()) @click.pass_context def write(ctx, map_type, filename): """Write a map to disk sequentially. read from stdin a textual SWHID->node mapping (for swhid2node, or a simple sequence of SWHIDs for node2swhid) and write it to disk in the requested binary map format note that no sorting is applied, so the input should already be sorted as required by the chosen map type (by SWHID for swhid2node, by int for node2swhid) """ from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap with open(filename, "wb") as f: if map_type == "swhid2node": for line in sys.stdin: (swhid, int_str) = line.rstrip().split(maxsplit=1) SwhidToNodeMap.write_record(f, swhid, int(int_str)) elif map_type == "node2swhid": for line in sys.stdin: swhid = line.rstrip() NodeToSwhidMap.write_record(f, swhid) else: raise ValueError("invalid map type: " + map_type) @map.command("lookup") @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.argument("identifiers", nargs=-1) def map_lookup(graph, identifiers): """Lookup identifiers using on-disk maps. Depending on the identifier type lookup either a SWHID into a SWHID->node (and return the node integer identifier) or, vice-versa, lookup a node integer identifier into a node->SWHID (and return the SWHID). The desired behavior is chosen depending on the syntax of each given identifier. Identifiers can be passed either directly on the command line or on standard input, separate by blanks. Logical lines (as returned by readline()) in stdin will be preserved in stdout. """ from swh.graph.backend import NODE2SWHID_EXT, SWHID2NODE_EXT from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap import swh.model.exceptions - from swh.model.identifiers import ExtendedSWHID + from swh.model.swhids import ExtendedSWHID success = True # no identifiers failed to be looked up swhid2node = SwhidToNodeMap(f"{graph}.{SWHID2NODE_EXT}") node2swhid = NodeToSwhidMap(f"{graph}.{NODE2SWHID_EXT}") def lookup(identifier): nonlocal success, swhid2node, node2swhid is_swhid = None try: int(identifier) is_swhid = False except ValueError: try: ExtendedSWHID.from_string(identifier) is_swhid = True except swh.model.exceptions.ValidationError: success = False logging.error(f'invalid identifier: "{identifier}", skipping') try: if is_swhid: return str(swhid2node[identifier]) else: return node2swhid[int(identifier)] except KeyError: success = False logging.error(f'identifier not found: "{identifier}", skipping') if identifiers: # lookup identifiers passed via CLI for identifier in identifiers: print(lookup(identifier)) else: # lookup identifiers passed via stdin, preserving logical lines for line in sys.stdin: results = [lookup(id) for id in line.rstrip().split()] if results: # might be empty if all IDs on the same line failed print(" ".join(results)) sys.exit(0 if success else 1) @graph_cli_group.command(name="rpc-serve") @click.option( "--host", "-h", default="0.0.0.0", metavar="IP", show_default=True, help="host IP address to bind the server on", ) @click.option( "--port", "-p", default=5009, type=click.INT, metavar="PORT", show_default=True, help="port to bind the server on", ) @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.pass_context def serve(ctx, host, port, graph): """run the graph RPC service""" import aiohttp from swh.graph.server.app import make_app config = ctx.obj["config"] config.setdefault("graph", {}) config["graph"]["path"] = graph app = make_app(config=config) aiohttp.web.run_app(app, host=host, port=port) @graph_cli_group.command() @click.option( "--graph", "-g", required=True, metavar="GRAPH", type=PathlibPath(), help="input graph basename", ) @click.option( "--outdir", "-o", "out_dir", required=True, metavar="DIR", type=PathlibPath(), help="directory where to store compressed graph", ) @click.option( "--steps", "-s", metavar="STEPS", type=StepOption(), help="run only these compression steps (default: all steps)", ) @click.pass_context def compress(ctx, graph, out_dir, steps): """Compress a graph using WebGraph Input: a pair of files g.nodes.csv.gz, g.edges.csv.gz Output: a directory containing a WebGraph compressed graph - Compression steps are: (1) mph, (2) bv, (3) bv_obl, (4) bfs, (5) permute, - (6) permute_obl, (7) stats, (8) transpose, (9) transpose_obl, (10) maps, - (11) clean_tmp. Compression steps can be selected by name or number using + Compression steps are: (1) mph, (2) bv, (3) bfs, (4) permute_bfs, + (5) transpose_bfs, (6) simplify, (7) llp, (8) permute_llp, (9) obl, (10) + compose_orders, (11) stats, (12) transpose, (13) transpose_obl, (14) maps, + (15) clean_tmp. Compression steps can be selected by name or number using --steps, separating them with commas; step ranges (e.g., 3-9, 6-, etc.) are also supported. """ from swh.graph import webgraph graph_name = graph.name in_dir = graph.parent try: conf = ctx.obj["config"]["graph"]["compress"] except KeyError: conf = {} # use defaults webgraph.compress(graph_name, in_dir, out_dir, steps, conf) @graph_cli_group.command(name="cachemount") @click.option( "--graph", "-g", required=True, metavar="GRAPH", help="compressed graph basename" ) @click.option( "--cache", "-c", default="/dev/shm/swh-graph/default", metavar="CACHE", type=PathlibPath(), help="Memory cache path (defaults to /dev/shm/swh-graph/default)", ) @click.pass_context def cachemount(ctx, graph, cache): """ Cache the mmapped files of the compressed graph in a tmpfs. This command creates a new directory at the path given by CACHE that has the same structure as the compressed graph basename, except it copies the files that require mmap access (:file:`{*}.graph`) but uses symlinks from the source for all the other files (:file:`{*}.map`, :file:`{*}.bin`, ...). The command outputs the path to the memory cache directory (particularly useful when relying on the default value). """ import shutil cache.mkdir(parents=True) for src in Path(graph).parent.glob("*"): dst = cache / src.name if src.suffix == ".graph": shutil.copy2(src, dst) else: dst.symlink_to(src.resolve()) print(cache) def main(): return graph_cli_group(auto_envvar_prefix="SWH_GRAPH") if __name__ == "__main__": main() diff --git a/swh/graph/config.py b/swh/graph/config.py index 0d52b3f..f144f26 100644 --- a/swh/graph/config.py +++ b/swh/graph/config.py @@ -1,113 +1,115 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from pathlib import Path import sys import psutil def find_graph_jar(): """find swh-graph.jar, containing the Java part of swh-graph look both in development directories and installed data (for in-production deployments who fecthed the JAR from pypi) """ swh_graph_root = Path(__file__).parents[2] try_paths = [ swh_graph_root / "java/target/", Path(sys.prefix) / "share/swh-graph/", Path(sys.prefix) / "local/share/swh-graph/", ] for path in try_paths: glob = list(path.glob("swh-graph-*.jar")) if glob: if len(glob) > 1: logging.warn( "found multiple swh-graph JARs, " "arbitrarily picking one" ) logging.info("using swh-graph JAR: {0}".format(glob[0])) return str(glob[0]) raise RuntimeError("swh-graph JAR not found. Have you run `make java`?") def check_config(conf): """check configuration and propagate defaults""" conf = conf.copy() if "batch_size" not in conf: # Use 0.1% of the RAM as a batch size: # ~1 billion for big servers, ~10 million for small desktop machines conf["batch_size"] = int(psutil.virtual_memory().total / 1000) + if "llp_gammas" not in conf: + conf["llp_gammas"] = "-0,-1,-2,-3,-4" if "max_ram" not in conf: conf["max_ram"] = str(psutil.virtual_memory().total) if "java_tool_options" not in conf: conf["java_tool_options"] = " ".join( [ "-Xmx{max_ram}", "-XX:PretenureSizeThreshold=512M", "-XX:MaxNewSize=4G", "-XX:+UseLargePages", "-XX:+UseTransparentHugePages", "-XX:+UseNUMA", "-XX:+UseTLAB", "-XX:+ResizeTLAB", ] ) conf["java_tool_options"] = conf["java_tool_options"].format( max_ram=conf["max_ram"] ) if "java" not in conf: conf["java"] = "java" if "classpath" not in conf: conf["classpath"] = find_graph_jar() return conf def check_config_compress(config, graph_name, in_dir, out_dir): """check compression-specific configuration and initialize its execution environment. """ conf = check_config(config) conf["graph_name"] = graph_name conf["in_dir"] = str(in_dir) conf["out_dir"] = str(out_dir) out_dir.mkdir(parents=True, exist_ok=True) if "tmp_dir" not in conf: tmp_dir = out_dir / "tmp" conf["tmp_dir"] = str(tmp_dir) else: tmp_dir = Path(conf["tmp_dir"]) tmp_dir.mkdir(parents=True, exist_ok=True) if "logback" not in conf: logback_confpath = tmp_dir / "logback.xml" with open(logback_confpath, "w") as conffile: conffile.write( """ %d %r %p [%t] %logger{1} - %m%n """ ) conf["logback"] = str(logback_confpath) conf["java_tool_options"] += " -Dlogback.configurationFile={logback}" conf["java_tool_options"] += " -Djava.io.tmpdir={tmp_dir}" conf["java_tool_options"] = conf["java_tool_options"].format( logback=conf["logback"], tmp_dir=conf["tmp_dir"], ) return conf diff --git a/swh/graph/graph.py b/swh/graph/graph.py deleted file mode 100644 index 3fd853b..0000000 --- a/swh/graph/graph.py +++ /dev/null @@ -1,193 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import asyncio -import contextlib -import functools - -from swh.graph.backend import Backend -from swh.graph.dot import KIND_TO_SHAPE, dot_to_svg, graph_dot - -BASE_URL = "https://archive.softwareheritage.org/browse" -KIND_TO_URL_FRAGMENT = { - "ori": "/origin/{}", - "snp": "/snapshot/{}", - "rel": "/release/{}", - "rev": "/revision/{}", - "dir": "/directory/{}", - "cnt": "/content/sha1_git:{}/", -} - - -def call_async_gen(generator, *args, **kwargs): - loop = asyncio.get_event_loop() - it = generator(*args, **kwargs).__aiter__() - while True: - try: - res = loop.run_until_complete(it.__anext__()) - yield res - except StopAsyncIteration: - break - - -class Neighbors: - """Neighbor iterator with custom O(1) length method""" - - def __init__(self, graph, iterator, length_func): - self.graph = graph - self.iterator = iterator - self.length_func = length_func - - def __iter__(self): - return self - - def __next__(self): - succ = self.iterator.nextLong() - if succ == -1: - raise StopIteration - return GraphNode(self.graph, succ) - - def __len__(self): - return self.length_func() - - -class GraphNode: - """Node in the SWH graph""" - - def __init__(self, graph, node_id): - self.graph = graph - self.id = node_id - - def children(self): - return Neighbors( - self.graph, - self.graph.java_graph.successors(self.id), - lambda: self.graph.java_graph.outdegree(self.id), - ) - - def parents(self): - return Neighbors( - self.graph, - self.graph.java_graph.predecessors(self.id), - lambda: self.graph.java_graph.indegree(self.id), - ) - - def simple_traversal( - self, ttype, direction="forward", edges="*", max_edges=0, return_types="*" - ): - for node in call_async_gen( - self.graph.backend.simple_traversal, - ttype, - direction, - edges, - self.id, - max_edges, - return_types, - ): - yield self.graph[node] - - def leaves(self, *args, **kwargs): - yield from self.simple_traversal("leaves", *args, **kwargs) - - def visit_nodes(self, *args, **kwargs): - yield from self.simple_traversal("visit_nodes", *args, **kwargs) - - def visit_edges(self, direction="forward", edges="*", max_edges=0): - for src, dst in call_async_gen( - self.graph.backend.visit_edges, direction, edges, self.id, max_edges - ): - yield (self.graph[src], self.graph[dst]) - - def visit_paths(self, direction="forward", edges="*", max_edges=0): - for path in call_async_gen( - self.graph.backend.visit_paths, direction, edges, self.id, max_edges - ): - yield [self.graph[node] for node in path] - - def walk(self, dst, direction="forward", edges="*", traversal="dfs"): - for node in call_async_gen( - self.graph.backend.walk, direction, edges, traversal, self.id, dst - ): - yield self.graph[node] - - def _count(self, ttype, direction="forward", edges="*"): - return self.graph.backend.count(ttype, direction, edges, self.id) - - count_leaves = functools.partialmethod(_count, ttype="leaves") - count_neighbors = functools.partialmethod(_count, ttype="neighbors") - count_visit_nodes = functools.partialmethod(_count, ttype="visit_nodes") - - @property - def swhid(self): - return self.graph.node2swhid[self.id] - - @property - def kind(self): - return self.swhid.split(":")[2] - - def __str__(self): - return self.swhid - - def __repr__(self): - return "<{}>".format(self.swhid) - - def dot_fragment(self): - swh, version, kind, hash = self.swhid.split(":") - label = "{}:{}..{}".format(kind, hash[0:2], hash[-2:]) - url = BASE_URL + KIND_TO_URL_FRAGMENT[kind].format(hash) - shape = KIND_TO_SHAPE[kind] - return '{} [label="{}", href="{}", target="_blank", shape="{}"];'.format( - self.id, label, url, shape - ) - - def _repr_svg_(self): - nodes = [self, *list(self.children()), *list(self.parents())] - dot = graph_dot(nodes) - svg = dot_to_svg(dot) - return svg - - -class Graph: - def __init__(self, backend, node2swhid, swhid2node): - self.backend = backend - self.java_graph = backend.entry.get_graph() - self.node2swhid = node2swhid - self.swhid2node = swhid2node - - def stats(self): - return self.backend.stats() - - @property - def path(self): - return self.java_graph.getPath() - - def __len__(self): - return self.java_graph.numNodes() - - def __getitem__(self, node_id): - if isinstance(node_id, int): - self.node2swhid[node_id] # check existence - return GraphNode(self, node_id) - elif isinstance(node_id, str): - node_id = self.swhid2node[node_id] - return GraphNode(self, node_id) - - def __iter__(self): - for swhid, pos in self.backend.swhid2node: - yield self[swhid] - - def iter_prefix(self, prefix): - for swhid, pos in self.backend.swhid2node.iter_prefix(prefix): - yield self[swhid] - - def iter_type(self, swhid_type): - for swhid, pos in self.backend.swhid2node.iter_type(swhid_type): - yield self[swhid] - - -@contextlib.contextmanager -def load(graph_path): - with Backend(graph_path) as backend: - yield Graph(backend, backend.node2swhid, backend.swhid2node) diff --git a/swh/graph/naive_client.py b/swh/graph/naive_client.py index 8191311..9e08d65 100644 --- a/swh/graph/naive_client.py +++ b/swh/graph/naive_client.py @@ -1,369 +1,369 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import inspect import re import statistics from typing import ( Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, ) -from swh.model.identifiers import ExtendedSWHID, ValidationError +from swh.model.swhids import ExtendedSWHID, ValidationError from .client import GraphArgumentException _NODE_TYPES = "ori|snp|rel|rev|dir|cnt" NODES_RE = re.compile(fr"(\*|{_NODE_TYPES})") EDGES_RE = re.compile(fr"(\*|{_NODE_TYPES}):(\*|{_NODE_TYPES})") T = TypeVar("T", bound=Callable) def check_arguments(f: T) -> T: """Decorator for generic argument checking for methods of NaiveClient. Checks ``src`` is a valid and known SWHID, and ``edges`` has the right format.""" signature = inspect.signature(f) @functools.wraps(f) def newf(*args, **kwargs): __tracebackhide__ = True # for pytest try: bound_args = signature.bind(*args, **kwargs) except TypeError as e: # rethrow the exception from here so pytest doesn't flood the terminal # with signature.bind's call stack. raise TypeError(*e.args) from None self = bound_args.arguments["self"] src = bound_args.arguments.get("src") if src: self._check_swhid(src) edges = bound_args.arguments.get("edges") if edges: if edges != "*" and not EDGES_RE.match(edges): raise GraphArgumentException(f"invalid edge restriction: {edges}") return_types = bound_args.arguments.get("return_types") if return_types: if not NODES_RE.match(return_types): raise GraphArgumentException( f"invalid return_types restriction: {return_types}" ) return f(*args, **kwargs) return newf # type: ignore def filter_node_types(node_types: str, nodes: Iterable[str]) -> Iterator[str]: if node_types == "*": yield from nodes else: prefixes = tuple(f"swh:1:{type_}:" for type_ in node_types.split(",")) for node in nodes: if node.startswith(prefixes): yield node class NaiveClient: """An alternative implementation of :class:`swh.graph.backend.Backend`, written in pure-python and meant for simulating it in other components' test cases. It is NOT meant to be efficient in any way; only to be a very simple implementation that provides the same behavior.""" def __init__(self, *, nodes: List[str], edges: List[Tuple[str, str]]): self.graph = Graph(nodes, edges) def _check_swhid(self, swhid): try: ExtendedSWHID.from_string(swhid) except ValidationError as e: raise GraphArgumentException(*e.args) from None if swhid not in self.graph.nodes: raise GraphArgumentException(f"SWHID not found: {swhid}") def stats(self) -> Dict: return { "counts": { "nodes": len(self.graph.nodes), "edges": sum(map(len, self.graph.forward_edges.values())), }, "ratios": { "compression": 1.0, "bits_per_edge": 100.0, "bits_per_node": 100.0, "avg_locality": 0.0, }, "indegree": { "min": min(map(len, self.graph.backward_edges.values())), "max": max(map(len, self.graph.backward_edges.values())), "avg": statistics.mean(map(len, self.graph.backward_edges.values())), }, "outdegree": { "min": min(map(len, self.graph.forward_edges.values())), "max": max(map(len, self.graph.forward_edges.values())), "avg": statistics.mean(map(len, self.graph.forward_edges.values())), }, } @check_arguments def leaves( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, [ node for node in self.graph.get_subgraph(src, edges, direction) if not self.graph.get_filtered_neighbors(node, edges, direction) ], ) @check_arguments def neighbors( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_filtered_neighbors(src, edges, direction) ) @check_arguments def visit_nodes( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0, return_types: str = "*", ) -> Iterator[str]: # TODO: max_edges yield from filter_node_types( return_types, self.graph.get_subgraph(src, edges, direction) ) @check_arguments def visit_edges( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[Tuple[str, str]]: if max_edges == 0: max_edges = None # type: ignore else: max_edges -= 1 yield from list(self.graph.iter_edges_dfs(direction, edges, src))[:max_edges] @check_arguments def visit_paths( self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 ) -> Iterator[List[str]]: # TODO: max_edges for path in self.graph.iter_paths_dfs(direction, edges, src): if path[-1] in self.leaves(src, edges, direction): yield list(path) @check_arguments def walk( self, src: str, dst: str, edges: str = "*", traversal: str = "dfs", direction: str = "forward", limit: Optional[int] = None, ) -> Iterator[str]: # TODO: implement algo="bfs" # TODO: limit match_path: Callable[[str], bool] if ":" in dst: match_path = dst.__eq__ self._check_swhid(dst) else: match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa for path in self.graph.iter_paths_dfs(direction, edges, src): if match_path(path[-1]): if not limit: # 0 or None yield from path elif limit > 0: yield from path[0:limit] else: yield from path[limit:] @check_arguments def random_walk( self, src: str, dst: str, edges: str = "*", direction: str = "forward", limit: Optional[int] = None, ): # TODO: limit yield from self.walk(src, dst, edges, "dfs", direction, limit) @check_arguments def count_leaves( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(list(self.leaves(src, edges, direction))) @check_arguments def count_neighbors( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_filtered_neighbors(src, edges, direction)) @check_arguments def count_visit_nodes( self, src: str, edges: str = "*", direction: str = "forward" ) -> int: return len(self.graph.get_subgraph(src, edges, direction)) class Graph: def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]): self.nodes = nodes self.forward_edges: Dict[str, List[str]] = {} self.backward_edges: Dict[str, List[str]] = {} for node in nodes: self.forward_edges[node] = [] self.backward_edges[node] = [] for (src, dst) in edges: self.forward_edges[src].append(dst) self.backward_edges[dst].append(src) def get_filtered_neighbors( self, src: str, edges_fmt: str, direction: str, ) -> Set[str]: if direction == "forward": edges = self.forward_edges elif direction == "backward": edges = self.backward_edges else: raise GraphArgumentException(f"invalid direction: {direction}") neighbors = edges.get(src, []) if edges_fmt == "*": return set(neighbors) else: filtered_neighbors: Set[str] = set() for edges_fmt_item in edges_fmt.split(","): (src_fmt, dst_fmt) = edges_fmt_item.split(":") if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): continue if dst_fmt == "*": filtered_neighbors.update(neighbors) else: prefix = f"swh:1:{dst_fmt}:" filtered_neighbors.update( n for n in neighbors if n.startswith(prefix) ) return filtered_neighbors def get_subgraph(self, src: str, edges_fmt: str, direction: str) -> Set[str]: seen = set() to_visit = {src} while to_visit: node = to_visit.pop() seen.add(node) neighbors = set(self.get_filtered_neighbors(node, edges_fmt, direction)) new_nodes = neighbors - seen to_visit.update(new_nodes) return seen def iter_paths_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, ...]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): yield path + (node,) def iter_edges_dfs( self, direction: str, edges_fmt: str, src: str ) -> Iterator[Tuple[str, str]]: for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): if len(path) > 0: yield (path[-1], node) class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): self.graph = graph self.direction = direction self.edges_fmt = edges_fmt self.seen: Set[str] = set() self.src = src def more_work(self) -> bool: raise NotImplementedError() def pop(self) -> Tuple[Tuple[str, ...], str]: raise NotImplementedError() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: raise NotImplementedError() def __next__(self) -> Tuple[Tuple[str, ...], str]: # Stores (path, next_node) if not self.more_work(): raise StopIteration() (path, node) = self.pop() new_path = path + (node,) if node not in self.seen: neighbors = self.graph.get_filtered_neighbors( node, self.edges_fmt, self.direction ) # We want to visit the first neighbor first, and to_visit is a stack; # so we need to reversed() the list of neighbors to get it on top # of the stack. for neighbor in reversed(list(neighbors)): self.push(new_path, neighbor) self.seen.add(node) return (path, node) class DfsSubgraphIterator(SubgraphIterator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.to_visit: List[Tuple[Tuple[str, ...], str]] = [((), self.src)] def more_work(self) -> bool: return bool(self.to_visit) def pop(self) -> Tuple[Tuple[str, ...], str]: return self.to_visit.pop() def push(self, new_path: Tuple[str, ...], neighbor: str) -> None: self.to_visit.append((new_path, neighbor)) diff --git a/swh/graph/server/app.py b/swh/graph/server/app.py index 0816a4b..0128f2a 100644 --- a/swh/graph/server/app.py +++ b/swh/graph/server/app.py @@ -1,402 +1,373 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using FIFO as a transport to stream integers between the two languages. """ import asyncio from collections import deque -import json import os from typing import Optional import aiohttp.web from swh.core.api.asynchronous import RPCServerApp from swh.core.config import read as config_read from swh.graph.backend import Backend -from swh.model.exceptions import ValidationError -from swh.model.identifiers import EXTENDED_SWHID_TYPES +from swh.model.swhids import EXTENDED_SWHID_TYPES try: from contextlib import asynccontextmanager except ImportError: # Compatibility with 3.6 backport from async_generator import asynccontextmanager # type: ignore # maximum number of retries for random walks RANDOM_RETRIES = 5 # TODO make this configurable via rpc-serve configuration class GraphServerApp(RPCServerApp): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.on_startup.append(self._start_gateway) self.on_shutdown.append(self._stop_gateway) @staticmethod async def _start_gateway(app): # Equivalent to entering `with app["backend"]:` app["backend"].start_gateway() @staticmethod async def _stop_gateway(app): # Equivalent to exiting `with app["backend"]:` with no error app["backend"].stop_gateway() async def index(request): return aiohttp.web.Response( content_type="text/html", body=""" Software Heritage graph server

You have reached the Software Heritage graph API server.

See its API documentation for more information.

""", ) class GraphView(aiohttp.web.View): """Base class for views working on the graph, with utility functions""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.backend = self.request.app["backend"] - def node_of_swhid(self, swhid): - """Lookup a SWHID in a swhid2node map, failing in an HTTP-nice way if - needed.""" - try: - return self.backend.swhid2node[swhid] - except KeyError: - raise aiohttp.web.HTTPNotFound(text=f"SWHID not found: {swhid}") - except ValidationError: - raise aiohttp.web.HTTPBadRequest(text=f"malformed SWHID: {swhid}") - - def swhid_of_node(self, node): - """Lookup a node in a node2swhid map, failing in an HTTP-nice way if - needed.""" - try: - return self.backend.node2swhid[node] - except KeyError: - raise aiohttp.web.HTTPInternalServerError( - text=f"reverse lookup failed for node id: {node}" - ) - def get_direction(self): """Validate HTTP query parameter `direction`""" s = self.request.query.get("direction", "forward") if s not in ("forward", "backward"): raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}") return s def get_edges(self): """Validate HTTP query parameter `edges`, i.e., edge restrictions""" s = self.request.query.get("edges", "*") if any( [ node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for edge in s.split(":") for node_type in edge.split(",", maxsplit=1) ] ): raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}") return s def get_return_types(self): """Validate HTTP query parameter 'return types', i.e, a set of types which we will filter the query results with""" s = self.request.query.get("return_types", "*") if any( node_type != "*" and node_type not in EXTENDED_SWHID_TYPES for node_type in s.split(",") ): raise aiohttp.web.HTTPBadRequest( text=f"invalid type for filtering res: {s}" ) # if the user puts a star, # then we filter nothing, we don't need the other information if "*" in s: return "*" else: return s def get_traversal(self): """Validate HTTP query parameter `traversal`, i.e., visit order""" s = self.request.query.get("traversal", "dfs") if s not in ("bfs", "dfs"): raise aiohttp.web.HTTPBadRequest(text=f"invalid traversal order: {s}") return s def get_limit(self): """Validate HTTP query parameter `limit`, i.e., number of results""" s = self.request.query.get("limit", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}") def get_max_edges(self): """Validate HTTP query parameter 'max_edges', i.e., the limit of the number of edges that can be visited""" s = self.request.query.get("max_edges", "0") try: return int(s) except ValueError: raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}") + def check_swhid(self, swhid): + """Validate that the given SWHID exists in the graph""" + try: + self.backend.check_swhid(swhid) + except (NameError, ValueError) as e: + raise aiohttp.web.HTTPBadRequest(text=str(e)) + class StreamingGraphView(GraphView): """Base class for views streaming their response line by line.""" content_type = "text/plain" @asynccontextmanager async def response_streamer(self, *args, **kwargs): """Context manager to prepare then close a StreamResponse""" response = aiohttp.web.StreamResponse(*args, **kwargs) response.content_type = self.content_type await response.prepare(self.request) yield response await response.write_eof() async def get(self): await self.prepare_response() async with self.response_streamer() as self.response_stream: self._buf = [] try: await self.stream_response() finally: await self._flush_buffer() return self.response_stream async def prepare_response(self): """This can be overridden with some setup to be run before the response actually starts streaming. """ pass async def stream_response(self): """Override this to perform the response streaming. Implementations of this should await self.stream_line(line) to write each line. """ raise NotImplementedError async def stream_line(self, line): """Write a line in the response stream.""" self._buf.append(line) if len(self._buf) > 100: await self._flush_buffer() async def _flush_buffer(self): await self.response_stream.write("\n".join(self._buf).encode() + b"\n") self._buf = [] class StatsView(GraphView): """View showing some statistics on the graph""" async def get(self): stats = self.backend.stats() return aiohttp.web.Response(body=stats, content_type="application/json") class SimpleTraversalView(StreamingGraphView): """Base class for views of simple traversals""" simple_traversal_type: Optional[str] = None async def prepare_response(self): - src = self.request.match_info["src"] - self.src_node = self.node_of_swhid(src) - + self.src = self.request.match_info["src"] self.edges = self.get_edges() self.direction = self.get_direction() self.max_edges = self.get_max_edges() self.return_types = self.get_return_types() + self.check_swhid(self.src) async def stream_response(self): - async for res_node in self.backend.simple_traversal( + async for res_line in self.backend.traversal( self.simple_traversal_type, self.direction, self.edges, - self.src_node, + self.src, self.max_edges, self.return_types, ): - res_swhid = self.swhid_of_node(res_node) - await self.stream_line(res_swhid) + await self.stream_line(res_line) class LeavesView(SimpleTraversalView): simple_traversal_type = "leaves" class NeighborsView(SimpleTraversalView): simple_traversal_type = "neighbors" class VisitNodesView(SimpleTraversalView): simple_traversal_type = "visit_nodes" +class VisitEdgesView(SimpleTraversalView): + simple_traversal_type = "visit_edges" + + class WalkView(StreamingGraphView): async def prepare_response(self): - src = self.request.match_info["src"] - dst = self.request.match_info["dst"] - self.src_node = self.node_of_swhid(src) - if dst not in EXTENDED_SWHID_TYPES: - self.dst_thing = self.node_of_swhid(dst) - else: - self.dst_thing = dst + self.src = self.request.match_info["src"] + self.dst = self.request.match_info["dst"] self.edges = self.get_edges() self.direction = self.get_direction() self.algo = self.get_traversal() self.limit = self.get_limit() + self.max_edges = self.get_max_edges() self.return_types = self.get_return_types() + self.check_swhid(self.src) + if self.dst not in EXTENDED_SWHID_TYPES: + self.check_swhid(self.dst) + async def get_walk_iterator(self): - return self.backend.walk( - self.direction, self.edges, self.algo, self.src_node, self.dst_thing + return self.backend.traversal( + "walk", + self.direction, + self.edges, + self.algo, + self.src, + self.dst, + self.max_edges, + self.return_types, ) async def stream_response(self): it = self.get_walk_iterator() if self.limit < 0: queue = deque(maxlen=-self.limit) - async for res_node in it: - res_swhid = self.swhid_of_node(res_node) + async for res_swhid in it: queue.append(res_swhid) while queue: await self.stream_line(queue.popleft()) else: count = 0 - async for res_node in it: + async for res_swhid in it: if self.limit == 0 or count < self.limit: - res_swhid = self.swhid_of_node(res_node) await self.stream_line(res_swhid) count += 1 else: break class RandomWalkView(WalkView): def get_walk_iterator(self): - return self.backend.random_walk( + return self.backend.traversal( + "random_walk", self.direction, self.edges, RANDOM_RETRIES, - self.src_node, - self.dst_thing, + self.src, + self.dst, + self.max_edges, self.return_types, ) -class VisitEdgesView(SimpleTraversalView): - async def stream_response(self): - it = self.backend.visit_edges( - self.direction, self.edges, self.src_node, self.max_edges - ) - async for (res_src, res_dst) in it: - res_src_swhid = self.swhid_of_node(res_src) - res_dst_swhid = self.swhid_of_node(res_dst) - await self.stream_line("{} {}".format(res_src_swhid, res_dst_swhid)) - - -class VisitPathsView(SimpleTraversalView): - content_type = "application/x-ndjson" - - async def stream_response(self): - it = self.backend.visit_paths( - self.direction, self.edges, self.src_node, self.max_edges - ) - async for res_path in it: - res_path_swhid = [self.swhid_of_node(n) for n in res_path] - line = json.dumps(res_path_swhid) - await self.stream_line(line) - - class CountView(GraphView): """Base class for counting views.""" count_type: Optional[str] = None async def get(self): - src = self.request.match_info["src"] - self.src_node = self.node_of_swhid(src) + self.src = self.request.match_info["src"] + self.check_swhid(self.src) self.edges = self.get_edges() self.direction = self.get_direction() + self.max_edges = self.get_max_edges() loop = asyncio.get_event_loop() cnt = await loop.run_in_executor( None, self.backend.count, self.count_type, self.direction, self.edges, - self.src_node, + self.src, + self.max_edges, ) return aiohttp.web.Response(body=str(cnt), content_type="application/json") class CountNeighborsView(CountView): count_type = "neighbors" class CountLeavesView(CountView): count_type = "leaves" class CountVisitNodesView(CountView): count_type = "visit_nodes" def make_app(config=None, backend=None, **kwargs): if (config is None) == (backend is None): raise ValueError("make_app() expects exactly one of 'config' or 'backend'") if backend is None: backend = Backend(graph_path=config["graph"]["path"], config=config["graph"]) app = GraphServerApp(**kwargs) app.add_routes( [ aiohttp.web.get("/", index), aiohttp.web.get("/graph", index), aiohttp.web.view("/graph/stats", StatsView), aiohttp.web.view("/graph/leaves/{src}", LeavesView), aiohttp.web.view("/graph/neighbors/{src}", NeighborsView), aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView), aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView), - aiohttp.web.view("/graph/visit/paths/{src}", VisitPathsView), # temporarily disabled in wait of a proper fix for T1969 # aiohttp.web.view("/graph/walk/{src}/{dst}", WalkView) aiohttp.web.view("/graph/randomwalk/{src}/{dst}", RandomWalkView), aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView), aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView), aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView), ] ) app["backend"] = backend return app def make_app_from_configfile(): """Load configuration and then build application to run """ config_file = os.environ.get("SWH_CONFIG_FILENAME") config = config_read(config_file) return make_app(config=config) diff --git a/swh/graph/swhid.py b/swh/graph/swhid.py index aadd0d0..90db73f 100644 --- a/swh/graph/swhid.py +++ b/swh/graph/swhid.py @@ -1,419 +1,419 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from collections.abc import MutableMapping from enum import Enum import mmap from mmap import MAP_SHARED, PROT_READ, PROT_WRITE import os import struct from typing import BinaryIO, Iterator, Tuple from swh.model.hashutil import hash_to_hex -from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID +from swh.model.swhids import ExtendedObjectType, ExtendedSWHID SWHID_BIN_FMT = "BB20s" # 2 unsigned chars + 20 bytes INT_BIN_FMT = ">q" # big endian, 8-byte integer SWHID_BIN_SIZE = 22 # in bytes INT_BIN_SIZE = 8 # in bytes class SwhidType(Enum): """types of existing SWHIDs, used to serialize ExtendedSWHID type as a (char) integer Note that the order does matter also for driving the binary search in SWHID-indexed maps. Integer values also matter, for compatibility with the Java layer. """ content = 0 directory = 1 origin = 2 release = 3 revision = 4 snapshot = 5 @classmethod def from_extended_object_type(cls, object_type: ExtendedObjectType) -> SwhidType: return cls[object_type.name.lower()] def to_extended_object_type(self) -> ExtendedObjectType: return ExtendedObjectType[SwhidType(self).name.upper()] def str_to_bytes(swhid_str: str) -> bytes: """Convert a SWHID to a byte sequence The binary format used to represent SWHIDs as 22-byte long byte sequences as follows: - 1 byte for the namespace version represented as a C `unsigned char` - 1 byte for the object type, as the int value of :class:`SwhidType` enums, represented as a C `unsigned char` - 20 bytes for the SHA1 digest as a byte sequence Args: swhid: persistent identifier Returns: bytes: byte sequence representation of swhid """ swhid = ExtendedSWHID.from_string(swhid_str) return struct.pack( SWHID_BIN_FMT, swhid.scheme_version, SwhidType.from_extended_object_type(swhid.object_type).value, swhid.object_id, ) def bytes_to_str(bytes: bytes) -> str: """Inverse function of :func:`str_to_bytes` See :func:`str_to_bytes` for a description of the binary SWHID format. Args: bytes: byte sequence representation of swhid Returns: swhid: persistent identifier """ (version, type, bin_digest) = struct.unpack(SWHID_BIN_FMT, bytes) # The following is equivalent to: # return str(ExtendedSWHID( # object_type=SwhidType(type).to_extended_object_type(), object_id=bin_digest # ) # but more efficient, because ExtendedSWHID.__init__ is extremely slow. object_type = ExtendedObjectType[SwhidType(type).name.upper()] return f"swh:1:{object_type.value}:{hash_to_hex(bin_digest)}" class _OnDiskMap: """mmap-ed on-disk sequence of fixed size records""" def __init__( self, record_size: int, fname: str, mode: str = "rb", length: int = None ): """open an existing on-disk map Args: record_size: size of each record in bytes fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize writable maps at creation time. Must be given when mode is 'wb' and the map doesn't exist on disk; ignored otherwise """ os_modes = {"rb": os.O_RDONLY, "wb": os.O_RDWR | os.O_CREAT, "rb+": os.O_RDWR} if mode not in os_modes: raise ValueError("invalid file open mode: " + mode) new_map = mode == "wb" writable_map = mode in ["wb", "rb+"] self.record_size = record_size self.fd = os.open(fname, os_modes[mode]) if new_map: if length is None: raise ValueError("missing length when creating new map") os.truncate(self.fd, length * self.record_size) self.size = os.path.getsize(fname) (self.length, remainder) = divmod(self.size, record_size) if remainder: raise ValueError( "map size {} is not a multiple of the record size {}".format( self.size, record_size ) ) self.mm = mmap.mmap( self.fd, self.size, prot=(PROT_READ | PROT_WRITE if writable_map else PROT_READ), flags=MAP_SHARED, ) def close(self) -> None: """close the map shuts down both the mmap and the underlying file descriptor """ if not self.mm.closed: self.mm.close() os.close(self.fd) def __len__(self) -> int: return self.length def __delitem__(self, pos: int) -> None: raise NotImplementedError("cannot delete records from fixed-size map") class SwhidToNodeMap(_OnDiskMap, MutableMapping): """memory mapped map from :ref:`SWHIDs ` to a continuous range 0..N of (8-byte long) integers This is the converse mapping of :class:`NodeToSwhidMap`. The on-disk serialization format is a sequence of fixed length (30 bytes) records with the following fields: - SWHID (22 bytes): binary SWHID representation as per :func:`str_to_bytes` - long (8 bytes): big endian long integer The records are sorted lexicographically by SWHID type and checksum, where type is the integer value of :class:`SwhidType`. SWHID lookup in the map is performed via binary search. Hence a huge map with, say, 11 B entries, will require ~30 disk seeks. Note that, due to fixed size + ordering, it is not possible to create these maps by random writing. Hence, __setitem__ can be used only to *update* the value associated to an existing key, rather than to add a missing item. To create an entire map from scratch, you should do so *sequentially*, using static method :meth:`write_record` (or, at your own risk, by hand via the mmap :attr:`mm`). """ # record binary format: SWHID + a big endian 8-byte big endian integer RECORD_BIN_FMT = ">" + SWHID_BIN_FMT + "q" RECORD_SIZE = SWHID_BIN_SIZE + INT_BIN_SIZE def __init__(self, fname: str, mode: str = "rb", length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') length: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> Tuple[bytes, bytes]: """seek and return the (binary) record at a given (logical) position see :func:`_get_record` for an equivalent function with additional deserialization Args: pos: 0-based record number Returns: a pair `(swhid, int)`, where swhid and int are bytes """ rec_pos = pos * self.RECORD_SIZE int_pos = rec_pos + SWHID_BIN_SIZE return (self.mm[rec_pos:int_pos], self.mm[int_pos : int_pos + INT_BIN_SIZE]) def _get_record(self, pos: int) -> Tuple[str, int]: """seek and return the record at a given (logical) position moral equivalent of :func:`_get_bin_record`, with additional deserialization to non-bytes types Args: pos: 0-based record number Returns: a pair `(swhid, int)`, where swhid is a string-based SWHID and int the corresponding integer identifier """ (swhid_bytes, int_bytes) = self._get_bin_record(pos) return (bytes_to_str(swhid_bytes), struct.unpack(INT_BIN_FMT, int_bytes)[0]) @classmethod def write_record(cls, f: BinaryIO, swhid: str, int: int) -> None: """write a logical record to a file-like object Args: f: file-like object to write the record to swhid: textual SWHID int: SWHID integer identifier """ f.write(str_to_bytes(swhid)) f.write(struct.pack(INT_BIN_FMT, int)) def _bisect_pos(self, swhid_str: str) -> int: """bisect the position of the given identifier. If the identifier is not found, the position of the swhid immediately after is returned. Args: swhid_str: the swhid as a string Returns: the logical record of the bisected position in the map """ if not isinstance(swhid_str, str): raise TypeError("SWHID must be a str, not {}".format(type(swhid_str))) try: target = str_to_bytes(swhid_str) # desired SWHID as bytes except ValueError: raise ValueError('invalid SWHID: "{}"'.format(swhid_str)) lo = 0 hi = self.length - 1 while lo < hi: mid = (lo + hi) // 2 (swhid, _value) = self._get_bin_record(mid) if swhid < target: lo = mid + 1 else: hi = mid return lo def _find(self, swhid_str: str) -> Tuple[int, int]: """lookup the integer identifier of a swhid and its position Args: swhid_str: the swhid as a string Returns: a pair `(swhid, pos)` with swhid integer identifier and its logical record position in the map """ pos = self._bisect_pos(swhid_str) swhid_found, value = self._get_record(pos) if swhid_found == swhid_str: return (value, pos) raise KeyError(swhid_str) def __getitem__(self, swhid_str: str) -> int: """lookup the integer identifier of a SWHID Args: swhid: the SWHID as a string Returns: the integer identifier of swhid """ return self._find(swhid_str)[0] # return element, ignore position def __setitem__(self, swhid_str: str, int: str) -> None: (_swhid, pos) = self._find(swhid_str) # might raise KeyError and that's OK rec_pos = pos * self.RECORD_SIZE int_pos = rec_pos + SWHID_BIN_SIZE self.mm[rec_pos:int_pos] = str_to_bytes(swhid_str) self.mm[int_pos : int_pos + INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int) def __iter__(self) -> Iterator[Tuple[str, int]]: for pos in range(self.length): yield self._get_record(pos) def iter_prefix(self, prefix: str): swh, n, t, sha = prefix.split(":") sha = sha.ljust(40, "0") start_swhid = ":".join([swh, n, t, sha]) start = self._bisect_pos(start_swhid) for pos in range(start, self.length): swhid, value = self._get_record(pos) if not swhid.startswith(prefix): break yield swhid, value def iter_type(self, swhid_type: str) -> Iterator[Tuple[str, int]]: prefix = "swh:1:{}:".format(swhid_type) yield from self.iter_prefix(prefix) class NodeToSwhidMap(_OnDiskMap, MutableMapping): """memory mapped map from a continuous range of 0..N (8-byte long) integers to :ref:`SWHIDs ` This is the converse mapping of :class:`SwhidToNodeMap`. The on-disk serialization format is a sequence of fixed length records (22 bytes), each being the binary representation of a SWHID as per :func:`str_to_bytes`. The records are sorted by long integer, so that integer lookup is possible via fixed-offset seek. """ RECORD_BIN_FMT = SWHID_BIN_FMT RECORD_SIZE = SWHID_BIN_SIZE def __init__(self, fname: str, mode: str = "rb", length: int = None): """open an existing on-disk map Args: fname: path to the on-disk map mode: file open mode, usually either 'rb' for read-only maps, 'wb' for creating new maps, or 'rb+' for updating existing ones (default: 'rb') size: map size in number of logical records; used to initialize read-write maps at creation time. Must be given when mode is 'wb'; ignored otherwise length: passed to :class:`_OnDiskMap` """ super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos: int) -> bytes: """seek and return the (binary) SWHID at a given (logical) position Args: pos: 0-based record number Returns: SWHID as a byte sequence """ rec_pos = pos * self.RECORD_SIZE return self.mm[rec_pos : rec_pos + self.RECORD_SIZE] @classmethod def write_record(cls, f: BinaryIO, swhid: str) -> None: """write a SWHID to a file-like object Args: f: file-like object to write the record to swhid: textual SWHID """ f.write(str_to_bytes(swhid)) def __getitem__(self, pos: int) -> str: orig_pos = pos if pos < 0: pos = len(self) + pos if not (0 <= pos < len(self)): raise IndexError(orig_pos) return bytes_to_str(self._get_bin_record(pos)) def __setitem__(self, pos: int, swhid: str) -> None: rec_pos = pos * self.RECORD_SIZE self.mm[rec_pos : rec_pos + self.RECORD_SIZE] = str_to_bytes(swhid) def __iter__(self) -> Iterator[Tuple[int, str]]: for pos in range(self.length): yield (pos, self[pos]) diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py index 5a7bb92..fed877b 100644 --- a/swh/graph/tests/conftest.py +++ b/swh/graph/tests/conftest.py @@ -1,68 +1,59 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import csv import multiprocessing from pathlib import Path from aiohttp.test_utils import TestClient, TestServer, loop_context import pytest from swh.graph.client import RemoteGraphClient from swh.graph.naive_client import NaiveClient SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0] TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/output/example" class GraphServerProcess(multiprocessing.Process): def __init__(self, q, *args, **kwargs): self.q = q super().__init__(*args, **kwargs) def run(self): # Lazy import to allow debian packaging from swh.graph.backend import Backend from swh.graph.server.app import make_app try: backend = Backend(graph_path=str(TEST_GRAPH_PATH)) with loop_context() as loop: app = make_app(backend=backend, debug=True) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url("/graph/") self.q.put(url) loop.run_forever() except Exception as e: self.q.put(e) @pytest.fixture(scope="module", params=["remote", "naive"]) def graph_client(request): if request.param == "remote": queue = multiprocessing.Queue() server = GraphServerProcess(queue) server.start() res = queue.get() if isinstance(res, Exception): raise res yield RemoteGraphClient(str(res)) server.terminate() else: with open(SWH_GRAPH_TESTS_ROOT / "dataset/example.nodes.csv") as fd: nodes = [node for (node,) in csv.reader(fd, delimiter=" ")] with open(SWH_GRAPH_TESTS_ROOT / "dataset/example.edges.csv") as fd: edges = list(csv.reader(fd, delimiter=" ")) yield NaiveClient(nodes=nodes, edges=edges) - - -@pytest.fixture(scope="module") -def graph(): - # Lazy import to allow debian packaging - from swh.graph.graph import load as graph_load - - with graph_load(str(TEST_GRAPH_PATH)) as g: - yield g diff --git a/swh/graph/tests/dataset/output/example-transposed.graph b/swh/graph/tests/dataset/output/example-transposed.graph index 5460ea4..ad5756e 100644 --- a/swh/graph/tests/dataset/output/example-transposed.graph +++ b/swh/graph/tests/dataset/output/example-transposed.graph @@ -1 +1 @@ -[):+u6Mjk5ֺ \ No newline at end of file +z.hѮIt{ \ No newline at end of file diff --git a/swh/graph/tests/dataset/output/example-transposed.obl b/swh/graph/tests/dataset/output/example-transposed.obl index 1291af6..54f0ac8 100644 Binary files a/swh/graph/tests/dataset/output/example-transposed.obl and b/swh/graph/tests/dataset/output/example-transposed.obl differ diff --git a/swh/graph/tests/dataset/output/example-transposed.offsets b/swh/graph/tests/dataset/output/example-transposed.offsets index 0b2e742..92c2947 100644 --- a/swh/graph/tests/dataset/output/example-transposed.offsets +++ b/swh/graph/tests/dataset/output/example-transposed.offsets @@ -1 +1,2 @@ - (pBERB \ No newline at end of file + + RqG4PTP( \ No newline at end of file diff --git a/swh/graph/tests/dataset/output/example-transposed.properties b/swh/graph/tests/dataset/output/example-transposed.properties index 1f6c525..512ce9d 100644 --- a/swh/graph/tests/dataset/output/example-transposed.properties +++ b/swh/graph/tests/dataset/output/example-transposed.properties @@ -1,35 +1,35 @@ #BVGraph properties -#Fri Apr 02 13:56:41 UTC 2021 -bitsforreferences=27 +#Sat Dec 04 01:37:28 CET 2021 +bitsforreferences=31 avgbitsforintervals=0.714 graphclass=it.unimi.dsi.big.webgraph.BVGraph -avgdist=0.381 -successoravggap=6.739 -residualexpstats=6,5,2,1,2,1 +avgdist=0.571 +successoravggap=6.478 +residualexpstats=6,6,2,2,2 arcs=23 minintervallength=4 bitsforoutdegrees=61 -residualavgloggap=2.2888731039272048 +residualavgloggap=2.1534522798004265 avgbitsforoutdegrees=2.905 -bitsforresiduals=83 -successoravgloggap=2.2822834512468524 +bitsforresiduals=85 +successoravgloggap=2.3226776741991215 maxrefcount=3 -successorexpstats=7,6,5,3,1,1 -residualarcs=17 -avgbitsforresiduals=3.952 -avgbitsforblocks=0.286 +successorexpstats=7,6,4,3,3 +residualarcs=18 +avgbitsforresiduals=4.048 +avgbitsforblocks=0.238 windowsize=7 -residualavggap=7.971 -copiedarcs=6 -avgbitsforreferences=1.286 +residualavggap=5.667 +copiedarcs=5 +avgbitsforreferences=1.476 version=0 -compratio=1.515 -bitsperlink=8.348 +compratio=1.554 +bitsperlink=8.565 compressionflags= nodes=21 -avgref=0.333 +avgref=0.238 zetak=3 bitsforintervals=15 intervalisedarcs=0 -bitspernode=9.143 -bitsforblocks=6 +bitspernode=9.381 +bitsforblocks=5 diff --git a/swh/graph/tests/dataset/output/example.graph b/swh/graph/tests/dataset/output/example.graph index e13c173..621b9b7 100644 --- a/swh/graph/tests/dataset/output/example.graph +++ b/swh/graph/tests/dataset/output/example.graph @@ -1 +1 @@ -}]/I#zWu.ޥ` \ No newline at end of file +'t}UOGϹ]ް].dP}R \ No newline at end of file diff --git a/swh/graph/tests/dataset/output/example.mph b/swh/graph/tests/dataset/output/example.mph index 7838165..c6f9e19 100644 Binary files a/swh/graph/tests/dataset/output/example.mph and b/swh/graph/tests/dataset/output/example.mph differ diff --git a/swh/graph/tests/dataset/output/example.node2swhid.bin b/swh/graph/tests/dataset/output/example.node2swhid.bin index 63cecba..9cc50b2 100644 Binary files a/swh/graph/tests/dataset/output/example.node2swhid.bin and b/swh/graph/tests/dataset/output/example.node2swhid.bin differ diff --git a/swh/graph/tests/dataset/output/example.node2type.map b/swh/graph/tests/dataset/output/example.node2type.map index 0a0a609..6b91c37 100644 Binary files a/swh/graph/tests/dataset/output/example.node2type.map and b/swh/graph/tests/dataset/output/example.node2type.map differ diff --git a/swh/graph/tests/dataset/output/example.obl b/swh/graph/tests/dataset/output/example.obl index 456c6ef..1b4fd2e 100644 Binary files a/swh/graph/tests/dataset/output/example.obl and b/swh/graph/tests/dataset/output/example.obl differ diff --git a/swh/graph/tests/dataset/output/example.offsets b/swh/graph/tests/dataset/output/example.offsets index f7d2333..407e1a6 100644 --- a/swh/graph/tests/dataset/output/example.offsets +++ b/swh/graph/tests/dataset/output/example.offsets @@ -1,2 +1 @@ -(PHPԒ -P) \ No newline at end of file +BU!B diff --git a/swh/graph/tests/dataset/output/example.order b/swh/graph/tests/dataset/output/example.order index 5e99fea..2cb5540 100644 Binary files a/swh/graph/tests/dataset/output/example.order and b/swh/graph/tests/dataset/output/example.order differ diff --git a/swh/graph/tests/dataset/output/example.properties b/swh/graph/tests/dataset/output/example.properties index 5b48508..cb6975a 100644 --- a/swh/graph/tests/dataset/output/example.properties +++ b/swh/graph/tests/dataset/output/example.properties @@ -1,35 +1,35 @@ #BVGraph properties -#Fri Apr 02 13:56:08 UTC 2021 +#Sat Dec 04 01:37:26 CET 2021 bitsforreferences=14 avgbitsforintervals=0.667 graphclass=it.unimi.dsi.big.webgraph.BVGraph avgdist=0 -successoravggap=7.652 -residualexpstats=7,5,5,3,2,1 +successoravggap=7.391 +residualexpstats=7,7,3,3,2,1 arcs=23 minintervallength=4 bitsforoutdegrees=51 -residualavgloggap=2.40434236090153 +residualavgloggap=2.32668281341601 avgbitsforoutdegrees=2.429 -bitsforresiduals=108 -successoravgloggap=2.40434236090153 +bitsforresiduals=111 +successoravgloggap=2.32668281341601 maxrefcount=3 -successorexpstats=7,5,5,3,2,1 +successorexpstats=7,7,3,3,2,1 residualarcs=23 -avgbitsforresiduals=5.143 +avgbitsforresiduals=5.286 avgbitsforblocks=0 windowsize=7 -residualavggap=7.652 +residualavggap=7.391 copiedarcs=0 avgbitsforreferences=0.667 version=0 -compratio=1.475 -bitsperlink=8.13 +compratio=1.499 +bitsperlink=8.261 compressionflags= nodes=21 avgref=0 zetak=3 bitsforintervals=14 intervalisedarcs=0 -bitspernode=8.905 +bitspernode=9.048 bitsforblocks=0 diff --git a/swh/graph/tests/dataset/output/example.stats b/swh/graph/tests/dataset/output/example.stats index 8b1eb1c..a58d3e2 100644 --- a/swh/graph/tests/dataset/output/example.stats +++ b/swh/graph/tests/dataset/output/example.stats @@ -1,20 +1,20 @@ nodes=21 arcs=23 loops=0 -successoravggap=5.765 -avglocality=3.826 +successoravggap=7.765 +avglocality=3.783 minoutdegree=0 maxoutdegree=3 -minoutdegreenode=7 -maxoutdegreenode=4 +minoutdegreenode=1 +maxoutdegreenode=0 dangling=7 terminal=7 percdangling=33.333333333333336 avgoutdegree=1.0952380952380953 -successorlogdeltastats=7,9,3,3,1 -successoravglogdelta=1.020 +successorlogdeltastats=11,7,1,3,1 +successoravglogdelta=0.911 minindegree=0 maxindegree=3 -minindegreenode=19 -maxindegreenode=1 +minindegreenode=17 +maxindegreenode=18 avgindegree=1.0952380952380953 diff --git a/swh/graph/tests/dataset/output/example.swhid2node.bin b/swh/graph/tests/dataset/output/example.swhid2node.bin deleted file mode 100644 index b8df2fa..0000000 Binary files a/swh/graph/tests/dataset/output/example.swhid2node.bin and /dev/null differ diff --git a/swh/graph/tests/test_api_client.py b/swh/graph/tests/test_api_client.py index 90f9a0a..46e0227 100644 --- a/swh/graph/tests/test_api_client.py +++ b/swh/graph/tests/test_api_client.py @@ -1,375 +1,379 @@ import pytest from pytest import raises from swh.core.api import RemoteException from swh.graph.client import GraphArgumentException def test_stats(graph_client): stats = graph_client.stats() assert set(stats.keys()) == {"counts", "ratios", "indegree", "outdegree"} assert set(stats["counts"].keys()) == {"nodes", "edges"} assert set(stats["ratios"].keys()) == { "compression", "bits_per_node", "bits_per_edge", "avg_locality", } assert set(stats["indegree"].keys()) == {"min", "max", "avg"} assert set(stats["outdegree"].keys()) == {"min", "max", "avg"} assert stats["counts"]["nodes"] == 21 assert stats["counts"]["edges"] == 23 assert isinstance(stats["ratios"]["compression"], float) assert isinstance(stats["ratios"]["bits_per_node"], float) assert isinstance(stats["ratios"]["bits_per_edge"], float) assert isinstance(stats["ratios"]["avg_locality"], float) assert stats["indegree"]["min"] == 0 assert stats["indegree"]["max"] == 3 assert isinstance(stats["indegree"]["avg"], float) assert stats["outdegree"]["min"] == 0 assert stats["outdegree"]["max"] == 3 assert isinstance(stats["outdegree"]["avg"], float) def test_leaves(graph_client): actual = list( graph_client.leaves("swh:1:ori:0000000000000000000000000000000000000021") ) expected = [ "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", "swh:1:cnt:0000000000000000000000000000000000000007", ] assert set(actual) == set(expected) def test_neighbors(graph_client): actual = list( graph_client.neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) ) expected = [ "swh:1:snp:0000000000000000000000000000000000000020", "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000013", ] assert set(actual) == set(expected) def test_visit_nodes(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ] assert set(actual) == set(expected) def test_visit_nodes_filtered(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="dir", ) ) expected = [ "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ] assert set(actual) == set(expected) def test_visit_nodes_filtered_star(graph_client): actual = list( graph_client.visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", return_types="*", ) ) expected = [ "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", "swh:1:cnt:0000000000000000000000000000000000000005", ] assert set(actual) == set(expected) def test_visit_edges(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] assert set(actual) == set(expected) def test_visit_edges_limited(graph_client): actual = list( graph_client.visit_edges( "swh:1:rel:0000000000000000000000000000000000000010", max_edges=4, edges="rel:rev,rev:rev,rev:dir", ) ) expected = [ ( "swh:1:rel:0000000000000000000000000000000000000010", "swh:1:rev:0000000000000000000000000000000000000009", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ] # As there are four valid answers (up to reordering), we cannot check for # equality. Instead, we check the client returned all edges but one. assert set(actual).issubset(set(expected)) assert len(actual) == 3 def test_visit_edges_diamond_pattern(graph_client): actual = list( graph_client.visit_edges( "swh:1:rev:0000000000000000000000000000000000000009", edges="*", ) ) expected = [ ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:rev:0000000000000000000000000000000000000003", ), ( "swh:1:rev:0000000000000000000000000000000000000009", "swh:1:dir:0000000000000000000000000000000000000008", ), ( "swh:1:rev:0000000000000000000000000000000000000003", "swh:1:dir:0000000000000000000000000000000000000002", ), ( "swh:1:dir:0000000000000000000000000000000000000002", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000001", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:cnt:0000000000000000000000000000000000000007", ), ( "swh:1:dir:0000000000000000000000000000000000000008", "swh:1:dir:0000000000000000000000000000000000000006", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000004", ), ( "swh:1:dir:0000000000000000000000000000000000000006", "swh:1:cnt:0000000000000000000000000000000000000005", ), ] assert set(actual) == set(expected) -def test_visit_paths(graph_client): - actual = list( - graph_client.visit_paths( - "swh:1:snp:0000000000000000000000000000000000000020", edges="snp:*,rev:*" - ) - ) - actual = [tuple(path) for path in actual] - expected = [ - ( - "swh:1:snp:0000000000000000000000000000000000000020", - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:rev:0000000000000000000000000000000000000003", - "swh:1:dir:0000000000000000000000000000000000000002", - ), - ( - "swh:1:snp:0000000000000000000000000000000000000020", - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:dir:0000000000000000000000000000000000000008", - ), - ( - "swh:1:snp:0000000000000000000000000000000000000020", - "swh:1:rel:0000000000000000000000000000000000000010", - ), - ] - assert set(actual) == set(expected) - - @pytest.mark.skip(reason="currently disabled due to T1969") def test_walk(graph_client): args = ("swh:1:dir:0000000000000000000000000000000000000016", "rel") kwargs = { "edges": "dir:dir,dir:rev,rev:*", "direction": "backward", "traversal": "bfs", } actual = list(graph_client.walk(*args, **kwargs)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", "swh:1:rev:0000000000000000000000000000000000000018", "swh:1:rel:0000000000000000000000000000000000000019", ] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.walk(*args, **kwargs2)) expected = ["swh:1:rel:0000000000000000000000000000000000000019"] assert set(actual) == set(expected) kwargs2 = kwargs.copy() kwargs2["limit"] = 2 actual = list(graph_client.walk(*args, **kwargs2)) expected = [ "swh:1:dir:0000000000000000000000000000000000000016", "swh:1:dir:0000000000000000000000000000000000000017", ] assert set(actual) == set(expected) -def test_random_walk(graph_client): +def test_random_walk_dst_is_type(graph_client): """as the walk is random, we test a visit from a cnt node to the only origin in the dataset, and only check the final node of the path (i.e., the origin) """ args = ("swh:1:cnt:0000000000000000000000000000000000000001", "ori") kwargs = {"direction": "backward"} expected_root = "swh:1:ori:0000000000000000000000000000000000000021" actual = list(graph_client.random_walk(*args, **kwargs)) assert len(actual) > 1 # no origin directly links to a content assert actual[0] == args[0] assert actual[-1] == expected_root kwargs2 = kwargs.copy() kwargs2["limit"] = -1 actual = list(graph_client.random_walk(*args, **kwargs2)) assert actual == [expected_root] kwargs2["limit"] = -2 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 2 assert actual[-1] == expected_root kwargs2["limit"] = 3 actual = list(graph_client.random_walk(*args, **kwargs2)) assert len(actual) == 3 +def test_random_walk_dst_is_node(graph_client): + """Same as test_random_walk_dst_is_type, but we target the specific origin + node instead of a type + """ + args = ( + "swh:1:cnt:0000000000000000000000000000000000000001", + "swh:1:ori:0000000000000000000000000000000000000021", + ) + kwargs = {"direction": "backward"} + expected_root = "swh:1:ori:0000000000000000000000000000000000000021" + + actual = list(graph_client.random_walk(*args, **kwargs)) + assert len(actual) > 1 # no origin directly links to a content + assert actual[0] == args[0] + assert actual[-1] == expected_root + + kwargs2 = kwargs.copy() + kwargs2["limit"] = -1 + actual = list(graph_client.random_walk(*args, **kwargs2)) + assert actual == [expected_root] + + kwargs2["limit"] = -2 + actual = list(graph_client.random_walk(*args, **kwargs2)) + assert len(actual) == 2 + assert actual[-1] == expected_root + + kwargs2["limit"] = 3 + actual = list(graph_client.random_walk(*args, **kwargs2)) + assert len(actual) == 3 + + def test_count(graph_client): actual = graph_client.count_leaves( "swh:1:ori:0000000000000000000000000000000000000021" ) assert actual == 4 actual = graph_client.count_visit_nodes( "swh:1:rel:0000000000000000000000000000000000000010", edges="rel:rev,rev:rev" ) assert actual == 3 actual = graph_client.count_neighbors( "swh:1:rev:0000000000000000000000000000000000000009", direction="backward" ) assert actual == 3 def test_param_validation(graph_client): with raises(GraphArgumentException) as exc_info: # SWHID not found list(graph_client.leaves("swh:1:ori:fff0000000000000000000000000000000000021")) if exc_info.value.response: assert exc_info.value.response.status_code == 404 with raises(GraphArgumentException) as exc_info: # malformed SWHID list( graph_client.neighbors("swh:1:ori:fff000000zzzzzz0000000000000000000000021") ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed edge specificaiton list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:notanodetype,dir:rev,rev:*", direction="backward", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 with raises(GraphArgumentException) as exc_info: # malformed direction list( graph_client.visit_nodes( "swh:1:dir:0000000000000000000000000000000000000016", edges="dir:dir,dir:rev,rev:*", direction="notadirection", ) ) if exc_info.value.response: assert exc_info.value.response.status_code == 400 @pytest.mark.skip(reason="currently disabled due to T1969") def test_param_validation_walk(graph_client): """test validation of walk-specific parameters only""" with raises(RemoteException) as exc_info: # malformed traversal order list( graph_client.walk( "swh:1:dir:0000000000000000000000000000000000000016", "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="notatraversalorder", ) ) assert exc_info.value.response.status_code == 400 diff --git a/swh/graph/tests/test_graph.py b/swh/graph/tests/test_graph.py deleted file mode 100644 index c752580..0000000 --- a/swh/graph/tests/test_graph.py +++ /dev/null @@ -1,166 +0,0 @@ -import pytest - - -def test_graph(graph): - assert len(graph) == 21 - - obj = "swh:1:dir:0000000000000000000000000000000000000008" - node = graph[obj] - - assert str(node) == obj - assert len(node.children()) == 3 - assert len(node.parents()) == 2 - - actual = {p.swhid for p in node.children()} - expected = { - "swh:1:cnt:0000000000000000000000000000000000000001", - "swh:1:dir:0000000000000000000000000000000000000006", - "swh:1:cnt:0000000000000000000000000000000000000007", - } - assert expected == actual - - actual = {p.swhid for p in node.parents()} - expected = { - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:dir:0000000000000000000000000000000000000012", - } - assert expected == actual - - -def test_invalid_swhid(graph): - with pytest.raises(IndexError): - graph[1337] - - with pytest.raises(IndexError): - graph[len(graph) + 1] - - with pytest.raises(KeyError): - graph["swh:1:dir:0000000000000000000000000000000420000012"] - - -def test_leaves(graph): - actual = list(graph["swh:1:ori:0000000000000000000000000000000000000021"].leaves()) - actual = [p.swhid for p in actual] - expected = [ - "swh:1:cnt:0000000000000000000000000000000000000001", - "swh:1:cnt:0000000000000000000000000000000000000004", - "swh:1:cnt:0000000000000000000000000000000000000005", - "swh:1:cnt:0000000000000000000000000000000000000007", - ] - assert set(actual) == set(expected) - - -def test_visit_nodes(graph): - actual = list( - graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_nodes( - edges="rel:rev,rev:rev" - ) - ) - actual = [p.swhid for p in actual] - expected = [ - "swh:1:rel:0000000000000000000000000000000000000010", - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:rev:0000000000000000000000000000000000000003", - ] - assert set(actual) == set(expected) - - -def test_visit_edges(graph): - actual = list( - graph["swh:1:rel:0000000000000000000000000000000000000010"].visit_edges( - edges="rel:rev,rev:rev,rev:dir" - ) - ) - actual = [(src.swhid, dst.swhid) for src, dst in actual] - expected = [ - ( - "swh:1:rel:0000000000000000000000000000000000000010", - "swh:1:rev:0000000000000000000000000000000000000009", - ), - ( - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:rev:0000000000000000000000000000000000000003", - ), - ( - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:dir:0000000000000000000000000000000000000008", - ), - ( - "swh:1:rev:0000000000000000000000000000000000000003", - "swh:1:dir:0000000000000000000000000000000000000002", - ), - ] - assert set(actual) == set(expected) - - -def test_visit_paths(graph): - actual = list( - graph["swh:1:snp:0000000000000000000000000000000000000020"].visit_paths( - edges="snp:*,rev:*" - ) - ) - actual = [tuple(n.swhid for n in path) for path in actual] - expected = [ - ( - "swh:1:snp:0000000000000000000000000000000000000020", - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:rev:0000000000000000000000000000000000000003", - "swh:1:dir:0000000000000000000000000000000000000002", - ), - ( - "swh:1:snp:0000000000000000000000000000000000000020", - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:dir:0000000000000000000000000000000000000008", - ), - ( - "swh:1:snp:0000000000000000000000000000000000000020", - "swh:1:rel:0000000000000000000000000000000000000010", - ), - ] - assert set(actual) == set(expected) - - -def test_walk(graph): - actual = list( - graph["swh:1:dir:0000000000000000000000000000000000000016"].walk( - "rel", edges="dir:dir,dir:rev,rev:*", direction="backward", traversal="bfs" - ) - ) - actual = [p.swhid for p in actual] - expected = [ - "swh:1:dir:0000000000000000000000000000000000000016", - "swh:1:dir:0000000000000000000000000000000000000017", - "swh:1:rev:0000000000000000000000000000000000000018", - "swh:1:rel:0000000000000000000000000000000000000019", - ] - assert set(actual) == set(expected) - - -def test_count(graph): - assert ( - graph["swh:1:ori:0000000000000000000000000000000000000021"].count_leaves() == 4 - ) - assert ( - graph["swh:1:rel:0000000000000000000000000000000000000010"].count_visit_nodes( - edges="rel:rev,rev:rev" - ) - == 3 - ) - assert ( - graph["swh:1:rev:0000000000000000000000000000000000000009"].count_neighbors( - direction="backward" - ) - == 3 - ) - - -def test_iter_type(graph): - rev_list = list(graph.iter_type("rev")) - actual = [n.swhid for n in rev_list] - expected = [ - "swh:1:rev:0000000000000000000000000000000000000003", - "swh:1:rev:0000000000000000000000000000000000000009", - "swh:1:rev:0000000000000000000000000000000000000013", - "swh:1:rev:0000000000000000000000000000000000000018", - ] - assert expected == actual diff --git a/swh/graph/tests/test_swhid.py b/swh/graph/tests/test_swhid.py index 722e6b1..6053215 100644 --- a/swh/graph/tests/test_swhid.py +++ b/swh/graph/tests/test_swhid.py @@ -1,196 +1,196 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from itertools import islice import os import shutil import tempfile import unittest from swh.graph.swhid import NodeToSwhidMap, SwhidToNodeMap, bytes_to_str, str_to_bytes -from swh.model.identifiers import SWHID_TYPES +from swh.model.swhids import SWHID_TYPES class TestSwhidSerialization(unittest.TestCase): pairs = [ ( "swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2", bytes.fromhex("01" + "00" + "94a9ed024d3859793618152ea559a168bbcbb5e2"), ), ( "swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505", bytes.fromhex("01" + "01" + "d198bc9d7a6bcf6db04f476d29314f157507d505"), ), ( "swh:1:ori:b63a575fe3faab7692c9f38fb09d4bb45651bb0f", bytes.fromhex("01" + "02" + "b63a575fe3faab7692c9f38fb09d4bb45651bb0f"), ), ( "swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f", bytes.fromhex("01" + "03" + "22ece559cc7cc2364edc5e5593d63ae8bd229f9f"), ), ( "swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d", bytes.fromhex("01" + "04" + "309cf2674ee7a0749978cf8265ab91a60aea0f7d"), ), ( "swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453", bytes.fromhex("01" + "05" + "c7c108084bc0bf3d81436bf980b46e98bd338453"), ), ] def test_str_to_bytes(self): for (swhid_str, swhid_bytes) in self.pairs: self.assertEqual(str_to_bytes(swhid_str), swhid_bytes) def test_bytes_to_str(self): for (swhid_str, swhid_bytes) in self.pairs: self.assertEqual(bytes_to_str(swhid_bytes), swhid_str) def test_round_trip(self): for (swhid_str, swhid_bytes) in self.pairs: self.assertEqual(swhid_str, bytes_to_str(str_to_bytes(swhid_str))) self.assertEqual(swhid_bytes, str_to_bytes(bytes_to_str(swhid_bytes))) def gen_records(types=["cnt", "dir", "ori", "rel", "rev", "snp"], length=10000): """generate sequential SWHID/int records, suitable for filling int<->swhid maps for testing swh-graph on-disk binary databases Args: types (list): list of SWHID types to be generated, specified as the corresponding 3-letter component in SWHIDs length (int): number of SWHIDs to generate *per type* Yields: pairs (swhid, int) where swhid is a textual SWHID and int its sequential integer identifier """ pos = 0 for t in sorted(types): for i in range(0, length): seq = format(pos, "x") # current position as hex string swhid = "swh:1:{}:{}{}".format(t, "0" * (40 - len(seq)), seq) yield (swhid, pos) pos += 1 # pairs SWHID/position in the sequence generated by :func:`gen_records` above MAP_PAIRS = [ ("swh:1:cnt:0000000000000000000000000000000000000000", 0), ("swh:1:cnt:000000000000000000000000000000000000002a", 42), ("swh:1:dir:0000000000000000000000000000000000002afc", 11004), ("swh:1:ori:00000000000000000000000000000000000056ce", 22222), ("swh:1:rel:0000000000000000000000000000000000008235", 33333), ("swh:1:rev:000000000000000000000000000000000000ad9c", 44444), ("swh:1:snp:000000000000000000000000000000000000ea5f", 59999), ] class TestSwhidToNodeMap(unittest.TestCase): @classmethod def setUpClass(cls): """create reasonably sized (~2 MB) SWHID->int map to test on-disk DB""" cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.") cls.fname = os.path.join(cls.tmpdir, "swhid2int.bin") with open(cls.fname, "wb") as f: for (swhid, i) in gen_records(length=10000): SwhidToNodeMap.write_record(f, swhid, i) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): self.map = SwhidToNodeMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): for (swhid, pos) in MAP_PAIRS: self.assertEqual(self.map[swhid], pos) def test_missing(self): with self.assertRaises(KeyError): self.map["swh:1:ori:0101010100000000000000000000000000000000"], with self.assertRaises(KeyError): self.map["swh:1:cnt:0101010100000000000000000000000000000000"], def test_type_error(self): with self.assertRaises(TypeError): self.map[42] with self.assertRaises(TypeError): self.map[1.2] def test_update(self): fname2 = self.fname + ".update" shutil.copy(self.fname, fname2) # fresh map copy map2 = SwhidToNodeMap(fname2, mode="rb+") for (swhid, int) in islice(map2, 11): # update the first N items new_int = int + 42 map2[swhid] = new_int self.assertEqual(map2[swhid], new_int) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this def test_iter_type(self): for t in SWHID_TYPES + ["ori"]: first_20 = list(islice(self.map.iter_type(t), 20)) k = first_20[0][1] expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected def test_iter_prefix(self): for t in SWHID_TYPES + ["ori"]: prefix = self.map.iter_prefix("swh:1:{}:00".format(t)) first_20 = list(islice(prefix, 20)) k = first_20[0][1] expected = [("swh:1:{}:{:040x}".format(t, i), i) for i in range(k, k + 20)] assert first_20 == expected class TestNodeToSwhidMap(unittest.TestCase): @classmethod def setUpClass(cls): """create reasonably sized (~1 MB) int->SWHID map to test on-disk DB""" cls.tmpdir = tempfile.mkdtemp(prefix="swh.graph.test.") cls.fname = os.path.join(cls.tmpdir, "int2swhid.bin") with open(cls.fname, "wb") as f: for (swhid, _i) in gen_records(length=10000): NodeToSwhidMap.write_record(f, swhid) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir) def setUp(self): self.map = NodeToSwhidMap(self.fname) def tearDown(self): self.map.close() def test_lookup(self): for (swhid, pos) in MAP_PAIRS: self.assertEqual(self.map[pos], swhid) def test_out_of_bounds(self): with self.assertRaises(IndexError): self.map[1000000] with self.assertRaises(IndexError): self.map[-1000000] def test_update(self): fname2 = self.fname + ".update" shutil.copy(self.fname, fname2) # fresh map copy map2 = NodeToSwhidMap(fname2, mode="rb+") for (int, swhid) in islice(map2, 11): # update the first N items new_swhid = swhid.replace(":0", ":f") # mangle first hex digit map2[int] = new_swhid self.assertEqual(map2[int], new_swhid) # check updated value os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this diff --git a/swh/graph/webgraph.py b/swh/graph/webgraph.py index 87c5341..24bb4b5 100644 --- a/swh/graph/webgraph.py +++ b/swh/graph/webgraph.py @@ -1,229 +1,280 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """WebGraph driver """ from datetime import datetime from enum import Enum import logging import os from pathlib import Path import subprocess from typing import Dict, List, Set from swh.graph.config import check_config_compress class CompressionStep(Enum): MPH = 1 BV = 2 - BV_OBL = 3 - BFS = 4 - PERMUTE = 5 - PERMUTE_OBL = 6 - STATS = 7 - TRANSPOSE = 8 - TRANSPOSE_OBL = 9 - MAPS = 10 - CLEAN_TMP = 11 + BFS = 3 + PERMUTE_BFS = 4 + TRANSPOSE_BFS = 5 + SIMPLIFY = 6 + LLP = 7 + PERMUTE_LLP = 8 + OBL = 9 + COMPOSE_ORDERS = 10 + STATS = 11 + TRANSPOSE = 12 + TRANSPOSE_OBL = 13 + MAPS = 14 + CLEAN_TMP = 15 def __str__(self): return self.name # full compression pipeline COMP_SEQ = list(CompressionStep) # Mapping from compression steps to shell commands implementing them. Commands # will be executed by the shell, so be careful with meta characters. They are # specified here as lists of tokens that will be joined together only for ease # of line splitting. In commands, {tokens} will be interpolated with # configuration values, see :func:`compress`. STEP_ARGV: Dict[CompressionStep, List[str]] = { CompressionStep.MPH: [ "{java}", "it.unimi.dsi.sux4j.mph.GOVMinimalPerfectHashFunction", "--byte-array", "--temp-dir", "{tmp_dir}", "{out_dir}/{graph_name}.mph", "<( zstdcat {in_dir}/{graph_name}.nodes.csv.zst )", ], # use process substitution (and hence FIFO) above as MPH class load the # entire file in memory when reading from stdin CompressionStep.BV: [ "zstdcat", "{in_dir}/{graph_name}.edges.csv.zst", "|", "cut -d' ' -f1,2", "|", "{java}", "it.unimi.dsi.big.webgraph.ScatteredArcsASCIIGraph", "--byte-array", "--temp-dir", "{tmp_dir}", "--function", "{out_dir}/{graph_name}.mph", - "{out_dir}/{graph_name}-bv", - ], - CompressionStep.BV_OBL: [ - "{java}", - "it.unimi.dsi.big.webgraph.BVGraph", - "--list", - "{out_dir}/{graph_name}-bv", + "{out_dir}/{graph_name}-base", ], CompressionStep.BFS: [ "{java}", "it.unimi.dsi.law.big.graph.BFS", - "{out_dir}/{graph_name}-bv", - "{out_dir}/{graph_name}.order", + "{out_dir}/{graph_name}-base", + "{out_dir}/{graph_name}-bfs.order", + ], + CompressionStep.PERMUTE_BFS: [ + "{java}", + "it.unimi.dsi.big.webgraph.Transform", + "mapOffline", + "{out_dir}/{graph_name}-base", + "{out_dir}/{graph_name}-bfs", + "{out_dir}/{graph_name}-bfs.order", + "{batch_size}", + "{tmp_dir}", + ], + CompressionStep.TRANSPOSE_BFS: [ + "{java}", + "it.unimi.dsi.big.webgraph.Transform", + "transposeOffline", + "{out_dir}/{graph_name}-bfs", + "{out_dir}/{graph_name}-bfs-transposed", + "{batch_size}", + "{tmp_dir}", ], - CompressionStep.PERMUTE: [ + CompressionStep.SIMPLIFY: [ + "{java}", + "it.unimi.dsi.big.webgraph.Transform", + "simplify", + "{out_dir}/{graph_name}-bfs", + "{out_dir}/{graph_name}-bfs-transposed", + "{out_dir}/{graph_name}-bfs-simplified", + ], + CompressionStep.LLP: [ + "{java}", + "it.unimi.dsi.law.big.graph.LayeredLabelPropagation", + "-g", + "{llp_gammas}", + "{out_dir}/{graph_name}-bfs-simplified", + "{out_dir}/{graph_name}-llp.order", + ], + CompressionStep.PERMUTE_LLP: [ "{java}", "it.unimi.dsi.big.webgraph.Transform", "mapOffline", - "{out_dir}/{graph_name}-bv", + "{out_dir}/{graph_name}-bfs", "{out_dir}/{graph_name}", - "{out_dir}/{graph_name}.order", + "{out_dir}/{graph_name}-llp.order", "{batch_size}", "{tmp_dir}", ], - CompressionStep.PERMUTE_OBL: [ + CompressionStep.OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}", ], + CompressionStep.COMPOSE_ORDERS: [ + "{java}", + "org.softwareheritage.graph.utils.ComposePermutations", + "{out_dir}/{graph_name}-bfs.order", + "{out_dir}/{graph_name}-llp.order", + "{out_dir}/{graph_name}.order", + ], CompressionStep.STATS: [ "{java}", "it.unimi.dsi.big.webgraph.Stats", "{out_dir}/{graph_name}", ], CompressionStep.TRANSPOSE: [ "{java}", "it.unimi.dsi.big.webgraph.Transform", "transposeOffline", "{out_dir}/{graph_name}", "{out_dir}/{graph_name}-transposed", "{batch_size}", "{tmp_dir}", ], CompressionStep.TRANSPOSE_OBL: [ "{java}", "it.unimi.dsi.big.webgraph.BVGraph", "--list", "{out_dir}/{graph_name}-transposed", ], CompressionStep.MAPS: [ "zstdcat", "{in_dir}/{graph_name}.nodes.csv.zst", "|", "{java}", "org.softwareheritage.graph.maps.NodeMapBuilder", "{out_dir}/{graph_name}", "{tmp_dir}", ], CompressionStep.CLEAN_TMP: [ "rm", "-rf", - "{out_dir}/{graph_name}-bv.graph", - "{out_dir}/{graph_name}-bv.obl", - "{out_dir}/{graph_name}-bv.offsets", + "{out_dir}/{graph_name}-base.graph", + "{out_dir}/{graph_name}-base.offsets", + "{out_dir}/{graph_name}-base.properties", + "{out_dir}/{graph_name}-bfs-simplified.graph", + "{out_dir}/{graph_name}-bfs-simplified.offsets", + "{out_dir}/{graph_name}-bfs-simplified.properties", + "{out_dir}/{graph_name}-bfs-transposed.graph", + "{out_dir}/{graph_name}-bfs-transposed.offsets", + "{out_dir}/{graph_name}-bfs-transposed.properties", + "{out_dir}/{graph_name}-bfs.graph", + "{out_dir}/{graph_name}-bfs.offsets", + "{out_dir}/{graph_name}-bfs.order", + "{out_dir}/{graph_name}-bfs.properties", + "{out_dir}/{graph_name}-llp.order", "{tmp_dir}", ], } def do_step(step, conf): cmd = " ".join(STEP_ARGV[step]).format(**conf) cmd_env = os.environ.copy() cmd_env["JAVA_TOOL_OPTIONS"] = conf["java_tool_options"] cmd_env["CLASSPATH"] = conf["classpath"] logging.info(f"running: {cmd}") process = subprocess.Popen( ["/bin/bash", "-c", cmd], env=cmd_env, encoding="utf8", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) with process.stdout as stdout: for line in stdout: logging.info(line.rstrip()) rc = process.wait() if rc != 0: raise RuntimeError( f"compression step {step} returned non-zero " f"exit code {rc}" ) else: return rc def compress( graph_name: str, in_dir: Path, out_dir: Path, steps: Set[CompressionStep] = set(COMP_SEQ), conf: Dict[str, str] = {}, ): """graph compression pipeline driver from nodes/edges files to compressed on-disk representation Args: graph_name: graph base name, relative to in_dir in_dir: input directory, where the uncompressed graph can be found out_dir: output directory, where the compressed graph will be stored steps: compression steps to run (default: all steps) conf: compression configuration, supporting the following keys (all are optional, so an empty configuration is fine and is the default) - batch_size: batch size for `WebGraph transformations `_; defaults to 1 billion - classpath: java classpath, defaults to swh-graph JAR only - java: command to run java VM, defaults to "java" - java_tool_options: value for JAVA_TOOL_OPTIONS environment variable; defaults to various settings for high memory machines - logback: path to a logback.xml configuration file; if not provided a temporary one will be created and used - max_ram: maximum RAM to use for compression; defaults to available virtual memory - tmp_dir: temporary directory, defaults to the "tmp" subdir of out_dir """ if not steps: steps = set(COMP_SEQ) conf = check_config_compress(conf, graph_name, in_dir, out_dir) compression_start_time = datetime.now() logging.info(f"starting compression at {compression_start_time}") seq_no = 0 for step in COMP_SEQ: if step not in steps: logging.debug(f"skipping compression step {step}") continue seq_no += 1 step_start_time = datetime.now() logging.info( f"starting compression step {step} " f"({seq_no}/{len(steps)}) at {step_start_time}" ) do_step(step, conf) step_end_time = datetime.now() step_duration = step_end_time - step_start_time logging.info( f"completed compression step {step} " f"({seq_no}/{len(steps)}) " f"at {step_end_time} in {step_duration}" ) compression_end_time = datetime.now() compression_duration = compression_end_time - compression_start_time logging.info(f"completed compression in {compression_duration}") diff --git a/tox.ini b/tox.ini index b5f2819..959bbb8 100644 --- a/tox.ini +++ b/tox.ini @@ -1,76 +1,76 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov whitelist_externals = mvn sh commands = sh -c 'if ! [ -d {envdir}/share/swh-graph ]; then mvn -f java/pom.xml compile assembly:single; mkdir {envdir}/share/swh-graph; cp java/target/*.jar {envdir}/share/swh-graph; fi' pytest --cov={envsitepackagesdir}/swh/graph \ {envsitepackagesdir}/swh/graph \ --cov-branch {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = - mypy + mypy==0.920 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs