diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py index ea61a96..032a829 100644 --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -1,155 +1,109 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from typing import Any, Dict import click -from pathlib import PosixPath -from typing import Tuple from swh.core import config from swh.core.cli import CONTEXT_SETTINGS # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) DEFAULT_CONFIG: Dict[str, Any] = { "web-api": { "url": "https://archive.softwareheritage.org/api/1/", "auth-token": None, } } def parse_url(url): + """CLI-specific helper to 'autocomplete' the provided url.""" if not url.startswith("https://"): url = "https://" + url if not url.endswith("/"): url += "/" return url -def extract_regex_objs(root_path: PosixPath, patterns: Tuple[str]) -> object: - """Generates a regex object for each pattern given in input and checks if - the path is a subdirectory or relative to the root path. - - Yields: - an SRE_Pattern object - """ - import glob - import fnmatch - import re - from .exceptions import InvalidDirectoryPath - - for pattern in patterns: - for path in glob.glob(pattern): - dirpath = PosixPath(path) - if root_path not in dirpath.parents: - error_msg = ( - f'The path "{dirpath}" is not a subdirectory or relative ' - f'to the root directory path: "{root_path}"' - ) - raise InvalidDirectoryPath(error_msg) - - regex = fnmatch.translate(str(PosixPath(pattern))) - yield re.compile(regex) - - @click.group(name="scanner", context_settings=CONTEXT_SETTINGS) @click.option( "-C", "--config-file", default=DEFAULT_CONFIG_PATH, type=click.Path(exists=True, dir_okay=False, path_type=str), help="YAML configuration file", ) @click.pass_context def scanner(ctx, config_file: str): """Software Heritage Scanner tools.""" # recursive merge not done by config.read conf = config.read_raw_config(config.config_basepath(config_file)) conf = config.merge_configs(DEFAULT_CONFIG, conf) ctx.ensure_object(dict) ctx.obj["config"] = conf @scanner.command(name="scan") @click.argument("root_path", required=True, type=click.Path(exists=True)) @click.option( "-u", "--api-url", default=None, metavar="API_URL", show_default=True, help="URL for the api request", ) @click.option( "--exclude", "-x", "patterns", metavar="PATTERN", multiple=True, help="Exclude directories using glob patterns \ (e.g., '*.git' to exclude all .git directories)", ) @click.option( "-f", - "--format", + "--output-format", + "out_fmt", default="text", show_default=True, type=click.Choice(["text", "json", "ndjson", "sunburst"], case_sensitive=False), help="The output format", ) @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) @click.pass_context -def scan(ctx, root_path, api_url, patterns, format, interactive): +def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): """Scan a source code project to discover files and directories already present in the archive""" - import asyncio - from .scanner import run - from .model import Tree - from .plot import generate_sunburst - from .dashboard.dashboard import run_app + from .scanner import scan config = ctx.obj["config"] if api_url: config["web-api"]["url"] = parse_url(api_url) - sre_patterns = set() - if patterns: - sre_patterns = { - reg_obj for reg_obj in extract_regex_objs(PosixPath(root_path), patterns) - } - - source_tree = Tree(PosixPath(root_path)) - loop = asyncio.get_event_loop() - loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) - - if interactive: - root = PosixPath(root_path) - directories = source_tree.getDirectoriesInfo(root) - figure = generate_sunburst(directories, root) - run_app(figure, source_tree) - else: - source_tree.show(format) + scan(config, root_path, patterns, out_fmt, interactive) def main(): return scanner(auto_envvar_prefix="SWH_SCANNER") if __name__ == "__main__": main() diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py index 8e044b1..8e0a805 100644 --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,194 +1,251 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os -import itertools import asyncio +import fnmatch +import glob +import itertools +import os from pathlib import PosixPath +import re from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any import aiohttp -from .exceptions import error_response -from .model import Tree - from swh.model.from_disk import Directory, Content, accept_all_directories from swh.model.identifiers import ( swhid, parse_swhid, DIRECTORY, CONTENT, ) +from .exceptions import InvalidDirectoryPath, error_response +from .model import Tree +from .plot import generate_sunburst +from .dashboard.dashboard import run_app + async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, ) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of SWHIDS api_url: url for the API request Returns: A dictionary with: key: SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = api_url + "known/" chunk_size = 1000 requests = [] def get_chunk(swhids): for i in range(0, len(swhids), chunk_size): yield swhids[i : i + chunk_size] async def make_request(swhids): async with session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > chunk_size: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) def directory_filter( path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] ) -> bool: """It checks if the path_name is matching with the patterns given in input. It is also used as a `dir_filter` function when generating the directory object from `swh.model.from_disk` Returns: False if the directory has to be ignored, True otherwise """ path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) for sre_pattern in exclude_patterns: if sre_pattern.match(str(path)): return False return True def get_subpaths( path: PosixPath, exclude_patterns: Iterable[Pattern[str]] ) -> Iterator[Tuple[PosixPath, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. Args: path: the root path Yields: pairs of: path, the relative SWHID """ def swhid_of(path): if path.is_dir(): if exclude_patterns: def dir_filter(dirpath, *args): return directory_filter(dirpath, exclude_patterns) else: dir_filter = accept_all_directories obj = Directory.from_disk( path=bytes(path), dir_filter=dir_filter ).get_data() return swhid(DIRECTORY, obj) else: obj = Content.from_file(path=bytes(path)).get_data() return swhid(CONTENT, obj) dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): sub_path = PosixPath(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( path: PosixPath, session: aiohttp.ClientSession, api_url: str, exclude_patterns: Iterable[Pattern[str]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. Args: path: the source path api_url: url for the API request Returns: a map containing tuples with: a subpath of the given path, the SWHID of the subpath and the result of the api call """ parsed_paths = dict(get_subpaths(path, exclude_patterns)) parsed_swhids = await swhids_discovery( list(parsed_paths.values()), session, api_url ) def unpack(tup): subpath, swhid = tup return (subpath, swhid, parsed_swhids[swhid]["known"]) return map(unpack, parsed_paths.items()) async def run( config: Dict[str, Any], root: str, source_tree: Tree, exclude_patterns: Iterable[Pattern[str]], ) -> None: """Start scanning from the given root. It fills the source tree with the path discovered. Args: root: the root path to scan api_url: url for the API request """ api_url = config["web-api"]["url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( root, session, api_url, exclude_patterns ): obj_type = parse_swhid(obj_swhid).object_type if obj_type == CONTENT: source_tree.addNode(path, obj_swhid, known) elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns): source_tree.addNode(path, obj_swhid, known) if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} async with aiohttp.ClientSession(headers=headers) as session: await _scan(root, session, api_url, source_tree, exclude_patterns) + + +def extract_regex_objs( + root_path: PosixPath, patterns: Iterable[str] +) -> Iterator[Pattern[str]]: + """Generates a regex object for each pattern given in input and checks if + the path is a subdirectory or relative to the root path. + + Yields: + an SRE_Pattern object + """ + for pattern in patterns: + for path in glob.glob(pattern): + dirpath = PosixPath(path) + if root_path not in dirpath.parents: + error_msg = ( + f'The path "{dirpath}" is not a subdirectory or relative ' + f'to the root directory path: "{root_path}"' + ) + raise InvalidDirectoryPath(error_msg) + + regex = fnmatch.translate((pattern)) + yield re.compile(regex) + + +def scan( + config: Dict[str, Any], + root_path: str, + exclude_patterns: Iterable[str], + out_fmt: str, + interactive: bool, +): + """Scan a source code project to discover files and directories already + present in the archive""" + sre_patterns = set() + if exclude_patterns: + sre_patterns = { + reg_obj + for reg_obj in extract_regex_objs(PosixPath(root_path), exclude_patterns) + } + + source_tree = Tree(PosixPath(root_path)) + loop = asyncio.get_event_loop() + loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) + + if interactive: + root = PosixPath(root_path) + directories = source_tree.getDirectoriesInfo(root) + figure = generate_sunburst(directories, root) + run_app(figure, source_tree) + else: + source_tree.show(out_fmt) diff --git a/swh/scanner/tests/test_cli.py b/swh/scanner/tests/test_cli.py deleted file mode 100644 index a8c4059..0000000 --- a/swh/scanner/tests/test_cli.py +++ /dev/null @@ -1,16 +0,0 @@ -import pytest - -from swh.scanner.cli import extract_regex_objs -from swh.scanner.exceptions import InvalidDirectoryPath - - -def test_extract_regex_objs(temp_folder): - root_path = temp_folder["root"] - - patterns = (str(temp_folder["subdir"]), "/none") - sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] - assert len(sre_patterns) == 2 - - patterns = (*patterns, "/tmp") - with pytest.raises(InvalidDirectoryPath): - sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py index 16c1a56..ca58eca 100644 --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,105 +1,116 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import json from .data import correct_api_response, present_swhids, to_exclude_swhid -from swh.scanner.scanner import swhids_discovery, get_subpaths, run +from swh.scanner.scanner import swhids_discovery, get_subpaths, extract_regex_objs, run from swh.scanner.model import Tree -from swh.scanner.cli import extract_regex_objs -from swh.scanner.exceptions import APIError +from swh.scanner.exceptions import APIError, InvalidDirectoryPath aio_url = "http://example.org/api/known/" +def test_extract_regex_objs(temp_folder): + root_path = temp_folder["root"] + + patterns = (str(temp_folder["subdir"]), "/none") + sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] + assert len(sre_patterns) == 2 + + patterns = (*patterns, "/tmp") + with pytest.raises(InvalidDirectoryPath): + sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] + + def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, status=200, content_type="application/json", body=json.dumps(correct_api_response), ) actual_result = event_loop.run_until_complete( swhids_discovery([], aiosession, "http://example.org/api/") ) assert correct_api_response == actual_result def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(aio_url, content_type="application/json", status=413) with pytest.raises(APIError): event_loop.run_until_complete( swhids_discovery([], aiosession, "http://example.org/api/") ) def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): api_url = live_server.url() + "/" request = [ "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) ] # /known/ is limited at 900 with pytest.raises(APIError): event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) def test_scanner_get_subpaths(temp_folder): root = temp_folder["root"] actual_result = [] for subpath, swhid in get_subpaths(root, tuple()): # also check if it's a symlink since pytest tmp_dir fixture create # also a symlink to each directory inside the tmp_dir path if subpath.is_dir() and not subpath.is_symlink(): actual_result.append((subpath, swhid)) assert len(actual_result) == 2 @pytest.mark.options(debug=False) def test_app(app): assert not app.debug def test_scanner_result(live_server, event_loop, test_sample_folder): api_url = live_server.url() + "/" config = {"web-api": {"url": api_url, "auth-token": None}} source_tree = Tree(test_sample_folder) event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] if node_info["swhid"] in present_swhids: assert node_info["known"] is True else: assert node_info["known"] is False def test_scanner_result_with_exclude_patterns( live_server, event_loop, test_sample_folder ): api_url = live_server.url() + "/" config = {"web-api": {"url": api_url, "auth-token": None}} patterns = (str(test_sample_folder) + "/toexclude",) exclude_pattern = { reg_obj for reg_obj in extract_regex_objs(test_sample_folder, patterns) } source_tree = Tree(test_sample_folder) event_loop.run_until_complete( run(config, test_sample_folder, source_tree, exclude_pattern) ) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] assert node_info["swhid"] != to_exclude_swhid