diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py --- a/swh/scanner/dashboard/dashboard.py +++ b/swh/scanner/dashboard/dashboard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -12,37 +12,41 @@ import dash_html_components as html import plotly.graph_objects as go -from ..model import Tree +from swh.model.from_disk import Directory +from ..data import MerkleNodeData, get_content_from -def generate_table_body(dir_path: Path, source: Tree): + +def generate_table_body( + dir_path: bytes, source_tree: Directory, nodes_data: MerkleNodeData +): """ Generate the data_table from the path taken from the chart. For each file builds the html table rows showing the known status, a local link to the file and the relative SoftWare Heritage persistent IDentifier (SWHID). """ + contents = get_content_from(dir_path, source_tree, nodes_data) data = [] - for file_info in source.get_files_from_dir(dir_path): - for file_path, attr in file_info.items(): - file_path = Path(file_path) - file_name = file_path.parts[len(file_path.parts) - 1] - data.append( - html.Tr( - [ - html.Td("✔" if attr["known"] else ""), - html.Td( - html.A(file_name, href="file://" + str(file_path.resolve())) - ), - html.Td(attr["swhid"]), - ] - ) + for file_path, attr in contents.items(): + file_path = Path(file_path) + file_name = file_path.parts[len(file_path.parts) - 1] + data.append( + html.Tr( + [ + html.Td("✔" if attr["known"] else ""), + html.Td( + html.A(file_name, href="file://" + str(file_path.resolve())) + ), + html.Td(attr["swhid"]), + ] ) + ) return [html.Tbody(data)] -def run_app(graph_obj: go, source: Tree): +def run_app(graph_obj: go, source_tree: Directory, nodes_data: MerkleNodeData): app = dash.Dash(__name__) fig = go.Figure().add_trace(graph_obj) @@ -88,13 +92,17 @@ """ if click_data is not None: - raw_path = click_data["points"][0]["label"] - full_path = ( - source.path.joinpath(raw_path) - if raw_path != str(source.path) - else Path(raw_path) + full_path = click_data["points"][0]["label"] + # full_path = ( + # raw_path.encode() + # if raw_path != source_tree.data["path"] + # else Path(raw_path) + # ) + return ( + table_header + + generate_table_body(full_path.encode(), source_tree, nodes_data), + full_path, ) - return table_header + generate_table_body(full_path, source), str(full_path) else: return "", "" diff --git a/swh/scanner/data.py b/swh/scanner/data.py new file mode 100644 --- /dev/null +++ b/swh/scanner/data.py @@ -0,0 +1,102 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from pathlib import Path +from typing import Dict, Tuple + +from swh.model.from_disk import Directory +from swh.model.identifiers import SWHID_RE + + +class MerkleNodeData(dict): + """This class is used to store additional data of the merkle nodes. + """ + + def __init__(self, *arg, **kwargs): + super(MerkleNodeData, self).__init__(*arg, **kwargs) + + def __setitem__(self, key, value): + """The keys must be instances of valid merkle node classes (implemented + in swh.model) + """ + if not SWHID_RE.fullmatch(key): + raise Exception("invalid key: " + str(type(key))) + + if not isinstance(value, dict): + raise Exception("value must be a dict") + + super(MerkleNodeData, self).__setitem__(key, value) + + +def get_directory_data( + root_path: Path, + source_tree: Directory, + nodes_data: MerkleNodeData, + directory_data: Dict = {}, +): + """Get content information for each directory inside model.from_disk + + Returns: + A dictionary with a directory path as key and the relative + contents information as values. + """ + + def _get_directory_data( + source_tree: Directory, nodes_data: MerkleNodeData, directory_data: Dict + ): + directories = list( + filter( + lambda n: n.object_type == "directory", + map(lambda n: n[1], source_tree.items()), + ) + ) + for node in directories: + directory_info = directory_content(node, nodes_data) + rel_path = Path(node.data["path"].decode()).relative_to(root_path) + directory_data[rel_path] = directory_info + if has_dirs(node): + _get_directory_data(node, nodes_data, directory_data) + + _get_directory_data(source_tree, nodes_data, directory_data) + return directory_data + + +# TODO comment +def directory_content(node: Directory, nodes_data: MerkleNodeData) -> Tuple[int, int]: + known_cnt = 0 + node_contents = list( + filter(lambda n: n.object_type == "content", map(lambda n: n[1], node.items())) + ) + for sub_node in node_contents: + if nodes_data[str(sub_node.swhid())]["known"]: + known_cnt += 1 + + return (len(node_contents), known_cnt) + + +def has_dirs(node: Directory): + for _, sub_node in node.items(): + if isinstance(sub_node, Directory): + return True + return False + + +def get_content_from( + node_path: bytes, source_tree: Directory, nodes_data: MerkleNodeData +): + directory = source_tree[node_path if node_path != source_tree.data["path"] else b""] + node_contents = list( + filter( + lambda n: n.object_type == "content", map(lambda n: n[1], directory.items()) + ) + ) + files_data = {} + for node in node_contents: + node_swhid = str(node.swhid()) + node_info = nodes_data[node_swhid] + node_info["swhid"] = node_swhid + files_data[node.data["path"].decode()] = node_info + + return files_data diff --git a/swh/scanner/model.py b/swh/scanner/model.py deleted file mode 100644 --- a/swh/scanner/model.py +++ /dev/null @@ -1,259 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from __future__ import annotations - -from enum import Enum -import json -from pathlib import Path -import sys -from typing import Any, Dict, Iterator, List, Tuple - -import ndjson - -from swh.model.identifiers import CONTENT, DIRECTORY - -from .exceptions import InvalidDirectoryPath, InvalidObjectType -from .plot import generate_sunburst, offline_plot - - -class Color(Enum): - blue = "\033[94m" - green = "\033[92m" - red = "\033[91m" - end = "\033[0m" - - -def colorize(text: str, color: Color): - return color.value + text + Color.end.value - - -class Tree: - """Representation of a file system structure - """ - - def __init__(self, path: Path, father: Tree = None): - self.father = father - self.path = path - self.otype = DIRECTORY if path.is_dir() else CONTENT - self.swhid = "" - self.known = False - self.children: Dict[Path, Tree] = {} - - def add_node(self, path: Path, swhid: str, known: bool) -> None: - """Recursively add a new path. - """ - relative_path = path.relative_to(self.path) - - if relative_path == Path("."): - self.swhid = swhid - self.known = known - return - - new_path = self.path.joinpath(relative_path.parts[0]) - if new_path not in self.children: - self.children[new_path] = Tree(new_path, self) - - self.children[new_path].add_node(path, swhid, known) - - def show(self, fmt) -> None: - """Show tree in different formats""" - if fmt == "json": - print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) - - if fmt == "ndjson": - print( - ndjson.dumps( - {str(Path(k).relative_to(self.path)): v} - for node in self.iterate() - for k, v in node.attributes.items() - ) - ) - - elif fmt == "text": - isatty = sys.stdout.isatty() - root_dir = self.path.relative_to(self.path.parent) - print(colorize(str(root_dir), Color.blue) if isatty else str(root_dir)) - self.print_children(isatty) - - elif fmt == "sunburst": - root = self.path - directories = self.get_directories_info(root) - sunburst = generate_sunburst(directories, root) - offline_plot(sunburst) - - def print_children(self, isatty: bool, inc: int = 1) -> None: - for path, node in self.children.items(): - self.print_node(node, isatty, inc) - if node.children: - node.print_children(isatty, inc + 1) - - def print_node(self, node: Any, isatty: bool, inc: int) -> None: - rel_path = str(node.path.relative_to(self.path)) - begin = "│ " * inc - end = "/" if node.otype == DIRECTORY else "" - - if isatty: - if not node.known: - rel_path = colorize(rel_path, Color.red) - elif node.otype == DIRECTORY: - rel_path = colorize(rel_path, Color.blue) - elif node.otype == CONTENT: - rel_path = colorize(rel_path, Color.green) - - print(f"{begin}{rel_path}{end}") - - @property - def attributes(self) -> Dict[str, Dict[str, Any]]: - """ - Get the attributes of the current node grouped by the relative path. - - Returns: - a dictionary containing a path as key and its known/unknown status and the - SWHID as values. - - """ - return {str(self.path): {"swhid": self.swhid, "known": self.known,}} - - def to_dict(self) -> Dict[str, Dict[str, Any]]: - """ - Recursively flatten the current tree nodes into a dictionary. - - For example, if you have the following structure: - - .. code-block:: none - - root { - subdir: { - file.txt - } - } - - The generated dictionary will be: - - .. code-block:: none - - { - "root": { - "swhid": "...", - "known": True/False - } - "root/subdir": { - "swhid": "...", - "known": True/False - } - "root/subdir/file.txt": { - "swhid": "...", - "known": True/False - } - } - """ - return { - str(Path(k).relative_to(self.path)): v - for node in self.iterate() - for k, v in node.attributes.items() - } - - def iterate(self) -> Iterator[Tree]: - """ - Recursively iterate through the children of the current node - - """ - for _, child_node in self.children.items(): - yield child_node - if child_node.otype == DIRECTORY: - yield from child_node.iterate() - - def get_files_from_dir(self, dir_path: Path) -> List: - """ - Retrieve files information about a specific directory path - - Returns: - A list containing the files attributes present inside the directory given - in input - """ - - def get_files(node): - files = [] - for _, node in node.children.items(): - if node.otype == CONTENT: - files.append(node.attributes) - return files - - if dir_path == self.path: - return get_files(self) - else: - for node in self.iterate(): - if node.path == dir_path: - return get_files(node) - raise InvalidDirectoryPath( - "The directory provided doesn't match any stored directory" - ) - - def _get_sub_dirs_info(self, root, directories): - """Fills the directories given in input with the contents information - stored inside the directory child, only if they have contents. - """ - for path, child_node in self.children.items(): - if child_node.otype == DIRECTORY: - rel_path = path.relative_to(root) - contents_info = child_node.count_contents() - # checks the first element of the tuple - # (the number of contents in a directory) - # if it is equal to zero it means that there are no contents - # in that directory. - if not contents_info[0] == 0: - directories[rel_path] = contents_info - if child_node.has_dirs(): - child_node._get_sub_dirs_info(root, directories) - - def get_directories_info(self, root: Path) -> Dict[Path, Tuple[int, int]]: - """Get information about all directories under the given root. - - Returns: - A dictionary with a directory path as key and the relative - contents information (the result of count_contents) as values. - - """ - directories = {root: self.count_contents()} - self._get_sub_dirs_info(root, directories) - return directories - - def count_contents(self) -> Tuple[int, int]: - """Count how many contents are present inside a directory. - If a directory has a SWHID returns as it has all the contents. - - Returns: - A tuple with the total number of the contents and the number - of contents known (the ones that have a persistent identifier). - - """ - contents = 0 - discovered = 0 - - if not self.otype == DIRECTORY: - raise InvalidObjectType( - "Can't count contents of the object type: %s" % self.otype - ) - - if self.known: - # to identify a directory with all files/directories present - return (1, 1) - else: - for _, child_node in self.children.items(): - if child_node.otype == CONTENT: - contents += 1 - if child_node.known: - discovered += 1 - - return (contents, discovered) - - def has_dirs(self) -> bool: - """Checks if node has directories - """ - for _, child_node in self.children.items(): - if child_node.otype == DIRECTORY: - return True - return False diff --git a/swh/scanner/output.py b/swh/scanner/output.py new file mode 100644 --- /dev/null +++ b/swh/scanner/output.py @@ -0,0 +1,83 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from enum import Enum +import os +from pathlib import Path +import sys +from typing import Any + +from swh.model.from_disk import Directory + +from .dashboard.dashboard import run_app +from .data import MerkleNodeData, get_directory_data +from .plot import generate_sunburst, offline_plot + +DEFAULT_OUTPUT = "text" + + +class Color(Enum): + BLUE = "\033[94m" + GREEN = "\033[92m" + RED = "\033[91m" + END = "\033[0m" + + +def colorize(text: str, color: Color): + return color.value + text + Color.END.value + + +class Output: + def __init__( + self, root_path: Path, nodes_data: MerkleNodeData, source_tree: Directory + ): + self.root_path = root_path + self.nodes_data = nodes_data + self.source_tree = source_tree + + def show(self, mode=DEFAULT_OUTPUT): + if mode == "text": + isatty = sys.stdout.isatty() + root_dir = self.source_tree.data["path"].decode() + print(colorize(root_dir, Color.BLUE) if isatty else root_dir) + self.print_children(self.source_tree, isatty) + elif mode == "sunburst": + directory_data = get_directory_data( + self.root_path, self.source_tree, self.nodes_data + ) + sunburst_figure = generate_sunburst(directory_data, self.root_path) + offline_plot(sunburst_figure) + elif mode == "interactive": + directory_data = get_directory_data( + self.root_path, self.source_tree, self.nodes_data + ) + sunburst_figure = generate_sunburst(directory_data, self.root_path) + run_app(sunburst_figure, self.source_tree, self.nodes_data) + + def print_children(self, node: Any, isatty: bool, inc: int = 1) -> None: + for _, sub_node in list(node.items()): + if sub_node == node: + continue + self.print_node(sub_node, isatty, inc) + if sub_node.object_type == "directory" and sub_node.items(): + self.print_children(sub_node, isatty, inc + 1) + + def print_node(self, node: Any, isatty: bool, inc: int) -> None: + path_name = "path" if "path" in node.data.keys() else "data" + rel_path = os.path.basename(node.data[path_name]) + rel_path = rel_path.decode() + begin = "│ " * inc + end = "/" if node.object_type == "directory" else "" + print(str(node.swhid())) + + if isatty: + if not self.nodes_data[str(node.swhid())]["known"]: + rel_path = colorize(rel_path, Color.RED) + elif node.object_type == "directory": + rel_path = colorize(rel_path, Color.BLUE) + elif node.object_type == "content": + rel_path = colorize(rel_path, Color.GREEN) + + print(f"{begin}{rel_path}{end}") diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,28 +1,49 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import itertools -import os from pathlib import Path -from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union +from typing import Any, Dict, Iterable, List import aiohttp -from swh.model.from_disk import ( - Content, - Directory, - accept_all_directories, - extract_regex_objs, -) -from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.cli import model_of_dir +from swh.model.from_disk import Directory -from .dashboard.dashboard import run_app +from .data import MerkleNodeData from .exceptions import error_response -from .model import Tree -from .plot import generate_sunburst +from .output import Output + + +async def stop_and_go( + source_tree: Directory, + data: MerkleNodeData, + session: aiohttp.ClientSession, + api_url: str, +): + + queue = [] + queue.append(source_tree) + + while len(queue) > 0: + swhids = [str(node.swhid()) for node in queue] + swhids_res = await swhids_discovery(swhids, session, api_url) + for node in queue.copy(): + queue.remove(node) + node_swhid = str(node.swhid()) + data[node_swhid]["known"] = swhids_res[node_swhid]["known"] + if node.object_type == "directory": + if not data[node_swhid]["known"]: + children = [n[1] for n in list(node.items())] + queue.extend(children) + else: + for sub_node in node.iter_tree(): + if sub_node == node: + continue + data[str(sub_node.swhid())]["known"] = True async def swhids_discovery( @@ -71,102 +92,8 @@ return await make_request(swhids) -def directory_filter( - path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] -) -> bool: - """It checks if the path_name is matching with the patterns given in input. - - It is also used as a `dir_filter` function when generating the directory - object from `swh.model.from_disk` - - Returns: - False if the directory has to be ignored, True otherwise - - """ - path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) - - for sre_pattern in exclude_patterns: - if sre_pattern.match(bytes(path)): - return False - return True - - -def get_subpaths( - path: Path, exclude_patterns: Iterable[Pattern[bytes]] -) -> Iterator[Tuple[Path, str]]: - """Find the SoftWare Heritage persistent IDentifier (SWHID) of - the directories and files under a given path. - - Args: - path: the root path - - Yields: - pairs of: path, the relative SWHID - - """ - - def swhid_of(path: Path) -> str: - if path.is_dir(): - if exclude_patterns: - - def dir_filter(dirpath: bytes, *args) -> bool: - return directory_filter(dirpath, exclude_patterns) - - else: - dir_filter = accept_all_directories # type: ignore - - obj = Directory.from_disk( - path=bytes(path), dir_filter=dir_filter - ).get_data() - - return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"])) - else: - obj = Content.from_file(path=bytes(path)).get_data() - return str( - CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"]) - ) - - dirpath, dnames, fnames = next(os.walk(path)) - for node in itertools.chain(dnames, fnames): - sub_path = Path(dirpath).joinpath(node) - yield (sub_path, swhid_of(sub_path)) - - -async def parse_path( - path: Path, - session: aiohttp.ClientSession, - api_url: str, - exclude_patterns: Iterable[Pattern[bytes]], -) -> Iterator[Tuple[str, str, bool]]: - """Check if the sub paths of the given path are present in the - archive or not. - - Args: - path: the source path - api_url: url for the API request - - Returns: - a map containing tuples with: a subpath of the given path, - the SWHID of the subpath and the result of the api call - - """ - parsed_paths = dict(get_subpaths(path, exclude_patterns)) - parsed_swhids = await swhids_discovery( - list(parsed_paths.values()), session, api_url - ) - - def unpack(tup): - subpath, swhid = tup - return (subpath, swhid, parsed_swhids[swhid]["known"]) - - return map(unpack, parsed_paths.items()) - - async def run( - config: Dict[str, Any], - root: str, - source_tree: Tree, - exclude_patterns: Iterable[Pattern[bytes]], + config: Dict[str, Any], source_tree: Directory, nodes_data: MerkleNodeData ) -> None: """Start scanning from the given root. @@ -179,28 +106,16 @@ """ api_url = config["web-api"]["url"] - async def _scan(root, session, api_url, source_tree, exclude_patterns): - for path, obj_swhid, known in await parse_path( - root, session, api_url, exclude_patterns - ): - obj_type = CoreSWHID.from_string(obj_swhid).object_type - - if obj_type == ObjectType.CONTENT: - source_tree.add_node(path, obj_swhid, known) - elif obj_type == ObjectType.DIRECTORY and directory_filter( - path, exclude_patterns - ): - source_tree.add_node(path, obj_swhid, known) - if not known: - await _scan(path, session, api_url, source_tree, exclude_patterns) - if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} + for node in source_tree.iter_tree(): + nodes_data[str(node.swhid())] = {} + async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: - await _scan(root, session, api_url, source_tree, exclude_patterns) + await stop_and_go(source_tree, nodes_data, session, api_url) def scan( @@ -212,22 +127,15 @@ ): """Scan a source code project to discover files and directories already present in the archive""" - converted_patterns = set(pattern.encode() for pattern in exclude_patterns) - sre_patterns = set() - if exclude_patterns: - sre_patterns = { - reg_obj - for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) - } - - source_tree = Tree(Path(root_path)) + converted_patterns = [pattern.encode() for pattern in exclude_patterns] + source_tree = model_of_dir(str(Path(root_path)).encode(), converted_patterns) + nodes_data = MerkleNodeData() + loop = asyncio.get_event_loop() - loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) + loop.run_until_complete(run(config, source_tree, nodes_data)) + out = Output(Path(root_path), nodes_data, source_tree) if interactive: - root = Path(root_path) - directories = source_tree.get_directories_info(root) - figure = generate_sunburst(directories, root) - run_app(figure, source_tree) + out.show("interactive") else: - source_tree.show(out_fmt) + out.show(out_fmt) diff --git a/swh/scanner/tests/data.py b/swh/scanner/tests/data.py --- a/swh/scanner/tests/data.py +++ b/swh/scanner/tests/data.py @@ -9,7 +9,6 @@ "swh:1:dir:4b825dc642cb6eb9a060e54bf8d69288fbee4904": {"known": True}, } -# present SWHIDs inside /data/sample-folder present_swhids = [ "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a", # quotes.md "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb", # some-binary @@ -17,5 +16,11 @@ "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", # toexclude/ ] +# these SWHIDs are considered known by the fake backend (scanner.test.flask_api) +unknown_swhids = [ + "swh:1:dir:0a7b61ef5780b03aa274d11069564980246445ce", # root directory + "swh:1:cnt:5f1cfce26640056bed3710cfaf3062a6a326a119", # toexclude/example.txt + "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", # toexclude/ +] to_exclude_swhid = "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326" diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py --- a/swh/scanner/tests/flask_api.py +++ b/swh/scanner/tests/flask_api.py @@ -7,7 +7,7 @@ from swh.scanner.exceptions import LargePayloadExc -from .data import present_swhids +from .data import unknown_swhids def create_app(): @@ -28,7 +28,7 @@ res = {swhid: {"known": False} for swhid in swhids} for swhid in swhids: - if swhid in present_swhids: + if swhid not in unknown_swhids: res[swhid]["known"] = True return res diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py deleted file mode 100644 --- a/swh/scanner/tests/test_model.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pytest - -from swh.scanner.exceptions import InvalidDirectoryPath - - -def test_tree_add_node(example_tree, temp_folder): - avail_paths = temp_folder["paths"].keys() - - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, False) - - for path, node in example_tree.children.items(): - assert path in avail_paths - if node.children: - for subpath, subnode in node.children.items(): - assert subpath in avail_paths - - -def test_to_json_no_one_present(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, False) - - result = example_tree.to_dict() - - assert len(result) == 6 - - for _, node_info in result.items(): - assert node_info["known"] is False - - -def test_get_json_tree_all_present(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - result = example_tree.to_dict() - - assert len(result) == 6 - - for _, node_info in result.items(): - assert node_info["known"] is True - - -def test_get_json_tree_only_one_present(example_tree, temp_folder): - filesample_path = temp_folder["filesample"] - - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True if path == filesample_path else False) - - result = example_tree.to_dict() - - assert len(result) == 6 - - for path, node_attr in result.items(): - if path == "subdir/filesample.txt": - assert node_attr["known"] is True - else: - assert node_attr["known"] is False - - -def test_get_directories_info(example_tree, temp_folder): - root_path = temp_folder["root"] - filesample_path = temp_folder["filesample"] - filesample2_path = temp_folder["filesample2"] - subdir_path = temp_folder["subdir"].relative_to(root_path) - subsubdir_path = temp_folder["subsubdir"].relative_to(root_path) - - for path, swhid in temp_folder["paths"].items(): - if path == filesample_path or path == filesample2_path: - example_tree.add_node(path, swhid, True) - else: - example_tree.add_node(path, swhid, False) - - directories = example_tree.get_directories_info(example_tree.path) - - assert subsubdir_path not in directories - assert directories[subdir_path] == (2, 2) - - -def test_get_files_from_dir(example_tree, temp_folder): - subdir_path = temp_folder["subdir"] - - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - files = example_tree.get_files_from_dir(subdir_path) - assert len(files) == 2 - - -def test_get_files_source_path(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - files = example_tree.get_files_from_dir(example_tree.path) - assert len(files) == 1 - - -def test_get_files_from_dir_raise_exception(example_tree, temp_folder): - for path, swhid in temp_folder["paths"].items(): - example_tree.add_node(path, swhid, True) - - with pytest.raises(InvalidDirectoryPath): - example_tree.get_files_from_dir("test/") diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -8,29 +8,16 @@ from flask import url_for import pytest -from swh.model.exceptions import InvalidDirectoryPath +from swh.model.cli import model_of_dir +from swh.scanner.data import MerkleNodeData from swh.scanner.exceptions import APIError -from swh.scanner.model import Tree -from swh.scanner.scanner import extract_regex_objs, get_subpaths, run, swhids_discovery +from swh.scanner.scanner import run, swhids_discovery -from .data import correct_api_response, present_swhids, to_exclude_swhid +from .data import correct_api_response, unknown_swhids aio_url = "http://example.org/api/known/" -def test_extract_regex_objs(temp_folder): - root_path = bytes(temp_folder["root"]) - - patterns = (bytes(temp_folder["subdir"]), b"/none") - - sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] - assert len(sre_patterns) == 2 - - patterns = (*patterns, b"/tmp") - with pytest.raises(InvalidDirectoryPath): - sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] - - def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, @@ -66,19 +53,6 @@ event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) -def test_scanner_get_subpaths(temp_folder): - root = temp_folder["root"] - - actual_result = [] - for subpath, swhid in get_subpaths(root, tuple()): - # also check if it's a symlink since pytest tmp_dir fixture create - # also a symlink to each directory inside the tmp_dir path - if subpath.is_dir() and not subpath.is_symlink(): - actual_result.append((subpath, swhid)) - - assert len(actual_result) == 2 - - @pytest.mark.options(debug=False) def test_app(app): assert not app.debug @@ -88,34 +62,13 @@ api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} - source_tree = Tree(test_sample_folder) - event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) + source_tree = model_of_dir(str(test_sample_folder).encode()) + nodes_data = MerkleNodeData() + event_loop.run_until_complete(run(config, source_tree, nodes_data)) - for child_node in source_tree.iterate(): - node_info = list(child_node.attributes.values())[0] - if node_info["swhid"] in present_swhids: - assert node_info["known"] is True + for node in source_tree.iter_tree(): + node_swhid = str(node.swhid()) + if node_swhid in unknown_swhids: + assert nodes_data[node_swhid]["known"] is False else: - assert node_info["known"] is False - - -def test_scanner_result_with_exclude_patterns( - live_server, event_loop, test_sample_folder -): - api_url = url_for("index", _external=True) - config = {"web-api": {"url": api_url, "auth-token": None}} - to_exclude_dir = str(test_sample_folder) + "/toexclude" - - patterns = (to_exclude_dir.encode(),) - exclude_pattern = { - reg_obj for reg_obj in extract_regex_objs(bytes(test_sample_folder), patterns) - } - - source_tree = Tree(test_sample_folder) - event_loop.run_until_complete( - run(config, test_sample_folder, source_tree, exclude_pattern) - ) - - for child_node in source_tree.iterate(): - node_info = list(child_node.attributes.values())[0] - assert node_info["swhid"] != to_exclude_swhid + assert nodes_data[node_swhid]["known"] is True