Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/scanner.py
Show All 9 Lines | |||||
from typing import List, Dict, Tuple, Iterator, Union, Set, Any | from typing import List, Dict, Tuple, Iterator, Union, Set, Any | ||||
from pathlib import PosixPath | from pathlib import PosixPath | ||||
from .exceptions import error_response | from .exceptions import error_response | ||||
from .model import Tree | from .model import Tree | ||||
from swh.model.from_disk import Directory, Content, accept_all_directories | from swh.model.from_disk import Directory, Content, accept_all_directories | ||||
from swh.model.identifiers import ( | from swh.model.identifiers import ( | ||||
persistent_identifier, | swhid, | ||||
parse_persistent_identifier, | parse_swhid, | ||||
DIRECTORY, | DIRECTORY, | ||||
CONTENT, | CONTENT, | ||||
) | ) | ||||
async def pids_discovery( | async def swhids_discovery( | ||||
pids: List[str], session: aiohttp.ClientSession, api_url: str, | swhids: List[str], session: aiohttp.ClientSession, api_url: str, | ||||
) -> Dict[str, Dict[str, bool]]: | ) -> Dict[str, Dict[str, bool]]: | ||||
"""API Request to get information about the persistent identifiers given in | """API Request to get information about the SoftWare Heritage persistent | ||||
input. | IDentifiers (SWHIDs) given in input. | ||||
Args: | Args: | ||||
pids: a list of persistent identifier | swhids: a list of SWHIDS | ||||
api_url: url for the API request | api_url: url for the API request | ||||
Returns: | Returns: | ||||
A dictionary with: | A dictionary with: | ||||
key: persistent identifier searched | key: SWHID searched | ||||
value: | value: | ||||
value['known'] = True if the pid is found | value['known'] = True if the SWHID is found | ||||
value['known'] = False if the pid is not found | value['known'] = False if the SWHID is not found | ||||
""" | """ | ||||
endpoint = api_url + "known/" | endpoint = api_url + "known/" | ||||
chunk_size = 1000 | chunk_size = 1000 | ||||
requests = [] | requests = [] | ||||
def get_chunk(pids): | def get_chunk(swhids): | ||||
for i in range(0, len(pids), chunk_size): | for i in range(0, len(swhids), chunk_size): | ||||
yield pids[i : i + chunk_size] | yield swhids[i : i + chunk_size] | ||||
async def make_request(pids): | async def make_request(swhids): | ||||
async with session.post(endpoint, json=pids) as resp: | async with session.post(endpoint, json=swhids) as resp: | ||||
if resp.status != 200: | if resp.status != 200: | ||||
error_response(resp.reason, resp.status, endpoint) | error_response(resp.reason, resp.status, endpoint) | ||||
return await resp.json() | return await resp.json() | ||||
if len(pids) > chunk_size: | if len(swhids) > chunk_size: | ||||
for pids_chunk in get_chunk(pids): | for swhids_chunk in get_chunk(swhids): | ||||
requests.append(asyncio.create_task(make_request(pids_chunk))) | requests.append(asyncio.create_task(make_request(swhids_chunk))) | ||||
res = await asyncio.gather(*requests) | res = await asyncio.gather(*requests) | ||||
# concatenate list of dictionaries | # concatenate list of dictionaries | ||||
return dict(itertools.chain.from_iterable(e.items() for e in res)) | return dict(itertools.chain.from_iterable(e.items() for e in res)) | ||||
else: | else: | ||||
return await make_request(pids) | return await make_request(swhids) | ||||
def directory_filter(path_name: Union[str, bytes], exclude_patterns: Set[Any]) -> bool: | def directory_filter(path_name: Union[str, bytes], exclude_patterns: Set[Any]) -> bool: | ||||
"""It checks if the path_name is matching with the patterns given in input. | """It checks if the path_name is matching with the patterns given in input. | ||||
It is also used as a `dir_filter` function when generating the directory | It is also used as a `dir_filter` function when generating the directory | ||||
object from `swh.model.from_disk` | object from `swh.model.from_disk` | ||||
Returns: | Returns: | ||||
False if the directory has to be ignored, True otherwise | False if the directory has to be ignored, True otherwise | ||||
""" | """ | ||||
path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) | path = PosixPath(path_name.decode() if isinstance(path_name, bytes) else path_name) | ||||
for sre_pattern in exclude_patterns: | for sre_pattern in exclude_patterns: | ||||
if sre_pattern.match(str(path)): | if sre_pattern.match(str(path)): | ||||
return False | return False | ||||
return True | return True | ||||
def get_subpaths( | def get_subpaths( | ||||
path: PosixPath, exclude_patterns: Set[Any] | path: PosixPath, exclude_patterns: Set[Any] | ||||
) -> Iterator[Tuple[PosixPath, str]]: | ) -> Iterator[Tuple[PosixPath, str]]: | ||||
"""Find the persistent identifier of the directories and files under a | """Find the SoftWare Heritage persistent IDentifier (SWHID) of | ||||
given path. | the directories and files under a given path. | ||||
Args: | Args: | ||||
path: the root path | path: the root path | ||||
Yields: | Yields: | ||||
pairs of: path, the relative persistent identifier | pairs of: path, the relative SWHID | ||||
""" | """ | ||||
def pid_of(path): | def swhid_of(path): | ||||
if path.is_dir(): | if path.is_dir(): | ||||
if exclude_patterns: | if exclude_patterns: | ||||
def dir_filter(dirpath, *args): | def dir_filter(dirpath, *args): | ||||
return directory_filter(dirpath, exclude_patterns) | return directory_filter(dirpath, exclude_patterns) | ||||
else: | else: | ||||
dir_filter = accept_all_directories | dir_filter = accept_all_directories | ||||
obj = Directory.from_disk( | obj = Directory.from_disk( | ||||
path=bytes(path), dir_filter=dir_filter | path=bytes(path), dir_filter=dir_filter | ||||
).get_data() | ).get_data() | ||||
return persistent_identifier(DIRECTORY, obj) | return swhid(DIRECTORY, obj) | ||||
else: | else: | ||||
obj = Content.from_file(path=bytes(path)).get_data() | obj = Content.from_file(path=bytes(path)).get_data() | ||||
return persistent_identifier(CONTENT, obj) | return swhid(CONTENT, obj) | ||||
dirpath, dnames, fnames = next(os.walk(path)) | dirpath, dnames, fnames = next(os.walk(path)) | ||||
for node in itertools.chain(dnames, fnames): | for node in itertools.chain(dnames, fnames): | ||||
sub_path = PosixPath(dirpath).joinpath(node) | sub_path = PosixPath(dirpath).joinpath(node) | ||||
yield (sub_path, pid_of(sub_path)) | yield (sub_path, swhid_of(sub_path)) | ||||
async def parse_path( | async def parse_path( | ||||
path: PosixPath, | path: PosixPath, | ||||
session: aiohttp.ClientSession, | session: aiohttp.ClientSession, | ||||
api_url: str, | api_url: str, | ||||
exclude_patterns: Set[Any], | exclude_patterns: Set[Any], | ||||
) -> Iterator[Tuple[str, str, bool]]: | ) -> Iterator[Tuple[str, str, bool]]: | ||||
"""Check if the sub paths of the given path are present in the | """Check if the sub paths of the given path are present in the | ||||
archive or not. | archive or not. | ||||
Args: | Args: | ||||
path: the source path | path: the source path | ||||
api_url: url for the API request | api_url: url for the API request | ||||
Returns: | Returns: | ||||
a map containing tuples with: a subpath of the given path, | a map containing tuples with: a subpath of the given path, | ||||
the pid of the subpath and the result of the api call | the SWHID of the subpath and the result of the api call | ||||
""" | """ | ||||
parsed_paths = dict(get_subpaths(path, exclude_patterns)) | parsed_paths = dict(get_subpaths(path, exclude_patterns)) | ||||
parsed_pids = await pids_discovery(list(parsed_paths.values()), session, api_url) | parsed_swhids = await swhids_discovery( | ||||
list(parsed_paths.values()), session, api_url | |||||
) | |||||
def unpack(tup): | def unpack(tup): | ||||
subpath, pid = tup | subpath, swhid = tup | ||||
return (subpath, pid, parsed_pids[pid]["known"]) | return (subpath, swhid, parsed_swhids[swhid]["known"]) | ||||
return map(unpack, parsed_paths.items()) | return map(unpack, parsed_paths.items()) | ||||
async def run( | async def run( | ||||
root: PosixPath, api_url: str, source_tree: Tree, exclude_patterns: Set[Any] | root: PosixPath, api_url: str, source_tree: Tree, exclude_patterns: Set[Any] | ||||
) -> None: | ) -> None: | ||||
"""Start scanning from the given root. | """Start scanning from the given root. | ||||
It fills the source tree with the path discovered. | It fills the source tree with the path discovered. | ||||
Args: | Args: | ||||
root: the root path to scan | root: the root path to scan | ||||
api_url: url for the API request | api_url: url for the API request | ||||
""" | """ | ||||
async def _scan(root, session, api_url, source_tree, exclude_patterns): | async def _scan(root, session, api_url, source_tree, exclude_patterns): | ||||
for path, pid, known in await parse_path( | for path, obj_swhid, known in await parse_path( | ||||
root, session, api_url, exclude_patterns | root, session, api_url, exclude_patterns | ||||
): | ): | ||||
obj_type = parse_persistent_identifier(pid).object_type | obj_type = parse_swhid(obj_swhid).object_type | ||||
if obj_type == CONTENT: | if obj_type == CONTENT: | ||||
source_tree.addNode(path, pid, known) | source_tree.addNode(path, obj_swhid, known) | ||||
elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns): | elif obj_type == DIRECTORY and directory_filter(path, exclude_patterns): | ||||
source_tree.addNode(path, pid, known) | source_tree.addNode(path, obj_swhid, known) | ||||
if not known: | if not known: | ||||
await _scan(path, session, api_url, source_tree, exclude_patterns) | await _scan(path, session, api_url, source_tree, exclude_patterns) | ||||
async with aiohttp.ClientSession() as session: | async with aiohttp.ClientSession() as session: | ||||
await _scan(root, session, api_url, source_tree, exclude_patterns) | await _scan(root, session, api_url, source_tree, exclude_patterns) |