Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/scanner.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
import fnmatch | import fnmatch | ||||
import glob | import glob | ||||
import itertools | import itertools | ||||
import os | import os | ||||
from pathlib import PosixPath | from pathlib import Path | ||||
import re | import re | ||||
from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any | from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any | ||||
import aiohttp | import aiohttp | ||||
from swh.model.from_disk import Directory, Content, accept_all_directories | from swh.model.from_disk import Directory, Content, accept_all_directories | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
swhid, | swhid, | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | ) -> bool: | ||||
It is also used as a `dir_filter` function when generating the directory | It is also used as a `dir_filter` function when generating the directory | ||||
object from `swh.model.from_disk` | object from `swh.model.from_disk` | ||||
Returns: | Returns: | ||||
False if the directory has to be ignored, True otherwise | False if the directory has to be ignored, True otherwise | ||||
""" | """ | ||||
path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) | path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) | ||||
for sre_pattern in exclude_patterns: | for sre_pattern in exclude_patterns: | ||||
if sre_pattern.match(str(path)): | if sre_pattern.match(str(path)): | ||||
return False | return False | ||||
return True | return True | ||||
def get_subpaths( | def get_subpaths( | ||||
path: PosixPath, exclude_patterns: Iterable[Pattern[str]] | path: Path, exclude_patterns: Iterable[Pattern[str]] | ||||
) -> Iterator[Tuple[PosixPath, str]]: | ) -> Iterator[Tuple[Path, str]]: | ||||
"""Find the SoftWare Heritage persistent IDentifier (SWHID) of | """Find the SoftWare Heritage persistent IDentifier (SWHID) of | ||||
the directories and files under a given path. | the directories and files under a given path. | ||||
Args: | Args: | ||||
path: the root path | path: the root path | ||||
Yields: | Yields: | ||||
pairs of: path, the relative SWHID | pairs of: path, the relative SWHID | ||||
Show All 16 Lines | def swhid_of(path): | ||||
return swhid(DIRECTORY, obj) | return swhid(DIRECTORY, obj) | ||||
else: | else: | ||||
obj = Content.from_file(path=bytes(path)).get_data() | obj = Content.from_file(path=bytes(path)).get_data() | ||||
return swhid(CONTENT, obj) | return swhid(CONTENT, obj) | ||||
dirpath, dnames, fnames = next(os.walk(path)) | dirpath, dnames, fnames = next(os.walk(path)) | ||||
for node in itertools.chain(dnames, fnames): | for node in itertools.chain(dnames, fnames): | ||||
sub_path = PosixPath(dirpath).joinpath(node) | sub_path = Path(dirpath).joinpath(node) | ||||
yield (sub_path, swhid_of(sub_path)) | yield (sub_path, swhid_of(sub_path)) | ||||
async def parse_path( | async def parse_path( | ||||
path: PosixPath, | path: Path, | ||||
session: aiohttp.ClientSession, | session: aiohttp.ClientSession, | ||||
api_url: str, | api_url: str, | ||||
exclude_patterns: Iterable[Pattern[str]], | exclude_patterns: Iterable[Pattern[str]], | ||||
) -> Iterator[Tuple[str, str, bool]]: | ) -> Iterator[Tuple[str, str, bool]]: | ||||
"""Check if the sub paths of the given path are present in the | """Check if the sub paths of the given path are present in the | ||||
archive or not. | archive or not. | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | ) -> None: | ||||
else: | else: | ||||
headers = {} | headers = {} | ||||
async with aiohttp.ClientSession(headers=headers) as session: | async with aiohttp.ClientSession(headers=headers) as session: | ||||
await _scan(root, session, api_url, source_tree, exclude_patterns) | await _scan(root, session, api_url, source_tree, exclude_patterns) | ||||
def extract_regex_objs( | def extract_regex_objs( | ||||
root_path: PosixPath, patterns: Iterable[str] | root_path: Path, patterns: Iterable[str] | ||||
) -> Iterator[Pattern[str]]: | ) -> Iterator[Pattern[str]]: | ||||
"""Generates a regex object for each pattern given in input and checks if | """Generates a regex object for each pattern given in input and checks if | ||||
the path is a subdirectory or relative to the root path. | the path is a subdirectory or relative to the root path. | ||||
Yields: | Yields: | ||||
an SRE_Pattern object | an SRE_Pattern object | ||||
""" | """ | ||||
for pattern in patterns: | for pattern in patterns: | ||||
for path in glob.glob(pattern): | for path in glob.glob(pattern): | ||||
dirpath = PosixPath(path) | dirpath = Path(path) | ||||
if root_path not in dirpath.parents: | if root_path not in dirpath.parents: | ||||
error_msg = ( | error_msg = ( | ||||
f'The path "{dirpath}" is not a subdirectory or relative ' | f'The path "{dirpath}" is not a subdirectory or relative ' | ||||
f'to the root directory path: "{root_path}"' | f'to the root directory path: "{root_path}"' | ||||
) | ) | ||||
raise InvalidDirectoryPath(error_msg) | raise InvalidDirectoryPath(error_msg) | ||||
regex = fnmatch.translate((pattern)) | regex = fnmatch.translate((pattern)) | ||||
yield re.compile(regex) | yield re.compile(regex) | ||||
def scan( | def scan( | ||||
config: Dict[str, Any], | config: Dict[str, Any], | ||||
root_path: str, | root_path: str, | ||||
exclude_patterns: Iterable[str], | exclude_patterns: Iterable[str], | ||||
out_fmt: str, | out_fmt: str, | ||||
interactive: bool, | interactive: bool, | ||||
): | ): | ||||
"""Scan a source code project to discover files and directories already | """Scan a source code project to discover files and directories already | ||||
present in the archive""" | present in the archive""" | ||||
sre_patterns = set() | sre_patterns = set() | ||||
if exclude_patterns: | if exclude_patterns: | ||||
sre_patterns = { | sre_patterns = { | ||||
reg_obj | reg_obj for reg_obj in extract_regex_objs(Path(root_path), exclude_patterns) | ||||
for reg_obj in extract_regex_objs(PosixPath(root_path), exclude_patterns) | |||||
} | } | ||||
source_tree = Tree(PosixPath(root_path)) | source_tree = Tree(Path(root_path)) | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) | loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) | ||||
if interactive: | if interactive: | ||||
root = PosixPath(root_path) | root = Path(root_path) | ||||
directories = source_tree.getDirectoriesInfo(root) | directories = source_tree.getDirectoriesInfo(root) | ||||
figure = generate_sunburst(directories, root) | figure = generate_sunburst(directories, root) | ||||
run_app(figure, source_tree) | run_app(figure, source_tree) | ||||
else: | else: | ||||
source_tree.show(out_fmt) | source_tree.show(out_fmt) |