Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/scanner.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | import asyncio | ||||
import itertools | import itertools | ||||
import os | from typing import Any, Dict, Iterable, List | ||||
from pathlib import Path | |||||
from typing import Any, Dict, Iterable, Iterator, List, Pattern, Tuple, Union | |||||
import aiohttp | import aiohttp | ||||
from swh.model.from_disk import ( | from swh.model.cli import model_of_dir | ||||
Content, | from swh.model.from_disk import Directory | ||||
Directory, | from swh.model.identifiers import DIRECTORY | ||||
accept_all_directories, | |||||
extract_regex_objs, | |||||
) | |||||
from swh.model.identifiers import CoreSWHID, ObjectType | |||||
from .dashboard.dashboard import run_app | from .data import MerkleNodeInfo | ||||
from .exceptions import error_response | from .exceptions import error_response | ||||
from .model import Tree | from .output import Output | ||||
from .plot import generate_sunburst | |||||
zack: While we are at this, i think we need a better name for this algo. How about simply `bfs`? Or… | |||||
Not Done Inline ActionsOK! DanSeraf: OK! | |||||
async def lazy_bfs( | |||||
source_tree: Directory, | |||||
data: MerkleNodeInfo, | |||||
session: aiohttp.ClientSession, | |||||
api_url: str, | |||||
): | |||||
queue = [] | |||||
queue.append(source_tree) | |||||
Not Done Inline Actionsbetter: while queue zack: better: `while queue` | |||||
while queue: | |||||
swhids = [str(node.swhid()) for node in queue] | |||||
swhids_res = await swhids_discovery(swhids, session, api_url) | |||||
for node in queue.copy(): | |||||
queue.remove(node) | |||||
data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] | |||||
if node.object_type == DIRECTORY: | |||||
Not Done Inline Actionsconstant zack: constant | |||||
if not data[node.swhid()]["known"]: | |||||
children = [n[1] for n in list(node.items())] | |||||
queue.extend(children) | |||||
else: | |||||
for sub_node in node.iter_tree(dedup=False): | |||||
if sub_node == node: | |||||
continue | |||||
data[sub_node.swhid()]["known"] = True # type: ignore | |||||
async def swhids_discovery( | async def swhids_discovery( | ||||
swhids: List[str], session: aiohttp.ClientSession, api_url: str, | swhids: List[str], session: aiohttp.ClientSession, api_url: str, | ||||
) -> Dict[str, Dict[str, bool]]: | ) -> Dict[str, Dict[str, bool]]: | ||||
"""API Request to get information about the SoftWare Heritage persistent | """API Request to get information about the SoftWare Heritage persistent | ||||
IDentifiers (SWHIDs) given in input. | IDentifiers (SWHIDs) given in input. | ||||
Show All 32 Lines | if len(swhids) > chunk_size: | ||||
res = await asyncio.gather(*requests) | res = await asyncio.gather(*requests) | ||||
# concatenate list of dictionaries | # concatenate list of dictionaries | ||||
return dict(itertools.chain.from_iterable(e.items() for e in res)) | return dict(itertools.chain.from_iterable(e.items() for e in res)) | ||||
else: | else: | ||||
return await make_request(swhids) | return await make_request(swhids) | ||||
def directory_filter( | |||||
path_name: Union[str, bytes], exclude_patterns: Iterable[Pattern[bytes]] | |||||
) -> bool: | |||||
"""It checks if the path_name is matching with the patterns given in input. | |||||
It is also used as a `dir_filter` function when generating the directory | |||||
object from `swh.model.from_disk` | |||||
Returns: | |||||
False if the directory has to be ignored, True otherwise | |||||
""" | |||||
path = Path(path_name.decode() if isinstance(path_name, bytes) else path_name) | |||||
for sre_pattern in exclude_patterns: | |||||
if sre_pattern.match(bytes(path)): | |||||
return False | |||||
return True | |||||
def get_subpaths( | |||||
path: Path, exclude_patterns: Iterable[Pattern[bytes]] | |||||
) -> Iterator[Tuple[Path, str]]: | |||||
"""Find the SoftWare Heritage persistent IDentifier (SWHID) of | |||||
the directories and files under a given path. | |||||
Args: | |||||
path: the root path | |||||
Yields: | |||||
pairs of: path, the relative SWHID | |||||
""" | |||||
def swhid_of(path: Path) -> str: | |||||
if path.is_dir(): | |||||
if exclude_patterns: | |||||
def dir_filter(dirpath: bytes, *args) -> bool: | |||||
return directory_filter(dirpath, exclude_patterns) | |||||
else: | |||||
dir_filter = accept_all_directories # type: ignore | |||||
obj = Directory.from_disk( | |||||
path=bytes(path), dir_filter=dir_filter | |||||
).get_data() | |||||
return str(CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj["id"])) | |||||
else: | |||||
obj = Content.from_file(path=bytes(path)).get_data() | |||||
return str( | |||||
CoreSWHID(object_type=ObjectType.CONTENT, object_id=obj["sha1_git"]) | |||||
) | |||||
dirpath, dnames, fnames = next(os.walk(path)) | |||||
for node in itertools.chain(dnames, fnames): | |||||
sub_path = Path(dirpath).joinpath(node) | |||||
yield (sub_path, swhid_of(sub_path)) | |||||
async def parse_path( | |||||
path: Path, | |||||
session: aiohttp.ClientSession, | |||||
api_url: str, | |||||
exclude_patterns: Iterable[Pattern[bytes]], | |||||
) -> Iterator[Tuple[str, str, bool]]: | |||||
"""Check if the sub paths of the given path are present in the | |||||
archive or not. | |||||
Args: | |||||
path: the source path | |||||
api_url: url for the API request | |||||
Returns: | |||||
a map containing tuples with: a subpath of the given path, | |||||
the SWHID of the subpath and the result of the api call | |||||
""" | |||||
parsed_paths = dict(get_subpaths(path, exclude_patterns)) | |||||
parsed_swhids = await swhids_discovery( | |||||
list(parsed_paths.values()), session, api_url | |||||
) | |||||
def unpack(tup): | |||||
subpath, swhid = tup | |||||
return (subpath, swhid, parsed_swhids[swhid]["known"]) | |||||
return map(unpack, parsed_paths.items()) | |||||
async def run( | async def run( | ||||
config: Dict[str, Any], | config: Dict[str, Any], source_tree: Directory, nodes_data: MerkleNodeInfo | ||||
root: str, | |||||
source_tree: Tree, | |||||
exclude_patterns: Iterable[Pattern[bytes]], | |||||
) -> None: | ) -> None: | ||||
"""Start scanning from the given root. | """Start scanning from the given root. | ||||
It fills the source tree with the path discovered. | It fills the source tree with the path discovered. | ||||
Args: | Args: | ||||
root: the root path to scan | root: the root path to scan | ||||
api_url: url for the API request | api_url: url for the API request | ||||
""" | """ | ||||
api_url = config["web-api"]["url"] | api_url = config["web-api"]["url"] | ||||
async def _scan(root, session, api_url, source_tree, exclude_patterns): | |||||
for path, obj_swhid, known in await parse_path( | |||||
root, session, api_url, exclude_patterns | |||||
): | |||||
obj_type = CoreSWHID.from_string(obj_swhid).object_type | |||||
if obj_type == ObjectType.CONTENT: | |||||
source_tree.add_node(path, obj_swhid, known) | |||||
elif obj_type == ObjectType.DIRECTORY and directory_filter( | |||||
path, exclude_patterns | |||||
): | |||||
source_tree.add_node(path, obj_swhid, known) | |||||
if not known: | |||||
await _scan(path, session, api_url, source_tree, exclude_patterns) | |||||
if config["web-api"]["auth-token"]: | if config["web-api"]["auth-token"]: | ||||
headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | ||||
else: | else: | ||||
headers = {} | headers = {} | ||||
for node in source_tree.iter_tree(): | |||||
nodes_data[node.swhid()] = {} # type: ignore | |||||
async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: | async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: | ||||
await _scan(root, session, api_url, source_tree, exclude_patterns) | await lazy_bfs(source_tree, nodes_data, session, api_url) | ||||
def scan( | def scan( | ||||
config: Dict[str, Any], | config: Dict[str, Any], | ||||
root_path: str, | root_path: str, | ||||
exclude_patterns: Iterable[str], | exclude_patterns: Iterable[str], | ||||
out_fmt: str, | out_fmt: str, | ||||
interactive: bool, | interactive: bool, | ||||
): | ): | ||||
"""Scan a source code project to discover files and directories already | """Scan a source code project to discover files and directories already | ||||
present in the archive""" | present in the archive""" | ||||
converted_patterns = set(pattern.encode() for pattern in exclude_patterns) | converted_patterns = [pattern.encode() for pattern in exclude_patterns] | ||||
sre_patterns = set() | source_tree = model_of_dir(root_path.encode(), converted_patterns) | ||||
if exclude_patterns: | nodes_data = MerkleNodeInfo() | ||||
sre_patterns = { | |||||
reg_obj | |||||
for reg_obj in extract_regex_objs(root_path.encode(), converted_patterns) | |||||
} | |||||
source_tree = Tree(Path(root_path)) | |||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
loop.run_until_complete(run(config, root_path, source_tree, sre_patterns)) | loop.run_until_complete(run(config, source_tree, nodes_data)) | ||||
out = Output(root_path, nodes_data, source_tree) | |||||
if interactive: | if interactive: | ||||
root = Path(root_path) | out.show("interactive") | ||||
directories = source_tree.get_directories_info(root) | |||||
figure = generate_sunburst(directories, root) | |||||
run_app(figure, source_tree) | |||||
else: | else: | ||||
source_tree.show(out_fmt) | out.show(out_fmt) |
While we are at this, i think we need a better name for this algo. How about simply bfs? Or maybe lazy_bfs if you want to highlight the "stop" part more.