diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -4,12 +4,13 @@ # See top-level LICENSE file for more information import click +import os import asyncio import glob import re import fnmatch -from pathlib import PosixPath -from typing import Tuple +from pathlib import Path +from typing import Tuple, Dict, Any from .scanner import run from .model import Tree @@ -17,14 +18,38 @@ from .dashboard.dashboard import run_app from .exceptions import InvalidDirectoryPath +from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +DEFAULT_CONFIG: Dict[str, Dict[str, Any]] = { + "web-client": {"api-url": "https://archive.softwareheritage.org/api/1/"} +} + @click.group(name="scanner", context_settings=CONTEXT_SETTINGS) +@click.option( + "-C", + "--config-file", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="YAML configuration file", +) @click.pass_context -def scanner(ctx): +def scanner(ctx, config_file: Path): """Software Heritage Scanner tools.""" - pass + if not config_file: + config_file = Path( + os.environ.get( + "SWH_CONFIG_FILENAME", Path(click.get_app_dir("swh")) / "global" + ) + ) + + # recursive merge not done by config.read + conf = config.read(config_file) + conf = config.merge_configs(conf, DEFAULT_CONFIG) + + ctx.ensure_object(dict) + ctx.obj["config"] = conf def parse_url(url): @@ -35,7 +60,7 @@ return url -def extract_regex_objs(root_path: PosixPath, patterns: Tuple[str]) -> object: +def extract_regex_objs(root_path: Path, patterns: Tuple[str]) -> object: """Generates a regex object for each pattern given in input and checks if the path is a subdirectory or relative to the root path. @@ -44,7 +69,7 @@ """ for pattern in patterns: for path in glob.glob(pattern): - dirpath = PosixPath(path) + dirpath = Path(path) if root_path not in dirpath.parents: error_msg = ( f'The path "{dirpath}" is not a subdirectory or relative ' @@ -53,7 +78,7 @@ raise InvalidDirectoryPath(error_msg) if glob.glob(pattern): - regex = fnmatch.translate(str(PosixPath(pattern))) + regex = fnmatch.translate(str(Path(pattern))) yield re.compile(regex) @@ -62,7 +87,7 @@ @click.option( "-u", "--api-url", - default="https://archive.softwareheritage.org/api/1", + default=None, metavar="API_URL", show_default=True, help="url for the api request", @@ -77,7 +102,8 @@ ) @click.option( "-f", - "--format", + "--output-format", + "out_fmt", type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), default="text", help="select the output format", @@ -86,28 +112,35 @@ "-i", "--interactive", is_flag=True, help="show the result in a dashboard" ) @click.pass_context -def scan(ctx, root_path, api_url, patterns, format, interactive): +def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): """Scan a source code project to discover files and directories already present in the archive""" + config = ctx.obj["config"] + if api_url: + config["web-client"]["api-url"] = parse_url(api_url) + sre_patterns = set() if patterns: sre_patterns = { - reg_obj for reg_obj in extract_regex_objs(PosixPath(root_path), patterns) + reg_obj for reg_obj in extract_regex_objs(Path(root_path), patterns) } - api_url = parse_url(api_url) - source_tree = Tree(PosixPath(root_path)) + source_tree = Tree(Path(root_path)) loop = asyncio.get_event_loop() - loop.run_until_complete(run(root_path, api_url, source_tree, sre_patterns)) + loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) if interactive: - root = PosixPath(root_path) + root = Path(root_path) directories = source_tree.getDirectoriesInfo(root) figure = generate_sunburst(directories, root) run_app(figure, source_tree) else: - source_tree.show(format) + source_tree.show(out_fmt) + + +def main(): + return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": - scan() + main() diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py --- a/swh/scanner/dashboard/dashboard.py +++ b/swh/scanner/dashboard/dashboard.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from pathlib import PosixPath +from pathlib import Path from ..model import Tree @@ -15,7 +15,7 @@ from dash.dependencies import Input, Output -def generate_table_body(dir_path: PosixPath, source: Tree): +def generate_table_body(dir_path: Path, source: Tree): """ Generate the data_table from the path taken from the chart. @@ -25,7 +25,7 @@ data = [] for file_info in source.getFilesFromDir(dir_path): for file_path, attr in file_info.items(): - file_path = PosixPath(file_path) + file_path = Path(file_path) file_name = file_path.parts[len(file_path.parts) - 1] data.append( html.Tr( @@ -92,7 +92,7 @@ full_path = ( source.path.joinpath(raw_path) if raw_path != str(source.path) - else PosixPath(raw_path) + else Path(raw_path) ) return table_header + generate_table_body(full_path, source), str(full_path) else: diff --git a/swh/scanner/model.py b/swh/scanner/model.py --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -6,7 +6,7 @@ from __future__ import annotations import sys import json -from pathlib import PosixPath +from pathlib import Path from typing import Any, Dict, Tuple, Iterable, List from enum import Enum @@ -32,20 +32,20 @@ """Representation of a file system structure """ - def __init__(self, path: PosixPath, father: Tree = None): + def __init__(self, path: Path, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False - self.children: Dict[PosixPath, Tree] = {} + self.children: Dict[Path, Tree] = {} - def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: + def addNode(self, path: Path, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) - if relative_path == PosixPath("."): + if relative_path == Path("."): self.swhid = swhid self.known = known return @@ -172,7 +172,7 @@ if child_node.otype == DIRECTORY: yield from child_node.__iterNodesAttr() - def getFilesFromDir(self, dir_path: PosixPath) -> List: + def getFilesFromDir(self, dir_path: Path) -> List: """ Retrieve files information about a specific directory path @@ -215,7 +215,7 @@ if child_node.has_dirs(): child_node.__getSubDirsInfo(root, directories) - def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: + def getDirectoriesInfo(self, root: Path) -> Dict[Path, Tuple[int, int]]: """Get information about all directories under the given root. Returns: diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py --- a/swh/scanner/plot.py +++ b/swh/scanner/plot.py @@ -16,7 +16,7 @@ """ from typing import List, Dict, Tuple -from pathlib import PosixPath +from pathlib import Path from plotly.offline import offline import plotly.graph_objects as go @@ -160,7 +160,7 @@ return complete_df -def compute_max_depth(dirs_path: List[PosixPath], root: PosixPath) -> int: +def compute_max_depth(dirs_path: List[Path], root: Path) -> int: """Compute the maximum depth level of the given directory paths. Example: for `var/log/kernel/` the depth level is 3 @@ -179,10 +179,7 @@ def generate_df_from_dirs( - dirs: Dict[PosixPath, Tuple[int, int]], - columns: List[str], - root: PosixPath, - max_depth: int, + dirs: Dict[Path, Tuple[int, int]], columns: List[str], root: Path, max_depth: int, ) -> pd.DataFrame: """Generate a dataframe from the directories given in input. @@ -208,7 +205,7 @@ """ - def get_parents(path: PosixPath): + def get_parents(path: Path): parts = path.parts[1:] if path.parts[0] == "/" else path.parts for i in range(1, len(parts) + 1): @@ -237,7 +234,7 @@ def generate_sunburst( - directories: Dict[PosixPath, Tuple[int, int]], root: PosixPath + directories: Dict[Path, Tuple[int, int]], root: Path ) -> go.Sunburst: """Generate a sunburst chart from the directories given in input. diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -8,7 +8,7 @@ import asyncio import aiohttp from typing import List, Dict, Tuple, Iterator, Union, Set, Any -from pathlib import PosixPath +from pathlib import Path from .exceptions import error_response from .model import Tree @@ -76,16 +76,14 @@ False if the directory has to be ignored, True otherwise """ - path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) + path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) for sre_pattern in exclude_patterns: if sre_pattern.match(str(path)): return False return True -def get_subpaths( - path: PosixPath, exclude_patterns: Set[Any] -) -> Iterator[Tuple[PosixPath, str]]: +def get_subpaths(path: Path, exclude_patterns: Set[Any]) -> Iterator[Tuple[Path, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. @@ -118,12 +116,12 @@ dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): - sub_path = PosixPath(dirpath).joinpath(node) + sub_path = Path(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( - path: PosixPath, + path: Path, session: aiohttp.ClientSession, api_url: str, exclude_patterns: Set[Any], @@ -153,7 +151,7 @@ async def run( - root: PosixPath, api_url: str, source_tree: Tree, exclude_patterns: Set[Any] + config: Dict[str, Any], root: Path, source_tree: Tree, exclude_patterns: Set[Any], ) -> None: """Start scanning from the given root. @@ -164,6 +162,7 @@ api_url: url for the API request """ + api_url = config["web-client"]["api-url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( @@ -178,5 +177,7 @@ if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) - async with aiohttp.ClientSession() as session: + headers = {"Authorization": f"Bearer {config['web-client']['http-auth-token']}"} + + async with aiohttp.ClientSession(headers=headers) as session: await _scan(root, session, api_url, source_tree, exclude_patterns)