`_.
diff --git a/swh.graph.egg-info/SOURCES.txt b/swh.graph.egg-info/SOURCES.txt
index 5a4af14..9511cb7 100644
--- a/swh.graph.egg-info/SOURCES.txt
+++ b/swh.graph.egg-info/SOURCES.txt
@@ -1,256 +1,256 @@
.git-blame-ignore-revs
.gitignore
.pre-commit-config.yaml
AUTHORS
CODE_OF_CONDUCT.md
CONTRIBUTORS
LICENSE
MANIFEST.in
Makefile
Makefile.local
README.rst
mypy.ini
pyproject.toml
pytest.ini
requirements-swh.txt
requirements-test.txt
requirements.txt
setup.cfg
setup.py
tox.ini
docker/Dockerfile
docker/build.sh
docker/run.sh
docs/.gitignore
docs/Makefile
docs/Makefile.local
docs/README.rst
docs/api.rst
docs/cli.rst
docs/compression.rst
docs/conf.py
docs/docker.rst
docs/git2graph.md
docs/grpc-api.rst
docs/index.rst
docs/java-api.rst
docs/memory.rst
docs/quickstart.rst
docs/_static/.placeholder
docs/_templates/.placeholder
docs/images/.gitignore
docs/images/Makefile
docs/images/compression_steps.dot
java/.coding-style.xml
java/.gitignore
java/AUTHORS
java/LICENSE
java/README.md
java/pom.xml
java/.mvn/jvm.config
java/src/main/proto
java/src/main/java/org/softwareheritage/graph/AllowedEdges.java
java/src/main/java/org/softwareheritage/graph/AllowedNodes.java
java/src/main/java/org/softwareheritage/graph/SWHID.java
java/src/main/java/org/softwareheritage/graph/Subgraph.java
java/src/main/java/org/softwareheritage/graph/SwhBidirectionalGraph.java
java/src/main/java/org/softwareheritage/graph/SwhGraph.java
java/src/main/java/org/softwareheritage/graph/SwhGraphProperties.java
java/src/main/java/org/softwareheritage/graph/SwhType.java
java/src/main/java/org/softwareheritage/graph/SwhUnidirectionalGraph.java
java/src/main/java/org/softwareheritage/graph/compress/CSVEdgeDataset.java
java/src/main/java/org/softwareheritage/graph/compress/ComposePermutations.java
java/src/main/java/org/softwareheritage/graph/compress/ExtractNodes.java
java/src/main/java/org/softwareheritage/graph/compress/ExtractPersons.java
java/src/main/java/org/softwareheritage/graph/compress/GraphDataset.java
java/src/main/java/org/softwareheritage/graph/compress/LabelMapBuilder.java
java/src/main/java/org/softwareheritage/graph/compress/NodeMapBuilder.java
java/src/main/java/org/softwareheritage/graph/compress/ORCGraphDataset.java
java/src/main/java/org/softwareheritage/graph/compress/ScatteredArcsORCGraph.java
java/src/main/java/org/softwareheritage/graph/compress/WriteNodeProperties.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/ForkCC.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/ForkCliques.java
java/src/main/java/org/softwareheritage/graph/experiments/forks/ListEmptyOrigins.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/AveragePaths.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/ClusteringCoefficient.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/ConnectedComponents.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/InOutDegree.java
java/src/main/java/org/softwareheritage/graph/experiments/topology/SubdatasetSizeFunction.java
java/src/main/java/org/softwareheritage/graph/labels/DirEntry.java
java/src/main/java/org/softwareheritage/graph/labels/SwhLabel.java
java/src/main/java/org/softwareheritage/graph/maps/NodeIdMap.java
java/src/main/java/org/softwareheritage/graph/maps/NodeTypesMap.java
java/src/main/java/org/softwareheritage/graph/rpc/GraphServer.java
java/src/main/java/org/softwareheritage/graph/rpc/NodePropertyBuilder.java
java/src/main/java/org/softwareheritage/graph/rpc/Traversal.java
java/src/main/java/org/softwareheritage/graph/utils/DumpProperties.java
java/src/main/java/org/softwareheritage/graph/utils/ExportSubdataset.java
java/src/main/java/org/softwareheritage/graph/utils/FindEarliestRevision.java
java/src/main/java/org/softwareheritage/graph/utils/ForkJoinBigQuickSort2.java
java/src/main/java/org/softwareheritage/graph/utils/ForkJoinQuickSort3.java
java/src/main/java/org/softwareheritage/graph/utils/MPHTranslate.java
java/src/main/java/org/softwareheritage/graph/utils/ReadGraph.java
java/src/main/java/org/softwareheritage/graph/utils/ReadLabelledGraph.java
java/src/main/java/org/softwareheritage/graph/utils/Sort.java
java/src/test/java/org/softwareheritage/graph/AllowedEdgesTest.java
java/src/test/java/org/softwareheritage/graph/AllowedNodesTest.java
java/src/test/java/org/softwareheritage/graph/GraphTest.java
java/src/test/java/org/softwareheritage/graph/SubgraphTest.java
java/src/test/java/org/softwareheritage/graph/compress/ExtractNodesTest.java
java/src/test/java/org/softwareheritage/graph/compress/ExtractPersonsTest.java
java/src/test/java/org/softwareheritage/graph/rpc/CountEdgesTest.java
java/src/test/java/org/softwareheritage/graph/rpc/CountNodesTest.java
java/src/test/java/org/softwareheritage/graph/rpc/FindPathBetweenTest.java
java/src/test/java/org/softwareheritage/graph/rpc/FindPathToTest.java
java/src/test/java/org/softwareheritage/graph/rpc/GetNodeTest.java
java/src/test/java/org/softwareheritage/graph/rpc/StatsTest.java
java/src/test/java/org/softwareheritage/graph/rpc/TraversalServiceTest.java
java/src/test/java/org/softwareheritage/graph/rpc/TraverseLeavesTest.java
java/src/test/java/org/softwareheritage/graph/rpc/TraverseNeighborsTest.java
java/src/test/java/org/softwareheritage/graph/rpc/TraverseNodesPropertiesTest.java
java/src/test/java/org/softwareheritage/graph/rpc/TraverseNodesTest.java
java/src/test/java/org/softwareheritage/graph/utils/ForkJoinBigQuickSort2Test.java
java/src/test/java/org/softwareheritage/graph/utils/ForkJoinQuickSort3Test.java
-java/target/swh-graph-1.0.0.jar
+java/target/swh-graph-1.0.1.jar
proto/swhgraph.proto
reports/.gitignore
reports/benchmarks/Makefile
reports/benchmarks/benchmarks.tex
reports/experiments/Makefile
reports/experiments/experiments.tex
reports/linux_log/LinuxLog.java
reports/linux_log/Makefile
reports/linux_log/linux_log.tex
reports/node_mapping/Makefile
reports/node_mapping/NodeIdMapHaloDB.java
reports/node_mapping/NodeIdMapRocksDB.java
reports/node_mapping/node_mapping.tex
swh/__init__.py
swh.graph.egg-info/PKG-INFO
swh.graph.egg-info/SOURCES.txt
swh.graph.egg-info/dependency_links.txt
swh.graph.egg-info/entry_points.txt
swh.graph.egg-info/requires.txt
swh.graph.egg-info/top_level.txt
swh/graph/__init__.py
swh/graph/cli.py
swh/graph/client.py
swh/graph/config.py
swh/graph/http_client.py
swh/graph/http_naive_client.py
swh/graph/http_server.py
swh/graph/naive_client.py
swh/graph/py.typed
swh/graph/rpc_server.py
swh/graph/webgraph.py
swh/graph/rpc/swhgraph.proto
swh/graph/rpc/swhgraph_pb2.py
swh/graph/rpc/swhgraph_pb2.pyi
swh/graph/rpc/swhgraph_pb2_grpc.py
swh/graph/tests/__init__.py
swh/graph/tests/conftest.py
swh/graph/tests/test_cli.py
swh/graph/tests/test_http_client.py
swh/graph/tests/dataset/generate_dataset.py
swh/graph/tests/dataset/compressed/example-labelled.labeloffsets
swh/graph/tests/dataset/compressed/example-labelled.labels
swh/graph/tests/dataset/compressed/example-labelled.properties
swh/graph/tests/dataset/compressed/example-transposed-labelled.labeloffsets
swh/graph/tests/dataset/compressed/example-transposed-labelled.labels
swh/graph/tests/dataset/compressed/example-transposed-labelled.properties
swh/graph/tests/dataset/compressed/example-transposed.graph
swh/graph/tests/dataset/compressed/example-transposed.obl
swh/graph/tests/dataset/compressed/example-transposed.offsets
swh/graph/tests/dataset/compressed/example-transposed.properties
swh/graph/tests/dataset/compressed/example.edges.count.txt
swh/graph/tests/dataset/compressed/example.edges.stats.txt
swh/graph/tests/dataset/compressed/example.graph
swh/graph/tests/dataset/compressed/example.indegree
swh/graph/tests/dataset/compressed/example.labels.count.txt
swh/graph/tests/dataset/compressed/example.labels.csv.zst
swh/graph/tests/dataset/compressed/example.labels.fcl.bytearray
swh/graph/tests/dataset/compressed/example.labels.fcl.pointers
swh/graph/tests/dataset/compressed/example.labels.fcl.properties
swh/graph/tests/dataset/compressed/example.labels.mph
swh/graph/tests/dataset/compressed/example.mph
swh/graph/tests/dataset/compressed/example.node2swhid.bin
swh/graph/tests/dataset/compressed/example.node2type.map
swh/graph/tests/dataset/compressed/example.nodes.count.txt
swh/graph/tests/dataset/compressed/example.nodes.csv.zst
swh/graph/tests/dataset/compressed/example.nodes.stats.txt
swh/graph/tests/dataset/compressed/example.obl
swh/graph/tests/dataset/compressed/example.offsets
swh/graph/tests/dataset/compressed/example.order
swh/graph/tests/dataset/compressed/example.outdegree
swh/graph/tests/dataset/compressed/example.persons.count.txt
swh/graph/tests/dataset/compressed/example.persons.csv.zst
swh/graph/tests/dataset/compressed/example.persons.mph
swh/graph/tests/dataset/compressed/example.properties
swh/graph/tests/dataset/compressed/example.property.author_id.bin
swh/graph/tests/dataset/compressed/example.property.author_timestamp.bin
swh/graph/tests/dataset/compressed/example.property.author_timestamp_offset.bin
swh/graph/tests/dataset/compressed/example.property.committer_id.bin
swh/graph/tests/dataset/compressed/example.property.committer_timestamp.bin
swh/graph/tests/dataset/compressed/example.property.committer_timestamp_offset.bin
swh/graph/tests/dataset/compressed/example.property.content.is_skipped.bin
swh/graph/tests/dataset/compressed/example.property.content.length.bin
swh/graph/tests/dataset/compressed/example.property.message.bin
swh/graph/tests/dataset/compressed/example.property.message.offset.bin
swh/graph/tests/dataset/compressed/example.property.tag_name.bin
swh/graph/tests/dataset/compressed/example.property.tag_name.offset.bin
swh/graph/tests/dataset/compressed/example.stats
swh/graph/tests/dataset/edges/content/graph-all.edges.csv.zst
swh/graph/tests/dataset/edges/content/graph-all.nodes.csv.zst
swh/graph/tests/dataset/edges/directory/graph-all.edges.csv.zst
swh/graph/tests/dataset/edges/directory/graph-all.nodes.csv.zst
swh/graph/tests/dataset/edges/origin/graph-all.edges.csv.zst
swh/graph/tests/dataset/edges/origin/graph-all.nodes.csv.zst
swh/graph/tests/dataset/edges/release/graph-all.edges.csv.zst
swh/graph/tests/dataset/edges/release/graph-all.nodes.csv.zst
swh/graph/tests/dataset/edges/revision/graph-all.edges.csv.zst
swh/graph/tests/dataset/edges/revision/graph-all.nodes.csv.zst
swh/graph/tests/dataset/edges/snapshot/graph-all.edges.csv.zst
swh/graph/tests/dataset/edges/snapshot/graph-all.nodes.csv.zst
swh/graph/tests/dataset/img/.gitignore
swh/graph/tests/dataset/img/Makefile
swh/graph/tests/dataset/img/example.dot
swh/graph/tests/dataset/orc/content/content-all.orc
swh/graph/tests/dataset/orc/directory/directory-all.orc
swh/graph/tests/dataset/orc/directory_entry/directory_entry-all.orc
swh/graph/tests/dataset/orc/origin/origin-all.orc
swh/graph/tests/dataset/orc/origin_visit/origin_visit-all.orc
swh/graph/tests/dataset/orc/origin_visit_status/origin_visit_status-all.orc
swh/graph/tests/dataset/orc/release/release-all.orc
swh/graph/tests/dataset/orc/revision/revision-all.orc
swh/graph/tests/dataset/orc/revision_extra_headers/revision_extra_headers-all.orc
swh/graph/tests/dataset/orc/revision_history/revision_history-all.orc
swh/graph/tests/dataset/orc/skipped_content/skipped_content-all.orc
swh/graph/tests/dataset/orc/snapshot/snapshot-all.orc
swh/graph/tests/dataset/orc/snapshot_branch/snapshot_branch-all.orc
tools/dir2graph
tools/swhid2int2int2swhid.sh
tools/git2graph/.gitignore
tools/git2graph/Makefile
tools/git2graph/README.md
tools/git2graph/git2graph.c
tools/git2graph/tests/edge-filters.bats
tools/git2graph/tests/full-graph.bats
tools/git2graph/tests/node-filters.bats
tools/git2graph/tests/repo_helper.bash
tools/git2graph/tests/data/sample-repo.tgz
tools/git2graph/tests/data/graphs/dir-nodes/edges.csv
tools/git2graph/tests/data/graphs/dir-nodes/nodes.csv
tools/git2graph/tests/data/graphs/from-dir-edges/edges.csv
tools/git2graph/tests/data/graphs/from-dir-edges/nodes.csv
tools/git2graph/tests/data/graphs/from-rel-edges/edges.csv
tools/git2graph/tests/data/graphs/from-rel-edges/nodes.csv
tools/git2graph/tests/data/graphs/fs-nodes/edges.csv
tools/git2graph/tests/data/graphs/fs-nodes/nodes.csv
tools/git2graph/tests/data/graphs/full/edges.csv
tools/git2graph/tests/data/graphs/full/nodes.csv
tools/git2graph/tests/data/graphs/rev-edges/edges.csv
tools/git2graph/tests/data/graphs/rev-edges/nodes.csv
tools/git2graph/tests/data/graphs/rev-nodes/edges.csv
tools/git2graph/tests/data/graphs/rev-nodes/nodes.csv
tools/git2graph/tests/data/graphs/to-rev-edges/edges.csv
tools/git2graph/tests/data/graphs/to-rev-edges/nodes.csv
\ No newline at end of file
diff --git a/swh/graph/http_server.py b/swh/graph/http_server.py
index d06293b..84192ac 100644
--- a/swh/graph/http_server.py
+++ b/swh/graph/http_server.py
@@ -1,349 +1,349 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
A proxy HTTP server for swh-graph, talking to the Java code via py4j, and using
FIFO as a transport to stream integers between the two languages.
"""
import json
import os
from typing import Optional
import aiohttp.test_utils
import aiohttp.web
from google.protobuf import json_format
from google.protobuf.field_mask_pb2 import FieldMask
import grpc
from swh.core.api.asynchronous import RPCServerApp
from swh.core.config import read as config_read
from swh.graph.rpc.swhgraph_pb2 import (
GetNodeRequest,
NodeFilter,
StatsRequest,
TraversalRequest,
)
from swh.graph.rpc.swhgraph_pb2_grpc import TraversalServiceStub
-from swh.graph.rpc_server import spawn_java_rpc_server
+from swh.graph.rpc_server import spawn_java_rpc_server, stop_java_rpc_server
from swh.model.swhids import EXTENDED_SWHID_TYPES
try:
from contextlib import asynccontextmanager
except ImportError:
# Compatibility with 3.6 backport
from async_generator import asynccontextmanager # type: ignore
# maximum number of retries for random walks
RANDOM_RETRIES = 10 # TODO make this configurable via rpc-serve configuration
class GraphServerApp(RPCServerApp):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.on_startup.append(self._start)
self.on_shutdown.append(self._stop)
@staticmethod
async def _start(app):
app["channel"] = grpc.aio.insecure_channel(app["rpc_url"])
await app["channel"].__aenter__()
app["rpc_client"] = TraversalServiceStub(app["channel"])
await app["rpc_client"].Stats(StatsRequest(), wait_for_ready=True)
@staticmethod
async def _stop(app):
await app["channel"].__aexit__(None, None, None)
if app.get("local_server"):
- app["local_server"].terminate()
+ stop_java_rpc_server(app["local_server"])
async def index(request):
return aiohttp.web.Response(
content_type="text/html",
body="""
Software Heritage graph server
You have reached the
Software Heritage graph API server.
See its
API
documentation for more information.
""",
)
class GraphView(aiohttp.web.View):
"""Base class for views working on the graph, with utility functions"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rpc_client: TraversalServiceStub = self.request.app["rpc_client"]
def get_direction(self):
"""Validate HTTP query parameter `direction`"""
s = self.request.query.get("direction", "forward")
if s not in ("forward", "backward"):
raise aiohttp.web.HTTPBadRequest(text=f"invalid direction: {s}")
return s.upper()
def get_edges(self):
"""Validate HTTP query parameter `edges`, i.e., edge restrictions"""
s = self.request.query.get("edges", "*")
if any(
[
node_type != "*" and node_type not in EXTENDED_SWHID_TYPES
for edge in s.split(":")
for node_type in edge.split(",", maxsplit=1)
]
):
raise aiohttp.web.HTTPBadRequest(text=f"invalid edge restriction: {s}")
return s
def get_return_types(self):
"""Validate HTTP query parameter 'return types', i.e,
a set of types which we will filter the query results with"""
s = self.request.query.get("return_types", "*")
if any(
node_type != "*" and node_type not in EXTENDED_SWHID_TYPES
for node_type in s.split(",")
):
raise aiohttp.web.HTTPBadRequest(
text=f"invalid type for filtering res: {s}"
)
# if the user puts a star,
# then we filter nothing, we don't need the other information
if "*" in s:
return "*"
else:
return s
def get_traversal(self):
"""Validate HTTP query parameter `traversal`, i.e., visit order"""
s = self.request.query.get("traversal", "dfs")
if s not in ("bfs", "dfs"):
raise aiohttp.web.HTTPBadRequest(text=f"invalid traversal order: {s}")
return s
def get_limit(self):
"""Validate HTTP query parameter `limit`, i.e., number of results"""
s = self.request.query.get("limit", "0")
try:
return int(s)
except ValueError:
raise aiohttp.web.HTTPBadRequest(text=f"invalid limit value: {s}")
def get_max_edges(self):
"""Validate HTTP query parameter 'max_edges', i.e.,
the limit of the number of edges that can be visited"""
s = self.request.query.get("max_edges", "0")
try:
return int(s)
except ValueError:
raise aiohttp.web.HTTPBadRequest(text=f"invalid max_edges value: {s}")
async def check_swhid(self, swhid):
"""Validate that the given SWHID exists in the graph"""
try:
await self.rpc_client.GetNode(
GetNodeRequest(swhid=swhid, mask=FieldMask(paths=["swhid"]))
)
except grpc.aio.AioRpcError as e:
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
raise aiohttp.web.HTTPBadRequest(text=str(e.details()))
class StreamingGraphView(GraphView):
"""Base class for views streaming their response line by line."""
content_type = "text/plain"
@asynccontextmanager
async def response_streamer(self, *args, **kwargs):
"""Context manager to prepare then close a StreamResponse"""
response = aiohttp.web.StreamResponse(*args, **kwargs)
response.content_type = self.content_type
await response.prepare(self.request)
yield response
await response.write_eof()
async def get(self):
await self.prepare_response()
async with self.response_streamer() as self.response_stream:
self._buf = []
try:
await self.stream_response()
finally:
await self._flush_buffer()
return self.response_stream
async def prepare_response(self):
"""This can be overridden with some setup to be run before the response
actually starts streaming.
"""
pass
async def stream_response(self):
"""Override this to perform the response streaming. Implementations of
this should await self.stream_line(line) to write each line.
"""
raise NotImplementedError
async def stream_line(self, line):
"""Write a line in the response stream."""
self._buf.append(line)
if len(self._buf) > 100:
await self._flush_buffer()
async def _flush_buffer(self):
await self.response_stream.write("\n".join(self._buf).encode() + b"\n")
self._buf = []
class StatsView(GraphView):
"""View showing some statistics on the graph"""
async def get(self):
res = await self.rpc_client.Stats(StatsRequest())
stats = json_format.MessageToDict(
res, including_default_value_fields=True, preserving_proto_field_name=True
)
# Int64 fields are serialized as strings by default.
for descriptor in res.DESCRIPTOR.fields:
if descriptor.type == descriptor.TYPE_INT64:
try:
stats[descriptor.name] = int(stats[descriptor.name])
except KeyError:
pass
json_body = json.dumps(stats, indent=4, sort_keys=True)
return aiohttp.web.Response(body=json_body, content_type="application/json")
class SimpleTraversalView(StreamingGraphView):
"""Base class for views of simple traversals"""
async def prepare_response(self):
src = self.request.match_info["src"]
self.traversal_request = TraversalRequest(
src=[src],
edges=self.get_edges(),
direction=self.get_direction(),
return_nodes=NodeFilter(types=self.get_return_types()),
mask=FieldMask(paths=["swhid"]),
)
if self.get_max_edges():
self.traversal_request.max_edges = self.get_max_edges()
await self.check_swhid(src)
self.configure_request()
def configure_request(self):
pass
async def stream_response(self):
async for node in self.rpc_client.Traverse(self.traversal_request):
await self.stream_line(node.swhid)
class LeavesView(SimpleTraversalView):
def configure_request(self):
self.traversal_request.return_nodes.max_traversal_successors = 0
class NeighborsView(SimpleTraversalView):
def configure_request(self):
self.traversal_request.min_depth = 1
self.traversal_request.max_depth = 1
class VisitNodesView(SimpleTraversalView):
pass
class VisitEdgesView(SimpleTraversalView):
def configure_request(self):
self.traversal_request.mask.paths.extend(["successor", "successor.swhid"])
# self.traversal_request.return_fields.successor = True
async def stream_response(self):
async for node in self.rpc_client.Traverse(self.traversal_request):
for succ in node.successor:
await self.stream_line(node.swhid + " " + succ.swhid)
class CountView(GraphView):
"""Base class for counting views."""
count_type: Optional[str] = None
async def get(self):
src = self.request.match_info["src"]
self.traversal_request = TraversalRequest(
src=[src],
edges=self.get_edges(),
direction=self.get_direction(),
return_nodes=NodeFilter(types=self.get_return_types()),
mask=FieldMask(paths=["swhid"]),
)
if self.get_max_edges():
self.traversal_request.max_edges = self.get_max_edges()
self.configure_request()
res = await self.rpc_client.CountNodes(self.traversal_request)
return aiohttp.web.Response(
body=str(res.count), content_type="application/json"
)
def configure_request(self):
pass
class CountNeighborsView(CountView):
def configure_request(self):
self.traversal_request.min_depth = 1
self.traversal_request.max_depth = 1
class CountLeavesView(CountView):
def configure_request(self):
self.traversal_request.return_nodes.max_traversal_successors = 0
class CountVisitNodesView(CountView):
pass
-def make_app(config=None, rpc_url=None, **kwargs):
+def make_app(config=None, rpc_url=None, spawn_rpc_port=50091, **kwargs):
app = GraphServerApp(**kwargs)
if rpc_url is None:
- app["local_server"], port = spawn_java_rpc_server(config)
+ app["local_server"], port = spawn_java_rpc_server(config, port=spawn_rpc_port)
rpc_url = f"localhost:{port}"
app.add_routes(
[
aiohttp.web.get("/", index),
aiohttp.web.get("/graph", index),
aiohttp.web.view("/graph/stats", StatsView),
aiohttp.web.view("/graph/leaves/{src}", LeavesView),
aiohttp.web.view("/graph/neighbors/{src}", NeighborsView),
aiohttp.web.view("/graph/visit/nodes/{src}", VisitNodesView),
aiohttp.web.view("/graph/visit/edges/{src}", VisitEdgesView),
aiohttp.web.view("/graph/neighbors/count/{src}", CountNeighborsView),
aiohttp.web.view("/graph/leaves/count/{src}", CountLeavesView),
aiohttp.web.view("/graph/visit/nodes/count/{src}", CountVisitNodesView),
]
)
app["rpc_url"] = rpc_url
return app
def make_app_from_configfile():
"""Load configuration and then build application to run"""
config_file = os.environ.get("SWH_CONFIG_FILENAME")
config = config_read(config_file)
return make_app(config=config)
diff --git a/swh/graph/rpc_server.py b/swh/graph/rpc_server.py
index e4b4f1e..540fc5d 100644
--- a/swh/graph/rpc_server.py
+++ b/swh/graph/rpc_server.py
@@ -1,33 +1,47 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
A simple tool to start the swh-graph GRPC server in Java.
"""
+import logging
import subprocess
import aiohttp.test_utils
import aiohttp.web
from swh.graph.config import check_config
def spawn_java_rpc_server(config, port=None):
if port is None:
port = aiohttp.test_utils.unused_port()
config = check_config(config or {})
cmd = [
"java",
"-cp",
config["classpath"],
*config["java_tool_options"].split(),
"org.softwareheritage.graph.rpc.GraphServer",
"--port",
str(port),
- config["graph"]["path"],
+ str(config["graph"]["path"]),
]
+ print(cmd)
+ # XXX: shlex.join() is in 3.8
+ # logging.info("Starting RPC server: %s", shlex.join(cmd))
+ logging.info("Starting RPC server: %s", str(cmd))
server = subprocess.Popen(cmd)
return server, port
+
+
+def stop_java_rpc_server(server: subprocess.Popen, timeout: int = 15):
+ server.terminate()
+ try:
+ server.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ logging.warning("Server did not terminate, sending kill signal...")
+ server.kill()
diff --git a/swh/graph/tests/conftest.py b/swh/graph/tests/conftest.py
index f66d2b3..3d86602 100644
--- a/swh/graph/tests/conftest.py
+++ b/swh/graph/tests/conftest.py
@@ -1,70 +1,70 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import multiprocessing
from pathlib import Path
import subprocess
from aiohttp.test_utils import TestClient, TestServer, loop_context
import pytest
from swh.graph.http_client import RemoteGraphClient
from swh.graph.http_naive_client import NaiveClient
SWH_GRAPH_TESTS_ROOT = Path(__file__).parents[0]
TEST_GRAPH_PATH = SWH_GRAPH_TESTS_ROOT / "dataset/compressed/example"
class GraphServerProcess(multiprocessing.Process):
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
# Lazy import to allow debian packaging
from swh.graph.http_server import make_app
try:
config = {"graph": {"path": TEST_GRAPH_PATH}}
with loop_context() as loop:
- app = make_app(config=config, debug=True)
+ app = make_app(config=config, debug=True, spawn_rpc_port=None)
client = TestClient(TestServer(app), loop=loop)
loop.run_until_complete(client.start_server())
url = client.make_url("/graph/")
self.q.put(url)
loop.run_forever()
except Exception as e:
self.q.put(e)
@pytest.fixture(scope="module", params=["remote", "naive"])
def graph_client(request):
if request.param == "remote":
queue = multiprocessing.Queue()
server = GraphServerProcess(queue)
server.start()
res = queue.get()
if isinstance(res, Exception):
raise res
yield RemoteGraphClient(str(res))
server.terminate()
else:
def zstdcat(*files):
p = subprocess.run(["zstdcat", *files], stdout=subprocess.PIPE)
return p.stdout.decode()
edges_dataset = SWH_GRAPH_TESTS_ROOT / "dataset/edges"
edge_files = edges_dataset.glob("*/*.edges.csv.zst")
node_files = edges_dataset.glob("*/*.nodes.csv.zst")
nodes = set(zstdcat(*node_files).strip().split("\n"))
edge_lines = [line.split() for line in zstdcat(*edge_files).strip().split("\n")]
edges = [(src, dst) for src, dst, *_ in edge_lines]
for src, dst in edges:
nodes.add(src)
nodes.add(dst)
yield NaiveClient(nodes=list(nodes), edges=edges)