diff --git a/requirements-test.txt b/requirements-test.txt index 04a3b9a..c01dc5e 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,8 +1,11 @@ pytest aioresponses pytest_asyncio pytest_flask +plotly +pandas +numpy swh.core[testing-core] swh.model[testing] swh.storage[testing] swh.web[testing] diff --git a/swh/scanner/model.py b/swh/scanner/model.py index 94d4006..620c4c4 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,106 +1,165 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import sys import json from pathlib import PosixPath -from typing import Any, Dict +from typing import Any, Dict, List from enum import Enum +from .plot import sunburst + from swh.model.identifiers import ( DIRECTORY, CONTENT ) class Color(Enum): blue = '\033[94m' green = '\033[92m' red = '\033[91m' end = '\033[0m' def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ def __init__(self, path: PosixPath, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.pid = '' self.children: Dict[PosixPath, Tree] = {} def addNode(self, path: PosixPath, pid: str = None) -> None: - """Recursively add a new node path + """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == PosixPath('.'): if pid is not None: self.pid = pid return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].addNode(path, pid) def show(self, format) -> None: - """Print all the tree""" + """Show tree in different formats""" if format == 'json': print(json.dumps(self.getTree(), indent=4, sort_keys=True)) + elif format == 'text': isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.printChildren(isatty) - def printChildren(self, isatty: bool, inc: int = 0) -> None: + elif format == 'sunburst': + root = self.path + directories = {root: self.count_contents()} + directories = self.getDirectoriesInfo(directories, root) + sunburst(directories, root) + + def printChildren(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: node.printChildren(isatty, inc+1) def printNode(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = '│ ' * inc end = '/' if node.otype == DIRECTORY else '' if isatty: if not node.pid: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f'{begin}{rel_path}{end}') def getTree(self): """Walk through the tree to discover content or directory that have a persistent identifier. If a persistent identifier is found it saves the path with the relative PID. Returns: child_tree: the tree with the content/directory found """ child_tree = {} for path, child_node in self.children.items(): rel_path = str(child_node.path.relative_to(self.path)) if child_node.pid: child_tree[rel_path] = child_node.pid else: next_tree = child_node.getTree() if next_tree: child_tree[rel_path] = next_tree return child_tree + + def getDirectoriesInfo(self, directories, root) -> Dict[PosixPath, List]: + """Get information about all directories stored inside the tree. + + Returns: + A dictionary with the path as key and the contents information + as values. + + """ + for path, child_node in self.children.items(): + if child_node.otype == DIRECTORY: + rel_path = path.relative_to(root) + contents_info = child_node.count_contents() + if not contents_info[0] == 0: + directories[rel_path] = contents_info + if child_node.has_dirs(): + child_node.getDirectoriesInfo(directories, root) + + return directories + + def count_contents(self) -> List[int]: + """Count how many contents are present inside a directory. + If a directory has a pid returns as it has all the contents. + + Returns: + A list with the number of contents / discovered contents. + + """ + contents = 0 + discovered = 0 + + # to identificate a directory with all files/directories present + if self.otype == DIRECTORY and self.pid: + return [1, 1] + + for _, child_node in self.children.items(): + if child_node.otype == CONTENT: + contents += 1 + if child_node.pid: + discovered += 1 + + return [contents, discovered] + + def has_dirs(self) -> bool: + """Checks if node has directories + """ + for _, child_node in self.children.items(): + if child_node.otype == DIRECTORY: + return True + return False diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py index ebb3817..e7c37c5 100644 --- a/swh/scanner/tests/test_model.py +++ b/swh/scanner/tests/test_model.py @@ -1,67 +1,89 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.scanner.model import Tree @pytest.fixture(scope='function') def example_tree(temp_folder): """Fixture that generate a Tree with the root present in the session fixture "temp_folder". """ example_tree = Tree(temp_folder['root']) assert example_tree.path == temp_folder['root'] return example_tree def test_tree_add_node(example_tree, temp_folder): avail_paths = temp_folder['paths'].keys() for path, pid in temp_folder['paths'].items(): example_tree.addNode(path, pid) for path, node in example_tree.children.items(): assert path in avail_paths if node.children: for subpath, subnode in node.children.items(): assert subpath in avail_paths def test_get_json_tree_all_not_present(example_tree, temp_folder): for path, pid in temp_folder['paths'].items(): example_tree.addNode(path) json_tree = example_tree.getTree() assert len(json_tree) == 0 def test_get_json_tree_all_present(example_tree, temp_folder): for path, pid in temp_folder['paths'].items(): example_tree.addNode(path, pid) tree_dict = example_tree.getTree() assert len(tree_dict) == 3 # since subdir have a pid, it can't have a children path assert tree_dict['subdir0'] is not dict def test_get_json_tree_only_one_present(example_tree, temp_folder): filesample_path = temp_folder['filesample'] for path, pid in temp_folder['paths'].items(): if path == filesample_path: example_tree.addNode(path, pid) else: example_tree.addNode(path) tree_dict = example_tree.getTree() assert len(tree_dict) == 1 assert tree_dict['subdir0']['filesample.txt'] + + +def test_get_directories_info(example_tree, temp_folder): + root_path = temp_folder['root'] + filesample_path = temp_folder['filesample'] + filesample2_path = temp_folder['filesample2'] + subdir_path = temp_folder['subdir'].relative_to(root_path) + subsubdir_path = temp_folder['subsubdir'].relative_to(root_path) + + for path, pid in temp_folder['paths'].items(): + if path == filesample_path or path == filesample2_path: + print(path) + example_tree.addNode(path, pid) + else: + example_tree.addNode(path) + + tree_root = example_tree + directories = {tree_root.path: tree_root.count_contents()} + directories = tree_root.getDirectoriesInfo(directories, tree_root.path) + + assert subsubdir_path not in directories + assert directories[subdir_path] == [2, 2]