diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -134,14 +134,22 @@ @click.option( "-i", "--interactive", is_flag=True, help="Show the result in a dashboard" ) +@click.option( + "-p", + "--policy", + default="bfs", + show_default=True, + type=click.Choice(["bfs", "filepriority", "dirpriority"]), + help="The scan policy.", +) @click.pass_context -def scan(ctx, root_path, api_url, patterns, out_fmt, interactive): +def scan(ctx, root_path, api_url, patterns, out_fmt, interactive, policy): """Scan a source code project to discover files and directories already present in the archive""" import swh.scanner.scanner as scanner config = setup_config(ctx, api_url) - scanner.scan(config, root_path, patterns, out_fmt, interactive) + scanner.scan(config, root_path, patterns, out_fmt, interactive, policy) @scanner.group("db", help="Manage local knowledge base for swh-scanner") diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py new file mode 100644 --- /dev/null +++ b/swh/scanner/policy.py @@ -0,0 +1,250 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import abc +import asyncio +import itertools +from typing import Dict, List, no_type_check + +import aiohttp + +from swh.model.from_disk import Directory +from swh.model.identifiers import CONTENT, DIRECTORY + +from .data import MerkleNodeInfo +from .exceptions import error_response + + +async def swhids_discovery( + swhids: List[str], session: aiohttp.ClientSession, api_url: str, +) -> Dict[str, Dict[str, bool]]: + """API Request to get information about the SoftWare Heritage persistent + IDentifiers (SWHIDs) given in input. + + Args: + swhids: a list of SWHIDS + api_url: url for the API request + + Returns: + A dictionary with: + + key: + SWHID searched + value: + value['known'] = True if the SWHID is found + value['known'] = False if the SWHID is not found + + """ + endpoint = api_url + "known/" + chunk_size = 1000 + requests = [] + + def get_chunk(swhids): + for i in range(0, len(swhids), chunk_size): + yield swhids[i : i + chunk_size] + + async def make_request(swhids): + async with session.post(endpoint, json=swhids) as resp: + if resp.status != 200: + error_response(resp.reason, resp.status, endpoint) + + return await resp.json() + + if len(swhids) > chunk_size: + for swhids_chunk in get_chunk(swhids): + requests.append(asyncio.create_task(make_request(swhids_chunk))) + + res = await asyncio.gather(*requests) + # concatenate list of dictionaries + return dict(itertools.chain.from_iterable(e.items() for e in res)) + else: + return await make_request(swhids) + + +class Policy(metaclass=abc.ABCMeta): + + data: MerkleNodeInfo + """information about contents and directories of the merkle tree""" + + source_tree: Directory + """representation of a source code project directory in the merkle tree""" + + def __init__(self, source_tree: Directory, data: MerkleNodeInfo): + self.data = data + self.source_tree = source_tree + for node in source_tree.iter_tree(): + self.data[node.swhid()] = {"known": None} # type: ignore + + @abc.abstractmethod + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + """Scan a source code project""" + raise NotImplementedError("Must implement run method") + + +class LazyBFS(Policy): + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + queue = [] + queue.append(self.source_tree) + + while queue: + swhids = [str(node.swhid()) for node in queue] + swhids_res = await swhids_discovery(swhids, session, api_url) + for node in queue.copy(): + queue.remove(node) + self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())][ + "known" + ] + if node.object_type == DIRECTORY: + if not self.data[node.swhid()]["known"]: + children = [n[1] for n in list(node.items())] + queue.extend(children) + else: + for sub_node in node.iter_tree(): + if sub_node == node: + continue + self.data[sub_node.swhid()]["known"] = True # type: ignore + + +class FilePriority(Policy): + @no_type_check + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + # get all the files + all_contents = list( + filter( + lambda node: node.object_type == CONTENT, self.source_tree.iter_tree() + ) + ) + all_contents.reverse() # check nodes from the deepest + + # query the backend to get all file contents status + cnt_swhids = [str(node.swhid()) for node in all_contents] + cnt_status_res = await swhids_discovery(cnt_swhids, session, api_url) + # set all the file contents status + for cnt in all_contents: + self.data[cnt.swhid()]["known"] = cnt_status_res[str(cnt.swhid())]["known"] + # set all the upstream directories of unknown file contents to unknown + if not self.data[cnt.swhid()]["known"]: + parent = cnt.parents[0] + while parent: + self.data[parent.swhid()]["known"] = False + parent = parent.parents[0] if parent.parents else None + + # get all unset directories and check their status + # (update children directories accordingly) + unset_dirs = list( + filter( + lambda node: node.object_type == DIRECTORY + and self.data[node.swhid()]["known"] is None, + self.source_tree.iter_tree(), + ) + ) + + # check unset directories + for dir_ in unset_dirs: + if self.data[dir_.swhid()]["known"] is None: + # update directory status + dir_status = await swhids_discovery( + [str(dir_.swhid())], session, api_url + ) + dir_known = dir_status[str(dir_.swhid())]["known"] + self.data[dir_.swhid()]["known"] = dir_known + if dir_known: + sub_dirs = list( + filter( + lambda n: n.object_type == DIRECTORY + and self.data[n.swhid()]["known"] is None, + dir_.iter_tree(), + ) + ) + for node in sub_dirs: + self.data[node.swhid()]["known"] = True + + +class DirectoryPriority(Policy): + @no_type_check + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + # get all directory contents that have at least one file content + unknown_dirs = list( + filter( + lambda dir_: dir_.object_type == DIRECTORY and self.has_contents(dir_), + self.source_tree.iter_tree(), + ) + ) + unknown_dirs.reverse() + + for dir_ in unknown_dirs: + if self.data[dir_.swhid()]["known"] is None: + dir_status = await swhids_discovery( + [str(dir_.swhid())], session, api_url + ) + dir_known = dir_status[str(dir_.swhid())]["known"] + self.data[dir_.swhid()]["known"] = dir_known + # set all the downstream file contents to known + if dir_known: + for cnt in self.get_contents(dir_): + self.data[cnt.swhid()]["known"] = True + # otherwise set all the upstream directories to unknown + else: + parent = dir_.parents[0] + while parent: + self.data[parent.swhid()]["known"] = False + parent = parent.parents[0] if parent.parents else None + + # get remaining directories that have no file contents + empty_dirs = list( + filter( + lambda n: n.object_type == DIRECTORY + and not self.has_contents(n) + and self.data[n.swhid()]["known"] is None, + self.source_tree.iter_tree(), + ) + ) + empty_dirs_swhids = [str(n.swhid()) for n in empty_dirs] + empty_dir_status = await swhids_discovery(empty_dirs_swhids, session, api_url) + + # update status of directories that have no file contents + for dir_ in empty_dirs: + self.data[dir_.swhid()]["known"] = empty_dir_status[str(dir_.swhid())][ + "known" + ] + + # check unknown file contents + unknown_cnts = list( + filter( + lambda n: n.object_type == CONTENT + and self.data[n.swhid()]["known"] is None, + self.source_tree.iter_tree(), + ) + ) + unknown_cnts_swhids = [str(n.swhid()) for n in unknown_cnts] + unknown_cnts_status = await swhids_discovery( + unknown_cnts_swhids, session, api_url + ) + + for cnt in unknown_cnts: + self.data[cnt.swhid()]["known"] = unknown_cnts_status[str(cnt.swhid())][ + "known" + ] + + def has_contents(self, directory: Directory): + """Check if the directory given in input has contents""" + for entry in directory.entries: + if entry["type"] == "file": + return True + return False + + def get_contents(self, dir_: Directory): + """Get all the contents of a given directory""" + for _, node in list(dir_.items()): + if node.object_type == CONTENT: + yield node diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -4,99 +4,20 @@ # See top-level LICENSE file for more information import asyncio -import itertools -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable import aiohttp from swh.model.cli import model_of_dir from swh.model.from_disk import Directory -from swh.model.identifiers import DIRECTORY from .data import MerkleNodeInfo -from .exceptions import error_response from .output import Output +from .policy import DirectoryPriority, FilePriority, LazyBFS -async def lazy_bfs( - source_tree: Directory, - data: MerkleNodeInfo, - session: aiohttp.ClientSession, - api_url: str, -): - - queue = [] - queue.append(source_tree) - - while queue: - swhids = [str(node.swhid()) for node in queue] - swhids_res = await swhids_discovery(swhids, session, api_url) - for node in queue.copy(): - queue.remove(node) - data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] - if node.object_type == DIRECTORY: - if not data[node.swhid()]["known"]: - children = [n[1] for n in list(node.items())] - queue.extend(children) - else: - for sub_node in node.iter_tree(dedup=False): - if sub_node == node: - continue - data[sub_node.swhid()]["known"] = True # type: ignore - - -async def swhids_discovery( - swhids: List[str], session: aiohttp.ClientSession, api_url: str, -) -> Dict[str, Dict[str, bool]]: - """API Request to get information about the SoftWare Heritage persistent - IDentifiers (SWHIDs) given in input. - - Args: - swhids: a list of SWHIDS - api_url: url for the API request - - Returns: - A dictionary with: - - key: - SWHID searched - value: - value['known'] = True if the SWHID is found - value['known'] = False if the SWHID is not found - - """ - endpoint = api_url + "known/" - chunk_size = 1000 - requests = [] - - def get_chunk(swhids): - for i in range(0, len(swhids), chunk_size): - yield swhids[i : i + chunk_size] - - async def make_request(swhids): - async with session.post(endpoint, json=swhids) as resp: - if resp.status != 200: - error_response(resp.reason, resp.status, endpoint) - - return await resp.json() - - if len(swhids) > chunk_size: - for swhids_chunk in get_chunk(swhids): - requests.append(asyncio.create_task(make_request(swhids_chunk))) - - res = await asyncio.gather(*requests) - # concatenate list of dictionaries - return dict(itertools.chain.from_iterable(e.items() for e in res)) - else: - return await make_request(swhids) - - -async def run( - config: Dict[str, Any], source_tree: Directory, nodes_data: MerkleNodeInfo -) -> None: - """Start scanning from the given root. - - It fills the source tree with the path discovered. +async def run(config: Dict[str, Any], policy) -> None: + """Scan a given source code according to the policy given in input. Args: root: the root path to scan @@ -110,11 +31,19 @@ else: headers = {} - for node in source_tree.iter_tree(): - nodes_data[node.swhid()] = {} # type: ignore - async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: - await lazy_bfs(source_tree, nodes_data, session, api_url) + await policy.run(session, api_url) + + +def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str): + if policy == "bfs": + return LazyBFS(source_tree, nodes_data) + elif policy == "filepriority": + return FilePriority(source_tree, nodes_data) + elif policy == "dirpriority": + return DirectoryPriority(source_tree, nodes_data) + else: + raise Exception(f"policy '{policy}' not found") def scan( @@ -123,15 +52,17 @@ exclude_patterns: Iterable[str], out_fmt: str, interactive: bool, + policy: str, ): """Scan a source code project to discover files and directories already present in the archive""" converted_patterns = [pattern.encode() for pattern in exclude_patterns] source_tree = model_of_dir(root_path.encode(), converted_patterns) nodes_data = MerkleNodeInfo() + policy = get_policy_obj(source_tree, nodes_data, policy) loop = asyncio.get_event_loop() - loop.run_until_complete(run(config, source_tree, nodes_data)) + loop.run_until_complete(run(config, policy)) out = Output(root_path, nodes_data, source_tree) if interactive: diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -103,7 +103,13 @@ @pytest.fixture(scope="session") -def app(): +def tmp_requests(tmpdir_factory): + requests_file = tmpdir_factory.mktemp("data").join("requests.json") + return requests_file + + +@pytest.fixture(scope="session") +def app(tmp_requests): """Flask backend API (used by live_server).""" - app = create_app() + app = create_app(tmp_requests) return app diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py --- a/swh/scanner/tests/flask_api.py +++ b/swh/scanner/tests/flask_api.py @@ -10,7 +10,7 @@ from .data import unknown_swhids -def create_app(): +def create_app(tmp_requests): app = Flask(__name__) @app.route("/") @@ -20,6 +20,9 @@ @app.route("/known/", methods=["POST"]) def known(): swhids = request.get_json() + with open(tmp_requests, "a") as f: + for swhid in swhids: + f.write(swhid + "\n") if len(swhids) > 900: raise LargePayloadExc( diff --git a/swh/scanner/tests/test_policy.py b/swh/scanner/tests/test_policy.py new file mode 100644 --- /dev/null +++ b/swh/scanner/tests/test_policy.py @@ -0,0 +1,147 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json + +from flask import url_for +import pytest + +from swh.scanner.data import MerkleNodeInfo +from swh.scanner.exceptions import APIError +from swh.scanner.policy import ( + DirectoryPriority, + FilePriority, + LazyBFS, + swhids_discovery, +) + +from .data import correct_api_response + +aio_url = "http://example.org/api/known/" + + +def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): + mock_aioresponse.post( + aio_url, + status=200, + content_type="application/json", + body=json.dumps(correct_api_response), + ) + + actual_result = event_loop.run_until_complete( + swhids_discovery([], aiosession, "http://example.org/api/") + ) + + assert correct_api_response == actual_result + + +def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): + mock_aioresponse.post(aio_url, content_type="application/json", status=413) + + with pytest.raises(APIError): + event_loop.run_until_complete( + swhids_discovery([], aiosession, "http://example.org/api/") + ) + + +def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): + + api_url = url_for("index", _external=True) + request = [ + "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) + ] # /known/ is limited at 900 + + with pytest.raises(APIError): + event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) + + +def test_scanner_directory_priority_has_contents(source_tree): + nodes_data = MerkleNodeInfo() + policy = DirectoryPriority(source_tree, nodes_data) + assert policy.has_contents(source_tree[b"/bar/barfoo"]) + + +def assertNodeStatus(nodes_data, node, status): + assert nodes_data[node.swhid()]["status"] == status + + +def get_backend_swhids_order(tmp_requests): + with open(tmp_requests, "r") as f: + backend_swhids_order = f.readlines() + return [x.strip() for x in backend_swhids_order] + + +def test_lazybfs_policy(live_server, aiosession, event_loop, source_tree, tmp_requests): + open(tmp_requests, "w").close() + api_url = url_for("index", _external=True) + + nodes_data = MerkleNodeInfo() + policy = LazyBFS(source_tree, nodes_data) + event_loop.run_until_complete(policy.run(aiosession, api_url)) + + expected_order = [ + "swh:1:dir:0a7b61ef5780b03aa274d11069564980246445ce", + "swh:1:cnt:7d5c08111e21c8a9f71540939998551683375fad", + "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb", + "swh:1:dir:1bfb212e59d0c6d569a94557e3ad9acce73f8357", + "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", + "swh:1:cnt:19102815663d23f8b75a47e7a01965dcdc96468c", + "swh:1:dir:2b41c40f0d1fbffcba12497db71fba83fcca96e5", + "swh:1:cnt:5f1cfce26640056bed3710cfaf3062a6a326a119", + ] + + assert get_backend_swhids_order(tmp_requests) == expected_order + + +def test_directory_priority_policy( + live_server, aiosession, event_loop, source_tree, tmp_requests +): + open(tmp_requests, "w").close() + api_url = url_for("index", _external=True) + + nodes_data = MerkleNodeInfo() + policy = DirectoryPriority(source_tree, nodes_data) + event_loop.run_until_complete(policy.run(aiosession, api_url)) + + expected_order = [ + "swh:1:dir:2b41c40f0d1fbffcba12497db71fba83fcca96e5", + "swh:1:dir:07d4d9ec5c406632d203dbd4631e7863612a0326", + "swh:1:dir:9619a28687b2462efbb5be816bc1185b95753d93", + "swh:1:dir:c3020f6bf135a38c6df3afeb5fb38232c5e07087", + "swh:1:dir:1bfb212e59d0c6d569a94557e3ad9acce73f8357", + "swh:1:cnt:7d5c08111e21c8a9f71540939998551683375fad", + "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb", + "swh:1:cnt:5f1cfce26640056bed3710cfaf3062a6a326a119", + "swh:1:cnt:19102815663d23f8b75a47e7a01965dcdc96468c", + ] + + assert get_backend_swhids_order(tmp_requests) == expected_order + + +def test_file_priority_policy( + live_server, aiosession, event_loop, source_tree, tmp_requests +): + open(tmp_requests, "w").close() + api_url = url_for("index", _external=True) + + nodes_data = MerkleNodeInfo() + policy = FilePriority(source_tree, nodes_data) + event_loop.run_until_complete(policy.run(aiosession, api_url)) + + expected_order = [ + "swh:1:cnt:acac326ddd63b0bc70840659d4ac43619484e69f", + "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a", + "swh:1:cnt:8185dfb2c0c2c597d16f75a8a0c37668567c3d7e", + "swh:1:cnt:19102815663d23f8b75a47e7a01965dcdc96468c", + "swh:1:cnt:5f1cfce26640056bed3710cfaf3062a6a326a119", + "swh:1:cnt:620d4582bfbf773ef15f9b52ac434906a3cdf9c3", + "swh:1:cnt:133693b125bad2b4ac318535b84901ebb1f6b638", + "swh:1:cnt:68769579c3eaadbe555379b9c3538e6628bae1eb", + "swh:1:cnt:7d5c08111e21c8a9f71540939998551683375fad", + "swh:1:dir:1bfb212e59d0c6d569a94557e3ad9acce73f8357", + "swh:1:dir:2b41c40f0d1fbffcba12497db71fba83fcca96e5", + ] + + assert get_backend_swhids_order(tmp_requests) == expected_order diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -1,68 +1,58 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import json - from flask import url_for import pytest from swh.scanner.data import MerkleNodeInfo -from swh.scanner.exceptions import APIError -from swh.scanner.scanner import run, swhids_discovery - -from .data import correct_api_response, unknown_swhids - -aio_url = "http://example.org/api/known/" - +from swh.scanner.policy import DirectoryPriority, FilePriority, LazyBFS +from swh.scanner.scanner import run -def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): - mock_aioresponse.post( - aio_url, - status=200, - content_type="application/json", - body=json.dumps(correct_api_response), - ) +from .data import unknown_swhids - actual_result = event_loop.run_until_complete( - swhids_discovery([], aiosession, "http://example.org/api/") - ) - - assert correct_api_response == actual_result +@pytest.mark.options(debug=False) +def test_app(app): + assert not app.debug -def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): - mock_aioresponse.post(aio_url, content_type="application/json", status=413) - with pytest.raises(APIError): - event_loop.run_until_complete( - swhids_discovery([], aiosession, "http://example.org/api/") - ) +def test_scanner_result_bfs(live_server, event_loop, source_tree): + api_url = url_for("index", _external=True) + config = {"web-api": {"url": api_url, "auth-token": None}} + nodes_data = MerkleNodeInfo() + policy = LazyBFS(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) + for node in source_tree.iter_tree(): + if str(node.swhid()) in unknown_swhids: + assert nodes_data[node.swhid()]["known"] is False + else: + assert nodes_data[node.swhid()]["known"] is True -def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): +def test_scanner_result_file_priority(live_server, event_loop, source_tree): api_url = url_for("index", _external=True) - request = [ - "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) - ] # /known/ is limited at 900 - - with pytest.raises(APIError): - event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) - + config = {"web-api": {"url": api_url, "auth-token": None}} -@pytest.mark.options(debug=False) -def test_app(app): - assert not app.debug + nodes_data = MerkleNodeInfo() + policy = FilePriority(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) + for node in source_tree.iter_tree(): + if str(node.swhid()) in unknown_swhids: + assert nodes_data[node.swhid()]["known"] is False + else: + assert nodes_data[node.swhid()]["known"] is True -def test_scanner_result(live_server, event_loop, source_tree): +def test_scanner_result_directory_priority(live_server, event_loop, source_tree): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} nodes_data = MerkleNodeInfo() - event_loop.run_until_complete(run(config, source_tree, nodes_data)) + policy = DirectoryPriority(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) for node in source_tree.iter_tree(): if str(node.swhid()) in unknown_swhids: assert nodes_data[node.swhid()]["known"] is False