Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/scanner.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
import fnmatch | |||||
import glob | |||||
import itertools | import itertools | ||||
import os | import os | ||||
from pathlib import Path | from pathlib import Path | ||||
import re | |||||
from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union | from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union | ||||
import aiohttp | import aiohttp | ||||
from swh.model.from_disk import Content, Directory, accept_all_directories | from swh.model.from_disk import ( | ||||
Content, | |||||
Directory, | |||||
accept_all_directories, | |||||
extract_regex_objs, | |||||
) | |||||
from swh.model.identifiers import CoreSWHID, ObjectType | from swh.model.identifiers import CoreSWHID, ObjectType | ||||
from .dashboard.dashboard import run_app | from .dashboard.dashboard import run_app | ||||
from .exceptions import InvalidDirectoryPath, error_response | from .exceptions import error_response | ||||
from .model import Tree | from .model import Tree | ||||
from .plot import generate_sunburst | from .plot import generate_sunburst | ||||
async def swhids_discovery( | async def swhids_discovery( | ||||
swhids: List[str], session: aiohttp.ClientSession, api_url: str, | swhids: List[str], session: aiohttp.ClientSession, api_url: str, | ||||
) -> Dict[str, Dict[str, bool]]: | ) -> Dict[str, Dict[str, bool]]: | ||||
"""API Request to get information about the SoftWare Heritage persistent | """API Request to get information about the SoftWare Heritage persistent | ||||
Show All 33 Lines | if len(swhids) > chunk_size: | ||||
res = await asyncio.gather(*requests) | res = await asyncio.gather(*requests) | ||||
# concatenate list of dictionaries | # concatenate list of dictionaries | ||||
return dict(itertools.chain.from_iterable(e.items() for e in res)) | return dict(itertools.chain.from_iterable(e.items() for e in res)) | ||||
else: | else: | ||||
return await make_request(swhids) | return await make_request(swhids) | ||||
def directory_filter( | def directory_filter( | ||||
path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[str]] | path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] | ||||
) -> bool: | ) -> bool: | ||||
"""It checks if the path_name is matching with the patterns given in input. | """It checks if the path_name is matching with the patterns given in input. | ||||
It is also used as a `dir_filter` function when generating the directory | It is also used as a `dir_filter` function when generating the directory | ||||
object from `swh.model.from_disk` | object from `swh.model.from_disk` | ||||
Returns: | Returns: | ||||
False if the directory has to be ignored, True otherwise | False if the directory has to be ignored, True otherwise | ||||
""" | """ | ||||
path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) | path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) | ||||
for sre_pattern in exclude_patterns: | for sre_pattern in exclude_patterns: | ||||
if sre_pattern.match(str(path)): | if sre_pattern.match(bytes(path)): | ||||
return False | return False | ||||
return True | return True | ||||
def get_subpaths( | def get_subpaths( | ||||
path: Path, exclude_patterns: Iterable[Pattern[str]] | path: Path, exclude_patterns: Iterable[Pattern[bytes]] | ||||
) -> Iterator[Tuple[Path, str]]: | ) -> Iterator[Tuple[Path, str]]: | ||||
"""Find the SoftWare Heritage persistent IDentifier (SWHID) of | """Find the SoftWare Heritage persistent IDentifier (SWHID) of | ||||
the directories and files under a given path. | the directories and files under a given path. | ||||
Args: | Args: | ||||
path: the root path | path: the root path | ||||
Yields: | Yields: | ||||
pairs of: path, the relative SWHID | pairs of: path, the relative SWHID | ||||
""" | """ | ||||
def swhid_of(path: Path) -> str: | def swhid_of(path: Path) -> str: | ||||
if path.is_dir(): | if path.is_dir(): | ||||
if exclude_patterns: | if exclude_patterns: | ||||
def dir_filter(dirpath: str, *args) -> bool: | def dir_filter(dirpath: bytes, *args) -> bool: | ||||
return directory_filter(dirpath, exclude_patterns) | return directory_filter(dirpath, exclude_patterns) | ||||
else: | else: | ||||
dir_filter = accept_all_directories # type: ignore | dir_filter = accept_all_directories # type: ignore | ||||
obj = Directory.from_disk( | obj = Directory.from_disk( | ||||
path=bytes(path), dir_filter=dir_filter | path=bytes(path), dir_filter=dir_filter | ||||
).get_data() | ).get_data() | ||||
Show All 10 Lines | for node in itertools.chain(dnames, fnames): | ||||
sub_path = Path(dirpath).joinpath(node) | sub_path = Path(dirpath).joinpath(node) | ||||
yield (sub_path, swhid_of(sub_path)) | yield (sub_path, swhid_of(sub_path)) | ||||
async def parse_path( | async def parse_path( | ||||
path: Path, | path: Path, | ||||
session: aiohttp.ClientSession, | session: aiohttp.ClientSession, | ||||
api_url: str, | api_url: str, | ||||
exclude_patterns: Iterable[Pattern[str]], | exclude_patterns: Iterable[Pattern[bytes]], | ||||
) -> Iterator[Tuple[str, str, bool]]: | ) -> Iterator[Tuple[str, str, bool]]: | ||||
"""Check if the sub paths of the given path are present in the | """Check if the sub paths of the given path are present in the | ||||
archive or not. | archive or not. | ||||
Args: | Args: | ||||
path: the source path | path: the source path | ||||
api_url: url for the API request | api_url: url for the API request | ||||
Show All 13 Lines | ) -> Iterator[Tuple[str, str, bool]]: | ||||
return map(unpack, parsed_paths.items()) | return map(unpack, parsed_paths.items()) | ||||
async def run( | async def run( | ||||
config: Dict[str, Any], | config: Dict[str, Any], | ||||
root: str, | root: str, | ||||
source_tree: Tree, | source_tree: Tree, | ||||
exclude_patterns: Iterable[Pattern[str]], | exclude_patterns: Iterable[Pattern[bytes]], | ||||
) -> None: | ) -> None: | ||||
"""Start scanning from the given root. | """Start scanning from the given root. | ||||
It fills the source tree with the path discovered. | It fills the source tree with the path discovered. | ||||
Args: | Args: | ||||
root: the root path to scan | root: the root path to scan | ||||
api_url: url for the API request | api_url: url for the API request | ||||
Show All 20 Lines | if config["web-api"]["auth-token"]: | ||||
headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | ||||
else: | else: | ||||
headers = {} | headers = {} | ||||
async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: | async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: | ||||
await _scan(root, session, api_url, source_tree, exclude_patterns) | await _scan(root, session, api_url, source_tree, exclude_patterns) | ||||
def extract_regex_objs( | |||||
root_path: Path, patterns: Iterable[str] | |||||
) -> Iterator[Pattern[str]]: | |||||
"""Generates a regex object for each pattern given in input and checks if | |||||
the path is a subdirectory or relative to the root path. | |||||
Yields: | |||||
an SRE_Pattern object | |||||
""" | |||||
for pattern in patterns: | |||||
for path in glob.glob(pattern): | |||||
dirpath = Path(path) | |||||
if root_path not in dirpath.parents: | |||||
error_msg = ( | |||||
f'The path "{dirpath}" is not a subdirectory or relative ' | |||||
f'to the root directory path: "{root_path}"' | |||||
) | |||||
raise InvalidDirectoryPath(error_msg) | |||||
regex = fnmatch.translate((pattern)) | |||||
yield re.compile(regex) | |||||
def scan( | def scan( | ||||
config: Dict[str, Any], | config: Dict[str, Any], | ||||
root_path: str, | root_path: str, | ||||
exclude_patterns: Iterable[str], | exclude_patterns: Iterable[str], | ||||
out_fmt: str, | out_fmt: str, | ||||
interactive: bool, | interactive: bool, | ||||
): | ): | ||||
"""Scan a source code project to discover files and directories already | """Scan a source code project to discover files and directories already | ||||
present in the archive""" | present in the archive""" | ||||
converted_patterns = set(pattern.encode() for pattern in exclude_patterns) | |||||
sre_patterns = set() | sre_patterns = set() | ||||
if exclude_patterns: | if exclude_patterns: | ||||
sre_patterns = { | sre_patterns = { | ||||
reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns) | reg_obj | ||||
for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) | |||||
vlorentz: why not `root_path.encode()`? | |||||
Not Done Inline Actionshonestly i didn't think about it, but probably it's better to convert all the string with .encode() as you said DanSeraf: honestly i didn't think about it, but probably it's better to convert all the string with . | |||||
Done Inline Actionsoh, i just noticed that i don't need to convert the root_path there, thanks! DanSeraf: oh, i just noticed that i don't need to convert the root_path there, thanks! | |||||
} | } | ||||
source_tree = Tree(Path(root_path)) | source_tree = Tree(Path(root_path)) | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) | loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) | ||||
if interactive: | if interactive: | ||||
root = Path(root_path) | root = Path(root_path) | ||||
directories = source_tree.get_directories_info(root) | directories = source_tree.get_directories_info(root) | ||||
figure = generate_sunburst(directories, root) | figure = generate_sunburst(directories, root) | ||||
run_app(figure, source_tree) | run_app(figure, source_tree) | ||||
else: | else: | ||||
source_tree.show(out_fmt) | source_tree.show(out_fmt) |
why not root_path.encode()?