Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/scanner.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | |||||
import itertools | |||||
import asyncio | import asyncio | ||||
import fnmatch | |||||
import glob | |||||
import itertools | |||||
import os | |||||
from pathlib import PosixPath | from pathlib import PosixPath | ||||
import re | |||||
from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any | from typing import List, Dict, Tuple, Iterator, Union, Iterable, Pattern, Any | ||||
import aiohttp | import aiohttp | ||||
from .exceptions import error_response | |||||
from .model import Tree | |||||
from swh.model.from_disk import Directory, Content, accept_all_directories | from swh.model.from_disk import Directory, Content, accept_all_directories | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
swhid, | swhid, | ||||
parse_swhid, | parse_swhid, | ||||
DIRECTORY, | DIRECTORY, | ||||
CONTENT, | CONTENT, | ||||
) | ) | ||||
from .exceptions import InvalidDirectoryPath, error_response | |||||
from .model import Tree | |||||
from .plot import generate_sunburst | |||||
from .dashboard.dashboard import run_app | |||||
async def swhids_discovery( | async def swhids_discovery( | ||||
swhids: List[str], session: aiohttp.ClientSession, api_url: str, | swhids: List[str], session: aiohttp.ClientSession, api_url: str, | ||||
) -> Dict[str, Dict[str, bool]]: | ) -> Dict[str, Dict[str, bool]]: | ||||
"""API Request to get information about the SoftWare Heritage persistent | """API Request to get information about the SoftWare Heritage persistent | ||||
IDentifiers (SWHIDs) given in input. | IDentifiers (SWHIDs) given in input. | ||||
Args: | Args: | ||||
▲ Show 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | ) -> None: | ||||
if config["web-api"]["auth-token"]: | if config["web-api"]["auth-token"]: | ||||
headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | ||||
else: | else: | ||||
headers = {} | headers = {} | ||||
async with aiohttp.ClientSession(headers=headers) as session: | async with aiohttp.ClientSession(headers=headers) as session: | ||||
await _scan(root, session, api_url, source_tree, exclude_patterns) | await _scan(root, session, api_url, source_tree, exclude_patterns) | ||||
def extract_regex_objs( | |||||
root_path: PosixPath, patterns: Iterable[str] | |||||
) -> Iterator[Pattern[str]]: | |||||
"""Generates a regex object for each pattern given in input and checks if | |||||
the path is a subdirectory or relative to the root path. | |||||
Yields: | |||||
an SRE_Pattern object | |||||
""" | |||||
for pattern in patterns: | |||||
for path in glob.glob(pattern): | |||||
dirpath = PosixPath(path) | |||||
if root_path not in dirpath.parents: | |||||
error_msg = ( | |||||
f'The path "{dirpath}" is not a subdirectory or relative ' | |||||
f'to the root directory path: "{root_path}"' | |||||
) | |||||
raise InvalidDirectoryPath(error_msg) | |||||
regex = fnmatch.translate((pattern)) | |||||
yield re.compile(regex) | |||||
def scan( | |||||
config: Dict[str, Any], | |||||
root_path: str, | |||||
exclude_patterns: Iterable[str], | |||||
out_fmt: str, | |||||
interactive: bool, | |||||
): | |||||
"""Scan a source code project to discover files and directories already | |||||
present in the archive""" | |||||
sre_patterns = set() | |||||
if exclude_patterns: | |||||
sre_patterns = { | |||||
reg_obj | |||||
for reg_obj in extract_regex_objs(PosixPath(root_path), exclude_patterns) | |||||
} | |||||
source_tree = Tree(PosixPath(root_path)) | |||||
loop = asyncio.get_event_loop() | |||||
loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) | |||||
if interactive: | |||||
root = PosixPath(root_path) | |||||
directories = source_tree.getDirectoriesInfo(root) | |||||
figure = generate_sunburst(directories, root) | |||||
run_app(figure, source_tree) | |||||
else: | |||||
source_tree.show(out_fmt) |