Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/standalone_client.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import collections | import functools | ||||
import inspect | |||||
import re | |||||
import statistics | import statistics | ||||
from typing import AsyncIterator, Dict, Iterable, Iterator, List, Set, Tuple | from typing import Callable, Dict, Iterator, List, Optional, Set, Tuple, TypeVar | ||||
from swh.model.identifiers import ExtendedSWHID | from swh.model.identifiers import ExtendedSWHID, ValidationError | ||||
from .client import GraphArgumentException | |||||
class Swhid2NodeDict(collections.UserDict): | _NODE_TYPES = "ori|snp|rel|rev|dir|cnt" | ||||
def iter_type(self, swhid_type: str) -> Iterator[Tuple[str, str]]: | EDGES_RE = re.compile(fr"(\*|{_NODE_TYPES}):(\*|{_NODE_TYPES})") | ||||
prefix = "swh:1:{}:".format(swhid_type) | |||||
for (swhid, node) in self.items(): | |||||
if swhid.startswith(prefix): | |||||
yield (swhid, node) | |||||
def __getitem__(self, swhid): | |||||
ExtendedSWHID.from_string(swhid) # Raises ValidationError, caught by server | |||||
return self.data[swhid] | |||||
T = TypeVar("T", bound=Callable) | |||||
class Node2SwhidDict(collections.UserDict): | |||||
def __getitem__(self, key): | |||||
try: | |||||
return self.data[key] | |||||
except KeyError: | |||||
# Pretend to be a list | |||||
raise IndexError(key) from None | |||||
def check_arguments(f: T) -> T: | |||||
"""Decorator for generic argument checking for methods of StandaloneClient. | |||||
Checks ``src`` is a valid and known SWHID, and ``edges`` has the right format.""" | |||||
signature = inspect.signature(f) | |||||
@functools.wraps(f) | |||||
def newf(*args, **kwargs): | |||||
bound_args = signature.bind(*args, **kwargs) | |||||
self = bound_args.arguments["self"] | |||||
class JavaIterator: | src = bound_args.arguments.get("src") | ||||
def __init__(self, iterator: Iterable): | if src: | ||||
self.iterator = iter(iterator) | self._check_swhid(src) | ||||
def nextLong(self): | edges = bound_args.arguments.get("edges") | ||||
return next(self.iterator) | if edges: | ||||
if edges != "*" and not EDGES_RE.match(edges): | |||||
raise GraphArgumentException(f"invalid edge restriction: {edges}") | |||||
def __getattr__(self, name): | return f(*args, **kwargs) | ||||
return getattr(self.iterator, name) | |||||
return newf # type: ignore | |||||
class StandaloneClient: | class StandaloneClient: | ||||
"""An alternative implementation of :class:`swh.graph.backend.Backend`, | """An alternative implementation of :class:`swh.graph.backend.Backend`, | ||||
written in pure-python and meant for simulating it in other components' test | written in pure-python and meant for simulating it in other components' test | ||||
cases. | cases. | ||||
It is NOT meant to be efficient in any way; only to be a very simple | It is NOT meant to be efficient in any way; only to be a very simple | ||||
implementation that provides the same behavior.""" | implementation that provides the same behavior.""" | ||||
def __init__(self, *, nodes: List[str], edges: List[Tuple[str, str]]): | def __init__(self, *, nodes: List[str], edges: List[Tuple[str, str]]): | ||||
self.graph = Graph(nodes, edges) | self.graph = Graph(nodes, edges) | ||||
def _check_swhid(self, swhid): | |||||
try: | |||||
ExtendedSWHID.from_string(swhid) | |||||
except ValidationError as e: | |||||
raise GraphArgumentException(*e.args) from None | |||||
if swhid not in self.graph.nodes: | |||||
raise GraphArgumentException(f"SWHID not found: {swhid}") | |||||
def stats(self) -> Dict: | def stats(self) -> Dict: | ||||
return { | return { | ||||
"counts": { | "counts": { | ||||
"nodes": len(self.graph.nodes), | "nodes": len(self.graph.nodes), | ||||
"edges": len(self.graph.forward_edges), | "edges": sum(map(len, self.graph.forward_edges.values())), | ||||
}, | }, | ||||
"ratios": { | "ratios": { | ||||
"compression": 1.0, | "compression": 1.0, | ||||
"bits_per_edge": 100, | "bits_per_edge": 100.0, | ||||
"bits_per_node": 100, | "bits_per_node": 100.0, | ||||
"avg_locality": 0, | "avg_locality": 0.0, | ||||
}, | }, | ||||
"indegree": { | "indegree": { | ||||
"min": min(map(len, self.graph.backward_edges.values())), | "min": min(map(len, self.graph.backward_edges.values())), | ||||
"max": max(map(len, self.graph.backward_edges.values())), | "max": max(map(len, self.graph.backward_edges.values())), | ||||
"avg": statistics.mean(map(len, self.graph.backward_edges.values())), | "avg": statistics.mean(map(len, self.graph.backward_edges.values())), | ||||
}, | }, | ||||
"outdegree": { | "outdegree": { | ||||
"min": min(map(len, self.graph.forward_edges.values())), | "min": min(map(len, self.graph.forward_edges.values())), | ||||
"max": max(map(len, self.graph.forward_edges.values())), | "max": max(map(len, self.graph.forward_edges.values())), | ||||
"avg": statistics.mean(map(len, self.graph.forward_edges.values())), | "avg": statistics.mean(map(len, self.graph.forward_edges.values())), | ||||
}, | }, | ||||
} | } | ||||
def count_neighbors(self, ttype, direction, edges_fmt, src) -> int: | @check_arguments | ||||
return len(self.graph.get_filtered_neighbors(direction, edges_fmt, src)) | def leaves( | ||||
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 | |||||
def count_visit_nodes(self, ttype, direction, edges_fmt, src) -> int: | ) -> Iterator[str]: | ||||
return len(self.graph.get_subgraph(direction, edges_fmt, src)) | # TODO: max_edges | ||||
def count_leaves(self, ttype, direction, edges_fmt, src) -> int: | |||||
return len(list(self.leaves(direction, edges_fmt, src))) | |||||
async def simple_traversal(self, ttype, direction, edges_fmt, src, max_edges): | |||||
# TODO: max_edges? | |||||
if ttype == "visit_nodes": | |||||
for node in self.graph.get_subgraph(direction, edges_fmt, src): | |||||
yield node | |||||
elif ttype == "leaves": | |||||
for node in self.leaves(direction, edges_fmt, src): | |||||
yield node | |||||
else: | |||||
assert False, f"unknown ttype {ttype!r}" | |||||
def leaves(self, direction, edges_fmt, src) -> Iterator[str]: | |||||
yield from [ | yield from [ | ||||
node | node | ||||
for node in self.graph.get_subgraph(direction, edges_fmt, src) | for node in self.graph.get_subgraph(src, edges, direction) | ||||
if not self.graph.get_filtered_neighbors(direction, edges_fmt, node) | if not self.graph.get_filtered_neighbors(node, edges, direction) | ||||
] | ] | ||||
async def walk(self, direction, edges_fmt, algo, src, dst) -> AsyncIterator[str]: | @check_arguments | ||||
def neighbors( | |||||
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 | |||||
) -> Iterator[str]: | |||||
# TODO: max_edges | |||||
yield from self.graph.get_filtered_neighbors(src, edges, direction) | |||||
@check_arguments | |||||
def visit_nodes( | |||||
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 | |||||
) -> Iterator[str]: | |||||
# TODO: max_edges | |||||
yield from self.graph.get_subgraph(src, edges, direction) | |||||
@check_arguments | |||||
def visit_edges( | |||||
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 | |||||
) -> Iterator[Tuple[str, str]]: | |||||
if max_edges == 0: | |||||
max_edges = None # type: ignore | |||||
else: | |||||
max_edges -= 1 | |||||
yield from list(self.graph.iter_edges_dfs(direction, edges, src))[:max_edges] | |||||
@check_arguments | |||||
def visit_paths( | |||||
self, src: str, edges: str = "*", direction: str = "forward", max_edges: int = 0 | |||||
) -> Iterator[List[str]]: | |||||
# TODO: max_edges | |||||
for path in self.graph.iter_paths_dfs(direction, edges, src): | |||||
if path[-1] in self.leaves(src, edges, direction): | |||||
yield list(path) | |||||
@check_arguments | |||||
def walk( | |||||
self, | |||||
src: str, | |||||
dst: str, | |||||
edges: str = "*", | |||||
traversal: str = "dfs", | |||||
direction: str = "forward", | |||||
limit: Optional[int] = None, | |||||
) -> Iterator[str]: | |||||
# TODO: implement algo="bfs" | # TODO: implement algo="bfs" | ||||
# TODO: limit | |||||
match_path: Callable[[str], bool] | |||||
if ":" in dst: | if ":" in dst: | ||||
match_path = dst.__eq__ | match_path = dst.__eq__ | ||||
self._check_swhid(dst) | |||||
else: | else: | ||||
match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa | match_path = lambda node: node.startswith(f"swh:1:{dst}:") # noqa | ||||
for path in self.graph.iter_paths_dfs(direction, edges_fmt, src): | for path in self.graph.iter_paths_dfs(direction, edges, src): | ||||
if match_path(path[-1]): | if match_path(path[-1]): | ||||
for node in path: | if not limit: | ||||
yield node | # 0 or None | ||||
yield from path | |||||
async def random_walk( | elif limit > 0: | ||||
self, direction, edges_fmt, retries, src, dst | yield from path[0:limit] | ||||
) -> AsyncIterator[str]: | |||||
async for node in self.walk(direction, edges_fmt, "dfs", src, dst): | |||||
yield node | |||||
async def visit_paths( | |||||
self, direction, edges_fmt, src, max_edges | |||||
) -> AsyncIterator[List[str]]: | |||||
# TODO: max_edges? | |||||
for path in self.graph.iter_paths_dfs(direction, edges_fmt, src): | |||||
if path[-1] in self.leaves(direction, edges_fmt, src): | |||||
yield list(path) | |||||
async def visit_edges( | |||||
self, direction, edges_fmt, src, max_edges | |||||
) -> AsyncIterator[Tuple[str, str]]: | |||||
if max_edges == 0: | |||||
max_edges = None | |||||
else: | else: | ||||
max_edges -= 1 | yield from path[limit:] | ||||
edges = list(self.graph.iter_edges_dfs(direction, edges_fmt, src)) | |||||
for (from_, to) in edges[:max_edges]: | @check_arguments | ||||
yield (from_, to) | def random_walk( | ||||
self, | |||||
src: str, | |||||
dst: str, | |||||
edges: str = "*", | |||||
direction: str = "forward", | |||||
limit: Optional[int] = None, | |||||
): | |||||
# TODO: limit | |||||
yield from self.walk(src, dst, edges, "dfs", direction, limit) | |||||
@check_arguments | |||||
def count_leaves( | |||||
self, src: str, edges: str = "*", direction: str = "forward" | |||||
) -> int: | |||||
return len(list(self.leaves(src, edges, direction))) | |||||
@check_arguments | |||||
def count_neighbors( | |||||
self, src: str, edges: str = "*", direction: str = "forward" | |||||
) -> int: | |||||
return len(self.graph.get_filtered_neighbors(src, edges, direction)) | |||||
@check_arguments | |||||
def count_visit_nodes( | |||||
self, src: str, edges: str = "*", direction: str = "forward" | |||||
) -> int: | |||||
return len(self.graph.get_subgraph(src, edges, direction)) | |||||
class Graph: | class Graph: | ||||
def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]): | def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]): | ||||
self.nodes = nodes | self.nodes = nodes | ||||
self.forward_edges: Dict[str, List[str]] = {} | self.forward_edges: Dict[str, List[str]] = {} | ||||
self.backward_edges: Dict[str, List[str]] = {} | self.backward_edges: Dict[str, List[str]] = {} | ||||
for node in nodes: | |||||
self.forward_edges[node] = [] | |||||
self.backward_edges[node] = [] | |||||
for (src, dst) in edges: | for (src, dst) in edges: | ||||
self.forward_edges.setdefault(src, []).append(dst) | self.forward_edges[src].append(dst) | ||||
self.backward_edges.setdefault(dst, []).append(src) | self.backward_edges[dst].append(src) | ||||
def numNodes(self) -> int: | |||||
return len(self.nodes) | |||||
def successors(self, node: str) -> Iterator[str]: | |||||
return JavaIterator(self.forward_edges[node]) | |||||
def outdegree(self, node: str) -> int: | |||||
return len(self.forward_edges[node]) | |||||
def predecessors(self, node: str) -> Iterator[str]: | |||||
return JavaIterator(self.backward_edges[node]) | |||||
def indegree(self, node: str) -> int: | |||||
return len(self.backward_edges[node]) | |||||
def get_filtered_neighbors( | def get_filtered_neighbors( | ||||
self, direction: str, edges_fmt: str, src: str | self, src: str, edges_fmt: str, direction: str, | ||||
) -> Set[str]: | ) -> Set[str]: | ||||
if direction == "forward": | if direction == "forward": | ||||
edges = self.forward_edges | edges = self.forward_edges | ||||
elif direction == "backward": | elif direction == "backward": | ||||
edges = self.backward_edges | edges = self.backward_edges | ||||
else: | else: | ||||
assert False, f"unknown direction {direction!r}" | raise GraphArgumentException(f"invalid direction: {direction}") | ||||
neighbors = edges.get(src, []) | neighbors = edges.get(src, []) | ||||
if edges_fmt == "*": | if edges_fmt == "*": | ||||
return set(neighbors) | return set(neighbors) | ||||
else: | else: | ||||
filtered_neighbors: Set[str] = set() | filtered_neighbors: Set[str] = set() | ||||
for edges_fmt_item in edges_fmt.split(","): | for edges_fmt_item in edges_fmt.split(","): | ||||
(src_fmt, dst_fmt) = edges_fmt_item.split(":") | (src_fmt, dst_fmt) = edges_fmt_item.split(":") | ||||
if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): | if src_fmt != "*" and not src.startswith(f"swh:1:{src_fmt}:"): | ||||
continue | continue | ||||
if dst_fmt == "*": | if dst_fmt == "*": | ||||
filtered_neighbors.update(neighbors) | filtered_neighbors.update(neighbors) | ||||
else: | else: | ||||
prefix = f"swh:1:{dst_fmt}:" | prefix = f"swh:1:{dst_fmt}:" | ||||
filtered_neighbors.update( | filtered_neighbors.update( | ||||
n for n in neighbors if n.startswith(prefix) | n for n in neighbors if n.startswith(prefix) | ||||
) | ) | ||||
return filtered_neighbors | return filtered_neighbors | ||||
def get_subgraph(self, direction: str, edges_fmt: str, src: str) -> Set[str]: | def get_subgraph(self, src: str, edges_fmt: str, direction: str) -> Set[str]: | ||||
seen = set() | seen = set() | ||||
to_visit = {src} | to_visit = {src} | ||||
while to_visit: | while to_visit: | ||||
node = to_visit.pop() | node = to_visit.pop() | ||||
seen.add(node) | seen.add(node) | ||||
neighbors = set(self.get_filtered_neighbors(direction, edges_fmt, node)) | neighbors = set(self.get_filtered_neighbors(node, edges_fmt, direction)) | ||||
new_nodes = neighbors - seen | new_nodes = neighbors - seen | ||||
to_visit.update(new_nodes) | to_visit.update(new_nodes) | ||||
return seen | return seen | ||||
def iter_paths_dfs( | def iter_paths_dfs( | ||||
self, direction: str, edges_fmt: str, src: str | self, direction: str, edges_fmt: str, src: str | ||||
) -> Iterator[Tuple[str, ...]]: | ) -> Iterator[Tuple[str, ...]]: | ||||
for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): | for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): | ||||
yield path + (node,) | yield path + (node,) | ||||
def iter_edges_dfs( | def iter_edges_dfs( | ||||
self, direction: str, edges_fmt: str, src: str | self, direction: str, edges_fmt: str, src: str | ||||
) -> Iterator[Tuple[str, ...]]: | ) -> Iterator[Tuple[str, str]]: | ||||
for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): | for (path, node) in DfsSubgraphIterator(self, direction, edges_fmt, src): | ||||
if len(path) > 0: | if len(path) > 0: | ||||
yield (path[-1], node) | yield (path[-1], node) | ||||
class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): | class SubgraphIterator(Iterator[Tuple[Tuple[str, ...], str]]): | ||||
def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): | def __init__(self, graph: Graph, direction: str, edges_fmt: str, src: str): | ||||
self.graph = graph | self.graph = graph | ||||
Show All 17 Lines | def __next__(self) -> Tuple[Tuple[str, ...], str]: | ||||
raise StopIteration() | raise StopIteration() | ||||
(path, node) = self.pop() | (path, node) = self.pop() | ||||
new_path = path + (node,) | new_path = path + (node,) | ||||
if node not in self.seen: | if node not in self.seen: | ||||
neighbors = self.graph.get_filtered_neighbors( | neighbors = self.graph.get_filtered_neighbors( | ||||
self.direction, self.edges_fmt, node | node, self.edges_fmt, self.direction | ||||
) | ) | ||||
# We want to visit the first neighbor first, and to_visit is a stack; | # We want to visit the first neighbor first, and to_visit is a stack; | ||||
# so we need to reversed() the list of neighbors to get it on top | # so we need to reversed() the list of neighbors to get it on top | ||||
# of the stack. | # of the stack. | ||||
for neighbor in reversed(list(neighbors)): | for neighbor in reversed(list(neighbors)): | ||||
self.push(new_path, neighbor) | self.push(new_path, neighbor) | ||||
Show All 17 Lines |