diff --git a/mypy.ini b/mypy.ini
index 354a135..fbebb85 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,15 +1,27 @@
[mypy]
namespace_packages = True
warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
[mypy-pkg_resources.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-ndjson.*]
ignore_missing_imports = True
+
+[mypy-dash.*]
+ignore_missing_imports = True
+
+[mypy-dash_core_components.*]
+ignore_missing_imports = True
+
+[mypy-dash_html_components.*]
+ignore_missing_imports = True
+
+[mypy-plotly.*]
+ignore_missing_imports = True
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 822dad4..6cd7568 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,11 +1,12 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
vcversioner
requests
aiohttp
ndjson
plotly
pandas
numpy
+dash
dulwich
diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py
index e756664..dd1e76b 100644
--- a/swh/scanner/cli.py
+++ b/swh/scanner/cli.py
@@ -1,102 +1,113 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import asyncio
import glob
import re
import fnmatch
from pathlib import PosixPath
from typing import Tuple
from .scanner import run
from .model import Tree
+from .plot import generate_sunburst
+from .dashboard import run_app
from .exceptions import InvalidDirectoryPath
from swh.core.cli import CONTEXT_SETTINGS
@click.group(name="scanner", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def scanner(ctx):
"""Software Heritage Scanner tools."""
pass
def parse_url(url):
if not url.startswith("https://"):
url = "https://" + url
if not url.endswith("/"):
url += "/"
return url
def extract_regex_objs(root_path: PosixPath, patterns: Tuple[str]) -> object:
"""Generates a regex object for each pattern given in input and checks if
the path is a subdirectory or relative to the root path.
Yields:
an SRE_Pattern object
"""
for pattern in patterns:
for path in glob.glob(pattern):
dirpath = PosixPath(path)
if root_path not in dirpath.parents:
error_msg = (
f'The path "{dirpath}" is not a subdirectory or relative '
f'to the root directory path: "{root_path}"'
)
raise InvalidDirectoryPath(error_msg)
if glob.glob(pattern):
regex = fnmatch.translate(str(PosixPath(pattern)))
yield re.compile(regex)
@scanner.command(name="scan")
@click.argument("root_path", required=True, type=click.Path(exists=True))
@click.option(
"-u",
"--api-url",
default="https://archive.softwareheritage.org/api/1",
metavar="API_URL",
show_default=True,
help="url for the api request",
)
@click.option(
"--exclude",
"-x",
"patterns",
metavar="PATTERN",
multiple=True,
help="recursively exclude a specific pattern",
)
@click.option(
"-f",
"--format",
type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False),
default="text",
help="select the output format",
)
+@click.option(
+ "-i", "--interactive", is_flag=True, help="show the result in a dashboard"
+)
@click.pass_context
-def scan(ctx, root_path, api_url, patterns, format):
+def scan(ctx, root_path, api_url, patterns, format, interactive):
"""Scan a source code project to discover files and directories already
present in the archive"""
sre_patterns = set()
if patterns:
sre_patterns = {
reg_obj for reg_obj in extract_regex_objs(PosixPath(root_path), patterns)
}
api_url = parse_url(api_url)
source_tree = Tree(PosixPath(root_path))
loop = asyncio.get_event_loop()
loop.run_until_complete(run(root_path, api_url, source_tree, sre_patterns))
- source_tree.show(format)
+ if interactive:
+ root = PosixPath(root_path)
+ directories = source_tree.getDirectoriesInfo(root)
+ figure = generate_sunburst(directories, root)
+ run_app(figure, source_tree)
+ else:
+ source_tree.show(format)
if __name__ == "__main__":
scan()
diff --git a/swh/scanner/dashboard.py b/swh/scanner/dashboard.py
new file mode 100644
index 0000000..ade5645
--- /dev/null
+++ b/swh/scanner/dashboard.py
@@ -0,0 +1,24 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from .model import Tree
+
+import plotly.graph_objects as go
+import dash
+import dash_core_components as dcc
+import dash_html_components as html
+
+
+def run_app(graph_obj: go, source: Tree):
+ app = dash.Dash(__name__)
+ fig = go.Figure().add_trace(graph_obj)
+
+ fig.update_layout(height=800,)
+
+ app.layout = html.Div(
+ [html.Div([html.Div([dcc.Graph(id="sunburst_chart", figure=fig),]),]),]
+ )
+
+ app.run_server(debug=True, use_reloader=False)
diff --git a/swh/scanner/model.py b/swh/scanner/model.py
index 3267193..5502fd3 100644
--- a/swh/scanner/model.py
+++ b/swh/scanner/model.py
@@ -1,228 +1,229 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import sys
import json
from pathlib import PosixPath
from typing import Any, Dict, Tuple, Iterable
from enum import Enum
import ndjson
-from .plot import sunburst
+from .plot import generate_sunburst, offline_plot
from .exceptions import InvalidObjectType
from swh.model.identifiers import DIRECTORY, CONTENT
class Color(Enum):
blue = "\033[94m"
green = "\033[92m"
red = "\033[91m"
end = "\033[0m"
def colorize(text: str, color: Color):
return color.value + text + Color.end.value
class Tree:
"""Representation of a file system structure
"""
def __init__(self, path: PosixPath, father: Tree = None):
self.father = father
self.path = path
self.otype = DIRECTORY if path.is_dir() else CONTENT
self.swhid = ""
self.known = False
self.children: Dict[PosixPath, Tree] = {}
def addNode(self, path: PosixPath, swhid: str, known: bool) -> None:
"""Recursively add a new path.
"""
relative_path = path.relative_to(self.path)
if relative_path == PosixPath("."):
self.swhid = swhid
self.known = known
return
new_path = self.path.joinpath(relative_path.parts[0])
if new_path not in self.children:
self.children[new_path] = Tree(new_path, self)
self.children[new_path].addNode(path, swhid, known)
def show(self, format) -> None:
"""Show tree in different formats"""
if format == "json":
print(json.dumps(self.toDict(), indent=4, sort_keys=True))
if format == "ndjson":
print(ndjson.dumps(dict_path for dict_path in self.iterate()))
elif format == "text":
isatty = sys.stdout.isatty()
print(colorize(str(self.path), Color.blue) if isatty else str(self.path))
self.printChildren(isatty)
elif format == "sunburst":
root = self.path
directories = self.getDirectoriesInfo(root)
- sunburst(directories, root)
+ sunburst = generate_sunburst(directories, root)
+ offline_plot(sunburst)
def printChildren(self, isatty: bool, inc: int = 1) -> None:
for path, node in self.children.items():
self.printNode(node, isatty, inc)
if node.children:
node.printChildren(isatty, inc + 1)
def printNode(self, node: Any, isatty: bool, inc: int) -> None:
rel_path = str(node.path.relative_to(self.path))
begin = "│ " * inc
end = "/" if node.otype == DIRECTORY else ""
if isatty:
if not node.known:
rel_path = colorize(rel_path, Color.red)
elif node.otype == DIRECTORY:
rel_path = colorize(rel_path, Color.blue)
elif node.otype == CONTENT:
rel_path = colorize(rel_path, Color.green)
print(f"{begin}{rel_path}{end}")
@property
def attributes(self):
"""
Get the attributes of the current node grouped by the relative path.
Returns:
a dictionary containing a path as key and its known/unknown status and the
Software Heritage persistent identifier as values.
"""
return {str(self.path): {"swhid": self.swhid, "known": self.known,}}
def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]:
"""
Recursively groups the current child nodes inside a dictionary.
For example, if you have the following structure:
.. code-block:: none
root {
subdir: {
file.txt
}
}
The generated dictionary will be:
.. code-block:: none
{
"root": {
"swhid": "...",
"known": True/False
}
"root/subdir": {
"swhid": "...",
"known": True/False
}
"root/subdir/file.txt": {
"swhid": "...",
"known": True/False
}
}
"""
for node_dict in self.iterate():
dict_nodes.update(node_dict)
return dict_nodes
def iterate(self) -> Iterable[Dict[str, Dict]]:
"""
Recursively iterate through the children of the current node
Yields:
a dictionary containing a path with its known/unknown status and the
Software Heritage persistent identifier
"""
for _, child_node in self.children.items():
yield child_node.attributes
if child_node.otype == DIRECTORY:
yield from child_node.iterate()
def __getSubDirsInfo(self, root, directories):
"""Fills the directories given in input with the contents information
stored inside the directory child, only if they have contents.
"""
for path, child_node in self.children.items():
if child_node.otype == DIRECTORY:
rel_path = path.relative_to(root)
contents_info = child_node.count_contents()
# checks the first element of the tuple
# (the number of contents in a directory)
# if it is equal to zero it means that there are no contents
# in that directory.
if not contents_info[0] == 0:
directories[rel_path] = contents_info
if child_node.has_dirs():
child_node.__getSubDirsInfo(root, directories)
def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]:
"""Get information about all directories under the given root.
Returns:
A dictionary with a directory path as key and the relative
contents information (the result of count_contents) as values.
"""
directories = {root: self.count_contents()}
self.__getSubDirsInfo(root, directories)
return directories
def count_contents(self) -> Tuple[int, int]:
"""Count how many contents are present inside a directory.
If a directory has a pid returns as it has all the contents.
Returns:
A tuple with the total number of the contents and the number
of contents known (the ones that have a persistent identifier).
"""
contents = 0
discovered = 0
if not self.otype == DIRECTORY:
raise InvalidObjectType(
"Can't calculate contents of the " "object type: %s" % self.otype
)
if self.known:
# to identify a directory with all files/directories present
return (1, 1)
else:
for _, child_node in self.children.items():
if child_node.otype == CONTENT:
contents += 1
if child_node.known:
discovered += 1
return (contents, discovered)
def has_dirs(self) -> bool:
"""Checks if node has directories
"""
for _, child_node in self.children.items():
if child_node.otype == DIRECTORY:
return True
return False
diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py
index 18be96b..a1ccf6b 100644
--- a/swh/scanner/plot.py
+++ b/swh/scanner/plot.py
@@ -1,274 +1,281 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
The purpose of this module is to display and to interact with the result of the
scanner contained in the model.
The `sunburst` function generates a navigable sunburst chart from the
directories information retrieved from the model. The chart displays for
each directory the total number of files and the percentage of file known.
The size of the directory is defined by the total number of contents whereas
the color gradient is generated relying on the percentage of contents known.
"""
from typing import List, Dict, Tuple
from pathlib import PosixPath
-from plotly.offline import offline # type: ignore
-import plotly.graph_objects as go # type: ignore
+from plotly.offline import offline
+import plotly.graph_objects as go
import pandas as pd # type: ignore
import numpy as np # type: ignore
def build_hierarchical_df(
dirs_dataframe: pd.DataFrame,
levels: List[str],
metrics_columns: List[str],
root_name: str,
) -> pd.DataFrame:
"""
Build a hierarchy of levels for Sunburst or Treemap charts.
For each directory the new dataframe will have the following
information:
id: the directory name
parent: the parent directory of id
contents: the total number of contents of the directory id and
the relative subdirectories
known: the percentage of contents known relative to computed
'contents'
Example:
Given the following dataframe:
.. code-block:: none
lev0 lev1 contents known
'' '' 20 2 //root
kernel kernel/subdirker 5 0
telnet telnet/subdirtel 10 4
The output hierarchical dataframe will be like the following:
.. code-block:: none
id parent contents known
20 10.00
kernel/subdirker kernel 5 0.00
telnet/subdirtel telnet 10 40.00
total 20 10.00
kernel total 5 0.00
telnet total 10 40.00
total 35 17.14
To create the hierarchical dataframe we need to iterate through
the dataframe given in input relying on the number of levels.
Based on the previous example we have to do two iterations:
iteration 1
The generated dataframe 'df_tree' will be:
.. code-block:: none
id parent contents known
20 10.0
kernel/subdirker kernel 5 0.0
telnet/subdirtel telnet 10 40.0
iteration 2
The generated dataframe 'df_tree' will be:
.. code-block:: none
id parent contents known
total 20 10.0
kernel total 5 0.0
telnet total 10 40.0
Note that since we have reached the last level, the parent given
to the directory id is the directory root.
The 'total' row il computed by adding the number of contents of the
dataframe given in input and the average of the contents known on
the total number of contents.
"""
def compute_known_percentage(contents: pd.Series, known: pd.Series) -> pd.Series:
"""This function compute the percentage of known contents and generate
the new known column with the percentage values.
It also assures that if there is no contents inside a directory
the percentage is zero
"""
known_values = []
for idx, content_val in enumerate(contents):
if content_val == 0:
known_values.append(0)
else:
percentage = known[idx] / contents[idx] * 100
known_values.append(percentage)
return pd.Series(np.array(known_values))
complete_df = pd.DataFrame(columns=["id", "parent", "contents", "known"])
# revert the level order to start from the deepest
levels = [level for level in reversed(levels)]
contents_col = metrics_columns[0]
known_col = metrics_columns[1]
df_tree_list = []
for i, level in enumerate(levels):
df_tree = pd.DataFrame(columns=["id", "parent", "contents", "known"])
dfg = dirs_dataframe.groupby(levels[i:]).sum()
dfg = dfg.reset_index()
df_tree["id"] = dfg[level].copy()
if i < len(levels) - 1:
# copy the parent directories (one level above)
df_tree["parent"] = dfg[levels[i + 1]].copy()
else:
# last level reached
df_tree["parent"] = root_name
# copy the contents column
df_tree["contents"] = dfg[contents_col]
# compute the percentage relative to the contents
df_tree["known"] = compute_known_percentage(dfg[contents_col], dfg[known_col])
df_tree_list.append(df_tree)
complete_df = complete_df.append(df_tree_list, ignore_index=True)
# create the main parent
total_contents = dirs_dataframe[contents_col].sum()
total_known = dirs_dataframe[known_col].sum()
total_avg = total_known / total_contents * 100
total = pd.Series(
dict(id=root_name, parent="", contents=total_contents, known=total_avg)
)
complete_df = complete_df.append(total, ignore_index=True)
return complete_df
def compute_max_depth(dirs_path: List[PosixPath], root: PosixPath) -> int:
"""Compute the maximum depth level of the given directory paths.
Example: for `var/log/kernel/` the depth level is 3
"""
max_depth = 0
for dir_path in dirs_path:
if dir_path == root:
continue
dir_depth = len(dir_path.parts)
if dir_depth > max_depth:
max_depth = dir_depth
return max_depth
def generate_df_from_dirs(
dirs: Dict[PosixPath, Tuple[int, int]],
columns: List[str],
root: PosixPath,
max_depth: int,
) -> pd.DataFrame:
"""Generate a dataframe from the directories given in input.
Example:
given the following directories as input
.. code-block:: python
dirs = {
'/var/log/': (23, 2),
'/var/log/kernel': (5, 0),
'/var/log/telnet': (10, 3)
}
The generated dataframe will be:
.. code-block:: none
lev0 lev1 lev2 contents known
'var' 'var/log' '' 23 2
'var' 'var/log' 'var/log/kernel' 5 0
'var' 'var/log' 'var/log/telnet' 10 3
"""
def get_parents(path: PosixPath):
parts = path.parts[1:] if path.parts[0] == "/" else path.parts
for i in range(1, len(parts) + 1):
yield "/".join(parts[0:i])
def get_dirs_array():
for dir_path, contents_info in dirs.items():
empty_lvl = max_depth - len(dir_path.parts)
if dir_path == root:
# ignore the root but store contents information
yield [""] * (max_depth) + list(contents_info)
else:
yield list(get_parents(dir_path)) + [""] * empty_lvl + list(
contents_info
)
df = pd.DataFrame(
np.array([dir_array for dir_array in get_dirs_array()]), columns=columns
)
df["contents"] = pd.to_numeric(df["contents"])
df["known"] = pd.to_numeric(df["known"])
return df
-def sunburst(directories: Dict[PosixPath, Tuple[int, int]], root: PosixPath) -> None:
- """Show the sunburst chart from the directories given in input.
+def generate_sunburst(
+ directories: Dict[PosixPath, Tuple[int, int]], root: PosixPath
+) -> go.Sunburst:
+ """Generate a sunburst chart from the directories given in input.
"""
max_depth = compute_max_depth(list(directories.keys()), root)
metrics_columns = ["contents", "known"]
levels_columns = ["lev" + str(i) for i in range(max_depth)]
df_columns = levels_columns + metrics_columns
dirs_df = generate_df_from_dirs(directories, df_columns, root, max_depth)
hierarchical_df = build_hierarchical_df(
dirs_df, levels_columns, metrics_columns, str(root)
)
- fig = go.Figure()
- fig.add_trace(
- go.Sunburst(
- labels=hierarchical_df["id"],
- parents=hierarchical_df["parent"],
- values=hierarchical_df["contents"],
- branchvalues="total",
- marker=dict(
- colors=hierarchical_df["known"],
- colorscale="matter",
- cmid=50,
- showscale=True,
- ),
- hovertemplate="""%{label}
-
Files: %{value}
-
Known: %{color:.2f}%""",
- name="",
- )
+ sunburst = go.Sunburst(
+ labels=hierarchical_df["id"],
+ parents=hierarchical_df["parent"],
+ values=hierarchical_df["contents"],
+ branchvalues="total",
+ marker=dict(
+ colors=hierarchical_df["known"],
+ colorscale="matter",
+ cmid=50,
+ showscale=True,
+ ),
+ hovertemplate="""%{label}
+
Files: %{value}
+
Known: %{color:.2f}%""",
+ name="",
)
- offline.plot(fig, filename="sunburst.html")
+ return sunburst
+
+
+def offline_plot(graph_object: go):
+ """Plot a graph object to an html file
+ """
+ fig = go.Figure()
+ fig.add_trace(graph_object)
+ offline.plot(fig, filename="chart.html")