diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -5,21 +5,31 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control +import os +from typing import Any, Dict + import click -from pathlib import PosixPath -from typing import Tuple +from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -@click.group(name="scanner", context_settings=CONTEXT_SETTINGS) -@click.pass_context -def scanner(ctx): - """Software Heritage Scanner tools.""" - pass +# All generic config code should reside in swh.core.config +DEFAULT_CONFIG_PATH = os.environ.get( + "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") +) + + +DEFAULT_CONFIG: Dict[str, Any] = { + "web-api": { + "url": "https://archive.softwareheritage.org/api/1/", + "auth-token": None, + } +} def parse_url(url): + """CLI-specific helper to 'autocomplete' the provided url.""" if not url.startswith("https://"): url = "https://" + url if not url.endswith("/"): @@ -27,30 +37,24 @@ return url -def extract_regex_objs(root_path: PosixPath, patterns: Tuple[str]) -> object: - """Generates a regex object for each pattern given in input and checks if - the path is a subdirectory or relative to the root path. - - Yields: - an SRE_Pattern object - """ - import glob - import fnmatch - import re - from .exceptions import InvalidDirectoryPath +@click.group(name="scanner", context_settings=CONTEXT_SETTINGS) +@click.option( + "-C", + "--config-file", + default=DEFAULT_CONFIG_PATH, + type=click.Path(exists=True, dir_okay=False, path_type=str), + help="YAML configuration file", +) +@click.pass_context +def scanner(ctx, config_file: str): + """Software Heritage Scanner tools.""" - for pattern in patterns: - for path in glob.glob(pattern): - dirpath = PosixPath(path) - if root_path not in dirpath.parents: - error_msg = ( - f'The path "{dirpath}" is not a subdirectory or relative ' - f'to the root directory path: "{root_path}"' - ) - raise InvalidDirectoryPath(error_msg) + # recursive merge not done by config.read + conf = config.read_raw_config(config.config_basepath(config_file)) + conf = config.merge_configs(DEFAULT_CONFIG, conf) - regex = fnmatch.translate(str(PosixPath(pattern))) - yield re.compile(regex) + ctx.ensure_object(dict) + ctx.obj["config"] = conf @scanner.command(name="scan") @@ -58,7 +62,7 @@ @click.option( "-u", "--api-url", - default="https://archive.softwareheritage.org/api/1", + default=None, metavar="API_URL", show_default=True, help="URL for the api request", @@ -74,7 +78,8 @@ ) @click.option( "-f", - "--format", + "--output-format", + "out_fmt", default="text", show_default=True, type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), @@ -84,34 +89,21 @@ "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.pass_context -def scan(ctx, root_path, api_url, patterns, format, interactive): +def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): """Scan a source code project to discover files and directories already present in the archive""" - import asyncio - from .scanner import run - from .model import Tree - from .plot import generate_sunburst - from .dashboard.dashboard import run_app - - sre_patterns = set() - if patterns: - sre_patterns = { - reg_obj for reg_obj in extract_regex_objs(PosixPath(root_path), patterns) - } - - api_url = parse_url(api_url) - source_tree = Tree(PosixPath(root_path)) - loop = asyncio.get_event_loop() - loop.run_until_complete(run(root_path, api_url, source_tree, sre_patterns)) - - if interactive: - root = PosixPath(root_path) - directories = source_tree.getDirectoriesInfo(root) - figure = generate_sunburst(directories, root) - run_app(figure, source_tree) - else: - source_tree.show(format) + from .scanner import scan + + config = ctx.obj["config"] + if api_url: + config["web-api"]["url"] = parse_url(api_url) + + scan(config, root_path, patterns, out_fmt, interactive) + + +def main(): + return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": - scan() + main() diff --git a/swh/scanner/dashboard/dashboard.py b/swh/scanner/dashboard/dashboard.py --- a/swh/scanner/dashboard/dashboard.py +++ b/swh/scanner/dashboard/dashboard.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from pathlib import PosixPath +from pathlib import Path from ..model import Tree @@ -15,7 +15,7 @@ from dash.dependencies import Input, Output -def generate_table_body(dir_path: PosixPath, source: Tree): +def generate_table_body(dir_path: Path, source: Tree): """ Generate the data_table from the path taken from the chart. @@ -25,7 +25,7 @@ data = [] for file_info in source.getFilesFromDir(dir_path): for file_path, attr in file_info.items(): - file_path = PosixPath(file_path) + file_path = Path(file_path) file_name = file_path.parts[len(file_path.parts) - 1] data.append( html.Tr( @@ -92,7 +92,7 @@ full_path = ( source.path.joinpath(raw_path) if raw_path != str(source.path) - else PosixPath(raw_path) + else Path(raw_path) ) return table_header + generate_table_body(full_path, source), str(full_path) else: diff --git a/swh/scanner/model.py b/swh/scanner/model.py --- a/swh/scanner/model.py +++ b/swh/scanner/model.py @@ -6,7 +6,7 @@ from __future__ import annotations import sys import json -from pathlib import PosixPath +from pathlib import Path from typing import Any, Dict, Tuple, Iterable, List from enum import Enum @@ -32,20 +32,20 @@ """Representation of a file system structure """ - def __init__(self, path: PosixPath, father: Tree = None): + def __init__(self, path: Path, father: Tree = None): self.father = father self.path = path self.otype = DIRECTORY if path.is_dir() else CONTENT self.swhid = "" self.known = False - self.children: Dict[PosixPath, Tree] = {} + self.children: Dict[Path, Tree] = {} - def addNode(self, path: PosixPath, swhid: str, known: bool) -> None: + def addNode(self, path: Path, swhid: str, known: bool) -> None: """Recursively add a new path. """ relative_path = path.relative_to(self.path) - if relative_path == PosixPath("."): + if relative_path == Path("."): self.swhid = swhid self.known = known return @@ -172,7 +172,7 @@ if child_node.otype == DIRECTORY: yield from child_node.__iterNodesAttr() - def getFilesFromDir(self, dir_path: PosixPath) -> List: + def getFilesFromDir(self, dir_path: Path) -> List: """ Retrieve files information about a specific directory path @@ -215,7 +215,7 @@ if child_node.has_dirs(): child_node.__getSubDirsInfo(root, directories) - def getDirectoriesInfo(self, root: PosixPath) -> Dict[PosixPath, Tuple[int, int]]: + def getDirectoriesInfo(self, root: Path) -> Dict[Path, Tuple[int, int]]: """Get information about all directories under the given root. Returns: diff --git a/swh/scanner/plot.py b/swh/scanner/plot.py --- a/swh/scanner/plot.py +++ b/swh/scanner/plot.py @@ -16,7 +16,7 @@ """ from typing import List, Dict, Tuple -from pathlib import PosixPath +from pathlib import Path from plotly.offline import offline import plotly.graph_objects as go @@ -160,7 +160,7 @@ return complete_df -def compute_max_depth(dirs_path: List[PosixPath], root: PosixPath) -> int: +def compute_max_depth(dirs_path: List[Path], root: Path) -> int: """Compute the maximum depth level of the given directory paths. Example: for `var/log/kernel/` the depth level is 3 @@ -179,10 +179,7 @@ def generate_df_from_dirs( - dirs: Dict[PosixPath, Tuple[int, int]], - columns: List[str], - root: PosixPath, - max_depth: int, + dirs: Dict[Path, Tuple[int, int]], columns: List[str], root: Path, max_depth: int, ) -> pd.DataFrame: """Generate a dataframe from the directories given in input. @@ -208,7 +205,7 @@ """ - def get_parents(path: PosixPath): + def get_parents(path: Path): parts = path.parts[1:] if path.parts[0] == "/" else path.parts for i in range(1, len(parts) + 1): @@ -237,7 +234,7 @@ def generate_sunburst( - directories: Dict[PosixPath, Tuple[int, int]], root: PosixPath + directories: Dict[Path, Tuple[int, int]], root: Path ) -> go.Sunburst: """Generate a sunburst chart from the directories given in input. diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -3,15 +3,16 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os -import itertools import asyncio -import aiohttp -from typing import List, Dict, Tuple, Iterator, Union, Set, Any -from pathlib import PosixPath +import fnmatch +import glob +import itertools +import os +from pathlib import Path +import re +from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any -from .exceptions import error_response -from .model import Tree +import aiohttp from swh.model.from_disk import Directory, Content, accept_all_directories from swh.model.identifiers import ( @@ -21,6 +22,11 @@ CONTENT, ) +from .exceptions import InvalidDirectoryPath, error_response +from .model import Tree +from .plot import generate_sunburst +from .dashboard.dashboard import run_app + async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, @@ -66,7 +72,9 @@ return await make_request(swhids) -def directory_filter(path_name: Union[str, bytes], exclude_patterns: Set[Any]) -> bool: +def directory_filter( + path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] +) -> bool: """It checks if the path_name is matching with the patterns given in input. It is also used as a `dir_filter` function when generating the directory @@ -76,7 +84,7 @@ False if the directory has to be ignored, True otherwise """ - path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) + path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) for sre_pattern in exclude_patterns: if sre_pattern.match(str(path)): return False @@ -84,8 +92,8 @@ def get_subpaths( - path: PosixPath, exclude_patterns: Set[Any] -) -> Iterator[Tuple[PosixPath, str]]: + path: Path, exclude_patterns: Iterable[Pattern[str]] +) -> Iterator[Tuple[Path, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. @@ -118,15 +126,15 @@ dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): - sub_path = PosixPath(dirpath).joinpath(node) + sub_path = Path(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( - path: PosixPath, + path: Path, session: aiohttp.ClientSession, api_url: str, - exclude_patterns: Set[Any], + exclude_patterns: Iterable[Pattern[str]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. @@ -153,7 +161,10 @@ async def run( - root: PosixPath, api_url: str, source_tree: Tree, exclude_patterns: Set[Any] + config: Dict[str, Any], + root: str, + source_tree: Tree, + exclude_patterns: Iterable[Pattern[str]], ) -> None: """Start scanning from the given root. @@ -164,6 +175,7 @@ api_url: url for the API request """ + api_url = config["web-api"]["url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( @@ -178,5 +190,61 @@ if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) - async with aiohttp.ClientSession() as session: + if config["web-api"]["auth-token"]: + headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} + else: + headers = {} + + async with aiohttp.ClientSession(headers=headers) as session: await _scan(root, session, api_url, source_tree, exclude_patterns) + + +def extract_regex_objs( + root_path: Path, patterns: Iterable[str] +) -> Iterator[Pattern[str]]: + """Generates a regex object for each pattern given in input and checks if + the path is a subdirectory or relative to the root path. + + Yields: + an SRE_Pattern object + """ + for pattern in patterns: + for path in glob.glob(pattern): + dirpath = Path(path) + if root_path not in dirpath.parents: + error_msg = ( + f'The path "{dirpath}" is not a subdirectory or relative ' + f'to the root directory path: "{root_path}"' + ) + raise InvalidDirectoryPath(error_msg) + + regex = fnmatch.translate(str(PosixPath(pattern))) + yield re.compile(regex) + + +def scan( + config: Dict[str, Any], + root_path: str, + exclude_patterns: Iterable[str], + out_fmt: str, + interactive: bool, +): + """Scan a source code project to discover files and directories already + present in the archive""" + sre_patterns = set() + if exclude_patterns: + sre_patterns = { + reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns) + } + + source_tree = Tree(Path(root_path)) + loop = asyncio.get_event_loop() + loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) + + if interactive: + root = Path(root_path) + directories = source_tree.getDirectoriesInfo(root) + figure = generate_sunburst(directories, root) + run_app(figure, source_tree) + else: + source_tree.show(out_fmt) diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -8,14 +8,25 @@ from .data import correct_api_response, present_swhids, to_exclude_swhid -from swh.scanner.scanner import swhids_discovery, get_subpaths, run +from swh.scanner.scanner import swhids_discovery, get_subpaths, extract_regex_objs, run from swh.scanner.model import Tree -from swh.scanner.cli import extract_regex_objs -from swh.scanner.exceptions import APIError +from swh.scanner.exceptions import APIError, InvalidDirectoryPath aio_url = "http://example.org/api/known/" +def test_extract_regex_objs(temp_folder): + root_path = temp_folder["root"] + + patterns = (str(temp_folder["subdir"]), "/none") + sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] + assert len(sre_patterns) == 2 + + patterns = (*patterns, "/tmp") + with pytest.raises(InvalidDirectoryPath): + sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] + + def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, @@ -71,9 +82,10 @@ def test_scanner_result(live_server, event_loop, test_sample_folder): api_url = live_server.url() + "/" + config = {"web-api": {"url": api_url, "auth-token": None}} source_tree = Tree(test_sample_folder) - event_loop.run_until_complete(run(test_sample_folder, api_url, source_tree, set())) + event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] @@ -87,6 +99,7 @@ live_server, event_loop, test_sample_folder ): api_url = live_server.url() + "/" + config = {"web-api": {"url": api_url, "auth-token": None}} patterns = (str(test_sample_folder) + "/toexclude",) exclude_pattern = { @@ -95,7 +108,7 @@ source_tree = Tree(test_sample_folder) event_loop.run_until_complete( - run(test_sample_folder, api_url, source_tree, exclude_pattern) + run(config, test_sample_folder, source_tree, exclude_pattern) ) for child_node in source_tree.iterate():