diff --git a/swh/scanner/model.py b/swh/scanner/model.py index 9ed9e2f..435ec3e 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,239 +1,265 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import sys import json from pathlib import PosixPath -from typing import Any, Dict, Tuple, Iterable +from typing import Any, Dict, Tuple, Iterable, List from enum import Enum import ndjson from .plot import generate_sunburst, offline_plot -from .exceptions import InvalidObjectType +from .exceptions import InvalidObjectType, InvalidDirectoryPath from swh.model.identifiers import DIRECTORY, CONTENT class Color(Enum): blue = "\033[94m" green = "\033[92m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ def __init__(self, path: PosixPath, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False self.children: Dict[PosixPath, Tree] = {} def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == PosixPath("."): self.swhid = swhid self.known = known return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].addNode(path, swhid, known) def show(self, format) -> None: """Show tree in different formats""" if format == "json": print(json.dumps(self.toDict(), indent=4, sort_keys=True)) if format == "ndjson": print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr())) elif format == "text": isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.printChildren(isatty) elif format == "sunburst": root = self.path directories = self.getDirectoriesInfo(root) sunburst = generate_sunburst(directories, root) offline_plot(sunburst) def printChildren(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: node.printChildren(isatty, inc + 1) def printNode(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: if not node.known: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") @property def attributes(self): """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the Software Heritage persistent identifier as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]: """ Recursively groups the current child nodes inside a dictionary. For example, if you have the following structure: .. code-block:: none root { subdir: { file.txt } } The generated dictionary will be: .. code-block:: none { "root": { "swhid": "...", "known": True/False } "root/subdir": { "swhid": "...", "known": True/False } "root/subdir/file.txt": { "swhid": "...", "known": True/False } } """ for node_dict in self.__iterNodesAttr(): dict_nodes.update(node_dict) return dict_nodes def iterate(self) -> Iterable[Tree]: """ Recursively iterate through the children of the current node """ for _, child_node in self.children.items(): yield child_node if child_node.otype == DIRECTORY: yield from child_node.iterate() def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]: """ Recursively iterate through the children of the current node returning an iterable of the children nodes attributes Yields: a dictionary containing a path with its known/unknown status and the Software Heritage persistent identifier """ for child_node in self.iterate(): yield child_node.attributes if child_node.otype == DIRECTORY: yield from child_node.__iterNodesAttr() + def getFilesFromDir(self, dir_path: PosixPath) -> List: + """ + Retrieve files information about a specific directory path + + Returns: + A list containing the files attributes present inside the directory given + in input + """ + + def getFiles(node): + files = [] + for _, node in node.children.items(): + if node.otype == CONTENT: + files.append(node.attributes) + return files + + if dir_path == self.path: + return getFiles(self) + else: + for node in self.iterate(): + if node.path == dir_path: + return getFiles(node) + raise InvalidDirectoryPath( + "The directory provided doesn't match any stored directory" + ) + def __getSubDirsInfo(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node.__getSubDirsInfo(root, directories) def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self.__getSubDirsInfo(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a pid returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't calculate contents of the " "object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False diff --git a/swh/scanner/tests/test_model.py b/swh/scanner/tests/test_model.py index 730d3bf..49b725b 100644 --- a/swh/scanner/tests/test_model.py +++ b/swh/scanner/tests/test_model.py @@ -1,78 +1,108 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest + +from swh.scanner.exceptions import InvalidDirectoryPath + def test_tree_add_node(example_tree, temp_folder): avail_paths = temp_folder["paths"].keys() for path, pid in temp_folder["paths"].items(): example_tree.addNode(path, pid, False) for path, node in example_tree.children.items(): assert path in avail_paths if node.children: for subpath, subnode in node.children.items(): assert subpath in avail_paths def test_to_json_no_one_present(example_tree, temp_folder): for path, pid in temp_folder["paths"].items(): example_tree.addNode(path, pid, False) result = example_tree.toDict() assert len(result) == 6 for _, node_info in result.items(): assert node_info["known"] is False def test_get_json_tree_all_present(example_tree, temp_folder): for path, pid in temp_folder["paths"].items(): example_tree.addNode(path, pid, True) result = example_tree.toDict() assert len(result) == 6 for _, node_info in result.items(): assert node_info["known"] is True def test_get_json_tree_only_one_present(example_tree, temp_folder): root = temp_folder["root"] filesample_path = temp_folder["filesample"] for path, pid in temp_folder["paths"].items(): example_tree.addNode(path, pid, True if path == filesample_path else False) result = example_tree.toDict() assert len(result) == 6 for path, node_attr in result.items(): if path == str(root) + "/subdir0/filesample.txt": assert node_attr["known"] is True else: assert node_attr["known"] is False def test_get_directories_info(example_tree, temp_folder): root_path = temp_folder["root"] filesample_path = temp_folder["filesample"] filesample2_path = temp_folder["filesample2"] subdir_path = temp_folder["subdir"].relative_to(root_path) subsubdir_path = temp_folder["subsubdir"].relative_to(root_path) for path, pid in temp_folder["paths"].items(): if path == filesample_path or path == filesample2_path: example_tree.addNode(path, pid, True) else: example_tree.addNode(path, pid, False) directories = example_tree.getDirectoriesInfo(example_tree.path) assert subsubdir_path not in directories assert directories[subdir_path] == (2, 2) + + +def test_get_files_from_dir(example_tree, temp_folder): + subdir_path = temp_folder["subdir"] + + for path, pid in temp_folder["paths"].items(): + example_tree.addNode(path, pid, True) + + files = example_tree.getFilesFromDir(subdir_path) + assert len(files) == 2 + + +def test_get_files_source_path(example_tree, temp_folder): + for path, pid in temp_folder["paths"].items(): + example_tree.addNode(path, pid, True) + + files = example_tree.getFilesFromDir(example_tree.path) + assert len(files) == 1 + + +def test_get_files_from_dir_raise_exception(example_tree, temp_folder): + for path, pid in temp_folder["paths"].items(): + example_tree.addNode(path, pid, True) + + with pytest.raises(InvalidDirectoryPath): + example_tree.getFilesFromDir("test/")