diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,6 +2,9 @@ aioresponses pytest_asyncio pytest_flask +plotly +pandas +numpy swh.core[testing-core] swh.model[testing] swh.storage[testing] diff --git a/swh/scanner/model.py b/swh/scanner/model.py --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -7,9 +7,11 @@ import sys import json from pathlib import PosixPath -from typing import Any, Dict +from typing import Any, Dict, List from enum import Enum +from .plot import sunburst + from swh.model.identifiers import ( DIRECTORY, CONTENT ) @@ -37,7 +39,7 @@ self.children: Dict[PosixPath, Tree] = {} def addNode(self, path: PosixPath, pid: str = None) -> None: - """Recursively add a new node path + """Recursively add a new path. """ relative_path = path.relative_to(self.path) @@ -53,9 +55,10 @@ self.children[new_path].addNode(path, pid) def show(self, format) -> None: - """Print all the tree""" + """Show tree in different formats""" if format == 'json': print(json.dumps(self.getTree(), indent=4, sort_keys=True)) + elif format == 'text': isatty = sys.stdout.isatty() @@ -63,7 +66,13 @@ else str(self.path)) self.printChildren(isatty) - def printChildren(self, isatty: bool, inc: int = 0) -> None: + elif format == 'sunburst': + root = self.path + directories = {root: self.count_contents()} + directories = self.getDirectoriesInfo(directories, root) + sunburst(directories, root) + + def printChildren(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: @@ -104,3 +113,53 @@ child_tree[rel_path] = next_tree return child_tree + + def getDirectoriesInfo(self, directories, root) -> Dict[PosixPath, List]: + """Get information about all directories stored inside the tree. + + Returns: + A dictionary with the path as key and the contents information + as values. + + """ + for path, child_node in self.children.items(): + if child_node.otype == DIRECTORY: + rel_path = path.relative_to(root) + contents_info = child_node.count_contents() + if not contents_info[0] == 0: + directories[rel_path] = contents_info + if child_node.has_dirs(): + child_node.getDirectoriesInfo(directories, root) + + return directories + + def count_contents(self) -> List[int]: + """Count how many contents are present inside a directory. + If a directory has a pid returns as it has all the contents. + + Returns: + A list with the number of contents / discovered contents. + + """ + contents = 0 + discovered = 0 + + # to identificate a directory with all files/directories present + if self.otype == DIRECTORY and self.pid: + return [1, 1] + + for _, child_node in self.children.items(): + if child_node.otype == CONTENT: + contents += 1 + if child_node.pid: + discovered += 1 + + return [contents, discovered] + + def has_dirs(self) -> bool: + """Checks if node has directories + """ + for _, child_node in self.children.items(): + if child_node.otype == DIRECTORY: + return True + return False diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py --- a/swh/scanner/tests/test_model.py +++ b/swh/scanner/tests/test_model.py @@ -65,3 +65,25 @@ assert len(tree_dict) == 1 assert tree_dict['subdir0']['filesample.txt'] + + +def test_get_directories_info(example_tree, temp_folder): + root_path = temp_folder['root'] + filesample_path = temp_folder['filesample'] + filesample2_path = temp_folder['filesample2'] + subdir_path = temp_folder['subdir'].relative_to(root_path) + subsubdir_path = temp_folder['subsubdir'].relative_to(root_path) + + for path, pid in temp_folder['paths'].items(): + if path == filesample_path or path == filesample2_path: + print(path) + example_tree.addNode(path, pid) + else: + example_tree.addNode(path) + + tree_root = example_tree + directories = {tree_root.path: tree_root.count_contents()} + directories = tree_root.getDirectoriesInfo(directories, tree_root.path) + + assert subsubdir_path not in directories + assert directories[subdir_path] == [2, 2]