diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index 034b53b..ea61a96 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,117 +1,155 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control +import os +from typing import Any, Dict + import click from pathlib import PosixPath from typing import Tuple +from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -@click.group(name="scanner", context_settings=CONTEXT_SETTINGS) -@click.pass_context -def scanner(ctx): - """Software Heritage Scanner tools.""" - pass +# All generic config code should reside in swh.core.config +DEFAULT_CONFIG_PATH = os.environ.get( + "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") +) + + +DEFAULT_CONFIG: Dict[str, Any] = { + "web-api": { + "url": "https://archive.softwareheritage.org/api/1/", + "auth-token": None, + } +} def parse_url(url): if not url.startswith("https://"): url = "https://" + url if not url.endswith("/"): url += "/" return url def extract_regex_objs(root_path: PosixPath, patterns: Tuple[str]) -> object: """Generates a regex object for each pattern given in input and checks if the path is a subdirectory or relative to the root path. Yields: an SRE_Pattern object """ import glob import fnmatch import re from .exceptions import InvalidDirectoryPath for pattern in patterns: for path in glob.glob(pattern): dirpath = PosixPath(path) if root_path not in dirpath.parents: error_msg = ( f'The path "{dirpath}" is not a subdirectory or relative ' f'to the root directory path: "{root_path}"' ) raise InvalidDirectoryPath(error_msg) regex = fnmatch.translate(str(PosixPath(pattern))) yield re.compile(regex) +@click.group(name="scanner", context_settings=CONTEXT_SETTINGS) +@click.option( + "-C", + "--config-file", + default=DEFAULT_CONFIG_PATH, + type=click.Path(exists=True, dir_okay=False, path_type=str), + help="YAML configuration file", +) +@click.pass_context +def scanner(ctx, config_file: str): + """Software Heritage Scanner tools.""" + + # recursive merge not done by config.read + conf = config.read_raw_config(config.config_basepath(config_file)) + conf = config.merge_configs(DEFAULT_CONFIG, conf) + + ctx.ensure_object(dict) + ctx.obj["config"] = conf + + @scanner.command(name="scan") @click.argument("root_path", required=True, type=click.Path(exists=True)) @click.option( "-u", "--api-url", - default="https://archive.softwareheritage.org/api/1", + default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ (e.g., '*.git' to exclude all .git directories)", ) @click.option( "-f", "--format", default="text", show_default=True, type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), help="The output format", ) @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.pass_context def scan(ctx, root_path, api_url, patterns, format, interactive): """Scan a source code project to discover files and directories already present in the archive""" import asyncio from .scanner import run from .model import Tree from .plot import generate_sunburst from .dashboard.dashboard import run_app + config = ctx.obj["config"] + if api_url: + config["web-api"]["url"] = parse_url(api_url) + sre_patterns = set() if patterns: sre_patterns = { reg_obj for reg_obj in extract_regex_objs(PosixPath(root_path), patterns) } - api_url = parse_url(api_url) source_tree = Tree(PosixPath(root_path)) loop = asyncio.get_event_loop() - loop.run_until_complete(run(root_path, api_url, source_tree, sre_patterns)) + loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) if interactive: root = PosixPath(root_path) directories = source_tree.getDirectoriesInfo(root) figure = generate_sunburst(directories, root) run_app(figure, source_tree) else: source_tree.show(format) +def main(): + return scanner(auto_envvar_prefix="SWH_SCANNER") + + if __name__ == "__main__": - scan() + main() diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py index da316b4..8e044b1 100644 --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,182 +1,194 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import itertools import asyncio -import aiohttp -from typing import List, Dict, Tuple, Iterator, Union, Set, Any from pathlib import PosixPath +from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any + +import aiohttp from .exceptions import error_response from .model import Tree from swh.model.from_disk import Directory, Content, accept_all_directories from swh.model.identifiers import ( swhid, parse_swhid, DIRECTORY, CONTENT, ) async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, ) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of SWHIDS api_url: url for the API request Returns: A dictionary with: key: SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = api_url + "known/" chunk_size = 1000 requests = [] def get_chunk(swhids): for i in range(0, len(swhids), chunk_size): yield swhids[i : i + chunk_size] async def make_request(swhids): async with session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > chunk_size: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) -def directory_filter(path_name: Union[str, bytes], exclude_patterns: Set[Any]) -> bool: +def directory_filter( + path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] +) -> bool: """It checks if the path_name is matching with the patterns given in input. It is also used as a `dir_filter` function when generating the directory object from `swh.model.from_disk` Returns: False if the directory has to be ignored, True otherwise """ path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) for sre_pattern in exclude_patterns: if sre_pattern.match(str(path)): return False return True def get_subpaths( - path: PosixPath, exclude_patterns: Set[Any] + path: PosixPath, exclude_patterns: Iterable[Pattern[str]] ) -> Iterator[Tuple[PosixPath, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. Args: path: the root path Yields: pairs of: path, the relative SWHID """ def swhid_of(path): if path.is_dir(): if exclude_patterns: def dir_filter(dirpath, *args): return directory_filter(dirpath, exclude_patterns) else: dir_filter = accept_all_directories obj = Directory.from_disk( path=bytes(path), dir_filter=dir_filter ).get_data() return swhid(DIRECTORY, obj) else: obj = Content.from_file(path=bytes(path)).get_data() return swhid(CONTENT, obj) dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): sub_path = PosixPath(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( path: PosixPath, session: aiohttp.ClientSession, api_url: str, - exclude_patterns: Set[Any], + exclude_patterns: Iterable[Pattern[str]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. Args: path: the source path api_url: url for the API request Returns: a map containing tuples with: a subpath of the given path, the SWHID of the subpath and the result of the api call """ parsed_paths = dict(get_subpaths(path, exclude_patterns)) parsed_swhids = await swhids_discovery( list(parsed_paths.values()), session, api_url ) def unpack(tup): subpath, swhid = tup return (subpath, swhid, parsed_swhids[swhid]["known"]) return map(unpack, parsed_paths.items()) async def run( - root: PosixPath, api_url: str, source_tree: Tree, exclude_patterns: Set[Any] + config: Dict[str, Any], + root: str, + source_tree: Tree, + exclude_patterns: Iterable[Pattern[str]], ) -> None: """Start scanning from the given root. It fills the source tree with the path discovered. Args: root: the root path to scan api_url: url for the API request """ + api_url = config["web-api"]["url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( root, session, api_url, exclude_patterns ): obj_type = parse_swhid(obj_swhid).object_type if obj_type == CONTENT: source_tree.addNode(path, obj_swhid, known) elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns): source_tree.addNode(path, obj_swhid, known) if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) - async with aiohttp.ClientSession() as session: + if config["web-api"]["auth-token"]: + headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} + else: + headers = {} + + async with aiohttp.ClientSession(headers=headers) as session: await _scan(root, session, api_url, source_tree, exclude_patterns) diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py index 0e26780..16c1a56 100644 --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,103 +1,105 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import json from .data import correct_api_response, present_swhids, to_exclude_swhid from swh.scanner.scanner import swhids_discovery, get_subpaths, run from swh.scanner.model import Tree from swh.scanner.cli import extract_regex_objs from swh.scanner.exceptions import APIError aio_url = "http://example.org/api/known/" def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, status=200, content_type="application/json", body=json.dumps(correct_api_response), ) actual_result = event_loop.run_until_complete( swhids_discovery([], aiosession, "http://example.org/api/") ) assert correct_api_response == actual_result def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(aio_url, content_type="application/json", status=413) with pytest.raises(APIError): event_loop.run_until_complete( swhids_discovery([], aiosession, "http://example.org/api/") ) def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): api_url = live_server.url() + "/" request = [ "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) ] # /known/ is limited at 900 with pytest.raises(APIError): event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) def test_scanner_get_subpaths(temp_folder): root = temp_folder["root"] actual_result = [] for subpath, swhid in get_subpaths(root, tuple()): # also check if it's a symlink since pytest tmp_dir fixture create # also a symlink to each directory inside the tmp_dir path if subpath.is_dir() and not subpath.is_symlink(): actual_result.append((subpath, swhid)) assert len(actual_result) == 2 @pytest.mark.options(debug=False) def test_app(app): assert not app.debug def test_scanner_result(live_server, event_loop, test_sample_folder): api_url = live_server.url() + "/" + config = {"web-api": {"url": api_url, "auth-token": None}} source_tree = Tree(test_sample_folder) - event_loop.run_until_complete(run(test_sample_folder, api_url, source_tree, set())) + event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] if node_info["swhid"] in present_swhids: assert node_info["known"] is True else: assert node_info["known"] is False def test_scanner_result_with_exclude_patterns( live_server, event_loop, test_sample_folder ): api_url = live_server.url() + "/" + config = {"web-api": {"url": api_url, "auth-token": None}} patterns = (str(test_sample_folder) + "/toexclude",) exclude_pattern = { reg_obj for reg_obj in extract_regex_objs(test_sample_folder, patterns) } source_tree = Tree(test_sample_folder) event_loop.run_until_complete( - run(test_sample_folder, api_url, source_tree, exclude_pattern) + run(config, test_sample_folder, source_tree, exclude_pattern) ) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] assert node_info["swhid"] != to_exclude_swhid