diff --git a/docs/cli.rst b/docs/cli.rst index 0e8d528..00d98bd 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -1,6 +1,6 @@ Command-line interface ====================== .. click:: swh.scanner.cli:scan - :prog: swh scanner scan - :show-nested: + :prog: swh scanner scan + :show-nested: diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index 9acc065..222af8f 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,243 +1,243 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict, Optional import click from importlib_metadata import version import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from .exceptions import DBError # Config for the "serve" option BACKEND_DEFAULT_PORT = 5011 # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILE" DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml") DEFAULT_CONFIG: Dict[str, Any] = { "web-api": { "url": "https://archive.softwareheritage.org/api/1/", "auth-token": None, } } CONFIG_FILE_HELP = f"""Configuration file: \b The CLI option or the environment variable will fail if invalid. CLI option is checked first. Then, environment variable {CONFIG_ENVVAR} is checked. Then, if cannot load the default path, a set of default values are used. Default config path is {DEFAULT_CONFIG_PATH}. Default config values are: \b {yaml.dump(DEFAULT_CONFIG)}""" SCANNER_HELP = f"""Software Heritage Scanner tools. {CONFIG_FILE_HELP}""" def setup_config(ctx, api_url): config = ctx.obj["config"] if api_url: if not api_url.endswith("/"): api_url += "/" config["web-api"]["url"] = api_url return config @swh_cli_group.group( name="scanner", context_settings=CONTEXT_SETTINGS, help=SCANNER_HELP, ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""YAML configuration file""", ) @click.version_option( version=version("swh-scanner"), prog_name="swh-scanner", ) @click.pass_context def scanner(ctx, config_file: Optional[str]): env_config_path = os.environ.get(CONFIG_ENVVAR) # read_raw_config do not fail if file does not exist, so check it beforehand # while enforcing loading priority if config_file: if not config.config_exists(config_file): raise click.BadParameter( f"File '{config_file}' cannot be opened.", param_hint="--config-file" ) elif env_config_path: if not config.config_exists(env_config_path): raise click.BadParameter( f"File '{env_config_path}' cannot be opened.", param_hint=CONFIG_ENVVAR ) config_file = env_config_path elif config.config_exists(DEFAULT_CONFIG_PATH): config_file = DEFAULT_CONFIG_PATH conf = DEFAULT_CONFIG if config_file is not None: conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf @scanner.command(name="scan") @click.argument("root_path", required=True, type=click.Path(exists=True)) @click.option( "-u", "--api-url", default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ - (e.g., '*.git' to exclude all .git directories)", + (e.g., ``*.git`` to exclude all .git directories)", ) @click.option( "-f", "--output-format", "out_fmt", default="text", show_default=True, type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), help="The output format", ) @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.pass_context def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): """Scan a source code project to discover files and directories already present in the archive""" import swh.scanner.scanner as scanner config = setup_config(ctx, api_url) scanner.scan(config, root_path, patterns, out_fmt, interactive) @scanner.group("db", help="Manage local knowledge base for swh-scanner") @click.pass_context def db(ctx): pass @db.command("import") @click.option( "-i", "--input", "input_file", metavar="INPUT_FILE", required=True, type=click.File("r"), help="A file containing SWHIDs", ) @click.option( "-o", "--output", "output_file_db", metavar="OUTPUT_DB_FILE", required=True, show_default=True, help="The name of the generated sqlite database", ) @click.option( "-s", "--chunk-size", "chunk_size", default="10000", metavar="SIZE", show_default=True, type=int, help="The chunk size ", ) @click.pass_context def import_(ctx, chunk_size, input_file, output_file_db): """Create SQLite database of known SWHIDs from a textual list of SWHIDs """ from .db import Db db = Db(output_file_db) cur = db.conn.cursor() try: db.create_from(input_file, chunk_size, cur) db.close() except DBError as e: ctx.fail("Failed to import SWHIDs into database: {0}".format(e)) @db.command("serve") @click.option( "-h", "--host", metavar="HOST", default="127.0.0.1", show_default=True, help="The host of the API server", ) @click.option( "-p", "--port", metavar="PORT", default=f"{BACKEND_DEFAULT_PORT}", show_default=True, help="The port of the API server", ) @click.option( "-f", "--db-file", "db_file", metavar="DB_FILE", default="SWHID_DB.sqlite", show_default=True, type=click.Path(exists=True), help="An sqlite database file (it can be generated with: 'swh scanner db import')", ) @click.pass_context def serve(ctx, host, port, db_file): """Start an API service using the sqlite database generated with the "db import" option.""" import swh.scanner.backend as backend from .db import Db db = Db(db_file) backend.run(host, port, db) db.close() def main(): return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": main() diff --git a/swh/scanner/model.py b/swh/scanner/model.py index c0adc61..6cf7ef6 100644 --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -1,261 +1,259 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from enum import Enum import json from pathlib import Path import sys from typing import Any, Dict, Iterator, List, Tuple import ndjson from swh.model.identifiers import CONTENT, DIRECTORY from .exceptions import InvalidDirectoryPath, InvalidObjectType from .plot import generate_sunburst, offline_plot class Color(Enum): blue = "\033[94m" green = "\033[92m" red = "\033[91m" end = "\033[0m" def colorize(text: str, color: Color): return color.value + text + Color.end.value class Tree: """Representation of a file system structure """ def __init__(self, path: Path, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False self.children: Dict[Path, Tree] = {} def add_node(self, path: Path, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) if relative_path == Path("."): self.swhid = swhid self.known = known return new_path = self.path.joinpath(relative_path.parts[0]) if new_path not in self.children: self.children[new_path] = Tree(new_path, self) self.children[new_path].add_node(path, swhid, known) def show(self, fmt) -> None: """Show tree in different formats""" if fmt == "json": print(json.dumps(self.to_dict(), indent=4, sort_keys=True)) if fmt == "ndjson": print( ndjson.dumps( {str(Path(k).relative_to(self.path)): v} for node in self.iterate() for k, v in node.attributes.items() ) ) elif fmt == "text": isatty = sys.stdout.isatty() root_dir = self.path.relative_to(self.path.parent) print(colorize(str(root_dir), Color.blue) if isatty else str(root_dir)) self.print_children(isatty) elif fmt == "sunburst": root = self.path directories = self.get_directories_info(root) sunburst = generate_sunburst(directories, root) offline_plot(sunburst) def print_children(self, isatty: bool, inc: int = 1) -> None: for path, node in self.children.items(): self.print_node(node, isatty, inc) if node.children: node.print_children(isatty, inc + 1) def print_node(self, node: Any, isatty: bool, inc: int) -> None: rel_path = str(node.path.relative_to(self.path)) begin = "│ " * inc end = "/" if node.otype == DIRECTORY else "" if isatty: if not node.known: rel_path = colorize(rel_path, Color.red) elif node.otype == DIRECTORY: rel_path = colorize(rel_path, Color.blue) elif node.otype == CONTENT: rel_path = colorize(rel_path, Color.green) print(f"{begin}{rel_path}{end}") @property def attributes(self) -> Dict[str, Dict[str, Any]]: """ Get the attributes of the current node grouped by the relative path. Returns: a dictionary containing a path as key and its known/unknown status and the SWHID as values. """ return {str(self.path): {"swhid": self.swhid, "known": self.known,}} def to_dict(self) -> Dict[str, Dict[str, Any]]: """ Recursively flatten the current tree nodes into a dictionary. For example, if you have the following structure: .. code-block:: none - root { - subdir: { - file.txt + root { + subdir: { + file.txt + } } - } The generated dictionary will be: .. code-block:: none - { - "root": { - "swhid": "...", - "known": True/False - } - "root/subdir": { - "swhid": "...", - "known": True/False - } - "root/subdir/file.txt": { - "swhid": "...", - "known": True/False + { + "root": { + "swhid": "...", + "known": True/False + } + "root/subdir": { + "swhid": "...", + "known": True/False + } + "root/subdir/file.txt": { + "swhid": "...", + "known": True/False + } } - } - - """ return { str(Path(k).relative_to(self.path)): v for node in self.iterate() for k, v in node.attributes.items() } def iterate(self) -> Iterator[Tree]: """ Recursively iterate through the children of the current node """ for _, child_node in self.children.items(): yield child_node if child_node.otype == DIRECTORY: yield from child_node.iterate() def get_files_from_dir(self, dir_path: Path) -> List: """ Retrieve files information about a specific directory path Returns: A list containing the files attributes present inside the directory given in input """ def get_files(node): files = [] for _, node in node.children.items(): if node.otype == CONTENT: files.append(node.attributes) return files if dir_path == self.path: return get_files(self) else: for node in self.iterate(): if node.path == dir_path: return get_files(node) raise InvalidDirectoryPath( "The directory provided doesn't match any stored directory" ) def _get_sub_dirs_info(self, root, directories): """Fills the directories given in input with the contents information stored inside the directory child, only if they have contents. """ for path, child_node in self.children.items(): if child_node.otype == DIRECTORY: rel_path = path.relative_to(root) contents_info = child_node.count_contents() # checks the first element of the tuple # (the number of contents in a directory) # if it is equal to zero it means that there are no contents # in that directory. if not contents_info[0] == 0: directories[rel_path] = contents_info if child_node.has_dirs(): child_node._get_sub_dirs_info(root, directories) def get_directories_info(self, root: Path) -> Dict[Path, Tuple[int, int]]: """Get information about all directories under the given root. Returns: A dictionary with a directory path as key and the relative contents information (the result of count_contents) as values. """ directories = {root: self.count_contents()} self._get_sub_dirs_info(root, directories) return directories def count_contents(self) -> Tuple[int, int]: """Count how many contents are present inside a directory. If a directory has a SWHID returns as it has all the contents. Returns: A tuple with the total number of the contents and the number of contents known (the ones that have a persistent identifier). """ contents = 0 discovered = 0 if not self.otype == DIRECTORY: raise InvalidObjectType( "Can't count contents of the object type: %s" % self.otype ) if self.known: # to identify a directory with all files/directories present return (1, 1) else: for _, child_node in self.children.items(): if child_node.otype == CONTENT: contents += 1 if child_node.known: discovered += 1 return (contents, discovered) def has_dirs(self) -> bool: """Checks if node has directories """ for _, child_node in self.children.items(): if child_node.otype == DIRECTORY: return True return False diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py index e8cb60f..8ce2b0d 100644 --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,231 +1,233 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import itertools import os from pathlib import Path from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union import aiohttp from swh.model.from_disk import ( Content, Directory, accept_all_directories, extract_regex_objs, ) from swh.model.identifiers import CoreSWHID, ObjectType from .dashboard.dashboard import run_app from .exceptions import error_response from .model import Tree from .plot import generate_sunburst async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, ) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of SWHIDS api_url: url for the API request Returns: A dictionary with: - key: SWHID searched + + key: + SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = api_url + "known/" chunk_size = 1000 requests = [] def get_chunk(swhids): for i in range(0, len(swhids), chunk_size): yield swhids[i : i + chunk_size] async def make_request(swhids): async with session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > chunk_size: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) def directory_filter( path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] ) -> bool: """It checks if the path_name is matching with the patterns given in input. It is also used as a `dir_filter` function when generating the directory object from `swh.model.from_disk` Returns: False if the directory has to be ignored, True otherwise """ path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) for sre_pattern in exclude_patterns: if sre_pattern.match(bytes(path)): return False return True def get_subpaths( path: Path, exclude_patterns: Iterable[Pattern[bytes]] ) -> Iterator[Tuple[Path, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. Args: path: the root path Yields: pairs of: path, the relative SWHID """ def swhid_of(path: Path) -> str: if path.is_dir(): if exclude_patterns: def dir_filter(dirpath: bytes, *args) -> bool: return directory_filter(dirpath, exclude_patterns) else: dir_filter = accept_all_directories # type: ignore obj = Directory.from_disk( path=bytes(path), dir_filter=dir_filter ).get_data() return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"])) else: obj = Content.from_file(path=bytes(path)).get_data() return str( CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"]) ) dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): sub_path = Path(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( path: Path, session: aiohttp.ClientSession, api_url: str, exclude_patterns: Iterable[Pattern[bytes]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. Args: path: the source path api_url: url for the API request Returns: a map containing tuples with: a subpath of the given path, the SWHID of the subpath and the result of the api call """ parsed_paths = dict(get_subpaths(path, exclude_patterns)) parsed_swhids = await swhids_discovery( list(parsed_paths.values()), session, api_url ) def unpack(tup): subpath, swhid = tup return (subpath, swhid, parsed_swhids[swhid]["known"]) return map(unpack, parsed_paths.items()) async def run( config: Dict[str, Any], root: str, source_tree: Tree, exclude_patterns: Iterable[Pattern[bytes]], ) -> None: """Start scanning from the given root. It fills the source tree with the path discovered. Args: root: the root path to scan api_url: url for the API request """ api_url = config["web-api"]["url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( root, session, api_url, exclude_patterns ): obj_type = CoreSWHID.from_string(obj_swhid).object_type if obj_type == ObjectType.CONTENT: source_tree.add_node(path, obj_swhid, known) elif obj_type == ObjectType.DIRECTORY and directory_filter( path, exclude_patterns ): source_tree.add_node(path, obj_swhid, known) if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: await _scan(root, session, api_url, source_tree, exclude_patterns) def scan( config: Dict[str, Any], root_path: str, exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, ): """Scan a source code project to discover files and directories already present in the archive""" converted_patterns = set(pattern.encode() for pattern in exclude_patterns) sre_patterns = set() if exclude_patterns: sre_patterns = { reg_obj for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) } source_tree = Tree(Path(root_path)) loop = asyncio.get_event_loop() loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) if interactive: root = Path(root_path) directories = source_tree.get_directories_info(root) figure = generate_sunburst(directories, root) run_app(figure, source_tree) else: source_tree.show(out_fmt)