diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -134,14 +134,22 @@ @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) +@click.option( + "-p", + "--policy", + default="bfs", + show_default=True, + type=click.Choice(["bfs", "filepriority", "dirpriority"]), + help="The scan policy.", +) @click.pass_context -def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): +def scan(ctx, root_path, api_url, patterns, out_fmt, interactive, policy): """Scan a source code project to discover files and directories already present in the archive""" import swh.scanner.scanner as scanner config = setup_config(ctx, api_url) - scanner.scan(config, root_path, patterns, out_fmt, interactive) + scanner.scan(config, root_path, patterns, out_fmt, interactive, policy) @scanner.group("db", help="Manage local knowledge base for swh-scanner") diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py new file mode 100644 --- /dev/null +++ b/swh/scanner/policy.py @@ -0,0 +1,258 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import abc +import asyncio +import itertools +from typing import Dict, List, no_type_check + +import aiohttp + +from swh.model.from_disk import Directory +from swh.model.identifiers import CONTENT, DIRECTORY, ObjectType + +from .data import MerkleNodeInfo +from .exceptions import error_response + + +async def swhids_discovery( + swhids: List[str], session: aiohttp.ClientSession, api_url: str, +) -> Dict[str, Dict[str, bool]]: + """API Request to get information about the SoftWare Heritage persistent + IDentifiers (SWHIDs) given in input. + + Args: + swhids: a list of SWHIDS + api_url: url for the API request + + Returns: + A dictionary with: + + key: + SWHID searched + value: + value['known'] = True if the SWHID is found + value['known'] = False if the SWHID is not found + + """ + endpoint = api_url + "known/" + chunk_size = 1000 + requests = [] + + def get_chunk(swhids): + for i in range(0, len(swhids), chunk_size): + yield swhids[i : i + chunk_size] + + async def make_request(swhids): + async with session.post(endpoint, json=swhids) as resp: + if resp.status != 200: + error_response(resp.reason, resp.status, endpoint) + + return await resp.json() + + if len(swhids) > chunk_size: + for swhids_chunk in get_chunk(swhids): + requests.append(asyncio.create_task(make_request(swhids_chunk))) + + res = await asyncio.gather(*requests) + # concatenate list of dictionaries + return dict(itertools.chain.from_iterable(e.items() for e in res)) + else: + return await make_request(swhids) + + +class Policy(metaclass=abc.ABCMeta): + + data: MerkleNodeInfo + """information about contents and directories of the merkle tree""" + + source_tree: Directory + """representation of a source code project directory in the merkle tree""" + + def __init__(self, source_tree: Directory, data: MerkleNodeInfo): + self.data = data + self.source_tree = source_tree + for node in source_tree.iter_tree(): + self.data[node.swhid()] = {"known": ""} # type: ignore + + @abc.abstractmethod + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + """Scan a source code project""" + raise NotImplementedError("Must implement run method") + + +class LazyBFS(Policy): + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + queue = [] + queue.append(self.source_tree) + + while queue: + swhids = [str(node.swhid()) for node in queue] + swhids_res = await swhids_discovery(swhids, session, api_url) + for node in queue.copy(): + queue.remove(node) + self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ + "known" + ] + if node.object_type == DIRECTORY: + if not self.data[node.swhid()]["known"]: + children = [n[1] for n in list(node.items())] + queue.extend(children) + else: + for sub_node in node.iter_tree(): + if sub_node == node: + continue + self.data[sub_node.swhid()]["known"] = True # type: ignore + + +class FilePriority(Policy): + @no_type_check + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + # get all the files + all_contents = list( + filter( + lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() + ) + ) + all_contents.reverse() # check nodes from the deepest + + # query the backend to get all file contents status + cnt_swhids = [str(node.swhid()) for node in all_contents] + cnt_status_res = await swhids_discovery(cnt_swhids, session, api_url) + # set all the file contents status + for cnt in all_contents: + self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] + # set all the upstream directories of unknown file contents to unknown + if not self.data[cnt.swhid()]["known"]: + parent = cnt.parents[0] + while parent: + self.data[parent.swhid()]["known"] = False + parents = parent.parents + if parents: + parent = parents[0] + else: + parent = None + + # get all unset directories and check their status + # (update children directories accordingly) + unset_dirs = list( + filter( + lambda node: node.object_type == DIRECTORY + and self.data[node.swhid()]["known"] == "", + self.source_tree.iter_tree(), + ) + ) + + # check unset directories + for dir_ in unset_dirs: + if self.data[dir_.swhid()]["known"] == "": + # update directory status + dir_status = await swhids_discovery( + [str(dir_.swhid())], session, api_url + ) + dir_known = dir_status[str(dir_.swhid())]["known"] + self.data[dir_.swhid()]["known"] = dir_known + if dir_known: + sub_dirs = list( + filter( + lambda n: n.object_type == DIRECTORY + and self.data[n.swhid()]["known"] == "", + dir_.iter_tree(), + ) + ) + for node in sub_dirs: + self.data[node.swhid()]["known"] = True + + +class DirectoryPriority(Policy): + @no_type_check + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + # get all directory contents that have at least one file content + unknown_dirs = list( + filter( + lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), + self.source_tree.iter_tree(), + ) + ) + unknown_dirs.reverse() + + for dir_ in unknown_dirs: + if self.data[dir_.swhid()]["known"] == "": + dir_status = await swhids_discovery( + [str(dir_.swhid())], session, api_url + ) + dir_known = dir_status[str(dir_.swhid())]["known"] + self.data[dir_.swhid()]["known"] = dir_known + # set all the downstream file contents to known + if dir_known: + for cnt in self.get_contents(dir_): + self.data[cnt.swhid()]["known"] = True + # otherwise set all the upstream directories to unknown + else: + parent = dir_.parents[0] + while parent: + self.data[parent.swhid()]["known"] = False + parents = parent.parents + if parents: + parent = parents[0] + else: + parent = None + + # get remaining directories that have no file contents + empty_dirs = list( + filter( + lambda n: n.object_type == DIRECTORY + and not self.has_contents(n) + and self.data[n.swhid()]["known"] == "", + self.source_tree.iter_tree(), + ) + ) + empty_dirs_swhids = [str(n.swhid()) for n in empty_dirs] + empty_dir_status = await swhids_discovery(empty_dirs_swhids, session, api_url) + + # update status of directories that have no file contents + for dir_ in empty_dirs: + self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ + "known" + ] + + # check unknown file contents + unknown_cnts = list( + filter( + lambda n: n.object_type == CONTENT + and self.data[n.swhid()]["known"] == "", + self.source_tree.iter_tree(), + ) + ) + unknown_cnts_swhids = [str(n.swhid()) for n in unknown_cnts] + unknown_cnts_status = await swhids_discovery( + unknown_cnts_swhids, session, api_url + ) + + for cnt in unknown_cnts: + self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ + "known" + ] + + def has_contents(self, directory: Directory): + """Check if the directory given in input has contents""" + for entry in directory.entries: + if entry["type"] == ObjectType.CONTENT: + return True + return False + + def get_contents(self, dir_: Directory): + """Get all the contents of a given directory""" + for _, node in list(dir_.items()): + if node.object_type == CONTENT: + yield node diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -4,99 +4,20 @@ # See top-level LICENSE file for more information import asyncio -import itertools -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable import aiohttp from swh.model.cli import model_of_dir from swh.model.from_disk import Directory -from swh.model.identifiers import DIRECTORY from .data import MerkleNodeInfo -from .exceptions import error_response from .output import Output +from .policy import DirectoryPriority, FilePriority, LazyBFS -async def lazy_bfs( - source_tree: Directory, - data: MerkleNodeInfo, - session: aiohttp.ClientSession, - api_url: str, -): - - queue = [] - queue.append(source_tree) - - while queue: - swhids = [str(node.swhid()) for node in queue] - swhids_res = await swhids_discovery(swhids, session, api_url) - for node in queue.copy(): - queue.remove(node) - data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] - if node.object_type == DIRECTORY: - if not data[node.swhid()]["known"]: - children = [n[1] for n in list(node.items())] - queue.extend(children) - else: - for sub_node in node.iter_tree(dedup=False): - if sub_node == node: - continue - data[sub_node.swhid()]["known"] = True # type: ignore - - -async def swhids_discovery( - swhids: List[str], session: aiohttp.ClientSession, api_url: str, -) -> Dict[str, Dict[str, bool]]: - """API Request to get information about the SoftWare Heritage persistent - IDentifiers (SWHIDs) given in input. - - Args: - swhids: a list of SWHIDS - api_url: url for the API request - - Returns: - A dictionary with: - - key: - SWHID searched - value: - value['known'] = True if the SWHID is found - value['known'] = False if the SWHID is not found - - """ - endpoint = api_url + "known/" - chunk_size = 1000 - requests = [] - - def get_chunk(swhids): - for i in range(0, len(swhids), chunk_size): - yield swhids[i : i + chunk_size] - - async def make_request(swhids): - async with session.post(endpoint, json=swhids) as resp: - if resp.status != 200: - error_response(resp.reason, resp.status, endpoint) - - return await resp.json() - - if len(swhids) > chunk_size: - for swhids_chunk in get_chunk(swhids): - requests.append(asyncio.create_task(make_request(swhids_chunk))) - - res = await asyncio.gather(*requests) - # concatenate list of dictionaries - return dict(itertools.chain.from_iterable(e.items() for e in res)) - else: - return await make_request(swhids) - - -async def run( - config: Dict[str, Any], source_tree: Directory, nodes_data: MerkleNodeInfo -) -> None: - """Start scanning from the given root. - - It fills the source tree with the path discovered. +async def run(config: Dict[str, Any], policy) -> None: + """Scan a given source code according to the policy given in input. Args: root: the root path to scan @@ -110,11 +31,19 @@ else: headers = {} - for node in source_tree.iter_tree(): - nodes_data[node.swhid()] = {} # type: ignore - async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: - await lazy_bfs(source_tree, nodes_data, session, api_url) + await policy.run(session, api_url) + + +def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str): + if policy == "bfs": + return LazyBFS(source_tree, nodes_data) + elif policy == "filepriority": + return FilePriority(source_tree, nodes_data) + elif policy == "dirpriority": + return DirectoryPriority(source_tree, nodes_data) + else: + raise Exception(f"policy '{policy}' not found") def scan( @@ -123,15 +52,17 @@ exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, + policy: str, ): """Scan a source code project to discover files and directories already present in the archive""" converted_patterns = [pattern.encode() for pattern in exclude_patterns] source_tree = model_of_dir(root_path.encode(), converted_patterns) nodes_data = MerkleNodeInfo() + policy = get_policy_obj(source_tree, nodes_data, policy) loop = asyncio.get_event_loop() - loop.run_until_complete(run(config, source_tree, nodes_data)) + loop.run_until_complete(run(config, policy)) out = Output(root_path, nodes_data, source_tree) if interactive: diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_policy.py copy from swh/scanner/tests/test_scanner.py copy to swh/scanner/tests/test_policy.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_policy.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -8,11 +8,10 @@ from flask import url_for import pytest -from swh.scanner.data import MerkleNodeInfo from swh.scanner.exceptions import APIError -from swh.scanner.scanner import run, swhids_discovery +from swh.scanner.policy import swhids_discovery -from .data import correct_api_response, unknown_swhids +from .data import correct_api_response aio_url = "http://example.org/api/known/" @@ -50,21 +49,3 @@ with pytest.raises(APIError): event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) - - -@pytest.mark.options(debug=False) -def test_app(app): - assert not app.debug - - -def test_scanner_result(live_server, event_loop, source_tree): - api_url = url_for("index", _external=True) - config = {"web-api": {"url": api_url, "auth-token": None}} - - nodes_data = MerkleNodeInfo() - event_loop.run_until_complete(run(config, source_tree, nodes_data)) - for node in source_tree.iter_tree(): - if str(node.swhid()) in unknown_swhids: - assert nodes_data[node.swhid()]["known"] is False - else: - assert nodes_data[node.swhid()]["known"] is True diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,68 +1,58 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import json - from flask import url_for import pytest from swh.scanner.data import MerkleNodeInfo -from swh.scanner.exceptions import APIError -from swh.scanner.scanner import run, swhids_discovery - -from .data import correct_api_response, unknown_swhids - -aio_url = "http://example.org/api/known/" - +from swh.scanner.policy import DirectoryPriority, FilePriority, LazyBFS +from swh.scanner.scanner import run -def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): - mock_aioresponse.post( - aio_url, - status=200, - content_type="application/json", - body=json.dumps(correct_api_response), - ) +from .data import unknown_swhids - actual_result = event_loop.run_until_complete( - swhids_discovery([], aiosession, "http://example.org/api/") - ) - - assert correct_api_response == actual_result +@pytest.mark.options(debug=False) +def test_app(app): + assert not app.debug -def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): - mock_aioresponse.post(aio_url, content_type="application/json", status=413) - with pytest.raises(APIError): - event_loop.run_until_complete( - swhids_discovery([], aiosession, "http://example.org/api/") - ) +def test_scanner_result_bfs(live_server, event_loop, source_tree): + api_url = url_for("index", _external=True) + config = {"web-api": {"url": api_url, "auth-token": None}} + nodes_data = MerkleNodeInfo() + policy = LazyBFS(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) + for node in source_tree.iter_tree(): + if str(node.swhid()) in unknown_swhids: + assert nodes_data[node.swhid()]["known"] is False + else: + assert nodes_data[node.swhid()]["known"] is True -def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): +def test_scanner_result_file_priority(live_server, event_loop, source_tree): api_url = url_for("index", _external=True) - request = [ - "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) - ] # /known/ is limited at 900 - - with pytest.raises(APIError): - event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) - + config = {"web-api": {"url": api_url, "auth-token": None}} -@pytest.mark.options(debug=False) -def test_app(app): - assert not app.debug + nodes_data = MerkleNodeInfo() + policy = FilePriority(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) + for node in source_tree.iter_tree(): + if str(node.swhid()) in unknown_swhids: + assert nodes_data[node.swhid()]["known"] is False + else: + assert nodes_data[node.swhid()]["known"] is True -def test_scanner_result(live_server, event_loop, source_tree): +def test_scanner_result_directory_priority(live_server, event_loop, source_tree): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} nodes_data = MerkleNodeInfo() - event_loop.run_until_complete(run(config, source_tree, nodes_data)) + policy = DirectoryPriority(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) for node in source_tree.iter_tree(): if str(node.swhid()) in unknown_swhids: assert nodes_data[node.swhid()]["known"] is False