diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py index 06f6100..e8cb60f 100644 --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -1,249 +1,231 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio -import fnmatch -import glob import itertools import os from pathlib import Path -import re from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union import aiohttp -from swh.model.from_disk import Content, Directory, accept_all_directories +from swh.model.from_disk import ( + Content, + Directory, + accept_all_directories, + extract_regex_objs, +) from swh.model.identifiers import CoreSWHID, ObjectType from .dashboard.dashboard import run_app -from .exceptions import InvalidDirectoryPath, error_response +from .exceptions import error_response from .model import Tree from .plot import generate_sunburst async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, ) -> Dict[str, Dict[str, bool]]: """API Request to get information about the SoftWare Heritage persistent IDentifiers (SWHIDs) given in input. Args: swhids: a list of SWHIDS api_url: url for the API request Returns: A dictionary with: key: SWHID searched value: value['known'] = True if the SWHID is found value['known'] = False if the SWHID is not found """ endpoint = api_url + "known/" chunk_size = 1000 requests = [] def get_chunk(swhids): for i in range(0, len(swhids), chunk_size): yield swhids[i : i + chunk_size] async def make_request(swhids): async with session.post(endpoint, json=swhids) as resp: if resp.status != 200: error_response(resp.reason, resp.status, endpoint) return await resp.json() if len(swhids) > chunk_size: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) res = await asyncio.gather(*requests) # concatenate list of dictionaries return dict(itertools.chain.from_iterable(e.items() for e in res)) else: return await make_request(swhids) def directory_filter( - path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] + path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] ) -> bool: """It checks if the path_name is matching with the patterns given in input. It is also used as a `dir_filter` function when generating the directory object from `swh.model.from_disk` Returns: False if the directory has to be ignored, True otherwise """ path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) + for sre_pattern in exclude_patterns: - if sre_pattern.match(str(path)): + if sre_pattern.match(bytes(path)): return False return True def get_subpaths( - path: Path, exclude_patterns: Iterable[Pattern[str]] + path: Path, exclude_patterns: Iterable[Pattern[bytes]] ) -> Iterator[Tuple[Path, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. Args: path: the root path Yields: pairs of: path, the relative SWHID """ def swhid_of(path: Path) -> str: if path.is_dir(): if exclude_patterns: - def dir_filter(dirpath: str, *args) -> bool: + def dir_filter(dirpath: bytes, *args) -> bool: return directory_filter(dirpath, exclude_patterns) else: dir_filter = accept_all_directories # type: ignore obj = Directory.from_disk( path=bytes(path), dir_filter=dir_filter ).get_data() return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"])) else: obj = Content.from_file(path=bytes(path)).get_data() return str( CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"]) ) dirpath, dnames, fnames = next(os.walk(path)) for node in itertools.chain(dnames, fnames): sub_path = Path(dirpath).joinpath(node) yield (sub_path, swhid_of(sub_path)) async def parse_path( path: Path, session: aiohttp.ClientSession, api_url: str, - exclude_patterns: Iterable[Pattern[str]], + exclude_patterns: Iterable[Pattern[bytes]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. Args: path: the source path api_url: url for the API request Returns: a map containing tuples with: a subpath of the given path, the SWHID of the subpath and the result of the api call """ parsed_paths = dict(get_subpaths(path, exclude_patterns)) parsed_swhids = await swhids_discovery( list(parsed_paths.values()), session, api_url ) def unpack(tup): subpath, swhid = tup return (subpath, swhid, parsed_swhids[swhid]["known"]) return map(unpack, parsed_paths.items()) async def run( config: Dict[str, Any], root: str, source_tree: Tree, - exclude_patterns: Iterable[Pattern[str]], + exclude_patterns: Iterable[Pattern[bytes]], ) -> None: """Start scanning from the given root. It fills the source tree with the path discovered. Args: root: the root path to scan api_url: url for the API request """ api_url = config["web-api"]["url"] async def _scan(root, session, api_url, source_tree, exclude_patterns): for path, obj_swhid, known in await parse_path( root, session, api_url, exclude_patterns ): obj_type = CoreSWHID.from_string(obj_swhid).object_type if obj_type == ObjectType.CONTENT: source_tree.add_node(path, obj_swhid, known) elif obj_type == ObjectType.DIRECTORY and directory_filter( path, exclude_patterns ): source_tree.add_node(path, obj_swhid, known) if not known: await _scan(path, session, api_url, source_tree, exclude_patterns) if config["web-api"]["auth-token"]: headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} else: headers = {} async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: await _scan(root, session, api_url, source_tree, exclude_patterns) -def extract_regex_objs( - root_path: Path, patterns: Iterable[str] -) -> Iterator[Pattern[str]]: - """Generates a regex object for each pattern given in input and checks if - the path is a subdirectory or relative to the root path. - - Yields: - an SRE_Pattern object - """ - for pattern in patterns: - for path in glob.glob(pattern): - dirpath = Path(path) - if root_path not in dirpath.parents: - error_msg = ( - f'The path "{dirpath}" is not a subdirectory or relative ' - f'to the root directory path: "{root_path}"' - ) - raise InvalidDirectoryPath(error_msg) - - regex = fnmatch.translate((pattern)) - yield re.compile(regex) - - def scan( config: Dict[str, Any], root_path: str, exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, ): """Scan a source code project to discover files and directories already present in the archive""" + converted_patterns = set(pattern.encode() for pattern in exclude_patterns) sre_patterns = set() if exclude_patterns: sre_patterns = { - reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns) + reg_obj + for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) } source_tree = Tree(Path(root_path)) loop = asyncio.get_event_loop() loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) if interactive: root = Path(root_path) directories = source_tree.get_directories_info(root) figure = generate_sunburst(directories, root) run_app(figure, source_tree) else: source_tree.show(out_fmt) diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py index c813ea7..f757033 100644 --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,118 +1,121 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from flask import url_for import pytest -from swh.scanner.exceptions import APIError, InvalidDirectoryPath +from swh.model.exceptions import InvalidDirectoryPath +from swh.scanner.exceptions import APIError from swh.scanner.model import Tree from swh.scanner.scanner import extract_regex_objs, get_subpaths, run, swhids_discovery from .data import correct_api_response, present_swhids, to_exclude_swhid aio_url = "http://example.org/api/known/" def test_extract_regex_objs(temp_folder): - root_path = temp_folder["root"] + root_path = bytes(temp_folder["root"]) + + patterns = (bytes(temp_folder["subdir"]), b"/none") - patterns = (str(temp_folder["subdir"]), "/none") sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] assert len(sre_patterns) == 2 - patterns = (*patterns, "/tmp") + patterns = (*patterns, b"/tmp") with pytest.raises(InvalidDirectoryPath): sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post( aio_url, status=200, content_type="application/json", body=json.dumps(correct_api_response), ) actual_result = event_loop.run_until_complete( swhids_discovery([], aiosession, "http://example.org/api/") ) assert correct_api_response == actual_result def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): mock_aioresponse.post(aio_url, content_type="application/json", status=413) with pytest.raises(APIError): event_loop.run_until_complete( swhids_discovery([], aiosession, "http://example.org/api/") ) def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): api_url = url_for("index", _external=True) request = [ "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) ] # /known/ is limited at 900 with pytest.raises(APIError): event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) def test_scanner_get_subpaths(temp_folder): root = temp_folder["root"] actual_result = [] for subpath, swhid in get_subpaths(root, tuple()): # also check if it's a symlink since pytest tmp_dir fixture create # also a symlink to each directory inside the tmp_dir path if subpath.is_dir() and not subpath.is_symlink(): actual_result.append((subpath, swhid)) assert len(actual_result) == 2 @pytest.mark.options(debug=False) def test_app(app): assert not app.debug def test_scanner_result(live_server, event_loop, test_sample_folder): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} source_tree = Tree(test_sample_folder) event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] if node_info["swhid"] in present_swhids: assert node_info["known"] is True else: assert node_info["known"] is False def test_scanner_result_with_exclude_patterns( live_server, event_loop, test_sample_folder ): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} + to_exclude_dir = str(test_sample_folder) + "/toexclude" - patterns = (str(test_sample_folder) + "/toexclude",) + patterns = (to_exclude_dir.encode(),) exclude_pattern = { - reg_obj for reg_obj in extract_regex_objs(test_sample_folder, patterns) + reg_obj for reg_obj in extract_regex_objs(bytes(test_sample_folder), patterns) } source_tree = Tree(test_sample_folder) event_loop.run_until_complete( run(config, test_sample_folder, source_tree, exclude_pattern) ) for child_node in source_tree.iterate(): node_info = list(child_node.attributes.values())[0] assert node_info["swhid"] != to_exclude_swhid