diff --git a/mypy.ini b/mypy.ini index 46f8db8..354a135 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,15 +1,15 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True -# [mypy-add_your_lib_here.*] -# ignore_missing_imports = True +[mypy-ndjson.*] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index 6800cb7..822dad4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner requests aiohttp +ndjson plotly pandas numpy dulwich diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index 4316a1c..e756664 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,102 +1,102 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import asyncio import glob import re import fnmatch from pathlib import PosixPath from typing import Tuple from .scanner import run from .model import Tree from .exceptions import InvalidDirectoryPath from swh.core.cli import CONTEXT_SETTINGS @click.group(name="scanner", context_settings=CONTEXT_SETTINGS) @click.pass_context def scanner(ctx): """Software Heritage Scanner tools.""" pass def parse_url(url): if not url.startswith("https://"): url = "https://" + url if not url.endswith("/"): url += "/" return url def extract_regex_objs(root_path: PosixPath, patterns: Tuple[str]) -> object: """Generates a regex object for each pattern given in input and checks if the path is a subdirectory or relative to the root path. Yields: an SRE_Pattern object """ for pattern in patterns: for path in glob.glob(pattern): dirpath = PosixPath(path) if root_path not in dirpath.parents: error_msg = ( f'The path "{dirpath}" is not a subdirectory or relative ' f'to the root directory path: "{root_path}"' ) raise InvalidDirectoryPath(error_msg) if glob.glob(pattern): regex = fnmatch.translate(str(PosixPath(pattern))) yield re.compile(regex) @scanner.command(name="scan") @click.argument("root_path", required=True, type=click.Path(exists=True)) @click.option( "-u", "--api-url", default="https://archive.softwareheritage.org/api/1", metavar="API_URL", show_default=True, help="url for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="recursively exclude a specific pattern", ) @click.option( "-f", "--format", - type=click.Choice(["text", "json", "sunburst"], case_sensitive=False), + type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), default="text", help="select the output format", ) @click.pass_context def scan(ctx, root_path, api_url, patterns, format): """Scan a source code project to discover files and directories already present in the archive""" sre_patterns = set() if patterns: sre_patterns = { reg_obj for reg_obj in extract_regex_objs(PosixPath(root_path), patterns) } api_url = parse_url(api_url) source_tree = Tree(PosixPath(root_path)) loop = asyncio.get_event_loop() loop.run_until_complete(run(root_path, api_url, source_tree, sre_patterns)) source_tree.show(format) if __name__ == "__main__": scan() diff --git a/swh/scanner/model.py b/swh/scanner/model.py index ec67c23..3267193 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,224 +1,228 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import sys import json from pathlib import PosixPath from typing import Any, Dict, Tuple, Iterable from enum import Enum +import ndjson + from .plot import sunburst from .exceptions import InvalidObjectType - from swh.model.identifiers import DIRECTORY, CONTENT class Color(Enum): blue = "\033[94m" green = "\033[92m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ def __init__(self, path: PosixPath, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False self.children: Dict[PosixPath, Tree] = {} def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == PosixPath("."): self.swhid = swhid self.known = known return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].addNode(path, swhid, known) def show(self, format) -> None: """Show tree in different formats""" if format == "json": print(json.dumps(self.toDict(), indent=4, sort_keys=True)) + if format == "ndjson": + print(ndjson.dumps(dict_path for dict_path in self.iterate())) + elif format == "text": isatty = sys.stdout.isatty() print(colorize(str(self.path), Color.blue) if isatty else str(self.path)) self.printChildren(isatty) elif format == "sunburst": root = self.path directories = self.getDirectoriesInfo(root) sunburst(directories, root) def printChildren(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.printNode(node, isatty, inc) if node.children: node.printChildren(isatty, inc + 1) def printNode(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: if not node.known: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") @property def attributes(self): """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the Software Heritage persistent identifier as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def toDict(self, dict_nodes={}) -> Dict[str, Dict[str, Dict]]: """ Recursively groups the current child nodes inside a dictionary. For example, if you have the following structure: .. code-block:: none root { subdir: { file.txt } } The generated dictionary will be: .. code-block:: none { "root": { "swhid": "...", "known": True/False } "root/subdir": { "swhid": "...", "known": True/False } "root/subdir/file.txt": { "swhid": "...", "known": True/False } } """ for node_dict in self.iterate(): dict_nodes.update(node_dict) return dict_nodes def iterate(self) -> Iterable[Dict[str, Dict]]: """ Recursively iterate through the children of the current node Yields: a dictionary containing a path with its known/unknown status and the Software Heritage persistent identifier """ for _, child_node in self.children.items(): yield child_node.attributes if child_node.otype == DIRECTORY: yield from child_node.iterate() def __getSubDirsInfo(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node.__getSubDirsInfo(root, directories) def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self.__getSubDirsInfo(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a pid returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't calculate contents of the " "object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False