diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -4,21 +4,23 @@ # See top-level LICENSE file for more information import asyncio -import fnmatch -import glob import itertools import os from pathlib import Path -import re from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union import aiohttp -from swh.model.from_disk import Content, Directory, accept_all_directories +from swh.model.from_disk import ( + Content, + Directory, + accept_all_directories, + extract_regex_objs, +) from swh.model.identifiers import CoreSWHID, ObjectType from .dashboard.dashboard import run_app -from .exceptions import InvalidDirectoryPath, error_response +from .exceptions import error_response from .model import Tree from .plot import generate_sunburst @@ -68,7 +70,7 @@ def directory_filter( - path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] + path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] ) -> bool: """It checks if the path_name is matching with the patterns given in input. @@ -80,14 +82,15 @@ """ path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) + for sre_pattern in exclude_patterns: - if sre_pattern.match(str(path)): + if sre_pattern.match(bytes(path)): return False return True def get_subpaths( - path: Path, exclude_patterns: Iterable[Pattern[str]] + path: Path, exclude_patterns: Iterable[Pattern[bytes]] ) -> Iterator[Tuple[Path, str]]: """Find the SoftWare Heritage persistent IDentifier (SWHID) of the directories and files under a given path. @@ -104,7 +107,7 @@ if path.is_dir(): if exclude_patterns: - def dir_filter(dirpath: str, *args) -> bool: + def dir_filter(dirpath: bytes, *args) -> bool: return directory_filter(dirpath, exclude_patterns) else: @@ -131,7 +134,7 @@ path: Path, session: aiohttp.ClientSession, api_url: str, - exclude_patterns: Iterable[Pattern[str]], + exclude_patterns: Iterable[Pattern[bytes]], ) -> Iterator[Tuple[str, str, bool]]: """Check if the sub paths of the given path are present in the archive or not. @@ -161,7 +164,7 @@ config: Dict[str, Any], root: str, source_tree: Tree, - exclude_patterns: Iterable[Pattern[str]], + exclude_patterns: Iterable[Pattern[bytes]], ) -> None: """Start scanning from the given root. @@ -198,29 +201,6 @@ await _scan(root, session, api_url, source_tree, exclude_patterns) -def extract_regex_objs( - root_path: Path, patterns: Iterable[str] -) -> Iterator[Pattern[str]]: - """Generates a regex object for each pattern given in input and checks if - the path is a subdirectory or relative to the root path. - - Yields: - an SRE_Pattern object - """ - for pattern in patterns: - for path in glob.glob(pattern): - dirpath = Path(path) - if root_path not in dirpath.parents: - error_msg = ( - f'The path "{dirpath}" is not a subdirectory or relative ' - f'to the root directory path: "{root_path}"' - ) - raise InvalidDirectoryPath(error_msg) - - regex = fnmatch.translate((pattern)) - yield re.compile(regex) - - def scan( config: Dict[str, Any], root_path: str, @@ -230,10 +210,12 @@ ): """Scan a source code project to discover files and directories already present in the archive""" + converted_patterns = set(pattern.encode() for pattern in exclude_patterns) sre_patterns = set() if exclude_patterns: sre_patterns = { - reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns) + reg_obj + for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) } source_tree = Tree(Path(root_path)) diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -8,7 +8,8 @@ from flask import url_for import pytest -from swh.scanner.exceptions import APIError, InvalidDirectoryPath +from swh.model.exceptions import InvalidDirectoryPath +from swh.scanner.exceptions import APIError from swh.scanner.model import Tree from swh.scanner.scanner import extract_regex_objs, get_subpaths, run, swhids_discovery @@ -18,13 +19,14 @@ def test_extract_regex_objs(temp_folder): - root_path = temp_folder["root"] + root_path = bytes(temp_folder["root"]) + + patterns = (bytes(temp_folder["subdir"]), b"/none") - patterns = (str(temp_folder["subdir"]), "/none") sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] assert len(sre_patterns) == 2 - patterns = (*patterns, "/tmp") + patterns = (*patterns, b"/tmp") with pytest.raises(InvalidDirectoryPath): sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] @@ -102,10 +104,11 @@ ): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} + to_exclude_dir = str(test_sample_folder) + "/toexclude" - patterns = (str(test_sample_folder) + "/toexclude",) + patterns = (to_exclude_dir.encode(),) exclude_pattern = { - reg_obj for reg_obj in extract_regex_objs(test_sample_folder, patterns) + reg_obj for reg_obj in extract_regex_objs(bytes(test_sample_folder), patterns) } source_tree = Tree(test_sample_folder)