diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ # Add here internal Software Heritage dependencies, one per line. swh.core >= 0.3 -swh.model >= 1.0.0 +swh.model >= 2.6.4 diff --git a/swh/scanner/backend.py b/swh/scanner/backend.py --- a/swh/scanner/backend.py +++ b/swh/scanner/backend.py @@ -13,7 +13,7 @@ def create_app(db: Db): """Backend for swh-scanner, implementing the /known endpoint of the - Software Heritage Web API""" + Software Heritage Web API""" app = Flask(__name__) @app.route("/api/1/known/", methods=["POST"]) @@ -35,7 +35,6 @@ def run(host: str, port: int, db: Db): - """Serve the local database - """ + """Serve the local database""" app = create_app(db) app.run(host, port, debug=True) diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -181,9 +181,7 @@ ) @click.pass_context def import_(ctx, chunk_size, input_file, output_file_db): - """Create SQLite database of known SWHIDs from a textual list of SWHIDs - - """ + """Create SQLite database of known SWHIDs from a textual list of SWHIDs""" from .db import Db db = Db(output_file_db) diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py --- a/swh/scanner/dashboard/dashboard.py +++ b/swh/scanner/dashboard/dashboard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -12,37 +12,40 @@ import dash_html_components as html import plotly.graph_objects as go -from ..model import Tree +from swh.model.from_disk import Directory +from ..data import MerkleNodeInfo, get_content_from -def generate_table_body(dir_path: Path, source: Tree): + +def generate_table_body( + dir_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo +): """ Generate the data_table from the path taken from the chart. For each file builds the html table rows showing the known status, a local link to the file and the relative SoftWare Heritage persistent IDentifier (SWHID). """ + contents = get_content_from(dir_path, source_tree, nodes_data) data = [] - for file_info in source.get_files_from_dir(dir_path): - for file_path, attr in file_info.items(): - file_path = Path(file_path) - file_name = file_path.parts[len(file_path.parts) - 1] - data.append( - html.Tr( - [ - html.Td("✔" if attr["known"] else ""), - html.Td( - html.A(file_name, href="file://" + str(file_path.resolve())) - ), - html.Td(attr["swhid"]), - ] - ) + for cnt, attr in contents.items(): + file_path = Path(cnt.decode()) + file_name = file_path.parts[len(file_path.parts) - 1] + full_file_path = Path(Path(dir_path.decode()), file_path) + data.append( + html.Tr( + [ + html.Td("✔" if attr["known"] else ""), + html.Td(html.A(file_name, href="file://" + str(full_file_path))), + html.Td(attr["swhid"]), + ] ) + ) return [html.Tbody(data)] -def run_app(graph_obj: go, source: Tree): +def run_app(graph_obj: go, source_tree: Directory, nodes_data: MerkleNodeInfo): app = dash.Dash(__name__) fig = go.Figure().add_trace(graph_obj) @@ -88,13 +91,12 @@ """ if click_data is not None: - raw_path = click_data["points"][0]["label"] - full_path = ( - source.path.joinpath(raw_path) - if raw_path != str(source.path) - else Path(raw_path) + full_path = click_data["points"][0]["label"] + return ( + table_header + + generate_table_body(full_path.encode(), source_tree, nodes_data), + full_path, ) - return table_header + generate_table_body(full_path, source), str(full_path) else: return "", "" diff --git a/swh/scanner/data.py b/swh/scanner/data.py new file mode 100644 --- /dev/null +++ b/swh/scanner/data.py @@ -0,0 +1,107 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from pathlib import Path +from typing import Dict, Tuple + +from swh.model.exceptions import ValidationError +from swh.model.from_disk import Directory +from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID + + +class MerkleNodeInfo(dict): + """Store additional information about Merkle DAG nodes, using SWHIDs as keys""" + + def __setitem__(self, key, value): + """The keys must be valid valid Software Heritage Persistent Identifiers + while values must be dict. + """ + if not isinstance(key, CoreSWHID): + raise ValidationError("keys must be valid SWHID(s)") + + if not isinstance(value, dict): + raise ValidationError(f"values must be dict, not {type(value)}") + + super(MerkleNodeInfo, self).__setitem__(key, value) + + +def get_directory_data( + root_path: str, + source_tree: Directory, + nodes_data: MerkleNodeInfo, + directory_data: Dict = {}, +) -> Dict[Path, dict]: + """Get content information for each directory inside source_tree. + + Returns: + A dictionary with a directory path as key and the relative + contents information as values. + """ + + def _get_directory_data( + source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict + ): + directories = list( + filter( + lambda n: n.object_type == DIRECTORY, + map(lambda n: n[1], source_tree.items()), + ) + ) + for node in directories: + directory_info = directory_content(node, nodes_data) + rel_path = Path(node.data["path"].decode()).relative_to(Path(root_path)) + directory_data[rel_path] = directory_info + if has_dirs(node): + _get_directory_data(node, nodes_data, directory_data) + + _get_directory_data(source_tree, nodes_data, directory_data) + return directory_data + + +def directory_content(node: Directory, nodes_data: MerkleNodeInfo) -> Tuple[int, int]: + """Count known contents inside the given directory. + + Returns: + A tuple with the total number of contents inside the directory and the number + of known contents. + """ + known_cnt = 0 + node_contents = list( + filter(lambda n: n.object_type == CONTENT, map(lambda n: n[1], node.items())) + ) + for sub_node in node_contents: + if nodes_data[sub_node.swhid()]["known"]: + known_cnt += 1 + + return (len(node_contents), known_cnt) + + +def has_dirs(node: Directory) -> bool: + """Check if the given directory has other directories inside.""" + for _, sub_node in node.items(): + if isinstance(sub_node, Directory): + return True + return False + + +def get_content_from( + node_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo +) -> Dict[bytes, dict]: + """Get content information from the given directory node.""" + # root in model.from_disk.Directory should be accessed with b"" + directory = source_tree[node_path if node_path != source_tree.data["path"] else b""] + node_contents = list( + filter( + lambda n: n.object_type == CONTENT, map(lambda n: n[1], directory.items()) + ) + ) + files_data = {} + for node in node_contents: + node_info = nodes_data[node.swhid()] + node_info["swhid"] = str(node.swhid()) + path_name = "path" if "path" in node.data.keys() else "data" + files_data[node.data[path_name]] = node_info + + return files_data diff --git a/swh/scanner/model.py b/swh/scanner/model.py deleted file mode 100644 --- a/swh/scanner/model.py +++ /dev/null @@ -1,259 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from __future__ import annotations - -from enum import Enum -import json -from pathlib import Path -import sys -from typing import Any, Dict, Iterator, List, Tuple - -import ndjson - -from swh.model.identifiers import CONTENT, DIRECTORY - -from .exceptions import InvalidDirectoryPath, InvalidObjectType -from .plot import generate_sunburst, offline_plot - - -class Color(Enum): - blue = "\033[94m" - green = "\033[92m" - red = "\033[91m" - end = "\033[0m" - - -def colorize(text: str, color: Color): - return color.value + text + Color.end.value - - -class Tree: - """Representation of a file system structure - """ - - def __init__(self, path: Path, father: Tree = None): - self.father = father - self.path = path - self.otype = DIRECTORY if path.is_dir() else CONTENT - self.swhid = "" - self.known = False - self.children: Dict[Path, Tree] = {} - - def add_node(self, path: Path, swhid: str, known: bool) -> None: - """Recursively add a new path. - """ - relative_path = path.relative_to(self.path) - - if relative_path == Path("."): - self.swhid = swhid - self.known = known - return - - new_path = self.path.joinpath(relative_path.parts[0]) - if new_path not in self.children: - self.children[new_path] = Tree(new_path, self) - - self.children[new_path].add_node(path, swhid, known) - - def show(self, fmt) -> None: - """Show tree in different formats""" - if fmt == "json": - print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) - - if fmt == "ndjson": - print( - ndjson.dumps( - {str(Path(k).relative_to(self.path)): v} - for node in self.iterate() - for k, v in node.attributes.items() - ) - ) - - elif fmt == "text": - isatty = sys.stdout.isatty() - root_dir = self.path.relative_to(self.path.parent) - print(colorize(str(root_dir), Color.blue) if isatty else str(root_dir)) - self.print_children(isatty) - - elif fmt == "sunburst": - root = self.path - directories = self.get_directories_info(root) - sunburst = generate_sunburst(directories, root) - offline_plot(sunburst) - - def print_children(self, isatty: bool, inc: int = 1) -> None: - for path, node in self.children.items(): - self.print_node(node, isatty, inc) - if node.children: - node.print_children(isatty, inc + 1) - - def print_node(self, node: Any, isatty: bool, inc: int) -> None: - rel_path = str(node.path.relative_to(self.path)) - begin = "│ " * inc - end = "/" if node.otype == DIRECTORY else "" - - if isatty: - if not node.known: - rel_path = colorize(rel_path, Color.red) - elif node.otype == DIRECTORY: - rel_path = colorize(rel_path, Color.blue) - elif node.otype == CONTENT: - rel_path = colorize(rel_path, Color.green) - - print(f"{begin}{rel_path}{end}") - - @property - def attributes(self) -> Dict[str, Dict[str, Any]]: - """ - Get the attributes of the current node grouped by the relative path. - - Returns: - a dictionary containing a path as key and its known/unknown status and the - SWHID as values. - - """ - return {str(self.path): {"swhid": self.swhid, "known": self.known,}} - - def to_dict(self) -> Dict[str, Dict[str, Any]]: - """ - Recursively flatten the current tree nodes into a dictionary. - - For example, if you have the following structure: - - .. code-block:: none - - root { - subdir: { - file.txt - } - } - - The generated dictionary will be: - - .. code-block:: none - - { - "root": { - "swhid": "...", - "known": True/False - } - "root/subdir": { - "swhid": "...", - "known": True/False - } - "root/subdir/file.txt": { - "swhid": "...", - "known": True/False - } - } - """ - return { - str(Path(k).relative_to(self.path)): v - for node in self.iterate() - for k, v in node.attributes.items() - } - - def iterate(self) -> Iterator[Tree]: - """ - Recursively iterate through the children of the current node - - """ - for _, child_node in self.children.items(): - yield child_node - if child_node.otype == DIRECTORY: - yield from child_node.iterate() - - def get_files_from_dir(self, dir_path: Path) -> List: - """ - Retrieve files information about a specific directory path - - Returns: - A list containing the files attributes present inside the directory given - in input - """ - - def get_files(node): - files = [] - for _, node in node.children.items(): - if node.otype == CONTENT: - files.append(node.attributes) - return files - - if dir_path == self.path: - return get_files(self) - else: - for node in self.iterate(): - if node.path == dir_path: - return get_files(node) - raise InvalidDirectoryPath( - "The directory provided doesn't match any stored directory" - ) - - def _get_sub_dirs_info(self, root, directories): - """Fills the directories given in input with the contents information - stored inside the directory child, only if they have contents. - """ - for path, child_node in self.children.items(): - if child_node.otype == DIRECTORY: - rel_path = path.relative_to(root) - contents_info = child_node.count_contents() - # checks the first element of the tuple - # (the number of contents in a directory) - # if it is equal to zero it means that there are no contents - # in that directory. - if not contents_info[0] == 0: - directories[rel_path] = contents_info - if child_node.has_dirs(): - child_node._get_sub_dirs_info(root, directories) - - def get_directories_info(self, root: Path) -> Dict[Path, Tuple[int, int]]: - """Get information about all directories under the given root. - - Returns: - A dictionary with a directory path as key and the relative - contents information (the result of count_contents) as values. - - """ - directories = {root: self.count_contents()} - self._get_sub_dirs_info(root, directories) - return directories - - def count_contents(self) -> Tuple[int, int]: - """Count how many contents are present inside a directory. - If a directory has a SWHID returns as it has all the contents. - - Returns: - A tuple with the total number of the contents and the number - of contents known (the ones that have a persistent identifier). - - """ - contents = 0 - discovered = 0 - - if not self.otype == DIRECTORY: - raise InvalidObjectType( - "Can't count contents of the object type: %s" % self.otype - ) - - if self.known: - # to identify a directory with all files/directories present - return (1, 1) - else: - for _, child_node in self.children.items(): - if child_node.otype == CONTENT: - contents += 1 - if child_node.known: - discovered += 1 - - return (contents, discovered) - - def has_dirs(self) -> bool: - """Checks if node has directories - """ - for _, child_node in self.children.items(): - if child_node.otype == DIRECTORY: - return True - return False diff --git a/swh/scanner/output.py b/swh/scanner/output.py new file mode 100644 --- /dev/null +++ b/swh/scanner/output.py @@ -0,0 +1,108 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from enum import Enum +import json +import os +import sys +from typing import Any + +import ndjson + +from swh.model.from_disk import Directory + +from .dashboard.dashboard import run_app +from .data import MerkleNodeInfo, get_directory_data +from .plot import generate_sunburst, offline_plot + +DEFAULT_OUTPUT = "text" + + +class Color(Enum): + BLUE = "\033[94m" + GREEN = "\033[92m" + RED = "\033[91m" + END = "\033[0m" + + +def colorize(text: str, color: Color): + return color.value + text + Color.END.value + + +class Output: + def __init__( + self, root_path: str, nodes_data: MerkleNodeInfo, source_tree: Directory + ): + self.root_path = root_path + self.nodes_data = nodes_data + self.source_tree = source_tree + + def show(self, mode=DEFAULT_OUTPUT): + if mode == "text": + isatty = sys.stdout.isatty() + self.print_text(isatty) + elif mode == "sunburst": + directory_data = get_directory_data( + self.root_path, self.source_tree, self.nodes_data + ) + sunburst_figure = generate_sunburst(directory_data, self.root_path) + offline_plot(sunburst_figure) + elif mode == "interactive": + directory_data = get_directory_data( + self.root_path, self.source_tree, self.nodes_data + ) + sunburst_figure = generate_sunburst(directory_data, self.root_path) + run_app(sunburst_figure, self.source_tree, self.nodes_data) + elif mode == "json": + self.print_json() + elif mode == "ndjson": + self.print_ndjson() + else: + raise Exception(f"mode {mode} is not an output format") + + def get_path_name(self, node): + return "path" if "path" in node.data.keys() else "data" + + def print_text(self, isatty: bool) -> None: + def compute_level(node): + node_path = str(node.data[self.get_path_name(node)]).split("/") + source_path = str(self.source_tree.data["path"]).split("/") + return len(node_path) - len(source_path) + + for node in self.source_tree.iter_tree(): + self.print_node(node, isatty, compute_level(node)) + + def print_node(self, node: Any, isatty: bool, level: int) -> None: + rel_path = os.path.basename(node.data[self.get_path_name(node)]) + rel_path = rel_path.decode() + begin = "│ " * level + end = "/" if node.object_type == "directory" else "" + + if isatty: + if not self.nodes_data[str(node.swhid())]["known"]: + rel_path = colorize(rel_path, Color.RED) + elif node.object_type == "directory": + rel_path = colorize(rel_path, Color.BLUE) + elif node.object_type == "content": + rel_path = colorize(rel_path, Color.GREEN) + + print(f"{begin}{rel_path}{end}") + + def data_as_json(self): + json = {} + for node in self.source_tree.iter_tree(): + node_known = self.nodes_data[node.swhid()]["known"] + rel_path = os.path.relpath( + node.data[self.get_path_name(node)].decode(), + self.source_tree.data["path"].decode(), + ) + json[rel_path] = {"swhid": str(node.swhid()), "known": node_known} + return json + + def print_json(self): + print(json.dumps(self.data_as_json(), indent=4, sort_keys=True)) + + def print_ndjson(self): + print(ndjson.dumps({k: v} for k, v in self.data_as_json().items())) diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py --- a/swh/scanner/plot.py +++ b/swh/scanner/plot.py @@ -31,81 +31,81 @@ root_name: str, ) -> pd.DataFrame: """ - Build a hierarchy of levels for Sunburst or Treemap charts. + Build a hierarchy of levels for Sunburst or Treemap charts. - For each directory the new dataframe will have the following - information: + For each directory the new dataframe will have the following + information: - id: the directory name - parent: the parent directory of id - contents: the total number of contents of the directory id and - the relative subdirectories - known: the percentage of contents known relative to computed - 'contents' + id: the directory name + parent: the parent directory of id + contents: the total number of contents of the directory id and + the relative subdirectories + known: the percentage of contents known relative to computed + 'contents' - Example: - Given the following dataframe: + Example: + Given the following dataframe: - .. code-block:: none + .. code-block:: none - lev0 lev1 contents known - '' '' 20 2 //root - kernel kernel/subdirker 5 0 - telnet telnet/subdirtel 10 4 + lev0 lev1 contents known + '' '' 20 2 //root + kernel kernel/subdirker 5 0 + telnet telnet/subdirtel 10 4 - The output hierarchical dataframe will be like the following: + The output hierarchical dataframe will be like the following: - .. code-block:: none + .. code-block:: none - id parent contents known - 20 10.00 - kernel/subdirker kernel 5 0.00 - telnet/subdirtel telnet 10 40.00 - total 20 10.00 - kernel total 5 0.00 - telnet total 10 40.00 - total 35 17.14 + id parent contents known + 20 10.00 + kernel/subdirker kernel 5 0.00 + telnet/subdirtel telnet 10 40.00 + total 20 10.00 + kernel total 5 0.00 + telnet total 10 40.00 + total 35 17.14 - To create the hierarchical dataframe we need to iterate through - the dataframe given in input relying on the number of levels. + To create the hierarchical dataframe we need to iterate through + the dataframe given in input relying on the number of levels. - Based on the previous example we have to do two iterations: + Based on the previous example we have to do two iterations: - iteration 1 - The generated dataframe 'df_tree' will be: + iteration 1 + The generated dataframe 'df_tree' will be: - .. code-block:: none + .. code-block:: none - id parent contents known - 20 10.0 - kernel/subdirker kernel 5 0.0 - telnet/subdirtel telnet 10 40.0 + id parent contents known + 20 10.0 + kernel/subdirker kernel 5 0.0 + telnet/subdirtel telnet 10 40.0 - iteration 2 - The generated dataframe 'df_tree' will be: + iteration 2 + The generated dataframe 'df_tree' will be: - .. code-block:: none + .. code-block:: none - id parent contents known - total 20 10.0 - kernel total 5 0.0 - telnet total 10 40.0 + id parent contents known + total 20 10.0 + kernel total 5 0.0 + telnet total 10 40.0 - Note that since we have reached the last level, the parent given - to the directory id is the directory root. + Note that since we have reached the last level, the parent given + to the directory id is the directory root. - The 'total' row il computed by adding the number of contents of the - dataframe given in input and the average of the contents known on - the total number of contents. + The 'total' row il computed by adding the number of contents of the + dataframe given in input and the average of the contents known on + the total number of contents. """ def compute_known_percentage(contents: pd.Series, known: pd.Series) -> pd.Series: """This function compute the percentage of known contents and generate - the new known column with the percentage values. + the new known column with the percentage values. - It also assures that if there is no contents inside a directory - the percentage is zero + It also assures that if there is no contents inside a directory + the percentage is zero """ known_values = [] @@ -160,18 +160,17 @@ return complete_df -def compute_max_depth(dirs_path: List[Path], root: Path) -> int: +def compute_max_depth(dirs_path: List[Path]) -> int: """Compute the maximum depth level of the given directory paths. - Example: for `var/log/kernel/` the depth level is 3 + Example: for `var/log/kernel/` the depth level is 3 """ max_depth = 0 for dir_path in dirs_path: - if dir_path == root: - continue - - dir_depth = len(dir_path.parts) + dir_depth = len( + dir_path.parts[1:] if dir_path.parts[0] == "/" else dir_path.parts + ) if dir_depth > max_depth: max_depth = dir_depth @@ -179,7 +178,7 @@ def generate_df_from_dirs( - dirs: Dict[Path, Tuple[int, int]], columns: List[str], root: Path, max_depth: int, + dirs: Dict[Path, Tuple[int, int]], columns: List[str], max_depth: int, ) -> pd.DataFrame: """Generate a dataframe from the directories given in input. @@ -215,13 +214,7 @@ for dir_path, contents_info in dirs.items(): empty_lvl = max_depth - len(dir_path.parts) - if dir_path == root: - # ignore the root but store contents information - yield [""] * (max_depth) + list(contents_info) - else: - yield list(get_parents(dir_path)) + [""] * empty_lvl + list( - contents_info - ) + yield list(get_parents(dir_path)) + [""] * empty_lvl + list(contents_info) df = pd.DataFrame( np.array([dir_array for dir_array in get_dirs_array()]), columns=columns @@ -236,15 +229,13 @@ def generate_sunburst( directories: Dict[Path, Tuple[int, int]], root: Path ) -> go.Sunburst: - """Generate a sunburst chart from the directories given in input. - - """ - max_depth = compute_max_depth(list(directories.keys()), root) + """Generate a sunburst chart from the directories given in input.""" + max_depth = compute_max_depth(list(directories.keys())) metrics_columns = ["contents", "known"] levels_columns = ["lev" + str(i) for i in range(max_depth)] df_columns = levels_columns + metrics_columns - dirs_df = generate_df_from_dirs(directories, df_columns, root, max_depth) + dirs_df = generate_df_from_dirs(directories, df_columns, max_depth) hierarchical_df = build_hierarchical_df( dirs_df, levels_columns, metrics_columns, str(root) @@ -271,8 +262,7 @@ def offline_plot(graph_object: go): - """Plot a graph object to an html file - """ + """Plot a graph object to an html file""" fig = go.Figure() fig.add_trace(graph_object) offline.plot(fig, filename="chart.html") diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,28 +1,48 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import itertools -import os -from pathlib import Path -from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union +from typing import Any, Dict, Iterable, List import aiohttp -from swh.model.from_disk import ( - Content, - Directory, - accept_all_directories, - extract_regex_objs, -) -from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.cli import model_of_dir +from swh.model.from_disk import Directory +from swh.model.identifiers import DIRECTORY -from .dashboard.dashboard import run_app +from .data import MerkleNodeInfo from .exceptions import error_response -from .model import Tree -from .plot import generate_sunburst +from .output import Output + + +async def lazy_bfs( + source_tree: Directory, + data: MerkleNodeInfo, + session: aiohttp.ClientSession, + api_url: str, +): + + queue = [] + queue.append(source_tree) + + while queue: + swhids = [str(node.swhid()) for node in queue] + swhids_res = await swhids_discovery(swhids, session, api_url) + for node in queue.copy(): + queue.remove(node) + data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] + if node.object_type == DIRECTORY: + if not data[node.swhid()]["known"]: + children = [n[1] for n in list(node.items())] + queue.extend(children) + else: + for sub_node in node.iter_tree(dedup=False): + if sub_node == node: + continue + data[sub_node.swhid()]["known"] = True # type: ignore async def swhids_discovery( @@ -71,102 +91,8 @@ return await make_request(swhids) -def directory_filter( - path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] -) -> bool: - """It checks if the path_name is matching with the patterns given in input. - - It is also used as a `dir_filter` function when generating the directory - object from `swh.model.from_disk` - - Returns: - False if the directory has to be ignored, True otherwise - - """ - path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) - - for sre_pattern in exclude_patterns: - if sre_pattern.match(bytes(path)): - return False - return True - - -def get_subpaths( - path: Path, exclude_patterns: Iterable[Pattern[bytes]] -) -> Iterator[Tuple[Path, str]]: - """Find the SoftWare Heritage persistent IDentifier (SWHID) of - the directories and files under a given path. - - Args: - path: the root path - - Yields: - pairs of: path, the relative SWHID - - """ - - def swhid_of(path: Path) -> str: - if path.is_dir(): - if exclude_patterns: - - def dir_filter(dirpath: bytes, *args) -> bool: - return directory_filter(dirpath, exclude_patterns) - - else: - dir_filter = accept_all_directories # type: ignore - - obj = Directory.from_disk( - path=bytes(path), dir_filter=dir_filter - ).get_data() - - return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"])) - else: - obj = Content.from_file(path=bytes(path)).get_data() - return str( - CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"]) - ) - - dirpath, dnames, fnames = next(os.walk(path)) - for node in itertools.chain(dnames, fnames): - sub_path = Path(dirpath).joinpath(node) - yield (sub_path, swhid_of(sub_path)) - - -async def parse_path( - path: Path, - session: aiohttp.ClientSession, - api_url: str, - exclude_patterns: Iterable[Pattern[bytes]], -) -> Iterator[Tuple[str, str, bool]]: - """Check if the sub paths of the given path are present in the - archive or not. - - Args: - path: the source path - api_url: url for the API request - - Returns: - a map containing tuples with: a subpath of the given path, - the SWHID of the subpath and the result of the api call - - """ - parsed_paths = dict(get_subpaths(path, exclude_patterns)) - parsed_swhids = await swhids_discovery( - list(parsed_paths.values()), session, api_url - ) - - def unpack(tup): - subpath, swhid = tup - return (subpath, swhid, parsed_swhids[swhid]["known"]) - - return map(unpack, parsed_paths.items()) - - async def run( - config: Dict[str, Any], - root: str, - source_tree: Tree, - exclude_patterns: Iterable[Pattern[bytes]], + config: Dict[str, Any], source_tree: Directory, nodes_data: MerkleNodeInfo ) -> None: """Start scanning from the given root. @@ -179,28 +105,16 @@ """ api_url = config["web-api"]["url"] - async def _scan(root, session, api_url, source_tree, exclude_patterns): - for path, obj_swhid, known in await parse_path( - root, session, api_url, exclude_patterns - ): - obj_type = CoreSWHID.from_string(obj_swhid).object_type - - if obj_type == ObjectType.CONTENT: - source_tree.add_node(path, obj_swhid, known) - elif obj_type == ObjectType.DIRECTORY and directory_filter( - path, exclude_patterns - ): - source_tree.add_node(path, obj_swhid, known) - if not known: - await _scan(path, session, api_url, source_tree, exclude_patterns) - if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} + for node in source_tree.iter_tree(): + nodes_data[node.swhid()] = {} # type: ignore + async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: - await _scan(root, session, api_url, source_tree, exclude_patterns) + await lazy_bfs(source_tree, nodes_data, session, api_url) def scan( @@ -212,22 +126,15 @@ ): """Scan a source code project to discover files and directories already present in the archive""" - converted_patterns = set(pattern.encode() for pattern in exclude_patterns) - sre_patterns = set() - if exclude_patterns: - sre_patterns = { - reg_obj - for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) - } - - source_tree = Tree(Path(root_path)) + converted_patterns = [pattern.encode() for pattern in exclude_patterns] + source_tree = model_of_dir(root_path.encode(), converted_patterns) + nodes_data = MerkleNodeInfo() + loop = asyncio.get_event_loop() - loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) + loop.run_until_complete(run(config, source_tree, nodes_data)) + out = Output(root_path, nodes_data, source_tree) if interactive: - root = Path(root_path) - directories = source_tree.get_directories_info(root) - figure = generate_sunburst(directories, root) - run_app(figure, source_tree) + out.show("interactive") else: - source_tree.show(out_fmt) + out.show(out_fmt) diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -12,8 +12,8 @@ from aioresponses import aioresponses # type: ignore import pytest -from swh.model.cli import swhid_of_dir, swhid_of_file -from swh.scanner.model import Tree +from swh.model.cli import model_of_dir +from swh.scanner.data import MerkleNodeInfo from .data import present_swhids from .flask_api import create_app @@ -43,104 +43,55 @@ @pytest.fixture(scope="function") -def temp_folder(tmp_path): - """Fixture that generates a temporary folder with the following - structure: - - .. code-block:: python - - root = { - subdir: { - subsubdir - filesample.txt - filesample2.txt - } - subdir2 - subfile.txt - } - """ - root = tmp_path - subdir = root / "subdir" - subdir.mkdir() - subsubdir = subdir / "subsubdir" - subsubdir.mkdir() - subdir2 = root / "subdir2" - subdir2.mkdir() - subfile = root / "subfile.txt" - subfile.touch() - filesample = subdir / "filesample.txt" - filesample.touch() - filesample2 = subdir / "filesample2.txt" - filesample2.touch() - - avail_path = { - subdir: str(swhid_of_dir(bytes(subdir))), - subsubdir: str(swhid_of_dir(bytes(subsubdir))), - subdir2: str(swhid_of_dir(bytes(subdir2))), - subfile: str(swhid_of_file(bytes(subfile))), - filesample: str(swhid_of_file(bytes(filesample))), - filesample2: str(swhid_of_file(bytes(filesample2))), - } - - return { - "root": root, - "paths": avail_path, - "filesample": filesample, - "filesample2": filesample2, - "subsubdir": subsubdir, - "subdir": subdir, - } +def test_sample_folder(datadir, tmp_path): + """Location of the "data" folder""" + archive_path = Path(os.path.join(datadir, "sample-folder.tgz")) + assert archive_path.exists() + shutil.unpack_archive(archive_path, extract_dir=tmp_path) + test_sample_folder = Path(os.path.join(tmp_path, "sample-folder")) + assert test_sample_folder.exists() + return test_sample_folder @pytest.fixture(scope="function") -def example_tree(temp_folder): - """Fixture that generate a Tree with the root present in the - session fixture "temp_folder". +def source_tree(test_sample_folder): + """Generate a model.from_disk.Directory object from the test sample + folder """ - example_tree = Tree(temp_folder["root"]) - assert example_tree.path == temp_folder["root"] - - return example_tree + return model_of_dir(str(test_sample_folder).encode()) @pytest.fixture(scope="function") -def example_dirs(example_tree, temp_folder): - """ - Fixture that fill the fixture example_tree with the values contained in - the fixture temp_folder and returns the directories information of the - filled example_tree. - +def source_tree_dirs(source_tree): + """Returns a list of all directories contained inside the test sample + folder """ - root = temp_folder["root"] - filesample_path = temp_folder["filesample"] - filesample2_path = temp_folder["filesample2"] - subsubdir_path = temp_folder["subsubdir"] - known_paths = [filesample_path, filesample2_path, subsubdir_path] + root = source_tree.data["path"] + return list( + map( + lambda n: Path(n.data["path"].decode()).relative_to(Path(root.decode())), + filter( + lambda n: n.object_type == "directory" + and not n.data["path"] == source_tree.data["path"], + source_tree.iter_tree(dedup=False), + ), + ) + ) - for path, swhid in temp_folder["paths"].items(): - if path in known_paths: - example_tree.add_node(path, swhid, True) - else: - example_tree.add_node(path, swhid, False) - return example_tree.get_directories_info(root) - - -@pytest.fixture -def test_sample_folder(datadir, tmp_path): - """Location of the "data" folder """ - archive_path = Path(os.path.join(datadir, "sample-folder.tgz")) - assert archive_path.exists() - shutil.unpack_archive(archive_path, extract_dir=tmp_path) - test_sample_folder = Path(os.path.join(tmp_path, "sample-folder")) - assert test_sample_folder.exists() - return test_sample_folder +@pytest.fixture(scope="function") +def nodes_data(source_tree): + """mock known status of file/dirs in test_sample_folder""" + nodes_data = MerkleNodeInfo() + for node in source_tree.iter_tree(): + nodes_data[node.swhid()] = {"known": True} + return nodes_data @pytest.fixture def test_swhids_sample(tmp_path): """Create and return the opened "swhids_sample" file, - filled with present swhids present in data.py + filled with present swhids present in data.py """ test_swhids_sample = Path(os.path.join(tmp_path, "swhids_sample.txt")) diff --git a/swh/scanner/tests/data.py b/swh/scanner/tests/data.py --- a/swh/scanner/tests/data.py +++ b/swh/scanner/tests/data.py @@ -9,7 +9,6 @@ "swh:1:dir:4b825dc642cb6eb9a060e54bf8d69288fbee4904": {"known": True}, } -# present SWHIDs inside /data/sample-folder present_swhids = [ "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a", # quotes.md "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb", # some-binary @@ -17,5 +16,11 @@ "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", # toexclude/ ] +# these SWHIDs are considered known by the fake backend (scanner.test.flask_api) +unknown_swhids = [ + "swh:1:dir:0a7b61ef5780b03aa274d11069564980246445ce", # root directory + "swh:1:cnt:5f1cfce26640056bed3710cfaf3062a6a326a119", # toexclude/example.txt + "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", # toexclude/ +] to_exclude_swhid = "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326" diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py --- a/swh/scanner/tests/flask_api.py +++ b/swh/scanner/tests/flask_api.py @@ -7,7 +7,7 @@ from swh.scanner.exceptions import LargePayloadExc -from .data import present_swhids +from .data import unknown_swhids def create_app(): @@ -28,7 +28,7 @@ res = {swhid: {"known": False} for swhid in swhids} for swhid in swhids: - if swhid in present_swhids: + if swhid not in unknown_swhids: res[swhid]["known"] = True return res diff --git a/swh/scanner/tests/test_dashboard.py b/swh/scanner/tests/test_dashboard.py --- a/swh/scanner/tests/test_dashboard.py +++ b/swh/scanner/tests/test_dashboard.py @@ -5,16 +5,23 @@ import dash_html_components as html +from swh.model.identifiers import CoreSWHID, ObjectType from swh.scanner.dashboard.dashboard import generate_table_body +from swh.scanner.data import MerkleNodeInfo -def test_generate_table_body(example_tree, temp_folder): - subdir_path = temp_folder["subdir"] +def test_generate_table_body(source_tree): + chart_path = b"/bar/barfoo" + dir_path = source_tree[b"/bar/barfoo"].data["path"].decode() + nodes_data = MerkleNodeInfo() + # CoreSWHID of 'another-quote.org' + known_cnt_swhid = CoreSWHID( + object_type=ObjectType.CONTENT, + object_id=b"\x136\x93\xb1%\xba\xd2\xb4\xac1\x855\xb8I\x01\xeb\xb1\xf6\xb68", + ) + nodes_data[known_cnt_swhid] = {"known": True} - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - generated_body = generate_table_body(subdir_path, example_tree) + generated_body = generate_table_body(chart_path, source_tree, nodes_data) expected_body = [ html.Tbody( @@ -24,23 +31,11 @@ html.Td("✔"), html.Td( html.A( - children="filesample.txt", - href=f"file://{subdir_path}/filesample.txt", - ) - ), - html.Td("swh:1:cnt:e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"), - ] - ), - html.Tr( - [ - html.Td("✔"), - html.Td( - html.A( - children="filesample2.txt", - href=f"file://{subdir_path}/filesample2.txt", + children="another-quote.org", + href=f"file://{dir_path}/another-quote.org", ) ), - html.Td("swh:1:cnt:e69de29bb2d1d6434b8b29ae775ad8c2e48c5391"), + html.Td("swh:1:cnt:133693b125bad2b4ac318535b84901ebb1f6b638"), ] ), ] diff --git a/swh/scanner/tests/test_data.py b/swh/scanner/tests/test_data.py new file mode 100644 --- /dev/null +++ b/swh/scanner/tests/test_data.py @@ -0,0 +1,44 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from pathlib import Path + +import pytest + +from swh.model.exceptions import ValidationError +from swh.scanner.data import ( + MerkleNodeInfo, + directory_content, + get_directory_data, + has_dirs, +) + + +def test_merkle_node_data_wrong_args(): + nodes_data = MerkleNodeInfo() + + with pytest.raises(ValidationError): + nodes_data["wrong key"] = {"known": True} + + with pytest.raises(ValidationError): + nodes_data["swh:1:dir:17d207da3804cc60a77cba58e76c3b2f767cb112"] = "wrong value" + + +def test_get_directory_data(source_tree, nodes_data): + root = Path(source_tree.data["path"].decode()) + dirs_data = get_directory_data(root, source_tree, nodes_data) + + assert len(dirs_data) == 5 + + +def test_directory_content(source_tree, nodes_data): + foo_dir = source_tree[b"foo"] + foo_content = directory_content(foo_dir, nodes_data) + assert foo_content[0] == 3 + assert foo_content[1] == 3 + + +def test_has_dirs(source_tree): + assert has_dirs(source_tree) diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py deleted file mode 100644 --- a/swh/scanner/tests/test_model.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from swh.scanner.exceptions import InvalidDirectoryPath - - -def test_tree_add_node(example_tree, temp_folder): - avail_paths = temp_folder["paths"].keys() - - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, False) - - for path, node in example_tree.children.items(): - assert path in avail_paths - if node.children: - for subpath, subnode in node.children.items(): - assert subpath in avail_paths - - -def test_to_json_no_one_present(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, False) - - result = example_tree.to_dict() - - assert len(result) == 6 - - for _, node_info in result.items(): - assert node_info["known"] is False - - -def test_get_json_tree_all_present(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - result = example_tree.to_dict() - - assert len(result) == 6 - - for _, node_info in result.items(): - assert node_info["known"] is True - - -def test_get_json_tree_only_one_present(example_tree, temp_folder): - filesample_path = temp_folder["filesample"] - - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True if path == filesample_path else False) - - result = example_tree.to_dict() - - assert len(result) == 6 - - for path, node_attr in result.items(): - if path == "subdir/filesample.txt": - assert node_attr["known"] is True - else: - assert node_attr["known"] is False - - -def test_get_directories_info(example_tree, temp_folder): - root_path = temp_folder["root"] - filesample_path = temp_folder["filesample"] - filesample2_path = temp_folder["filesample2"] - subdir_path = temp_folder["subdir"].relative_to(root_path) - subsubdir_path = temp_folder["subsubdir"].relative_to(root_path) - - for path, swhid in temp_folder["paths"].items(): - if path == filesample_path or path == filesample2_path: - example_tree.add_node(path, swhid, True) - else: - example_tree.add_node(path, swhid, False) - - directories = example_tree.get_directories_info(example_tree.path) - - assert subsubdir_path not in directories - assert directories[subdir_path] == (2, 2) - - -def test_get_files_from_dir(example_tree, temp_folder): - subdir_path = temp_folder["subdir"] - - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - files = example_tree.get_files_from_dir(subdir_path) - assert len(files) == 2 - - -def test_get_files_source_path(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - files = example_tree.get_files_from_dir(example_tree.path) - assert len(files) == 1 - - -def test_get_files_from_dir_raise_exception(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - with pytest.raises(InvalidDirectoryPath): - example_tree.get_files_from_dir("test/") diff --git a/swh/scanner/tests/test_plot.py b/swh/scanner/tests/test_plot.py --- a/swh/scanner/tests/test_plot.py +++ b/swh/scanner/tests/test_plot.py @@ -1,8 +1,11 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from pathlib import Path + +from swh.scanner.data import get_directory_data from swh.scanner.plot import ( build_hierarchical_df, compute_max_depth, @@ -10,48 +13,56 @@ ) -def test_max_depth(temp_folder, example_dirs): - root = temp_folder["root"] - max_depth = compute_max_depth(example_dirs, root) +def test_max_depth(source_tree, source_tree_dirs): + dirs = [Path(dir_path) for dir_path in source_tree_dirs] + max_depth = compute_max_depth(dirs) assert max_depth == 2 -def test_generate_df_from_dirs(temp_folder, example_dirs): - root = temp_folder["root"] - max_depth = compute_max_depth(example_dirs, root) +def test_generate_df_from_dirs(source_tree, source_tree_dirs, nodes_data): + root = Path(source_tree.data["path"].decode()) + dirs = [Path(dir_path) for dir_path in source_tree_dirs] + dirs_data = get_directory_data(root, source_tree, nodes_data) + max_depth = compute_max_depth(dirs) metrics_columns = ["contents", "known"] levels_columns = ["lev" + str(i) for i in range(max_depth)] df_columns = levels_columns + metrics_columns - actual_df = generate_df_from_dirs(example_dirs, df_columns, root, max_depth) + actual_df = generate_df_from_dirs(dirs_data, df_columns, max_depth) + + expected_lev0_path = ["bar", "foo", "toexclude"] + expected_lev1_path = ["bar/barfoo", "bar/barfoo2"] + + df_lev0 = actual_df["lev0"].tolist() + df_lev1 = actual_df["lev1"].tolist() - # assert root is empty - assert actual_df["lev0"][0] == "" - assert actual_df["lev1"][0] == "" + for path in expected_lev0_path: + assert path in df_lev0 - # assert subdir has correct contents information - assert actual_df["contents"][1] == 2 - assert actual_df["known"][1] == 2 + for path in expected_lev1_path: + assert path in df_lev1 - # assert subsubdir has correct level information - assert actual_df["lev0"][2] == "subdir" - assert actual_df["lev1"][2] == "subdir/subsubdir" + assert actual_df["contents"].sum() == 6 + assert actual_df["known"].sum() == 6 -def test_build_hierarchical_df(temp_folder, example_dirs): - root = temp_folder["root"] - max_depth = compute_max_depth(example_dirs, root) +def test_build_hierarchical_df(source_tree, source_tree_dirs, nodes_data): + root = Path(source_tree.data["path"].decode()) + dirs = [Path(dir_path) for dir_path in source_tree_dirs] + dirs_data = get_directory_data(root, source_tree, nodes_data) + max_depth = compute_max_depth(dirs) metrics_columns = ["contents", "known"] levels_columns = ["lev" + str(i) for i in range(max_depth)] df_columns = levels_columns + metrics_columns - actual_df = generate_df_from_dirs(example_dirs, df_columns, root, max_depth) + actual_df = generate_df_from_dirs(dirs_data, df_columns, max_depth) actual_result = build_hierarchical_df( actual_df, levels_columns, metrics_columns, root ) - assert actual_result["parent"][1] == "subdir" - assert actual_result["contents"][1] == 2 - assert actual_result["id"][5] == root - assert actual_result["known"][5] == 75 + assert actual_result["parent"][0] == "bar" + assert actual_result["parent"][1] == "foo" + assert actual_result["contents"][1] == 3 + assert actual_result["id"][8] == root + assert actual_result["known"][8] == 100 diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -8,29 +8,15 @@ from flask import url_for import pytest -from swh.model.exceptions import InvalidDirectoryPath +from swh.scanner.data import MerkleNodeInfo from swh.scanner.exceptions import APIError -from swh.scanner.model import Tree -from swh.scanner.scanner import extract_regex_objs, get_subpaths, run, swhids_discovery +from swh.scanner.scanner import run, swhids_discovery -from .data import correct_api_response, present_swhids, to_exclude_swhid +from .data import correct_api_response, unknown_swhids aio_url = "http://example.org/api/known/" -def test_extract_regex_objs(temp_folder): - root_path = bytes(temp_folder["root"]) - - patterns = (bytes(temp_folder["subdir"]), b"/none") - - sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] - assert len(sre_patterns) == 2 - - patterns = (*patterns, b"/tmp") - with pytest.raises(InvalidDirectoryPath): - sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] - - def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, @@ -66,56 +52,19 @@ event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) -def test_scanner_get_subpaths(temp_folder): - root = temp_folder["root"] - - actual_result = [] - for subpath, swhid in get_subpaths(root, tuple()): - # also check if it's a symlink since pytest tmp_dir fixture create - # also a symlink to each directory inside the tmp_dir path - if subpath.is_dir() and not subpath.is_symlink(): - actual_result.append((subpath, swhid)) - - assert len(actual_result) == 2 - - @pytest.mark.options(debug=False) def test_app(app): assert not app.debug -def test_scanner_result(live_server, event_loop, test_sample_folder): +def test_scanner_result(live_server, event_loop, source_tree): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} - source_tree = Tree(test_sample_folder) - event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) - - for child_node in source_tree.iterate(): - node_info = list(child_node.attributes.values())[0] - if node_info["swhid"] in present_swhids: - assert node_info["known"] is True + nodes_data = MerkleNodeInfo() + event_loop.run_until_complete(run(config, source_tree, nodes_data)) + for node in source_tree.iter_tree(): + if str(node.swhid()) in unknown_swhids: + assert nodes_data[node.swhid()]["known"] is False else: - assert node_info["known"] is False - - -def test_scanner_result_with_exclude_patterns( - live_server, event_loop, test_sample_folder -): - api_url = url_for("index", _external=True) - config = {"web-api": {"url": api_url, "auth-token": None}} - to_exclude_dir = str(test_sample_folder) + "/toexclude" - - patterns = (to_exclude_dir.encode(),) - exclude_pattern = { - reg_obj for reg_obj in extract_regex_objs(bytes(test_sample_folder), patterns) - } - - source_tree = Tree(test_sample_folder) - event_loop.run_until_complete( - run(config, test_sample_folder, source_tree, exclude_pattern) - ) - - for child_node in source_tree.iterate(): - node_info = list(child_node.attributes.values())[0] - assert node_info["swhid"] != to_exclude_swhid + assert nodes_data[node.swhid()]["known"] is True