diff --git a/java/src/main/java/org/softwareheritage/graph/experiments/topology/AveragePaths.java b/java/src/main/java/org/softwareheritage/graph/experiments/topology/AveragePaths.java new file mode 100644 index 0000000..988cb35 --- /dev/null +++ b/java/src/main/java/org/softwareheritage/graph/experiments/topology/AveragePaths.java @@ -0,0 +1,189 @@ +package org.softwareheritage.graph.experiments.topology; + +import com.martiansoftware.jsap.*; +import it.unimi.dsi.Util; +import it.unimi.dsi.big.webgraph.LazyLongIterator; +import it.unimi.dsi.fastutil.BigArrays; +import it.unimi.dsi.fastutil.longs.LongBigArrays; +import it.unimi.dsi.logging.ProgressLogger; +import it.unimi.dsi.util.XoRoShiRo128PlusRandom; +import org.softwareheritage.graph.*; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.*; +import java.util.concurrent.*; + +public class AveragePaths { + private final Graph graph; + private final Subgraph subgraph; + private final ConcurrentHashMap result; + + public AveragePaths(String graphBasename, String allowedNodes) throws IOException { + System.err.println("Loading graph " + graphBasename + " ..."); + this.graph = new Graph(graphBasename); + this.subgraph = new Subgraph(this.graph, new AllowedNodes(allowedNodes)); + System.err.println("Graph loaded."); + + result = new ConcurrentHashMap<>(); + } + + private static JSAPResult parse_args(String[] args) { + JSAPResult config = null; + try { + SimpleJSAP jsap = new SimpleJSAP( + AveragePaths.class.getName(), + "", + new Parameter[]{ + new FlaggedOption("graphPath", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, + 'g', "graph", "Basename of the compressed graph"), + new FlaggedOption("nodeTypes", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, + 's', "nodetypes", "Node type constraints"), + new FlaggedOption("outdir", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, + 'o', "outdir", "Directory where to put the results"), + new FlaggedOption("numThreads", JSAP.INTEGER_PARSER, "32", JSAP.NOT_REQUIRED, + 't', "numthreads", "Number of threads"), + } + ); + + config = jsap.parse(args); + if (jsap.messagePrinted()) { + System.exit(1); + } + } catch (JSAPException e) { + e.printStackTrace(); + } + return config; + } + + + private void run(int numThreads) throws InterruptedException { + final long END_OF_QUEUE = -1L; + + ArrayBlockingQueue queue = new ArrayBlockingQueue<>(numThreads); + ExecutorService service = Executors.newFixedThreadPool(numThreads + 1); + + service.submit(() -> { + try { + Graph thread_graph = graph.copy(); + Subgraph thread_subgraph = subgraph.copy(); + + long[][] randomPerm = Util.identity(thread_graph.numNodes()); + LongBigArrays.shuffle(randomPerm, new XoRoShiRo128PlusRandom()); + long n = thread_graph.numNodes(); + + ProgressLogger pl = new ProgressLogger(); + pl.expectedUpdates = n; + pl.itemsName = "nodes"; + pl.start("Filling processor queue..."); + + for (long j = 0; j < n; ++j) { + long node = BigArrays.get(randomPerm, j); + if (thread_subgraph.nodeExists(node) && thread_subgraph.outdegree(node) == 0) { + queue.put(node); + } + pl.update(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + for (int i = 0; i < numThreads; ++i) { + try { + queue.put(END_OF_QUEUE); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }); + + for (int i = 0; i < numThreads; ++i) { + service.submit(() -> { + try { + Subgraph thread_subgraph = subgraph.copy(); + while (true) { + Long node = null; + try { + node = queue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (node == null || node == END_OF_QUEUE) { + return; + } + + bfsAt(thread_subgraph, node); + } + } catch (Exception e){ + e.printStackTrace(); + } + }); + } + + service.shutdown(); + service.awaitTermination(365, TimeUnit.DAYS); + } + + private void bfsAt(Subgraph graph, long srcNodeId) + { + ArrayDeque queue = new ArrayDeque<>(); + HashSet visited = new HashSet<>(); + + long FRONTIER_MARKER = -1; + + queue.addLast(srcNodeId); + visited.add(srcNodeId); + + long distance = 0; + queue.addLast(FRONTIER_MARKER); + + while (!queue.isEmpty()) { + long currentNodeId = queue.removeFirst(); + // System.err.println("curr: " + currentNodeId); + if (currentNodeId == FRONTIER_MARKER) { + if (queue.isEmpty()) // avoid infinite loops + break; + ++distance; + queue.addLast(FRONTIER_MARKER); + continue; + } + if (graph.indegree(currentNodeId) == 0) { + result.merge(distance, 1L, Long::sum); + } + + LazyLongIterator it = graph.predecessors(currentNodeId); + for (long neighborNodeId; (neighborNodeId = it.nextLong()) != -1; ) { + if (!visited.contains(neighborNodeId)) { + queue.addLast(neighborNodeId); + visited.add(neighborNodeId); + } + } + } + } + + public void printResult(String outdirPath) throws IOException { + new File(outdirPath).mkdirs(); + + PrintWriter f = new PrintWriter(new FileWriter(outdirPath + "/distribution.txt")); + TreeMap sortedDistribution = new TreeMap<>(result); + for (Map.Entry entry : sortedDistribution.entrySet()) { + f.println(entry.getKey() + " " + entry.getValue()); + } + f.close(); + } + + public static void main(String[] args) throws IOException, InterruptedException { + JSAPResult config = parse_args(args); + + String graphPath = config.getString("graphPath"); + String outdir = config.getString("outdir"); + String allowedNodes = config.getString("nodeTypes"); + int numThreads = config.getInt("numThreads"); + + AveragePaths tp = new AveragePaths(graphPath, allowedNodes); + tp.run(numThreads); + tp.printResult(outdir); + } +}