diff --git a/swh/scanner/model.py b/swh/scanner/model.py index 5502fd3..9ed9e2f 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,229 +1,239 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import sys import json from pathlib import PosixPath from typing import Any, Dict, Tuple, Iterable from enum import Enum import ndjson from .plot import generate_sunburst, offline_plot from .exceptions import InvalidObjectType from swh.model.identifiers import DIRECTORY, CONTENT class Color(Enum): blue = "\033[94m" green = "\033[92m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ def __init__(self, path: PosixPath, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False self.children: Dict[PosixPath, Tree] = {} def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == PosixPath("."): self.swhid = swhid self.known = known return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].addNode(path, swhid, known) def show(self, format) -> None: """Show tree in different formats""" if format == "json": print(json.dumps(self.toDict(), indent=4, sort_keys=True)) if format == "ndjson": - print(ndjson.dumps(dict_path for dict_path in self.iterate())) + print(ndjson.dumps(dict_path for dict_path in self.__iterNodesAttr())) elif format == "text": isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.printChildren(isatty) elif format == "sunburst": root = self.path directories = self.getDirectoriesInfo(root) sunburst = generate_sunburst(directories, root) offline_plot(sunburst) def printChildren(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: node.printChildren(isatty, inc + 1) def printNode(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: if not node.known: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") @property def attributes(self): """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the Software Heritage persistent identifier as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]: """ Recursively groups the current child nodes inside a dictionary. For example, if you have the following structure: .. code-block:: none root { subdir: { file.txt } } The generated dictionary will be: .. code-block:: none { "root": { "swhid": "...", "known": True/False } "root/subdir": { "swhid": "...", "known": True/False } "root/subdir/file.txt": { "swhid": "...", "known": True/False } } """ - for node_dict in self.iterate(): + for node_dict in self.__iterNodesAttr(): dict_nodes.update(node_dict) return dict_nodes - def iterate(self) -> Iterable[Dict[str, Dict]]: + def iterate(self) -> Iterable[Tree]: """ Recursively iterate through the children of the current node + """ + for _, child_node in self.children.items(): + yield child_node + if child_node.otype == DIRECTORY: + yield from child_node.iterate() + + def __iterNodesAttr(self) -> Iterable[Dict[str, Dict]]: + """ + Recursively iterate through the children of the current node returning + an iterable of the children nodes attributes + Yields: a dictionary containing a path with its known/unknown status and the Software Heritage persistent identifier - """ - for _, child_node in self.children.items(): + for child_node in self.iterate(): yield child_node.attributes if child_node.otype == DIRECTORY: - yield from child_node.iterate() + yield from child_node.__iterNodesAttr() def __getSubDirsInfo(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node.__getSubDirsInfo(root, directories) def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self.__getSubDirsInfo(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a pid returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't calculate contents of the " "object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py index b9bc544..712e28d 100644 --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,106 +1,106 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import json from pathlib import PosixPath from .data import correct_api_response, present_swhids, to_exclude_swhid from swh.scanner.scanner import pids_discovery, get_subpaths, run from swh.scanner.model import Tree from swh.scanner.cli import extract_regex_objs from swh.scanner.exceptions import APIError aio_url = "http://example.org/api/known/" def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, status=200, content_type="application/json", body=json.dumps(correct_api_response), ) actual_result = event_loop.run_until_complete( pids_discovery([], aiosession, "http://example.org/api/") ) assert correct_api_response == actual_result def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(aio_url, content_type="application/json", status=413) with pytest.raises(APIError): event_loop.run_until_complete( pids_discovery([], aiosession, "http://example.org/api/") ) def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): api_url = live_server.url() + "/" request = [ "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) ] # /known/ is limited at 900 with pytest.raises(APIError): event_loop.run_until_complete(pids_discovery(request, aiosession, api_url)) def test_scanner_get_subpaths(temp_folder): root = temp_folder["root"] actual_result = [] for subpath, pid in get_subpaths(root, tuple()): # also check if it's a symlink since pytest tmp_dir fixture create # also a symlink to each directory inside the tmp_dir path if subpath.is_dir() and not subpath.is_symlink(): actual_result.append((subpath, pid)) assert len(actual_result) == 2 @pytest.mark.options(debug=False) def test_app(app): assert not app.debug def test_scanner_result(live_server, event_loop, test_folder): api_url = live_server.url() + "/" sample_folder = test_folder.joinpath(PosixPath("sample-folder")) source_tree = Tree(sample_folder) event_loop.run_until_complete(run(sample_folder, api_url, source_tree, set())) - for node_dict in source_tree.iterate(): - node_info = list(node_dict.values())[0] + for child_node in source_tree.iterate(): + node_info = list(child_node.attributes.values())[0] if node_info["swhid"] in present_swhids: assert node_info["known"] is True else: assert node_info["known"] is False def test_scanner_result_with_exclude_patterns(live_server, event_loop, test_folder): api_url = live_server.url() + "/" sample_folder = test_folder.joinpath(PosixPath("sample-folder")) patterns = (str(sample_folder) + "/toexclude",) exclude_pattern = { reg_obj for reg_obj in extract_regex_objs(sample_folder, patterns) } source_tree = Tree(sample_folder) event_loop.run_until_complete( run(sample_folder, api_url, source_tree, exclude_pattern) ) - for node_dict in source_tree.iterate(): - node_info = list(node_dict.values())[0] + for child_node in source_tree.iterate(): + node_info = list(child_node.attributes.values())[0] assert node_info["swhid"] != to_exclude_swhid