diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index 0e7e5c8..e7c29b8 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,298 +1,302 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click from importlib_metadata import version import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from .exceptions import DBError # Config for the "serve" option BACKEND_DEFAULT_PORT = 5011 # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_CONFIG: Dict[str, Any] = { "web-api": { "url": "https://archive.softwareheritage.org/api/1/", "auth-token": None, } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" SCANNER_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" def setup_config(ctx, api_url): config = ctx.obj["config"] if api_url: if not api_url.endswith("/"): api_url += "/" config["web-api"]["url"] = api_url return config @swh_cli_group.group( name="scanner", context_settings=CONTEXT_SETTINGS, help=SCANNER_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.version_option( version=version("swh.scanner"), prog_name="swh.scanner", ) @click.pass_context def scanner(ctx, config_file: Optional[str]): env_config_path = os.environ.get(CONFIG_ENVVAR) # read_raw_config do not fail if file does not exist, so check it beforehand # while enforcing loading priority if config_file: if not config.config_exists(config_file): raise click.BadParameter( f"File '{config_file}' cannot be opened.", param_hint="--config-file" ) elif env_config_path: if not config.config_exists(env_config_path): raise click.BadParameter( f"File '{env_config_path}' cannot be opened.", param_hint=CONFIG_ENVVAR ) config_file = env_config_path elif config.config_exists(DEFAULT_CONFIG_PATH): config_file = DEFAULT_CONFIG_PATH conf = DEFAULT_CONFIG if config_file is not None: conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf @scanner.command(name="scan") @click.argument("root_path", default=".", type=click.Path(exists=True)) @click.option( "-u", "--api-url", default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ (e.g., ``*.git`` to exclude all .git directories)", ) @click.option( "-f", "--output-format", "out_fmt", default="text", show_default=True, - type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), + type=click.Choice( + ["summary", "text", "json", "ndjson", "sunburst"], case_sensitive=False + ), help="The output format", ) @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.option( "-p", "--policy", default="auto", show_default=True, type=click.Choice(["auto", "bfs", "greedybfs", "filepriority", "dirpriority"]), help="The scan policy.", ) @click.option( "-e", "--extra-info", "extra_info", multiple=True, type=click.Choice(["origin"]), help="Add selected additional information about known software artifacts.", ) @click.pass_context def scan(ctx, root_path, api_url, patterns, out_fmt, interactive, policy, extra_info): """Scan a source code project to discover files and directories already present in the archive. The command can provide different output using the --output-format option:\n \b + summary: display a general summary of what the scanner found + text: display the scan result as a text based tree-like view of all the file, using color to indicate the file status. json: write all collected data on standard output as JSON json: write all collected data on standard output as Newline Delimited JSON sunburst: produce a dynamic chart as .html file. (in $PWD/chart.html) The source code project can be checked using different policies that can be set using the -p/--policy option:\n \b auto: it selects the best policy based on the source code, for codebase(s) with less than 1000 file/dir contents all the nodes will be queried. bfs: scan the source code in the BFS order, checking unknown directories only. \b greedybfs: same as "bfs" policy, but lookup the status of source code artifacts in chunks, in order to minimize the number of Web API round-trips with the archive. \b filepriority: scan all the source code file contents, checking only unset directories. (useful if the codebase contains a lot of source files) dirpriority: scan all the source code directories and check only unknown directory contents. Other information about software artifacts could be specified with the -e/ --extra-info option:\n \b origin: search the origin url of each source code files/dirs using the in-memory compressed graph.""" import swh.scanner.scanner as scanner config = setup_config(ctx, api_url) extra_info = set(extra_info) scanner.scan(config, root_path, patterns, out_fmt, interactive, policy, extra_info) @scanner.group("db", help="Manage local knowledge base for swh-scanner") @click.pass_context def db(ctx): pass @db.command("import") @click.option( "-i", "--input", "input_file", metavar="INPUT_FILE", required=True, type=click.File("r"), help="A file containing SWHIDs", ) @click.option( "-o", "--output", "output_file_db", metavar="OUTPUT_DB_FILE", required=True, show_default=True, help="The name of the generated sqlite database", ) @click.option( "-s", "--chunk-size", "chunk_size", default="10000", metavar="SIZE", show_default=True, type=int, help="The chunk size ", ) @click.pass_context def import_(ctx, chunk_size, input_file, output_file_db): """Create SQLite database of known SWHIDs from a textual list of SWHIDs""" from .db import Db db = Db(output_file_db) cur = db.conn.cursor() try: db.create_from(input_file, chunk_size, cur) db.close() except DBError as e: ctx.fail("Failed to import SWHIDs into database: {0}".format(e)) @db.command("serve") @click.option( "-h", "--host", metavar="HOST", default="127.0.0.1", show_default=True, help="The host of the API server", ) @click.option( "-p", "--port", metavar="PORT", default=f"{BACKEND_DEFAULT_PORT}", show_default=True, help="The port of the API server", ) @click.option( "-f", "--db-file", "db_file", metavar="DB_FILE", default="SWHID_DB.sqlite", show_default=True, type=click.Path(exists=True), help="An sqlite database file (it can be generated with: 'swh scanner db import')", ) @click.pass_context def serve(ctx, host, port, db_file): """Start an API service using the sqlite database generated with the "db import" option.""" import swh.scanner.backend as backend from .db import Db db = Db(db_file) backend.run(host, port, db) db.close() def main(): return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": main() diff --git a/swh/scanner/output.py b/swh/scanner/output.py index 5269d0f..2e4417c 100644 --- a/swh/scanner/output.py +++ b/swh/scanner/output.py @@ -1,109 +1,158 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from enum import Enum import json import os import sys from typing import Any import ndjson from swh.model.from_disk import Directory from .dashboard.dashboard import run_app from .data import MerkleNodeInfo, get_directory_data from .plot import generate_sunburst, offline_plot DEFAULT_OUTPUT = "text" class Color(Enum): BLUE = "\033[94m" GREEN = "\033[92m" RED = "\033[91m" END = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.END.value class Output: def __init__( self, root_path: str, nodes_data: MerkleNodeInfo, source_tree: Directory ): self.root_path = root_path self.nodes_data = nodes_data self.source_tree = source_tree def show(self, mode=DEFAULT_OUTPUT): - if mode == "text": + if mode == "summary": + self.summary() + elif mode == "text": isatty = sys.stdout.isatty() self.print_text(isatty) elif mode == "sunburst": directory_data = get_directory_data( self.root_path, self.source_tree, self.nodes_data ) sunburst_figure = generate_sunburst(directory_data, self.root_path) offline_plot(sunburst_figure) elif mode == "interactive": directory_data = get_directory_data( self.root_path, self.source_tree, self.nodes_data ) sunburst_figure = generate_sunburst(directory_data, self.root_path) run_app(sunburst_figure, self.source_tree, self.nodes_data) elif mode == "json": self.print_json() elif mode == "ndjson": self.print_ndjson() else: raise Exception(f"mode {mode} is not an output format") def get_path_name(self, node): return "path" if "path" in node.data.keys() else "data" def print_text(self, isatty: bool) -> None: def compute_level(node): node_path = str(node.data[self.get_path_name(node)]).split("/") source_path = str(self.source_tree.data["path"]).split("/") return len(node_path) - len(source_path) for node in self.source_tree.iter_tree(): self.print_node(node, isatty, compute_level(node)) def print_node(self, node: Any, isatty: bool, level: int) -> None: rel_path = os.path.basename(node.data[self.get_path_name(node)]) rel_path = rel_path.decode() begin = "│ " * level end = "/" if node.object_type == "directory" else "" if isatty: if not self.nodes_data[node.swhid()]["known"]: rel_path = colorize(rel_path, Color.RED) elif node.object_type == "directory": rel_path = colorize(rel_path, Color.BLUE) elif node.object_type == "content": rel_path = colorize(rel_path, Color.GREEN) print(f"{begin}{rel_path}{end}") + def summary(self): + directories_with_known_files = set() + + total_files = 0 + total_directories = 0 + known_files = 0 + full_known_directories = 0 + partially_known_directories = 0 + + contents = [] + directories = [] + + for node in self.source_tree.iter_tree(): + if node.object_type == "content": + contents.append(node) + elif node.object_type == "directory": + directories.append(node) + else: + assert False, "unreachable" + + total_files = len(contents) + for c in contents: + if self.nodes_data[c.swhid()]["known"]: + known_files += 1 + path = c.data[self.get_path_name(c)] + dir_name = os.path.dirname(path) + directories_with_known_files.add(dir_name) + + total_directories = len(directories) + for d in directories: + if self.nodes_data[d.swhid()]["known"]: + full_known_directories += 1 + else: + path = d.data[self.get_path_name(d)] + if path in directories_with_known_files: + partially_known_directories += 1 + + kp = known_files * 100 // total_files + fkp = full_known_directories * 100 // total_directories + pkp = partially_known_directories * 100 // total_directories + print(f"Files: {total_files:10d}") + print(f" known: {known_files:10d} ({kp:3d}%)") + print(f"directories: {total_directories:10d}") + print(f" fully-known: {full_known_directories:10d} ({fkp:3d}%)") + print(f" partially-known: {partially_known_directories:10d} ({pkp:3d}%)") + print("(see other --output-format for more details)") + def data_as_json(self): json = {} for node in self.source_tree.iter_tree(): rel_path = os.path.relpath( node.data[self.get_path_name(node)].decode(), self.source_tree.data["path"].decode(), ) json[rel_path] = {"swhid": str(node.swhid())} for k, v in self.nodes_data[node.swhid()].items(): json[rel_path][k] = v return json def print_json(self): print(json.dumps(self.data_as_json(), indent=4, sort_keys=True)) def print_ndjson(self): print(ndjson.dumps({k: v} for k, v in self.data_as_json().items()))