diff --git a/setup.py b/setup.py
index 84e7e72..f623637 100755
--- a/setup.py
+++ b/setup.py
@@ -1,74 +1,74 @@
#!/usr/bin/env python3
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from setuptools import setup, find_packages
-
-from os import path
from io import open
+from os import path
+
+from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
# Edit this part to match your module.
# Full sample:
# https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py
setup(
name="swh.scanner",
description="Software Heritage code scanner",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DTSCN/",
packages=find_packages(), # packages's modules
install_requires=parse_requirements() + parse_requirements("swh"),
tests_require=parse_requirements("test"),
setup_requires=["setuptools-scm"],
use_scm_version=True,
extras_require={"testing": parse_requirements("test")},
include_package_data=True,
entry_points="""
[swh.cli.subcommands]
scanner=swh.scanner.cli:scanner
""",
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-scanner",
"Documentation": "https://docs.softwareheritage.org/devel/swh-scanner/",
},
)
diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py
index 032a829..0f53cd7 100644
--- a/swh/scanner/cli.py
+++ b/swh/scanner/cli.py
@@ -1,109 +1,108 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
from typing import Any, Dict
import click
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
-
# All generic config code should reside in swh.core.config
DEFAULT_CONFIG_PATH = os.environ.get(
"SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml")
)
DEFAULT_CONFIG: Dict[str, Any] = {
"web-api": {
"url": "https://archive.softwareheritage.org/api/1/",
"auth-token": None,
}
}
def parse_url(url):
"""CLI-specific helper to 'autocomplete' the provided url."""
if not url.startswith("https://"):
url = "https://" + url
if not url.endswith("/"):
url += "/"
return url
@click.group(name="scanner", context_settings=CONTEXT_SETTINGS)
@click.option(
"-C",
"--config-file",
default=DEFAULT_CONFIG_PATH,
type=click.Path(exists=True, dir_okay=False, path_type=str),
help="YAML configuration file",
)
@click.pass_context
def scanner(ctx, config_file: str):
"""Software Heritage Scanner tools."""
# recursive merge not done by config.read
conf = config.read_raw_config(config.config_basepath(config_file))
conf = config.merge_configs(DEFAULT_CONFIG, conf)
ctx.ensure_object(dict)
ctx.obj["config"] = conf
@scanner.command(name="scan")
@click.argument("root_path", required=True, type=click.Path(exists=True))
@click.option(
"-u",
"--api-url",
default=None,
metavar="API_URL",
show_default=True,
help="URL for the api request",
)
@click.option(
"--exclude",
"-x",
"patterns",
metavar="PATTERN",
multiple=True,
help="Exclude directories using glob patterns \
(e.g., '*.git' to exclude all .git directories)",
)
@click.option(
"-f",
"--output-format",
"out_fmt",
default="text",
show_default=True,
type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False),
help="The output format",
)
@click.option(
"-i", "--interactive", is_flag=True, help="Show the result in a dashboard"
)
@click.pass_context
def scan(ctx, root_path, api_url, patterns, out_fmt, interactive):
"""Scan a source code project to discover files and directories already
present in the archive"""
from .scanner import scan
config = ctx.obj["config"]
if api_url:
config["web-api"]["url"] = parse_url(api_url)
scan(config, root_path, patterns, out_fmt, interactive)
def main():
return scanner(auto_envvar_prefix="SWH_SCANNER")
if __name__ == "__main__":
main()
diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py
index 71d4c24..4bc6c29 100644
--- a/swh/scanner/dashboard/dashboard.py
+++ b/swh/scanner/dashboard/dashboard.py
@@ -1,101 +1,101 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from pathlib import Path
-from ..model import Tree
-
-import plotly.graph_objects as go
import dash
+from dash.dependencies import Input, Output
+import dash_bootstrap_components as dbc
import dash_core_components as dcc
import dash_html_components as html
-import dash_bootstrap_components as dbc
-from dash.dependencies import Input, Output
+import plotly.graph_objects as go
+
+from ..model import Tree
def generate_table_body(dir_path: Path, source: Tree):
"""
Generate the data_table from the path taken from the chart.
For each file builds the html table rows showing the known status, a local link to
the file and the relative SoftWare Heritage persistent IDentifier (SWHID).
"""
data = []
for file_info in source.getFilesFromDir(dir_path):
for file_path, attr in file_info.items():
file_path = Path(file_path)
file_name = file_path.parts[len(file_path.parts) - 1]
data.append(
html.Tr(
[
html.Td("✔" if attr["known"] else ""),
html.Td(
html.A(file_name, href="file://" + str(file_path.resolve()))
),
html.Td(attr["swhid"]),
]
)
)
return [html.Tbody(data)]
def run_app(graph_obj: go, source: Tree):
app = dash.Dash(__name__)
fig = go.Figure().add_trace(graph_obj)
fig.update_layout(height=800,)
table_header = [
html.Thead(html.Tr([html.Th("KNOWN"), html.Th("FILE NAME"), html.Th("SWHID")]))
]
app.layout = html.Div(
[
html.Div(
[
html.Div(
[dcc.Graph(id="sunburst_chart", figure=fig),], className="col",
),
html.Div(
[
html.H3(id="directory_title"),
dbc.Table(
id="files_table",
hover=True,
responsive=True,
striped=True,
),
],
className="col",
),
],
className="row",
),
]
)
@app.callback(
[Output("files_table", "children"), Output("directory_title", "children")],
[Input("sunburst_chart", "clickData")],
)
def update_files_table(click_data):
"""
Callback that takes the input (directory path) from the chart and
update the `files_table` children with the relative files.
"""
if click_data is not None:
raw_path = click_data["points"][0]["label"]
full_path = (
source.path.joinpath(raw_path)
if raw_path != str(source.path)
else Path(raw_path)
)
return table_header + generate_table_body(full_path, source), str(full_path)
else:
return "", ""
app.run_server(debug=True, use_reloader=True)
diff --git a/swh/scanner/model.py b/swh/scanner/model.py
index 997e65b..8b27a5e 100644
--- a/swh/scanner/model.py
+++ b/swh/scanner/model.py
@@ -1,265 +1,267 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
-import sys
+
+from enum import Enum
import json
from pathlib import Path
-from typing import Any, Dict, Tuple, Iterable, List
-from enum import Enum
+import sys
+from typing import Any, Dict, Iterable, List, Tuple
import ndjson
+from swh.model.identifiers import CONTENT, DIRECTORY
+
+from .exceptions import InvalidDirectoryPath, InvalidObjectType
from .plot import generate_sunburst, offline_plot
-from .exceptions import InvalidObjectType, InvalidDirectoryPath
-from swh.model.identifiers import DIRECTORY, CONTENT
class Color(Enum):
blue = "\033[94m"
green = "\033[92m"
red = "\033[91m"
end = "\033[0m"
def colorize(text: str, color: Color):
return color.value + text + Color.end.value
class Tree:
"""Representation of a file system structure
"""
def __init__(self, path: Path, father: Tree = None):
self.father = father
self.path = path
self.otype = DIRECTORY if path.is_dir() else CONTENT
self.swhid = ""
self.known = False
self.children: Dict[Path, Tree] = {}
def addNode(self, path: Path, swhid: str, known: bool) -> None:
"""Recursively add a new path.
"""
relative_path = path.relative_to(self.path)
if relative_path == Path("."):
self.swhid = swhid
self.known = known
return
new_path = self.path.joinpath(relative_path.parts[0])
if new_path not in self.children:
self.children[new_path] = Tree(new_path, self)
self.children[new_path].addNode(path, swhid, known)
def show(self, format) -> None:
"""Show tree in different formats"""
if format == "json":
print(json.dumps(self.toDict(), indent=4, sort_keys=True))
if format == "ndjson":
print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr()))
elif format == "text":
isatty = sys.stdout.isatty()
print(colorize(str(self.path), Color.blue) if isatty else str(self.path))
self.printChildren(isatty)
elif format == "sunburst":
root = self.path
directories = self.getDirectoriesInfo(root)
sunburst = generate_sunburst(directories, root)
offline_plot(sunburst)
def printChildren(self, isatty: bool, inc: int = 1) -> None:
for path, node in self.children.items():
self.printNode(node, isatty, inc)
if node.children:
node.printChildren(isatty, inc + 1)
def printNode(self, node: Any, isatty: bool, inc: int) -> None:
rel_path = str(node.path.relative_to(self.path))
begin = "│ " * inc
end = "/" if node.otype == DIRECTORY else ""
if isatty:
if not node.known:
rel_path = colorize(rel_path, Color.red)
elif node.otype == DIRECTORY:
rel_path = colorize(rel_path, Color.blue)
elif node.otype == CONTENT:
rel_path = colorize(rel_path, Color.green)
print(f"{begin}{rel_path}{end}")
@property
def attributes(self):
"""
Get the attributes of the current node grouped by the relative path.
Returns:
a dictionary containing a path as key and its known/unknown status and the
SWHID as values.
"""
return {str(self.path): {"swhid": self.swhid, "known": self.known,}}
def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]:
"""
Recursively groups the current child nodes inside a dictionary.
For example, if you have the following structure:
.. code-block:: none
root {
subdir: {
file.txt
}
}
The generated dictionary will be:
.. code-block:: none
{
"root": {
"swhid": "...",
"known": True/False
}
"root/subdir": {
"swhid": "...",
"known": True/False
}
"root/subdir/file.txt": {
"swhid": "...",
"known": True/False
}
}
"""
for node_dict in self.__iterNodesAttr():
dict_nodes.update(node_dict)
return dict_nodes
def iterate(self) -> Iterable[Tree]:
"""
Recursively iterate through the children of the current node
"""
for _, child_node in self.children.items():
yield child_node
if child_node.otype == DIRECTORY:
yield from child_node.iterate()
def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]:
"""
Recursively iterate through the children of the current node returning
an iterable of the children nodes attributes
Yields:
a dictionary containing a path with its known/unknown status and the
SWHID
"""
for child_node in self.iterate():
yield child_node.attributes
if child_node.otype == DIRECTORY:
yield from child_node.__iterNodesAttr()
def getFilesFromDir(self, dir_path: Path) -> List:
"""
Retrieve files information about a specific directory path
Returns:
A list containing the files attributes present inside the directory given
in input
"""
def getFiles(node):
files = []
for _, node in node.children.items():
if node.otype == CONTENT:
files.append(node.attributes)
return files
if dir_path == self.path:
return getFiles(self)
else:
for node in self.iterate():
if node.path == dir_path:
return getFiles(node)
raise InvalidDirectoryPath(
"The directory provided doesn't match any stored directory"
)
def __getSubDirsInfo(self, root, directories):
"""Fills the directories given in input with the contents information
stored inside the directory child, only if they have contents.
"""
for path, child_node in self.children.items():
if child_node.otype == DIRECTORY:
rel_path = path.relative_to(root)
contents_info = child_node.count_contents()
# checks the first element of the tuple
# (the number of contents in a directory)
# if it is equal to zero it means that there are no contents
# in that directory.
if not contents_info[0] == 0:
directories[rel_path] = contents_info
if child_node.has_dirs():
child_node.__getSubDirsInfo(root, directories)
def getDirectoriesInfo(self, root: Path) -> Dict[Path, Tuple[int, int]]:
"""Get information about all directories under the given root.
Returns:
A dictionary with a directory path as key and the relative
contents information (the result of count_contents) as values.
"""
directories = {root: self.count_contents()}
self.__getSubDirsInfo(root, directories)
return directories
def count_contents(self) -> Tuple[int, int]:
"""Count how many contents are present inside a directory.
If a directory has a SWHID returns as it has all the contents.
Returns:
A tuple with the total number of the contents and the number
of contents known (the ones that have a persistent identifier).
"""
contents = 0
discovered = 0
if not self.otype == DIRECTORY:
raise InvalidObjectType(
"Can't calculate contents of the " "object type: %s" % self.otype
)
if self.known:
# to identify a directory with all files/directories present
return (1, 1)
else:
for _, child_node in self.children.items():
if child_node.otype == CONTENT:
contents += 1
if child_node.known:
discovered += 1
return (contents, discovered)
def has_dirs(self) -> bool:
"""Checks if node has directories
"""
for _, child_node in self.children.items():
if child_node.otype == DIRECTORY:
return True
return False
diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py
index 961d677..ff14125 100644
--- a/swh/scanner/plot.py
+++ b/swh/scanner/plot.py
@@ -1,278 +1,278 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
The purpose of this module is to display and to interact with the result of the
scanner contained in the model.
The `sunburst` function generates a navigable sunburst chart from the
directories information retrieved from the model. The chart displays for
each directory the total number of files and the percentage of file known.
The size of the directory is defined by the total number of contents whereas
the color gradient is generated relying on the percentage of contents known.
"""
-from typing import List, Dict, Tuple
from pathlib import Path
+from typing import Dict, List, Tuple
-from plotly.offline import offline
-import plotly.graph_objects as go
-import pandas as pd # type: ignore
import numpy as np # type: ignore
+import pandas as pd # type: ignore
+import plotly.graph_objects as go
+from plotly.offline import offline
def build_hierarchical_df(
dirs_dataframe: pd.DataFrame,
levels: List[str],
metrics_columns: List[str],
root_name: str,
) -> pd.DataFrame:
"""
Build a hierarchy of levels for Sunburst or Treemap charts.
For each directory the new dataframe will have the following
information:
id: the directory name
parent: the parent directory of id
contents: the total number of contents of the directory id and
the relative subdirectories
known: the percentage of contents known relative to computed
'contents'
Example:
Given the following dataframe:
.. code-block:: none
lev0 lev1 contents known
'' '' 20 2 //root
kernel kernel/subdirker 5 0
telnet telnet/subdirtel 10 4
The output hierarchical dataframe will be like the following:
.. code-block:: none
id parent contents known
20 10.00
kernel/subdirker kernel 5 0.00
telnet/subdirtel telnet 10 40.00
total 20 10.00
kernel total 5 0.00
telnet total 10 40.00
total 35 17.14
To create the hierarchical dataframe we need to iterate through
the dataframe given in input relying on the number of levels.
Based on the previous example we have to do two iterations:
iteration 1
The generated dataframe 'df_tree' will be:
.. code-block:: none
id parent contents known
20 10.0
kernel/subdirker kernel 5 0.0
telnet/subdirtel telnet 10 40.0
iteration 2
The generated dataframe 'df_tree' will be:
.. code-block:: none
id parent contents known
total 20 10.0
kernel total 5 0.0
telnet total 10 40.0
Note that since we have reached the last level, the parent given
to the directory id is the directory root.
The 'total' row il computed by adding the number of contents of the
dataframe given in input and the average of the contents known on
the total number of contents.
"""
def compute_known_percentage(contents: pd.Series, known: pd.Series) -> pd.Series:
"""This function compute the percentage of known contents and generate
the new known column with the percentage values.
It also assures that if there is no contents inside a directory
the percentage is zero
"""
known_values = []
for idx, content_val in enumerate(contents):
if content_val == 0:
known_values.append(0)
else:
percentage = known[idx] / contents[idx] * 100
known_values.append(percentage)
return pd.Series(np.array(known_values))
complete_df = pd.DataFrame(columns=["id", "parent", "contents", "known"])
# revert the level order to start from the deepest
levels = [level for level in reversed(levels)]
contents_col = metrics_columns[0]
known_col = metrics_columns[1]
df_tree_list = []
for i, level in enumerate(levels):
df_tree = pd.DataFrame(columns=["id", "parent", "contents", "known"])
dfg = dirs_dataframe.groupby(levels[i:]).sum()
dfg = dfg.reset_index()
df_tree["id"] = dfg[level].copy()
if i < len(levels) - 1:
# copy the parent directories (one level above)
df_tree["parent"] = dfg[levels[i + 1]].copy()
else:
# last level reached
df_tree["parent"] = root_name
# copy the contents column
df_tree["contents"] = dfg[contents_col]
# compute the percentage relative to the contents
df_tree["known"] = compute_known_percentage(dfg[contents_col], dfg[known_col])
df_tree_list.append(df_tree)
complete_df = complete_df.append(df_tree_list, ignore_index=True)
# create the main parent
total_contents = dirs_dataframe[contents_col].sum()
total_known = dirs_dataframe[known_col].sum()
total_avg = total_known / total_contents * 100
total = pd.Series(
dict(id=root_name, parent="", contents=total_contents, known=total_avg)
)
complete_df = complete_df.append(total, ignore_index=True)
return complete_df
def compute_max_depth(dirs_path: List[Path], root: Path) -> int:
"""Compute the maximum depth level of the given directory paths.
Example: for `var/log/kernel/` the depth level is 3
"""
max_depth = 0
for dir_path in dirs_path:
if dir_path == root:
continue
dir_depth = len(dir_path.parts)
if dir_depth > max_depth:
max_depth = dir_depth
return max_depth
def generate_df_from_dirs(
dirs: Dict[Path, Tuple[int, int]], columns: List[str], root: Path, max_depth: int,
) -> pd.DataFrame:
"""Generate a dataframe from the directories given in input.
Example:
given the following directories as input
.. code-block:: python
dirs = {
'/var/log/': (23, 2),
'/var/log/kernel': (5, 0),
'/var/log/telnet': (10, 3)
}
The generated dataframe will be:
.. code-block:: none
lev0 lev1 lev2 contents known
'var' 'var/log' '' 23 2
'var' 'var/log' 'var/log/kernel' 5 0
'var' 'var/log' 'var/log/telnet' 10 3
"""
def get_parents(path: Path):
parts = path.parts[1:] if path.parts[0] == "/" else path.parts
for i in range(1, len(parts) + 1):
yield "/".join(parts[0:i])
def get_dirs_array():
for dir_path, contents_info in dirs.items():
empty_lvl = max_depth - len(dir_path.parts)
if dir_path == root:
# ignore the root but store contents information
yield [""] * (max_depth) + list(contents_info)
else:
yield list(get_parents(dir_path)) + [""] * empty_lvl + list(
contents_info
)
df = pd.DataFrame(
np.array([dir_array for dir_array in get_dirs_array()]), columns=columns
)
df["contents"] = pd.to_numeric(df["contents"])
df["known"] = pd.to_numeric(df["known"])
return df
def generate_sunburst(
directories: Dict[Path, Tuple[int, int]], root: Path
) -> go.Sunburst:
"""Generate a sunburst chart from the directories given in input.
"""
max_depth = compute_max_depth(list(directories.keys()), root)
metrics_columns = ["contents", "known"]
levels_columns = ["lev" + str(i) for i in range(max_depth)]
df_columns = levels_columns + metrics_columns
dirs_df = generate_df_from_dirs(directories, df_columns, root, max_depth)
hierarchical_df = build_hierarchical_df(
dirs_df, levels_columns, metrics_columns, str(root)
)
sunburst = go.Sunburst(
labels=hierarchical_df["id"],
parents=hierarchical_df["parent"],
values=hierarchical_df["contents"],
branchvalues="total",
marker=dict(
colors=hierarchical_df["known"],
colorscale="matter",
cmid=50,
showscale=True,
),
hovertemplate="""%{label}
Files: %{value}
Known: %{color:.2f}%""",
name="",
)
return sunburst
def offline_plot(graph_object: go):
"""Plot a graph object to an html file
"""
fig = go.Figure()
fig.add_trace(graph_object)
offline.plot(fig, filename="chart.html")
diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py
index ea89524..10a56c3 100644
--- a/swh/scanner/scanner.py
+++ b/swh/scanner/scanner.py
@@ -1,250 +1,245 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import fnmatch
import glob
import itertools
import os
from pathlib import Path
import re
-from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any
+from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union
import aiohttp
-from swh.model.from_disk import Directory, Content, accept_all_directories
-from swh.model.identifiers import (
- swhid,
- parse_swhid,
- DIRECTORY,
- CONTENT,
-)
+from swh.model.from_disk import Content, Directory, accept_all_directories
+from swh.model.identifiers import CONTENT, DIRECTORY, parse_swhid, swhid
+from .dashboard.dashboard import run_app
from .exceptions import InvalidDirectoryPath, error_response
from .model import Tree
from .plot import generate_sunburst
-from .dashboard.dashboard import run_app
async def swhids_discovery(
swhids: List[str], session: aiohttp.ClientSession, api_url: str,
) -> Dict[str, Dict[str, bool]]:
"""API Request to get information about the SoftWare Heritage persistent
IDentifiers (SWHIDs) given in input.
Args:
swhids: a list of SWHIDS
api_url: url for the API request
Returns:
A dictionary with:
key: SWHID searched
value:
value['known'] = True if the SWHID is found
value['known'] = False if the SWHID is not found
"""
endpoint = api_url + "known/"
chunk_size = 1000
requests = []
def get_chunk(swhids):
for i in range(0, len(swhids), chunk_size):
yield swhids[i : i + chunk_size]
async def make_request(swhids):
async with session.post(endpoint, json=swhids) as resp:
if resp.status != 200:
error_response(resp.reason, resp.status, endpoint)
return await resp.json()
if len(swhids) > chunk_size:
for swhids_chunk in get_chunk(swhids):
requests.append(asyncio.create_task(make_request(swhids_chunk)))
res = await asyncio.gather(*requests)
# concatenate list of dictionaries
return dict(itertools.chain.from_iterable(e.items() for e in res))
else:
return await make_request(swhids)
def directory_filter(
path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]]
) -> bool:
"""It checks if the path_name is matching with the patterns given in input.
It is also used as a `dir_filter` function when generating the directory
object from `swh.model.from_disk`
Returns:
False if the directory has to be ignored, True otherwise
"""
path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name)
for sre_pattern in exclude_patterns:
if sre_pattern.match(str(path)):
return False
return True
def get_subpaths(
path: Path, exclude_patterns: Iterable[Pattern[str]]
) -> Iterator[Tuple[Path, str]]:
"""Find the SoftWare Heritage persistent IDentifier (SWHID) of
the directories and files under a given path.
Args:
path: the root path
Yields:
pairs of: path, the relative SWHID
"""
def swhid_of(path):
if path.is_dir():
if exclude_patterns:
def dir_filter(dirpath, *args):
return directory_filter(dirpath, exclude_patterns)
else:
dir_filter = accept_all_directories
obj = Directory.from_disk(
path=bytes(path), dir_filter=dir_filter
).get_data()
return swhid(DIRECTORY, obj)
else:
obj = Content.from_file(path=bytes(path)).get_data()
return swhid(CONTENT, obj)
dirpath, dnames, fnames = next(os.walk(path))
for node in itertools.chain(dnames, fnames):
sub_path = Path(dirpath).joinpath(node)
yield (sub_path, swhid_of(sub_path))
async def parse_path(
path: Path,
session: aiohttp.ClientSession,
api_url: str,
exclude_patterns: Iterable[Pattern[str]],
) -> Iterator[Tuple[str, str, bool]]:
"""Check if the sub paths of the given path are present in the
archive or not.
Args:
path: the source path
api_url: url for the API request
Returns:
a map containing tuples with: a subpath of the given path,
the SWHID of the subpath and the result of the api call
"""
parsed_paths = dict(get_subpaths(path, exclude_patterns))
parsed_swhids = await swhids_discovery(
list(parsed_paths.values()), session, api_url
)
def unpack(tup):
subpath, swhid = tup
return (subpath, swhid, parsed_swhids[swhid]["known"])
return map(unpack, parsed_paths.items())
async def run(
config: Dict[str, Any],
root: str,
source_tree: Tree,
exclude_patterns: Iterable[Pattern[str]],
) -> None:
"""Start scanning from the given root.
It fills the source tree with the path discovered.
Args:
root: the root path to scan
api_url: url for the API request
"""
api_url = config["web-api"]["url"]
async def _scan(root, session, api_url, source_tree, exclude_patterns):
for path, obj_swhid, known in await parse_path(
root, session, api_url, exclude_patterns
):
obj_type = parse_swhid(obj_swhid).object_type
if obj_type == CONTENT:
source_tree.addNode(path, obj_swhid, known)
elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns):
source_tree.addNode(path, obj_swhid, known)
if not known:
await _scan(path, session, api_url, source_tree, exclude_patterns)
if config["web-api"]["auth-token"]:
headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"}
else:
headers = {}
async with aiohttp.ClientSession(headers=headers) as session:
await _scan(root, session, api_url, source_tree, exclude_patterns)
def extract_regex_objs(
root_path: Path, patterns: Iterable[str]
) -> Iterator[Pattern[str]]:
"""Generates a regex object for each pattern given in input and checks if
the path is a subdirectory or relative to the root path.
Yields:
an SRE_Pattern object
"""
for pattern in patterns:
for path in glob.glob(pattern):
dirpath = Path(path)
if root_path not in dirpath.parents:
error_msg = (
f'The path "{dirpath}" is not a subdirectory or relative '
f'to the root directory path: "{root_path}"'
)
raise InvalidDirectoryPath(error_msg)
regex = fnmatch.translate((pattern))
yield re.compile(regex)
def scan(
config: Dict[str, Any],
root_path: str,
exclude_patterns: Iterable[str],
out_fmt: str,
interactive: bool,
):
"""Scan a source code project to discover files and directories already
present in the archive"""
sre_patterns = set()
if exclude_patterns:
sre_patterns = {
reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns)
}
source_tree = Tree(Path(root_path))
loop = asyncio.get_event_loop()
loop.run_until_complete(run(config, root_path, source_tree, sre_patterns))
if interactive:
root = Path(root_path)
directories = source_tree.getDirectoriesInfo(root)
figure = generate_sunburst(directories, root)
run_app(figure, source_tree)
else:
source_tree.show(out_fmt)
diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py
index b2f8e1c..ac17096 100644
--- a/swh/scanner/tests/conftest.py
+++ b/swh/scanner/tests/conftest.py
@@ -1,140 +1,141 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import pytest
import asyncio
-import aiohttp
import os
+from pathlib import Path
import shutil
-from pathlib import Path
+import aiohttp
from aioresponses import aioresponses # type: ignore
+import pytest
-from swh.model.cli import swhid_of_file, swhid_of_dir
+from swh.model.cli import swhid_of_dir, swhid_of_file
from swh.scanner.model import Tree
+
from .flask_api import create_app
@pytest.fixture
def mock_aioresponse():
with aioresponses() as m:
yield m
@pytest.fixture
def event_loop():
"""Fixture that generate an asyncio event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.fixture
async def aiosession():
"""Fixture that generate an aiohttp Client Session."""
session = aiohttp.ClientSession()
yield session
session.detach()
@pytest.fixture(scope="session")
def temp_folder(tmp_path_factory):
"""Fixture that generates a temporary folder with the following
structure:
.. code-block:: python
root = {
subdir: {
subsubdir
filesample.txt
filesample2.txt
}
subdir2
subfile.txt
}
"""
root = tmp_path_factory.getbasetemp()
subdir = tmp_path_factory.mktemp("subdir")
subsubdir = subdir.joinpath("subsubdir")
subsubdir.mkdir()
subdir2 = tmp_path_factory.mktemp("subdir2")
subfile = root / "subfile.txt"
subfile.touch()
filesample = subdir / "filesample.txt"
filesample.touch()
filesample2 = subdir / "filesample2.txt"
filesample2.touch()
avail_path = {
subdir: swhid_of_dir(bytes(subdir)),
subsubdir: swhid_of_dir(bytes(subsubdir)),
subdir2: swhid_of_dir(bytes(subdir2)),
subfile: swhid_of_file(bytes(subfile)),
filesample: swhid_of_file(bytes(filesample)),
filesample2: swhid_of_file(bytes(filesample2)),
}
return {
"root": root,
"paths": avail_path,
"filesample": filesample,
"filesample2": filesample2,
"subsubdir": subsubdir,
"subdir": subdir,
}
@pytest.fixture(scope="function")
def example_tree(temp_folder):
"""Fixture that generate a Tree with the root present in the
session fixture "temp_folder".
"""
example_tree = Tree(temp_folder["root"])
assert example_tree.path == temp_folder["root"]
return example_tree
@pytest.fixture(scope="function")
def example_dirs(example_tree, temp_folder):
"""
Fixture that fill the fixture example_tree with the values contained in
the fixture temp_folder and returns the directories information of the
filled example_tree.
"""
root = temp_folder["root"]
filesample_path = temp_folder["filesample"]
filesample2_path = temp_folder["filesample2"]
subsubdir_path = temp_folder["subsubdir"]
known_paths = [filesample_path, filesample2_path, subsubdir_path]
for path, swhid in temp_folder["paths"].items():
if path in known_paths:
example_tree.addNode(path, swhid, True)
else:
example_tree.addNode(path, swhid, False)
return example_tree.getDirectoriesInfo(root)
@pytest.fixture
def test_sample_folder(datadir, tmp_path):
"""Location of the "data" folder """
archive_path = Path(os.path.join(datadir, "sample-folder.tgz"))
assert archive_path.exists()
shutil.unpack_archive(archive_path, extract_dir=tmp_path)
test_sample_folder = Path(os.path.join(tmp_path, "sample-folder"))
assert test_sample_folder.exists()
return test_sample_folder
@pytest.fixture(scope="session")
def app():
"""Flask backend API (used by live_server)."""
app = create_app()
return app
diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py
index ffed42a..5ac7b51 100644
--- a/swh/scanner/tests/flask_api.py
+++ b/swh/scanner/tests/flask_api.py
@@ -1,32 +1,32 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from flask import Flask, request
-from .data import present_swhids
-
from swh.web.common.exc import LargePayloadExc
+from .data import present_swhids
+
def create_app():
app = Flask(__name__)
@app.route("/known/", methods=["POST"])
def known():
swhids = request.get_json()
if len(swhids) > 900:
raise LargePayloadExc(
"The maximum number of SWHIDs this endpoint can receive is 900"
)
res = {swhid: {"known": False} for swhid in swhids}
for swhid in swhids:
if swhid in present_swhids:
res[swhid]["known"] = True
return res
return app
diff --git a/swh/scanner/tests/test_dashboard.py b/swh/scanner/tests/test_dashboard.py
index ab89396..5d70354 100644
--- a/swh/scanner/tests/test_dashboard.py
+++ b/swh/scanner/tests/test_dashboard.py
@@ -1,51 +1,51 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from swh.scanner.dashboard.dashboard import generate_table_body
-
import dash_html_components as html
+from swh.scanner.dashboard.dashboard import generate_table_body
+
def test_generate_table_body(example_tree, temp_folder):
subdir_path = temp_folder["subdir"]
for path, swhid in temp_folder["paths"].items():
example_tree.addNode(path, swhid, True)
generated_body = generate_table_body(subdir_path, example_tree)
expected_body = [
html.Tbody(
[
html.Tr(
[
html.Td("✔"),
html.Td(
html.A(
children="filesample.txt",
href=f"file://{subdir_path}/filesample.txt",
)
),
html.Td("swh:1:cnt:e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"),
]
),
html.Tr(
[
html.Td("✔"),
html.Td(
html.A(
children="filesample2.txt",
href=f"file://{subdir_path}/filesample2.txt",
)
),
html.Td("swh:1:cnt:e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"),
]
),
]
)
]
# workaround: dash_html_component.__eq__ checks for object identity only
assert str(generated_body) == str(expected_body)
diff --git a/swh/scanner/tests/test_plot.py b/swh/scanner/tests/test_plot.py
index 68a926e..5ad5548 100644
--- a/swh/scanner/tests/test_plot.py
+++ b/swh/scanner/tests/test_plot.py
@@ -1,57 +1,57 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.scanner.plot import (
+ build_hierarchical_df,
compute_max_depth,
generate_df_from_dirs,
- build_hierarchical_df,
)
def test_max_depth(temp_folder, example_dirs):
root = temp_folder["root"]
max_depth = compute_max_depth(example_dirs, root)
assert max_depth == 2
def test_generate_df_from_dirs(temp_folder, example_dirs):
root = temp_folder["root"]
max_depth = compute_max_depth(example_dirs, root)
metrics_columns = ["contents", "known"]
levels_columns = ["lev" + str(i) for i in range(max_depth)]
df_columns = levels_columns + metrics_columns
actual_df = generate_df_from_dirs(example_dirs, df_columns, root, max_depth)
# assert root is empty
assert actual_df["lev0"][0] == ""
assert actual_df["lev1"][0] == ""
# assert subdir has correct contents information
assert actual_df["contents"][1] == 2
assert actual_df["known"][1] == 2
# assert subsubdir has correct level information
assert actual_df["lev0"][2] == "subdir0"
assert actual_df["lev1"][2] == "subdir0/subsubdir"
def test_build_hierarchical_df(temp_folder, example_dirs):
root = temp_folder["root"]
max_depth = compute_max_depth(example_dirs, root)
metrics_columns = ["contents", "known"]
levels_columns = ["lev" + str(i) for i in range(max_depth)]
df_columns = levels_columns + metrics_columns
actual_df = generate_df_from_dirs(example_dirs, df_columns, root, max_depth)
actual_result = build_hierarchical_df(
actual_df, levels_columns, metrics_columns, root
)
assert actual_result["parent"][1] == "subdir0"
assert actual_result["contents"][1] == 2
assert actual_result["id"][5] == root
assert actual_result["known"][5] == 75
diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py
index ca58eca..29e1030 100644
--- a/swh/scanner/tests/test_scanner.py
+++ b/swh/scanner/tests/test_scanner.py
@@ -1,116 +1,117 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import pytest
import json
-from .data import correct_api_response, present_swhids, to_exclude_swhid
+import pytest
-from swh.scanner.scanner import swhids_discovery, get_subpaths, extract_regex_objs, run
-from swh.scanner.model import Tree
from swh.scanner.exceptions import APIError, InvalidDirectoryPath
+from swh.scanner.model import Tree
+from swh.scanner.scanner import extract_regex_objs, get_subpaths, run, swhids_discovery
+
+from .data import correct_api_response, present_swhids, to_exclude_swhid
aio_url = "http://example.org/api/known/"
def test_extract_regex_objs(temp_folder):
root_path = temp_folder["root"]
patterns = (str(temp_folder["subdir"]), "/none")
sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)]
assert len(sre_patterns) == 2
patterns = (*patterns, "/tmp")
with pytest.raises(InvalidDirectoryPath):
sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)]
def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(
aio_url,
status=200,
content_type="application/json",
body=json.dumps(correct_api_response),
)
actual_result = event_loop.run_until_complete(
swhids_discovery([], aiosession, "http://example.org/api/")
)
assert correct_api_response == actual_result
def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession):
mock_aioresponse.post(aio_url, content_type="application/json", status=413)
with pytest.raises(APIError):
event_loop.run_until_complete(
swhids_discovery([], aiosession, "http://example.org/api/")
)
def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server):
api_url = live_server.url() + "/"
request = [
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901)
] # /known/ is limited at 900
with pytest.raises(APIError):
event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url))
def test_scanner_get_subpaths(temp_folder):
root = temp_folder["root"]
actual_result = []
for subpath, swhid in get_subpaths(root, tuple()):
# also check if it's a symlink since pytest tmp_dir fixture create
# also a symlink to each directory inside the tmp_dir path
if subpath.is_dir() and not subpath.is_symlink():
actual_result.append((subpath, swhid))
assert len(actual_result) == 2
@pytest.mark.options(debug=False)
def test_app(app):
assert not app.debug
def test_scanner_result(live_server, event_loop, test_sample_folder):
api_url = live_server.url() + "/"
config = {"web-api": {"url": api_url, "auth-token": None}}
source_tree = Tree(test_sample_folder)
event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set()))
for child_node in source_tree.iterate():
node_info = list(child_node.attributes.values())[0]
if node_info["swhid"] in present_swhids:
assert node_info["known"] is True
else:
assert node_info["known"] is False
def test_scanner_result_with_exclude_patterns(
live_server, event_loop, test_sample_folder
):
api_url = live_server.url() + "/"
config = {"web-api": {"url": api_url, "auth-token": None}}
patterns = (str(test_sample_folder) + "/toexclude",)
exclude_pattern = {
reg_obj for reg_obj in extract_regex_objs(test_sample_folder, patterns)
}
source_tree = Tree(test_sample_folder)
event_loop.run_until_complete(
run(config, test_sample_folder, source_tree, exclude_pattern)
)
for child_node in source_tree.iterate():
node_info = list(child_node.attributes.values())[0]
assert node_info["swhid"] != to_exclude_swhid