diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -139,7 +139,7 @@ "--policy", default="auto", show_default=True, - type=click.Choice(["auto", "bfs", "filepriority", "dirpriority"]), + type=click.Choice(["auto", "bfs", "greedybfs", "filepriority", "dirpriority"]), help="The scan policy.", ) @click.pass_context @@ -155,6 +155,9 @@ bfs: scan the source code in the BFS order, checking unknown directories only. + greedybfs: same as "bfs" policy, but lookup the status of source code artifacts in + chunks, in order to minimize the number of Web API round-trips with the archive. + filepriority: scan all the source code file contents, checking only unset directories. (useful if the codebase contains a lot of source files) diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py --- a/swh/scanner/policy.py +++ b/swh/scanner/policy.py @@ -10,6 +10,7 @@ import aiohttp +from swh.core.utils import grouper from swh.model.from_disk import Directory from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID @@ -67,6 +68,12 @@ return await make_request(swhids) +def source_size(source_tree: Directory): + """return the size of a source tree as the number of nodes it contains + """ + return sum(1 for n in source_tree.iter_tree(dedup=False)) + + class Policy(metaclass=abc.ABCMeta): data: MerkleNodeInfo @@ -120,6 +127,48 @@ self.data[sub_node.swhid()]["known"] = True # type: ignore +class GreedyBFS(Policy): + """Query graph nodes in chunks (to maximize the Web API rate limit use) and set the + downstream contents of known directories to known. + """ + + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + ssize = source_size(self.source_tree) + seen = [] + + async for nodes_chunk in self.get_nodes_chunks(session, api_url, ssize): + for node in nodes_chunk: + seen.append(node) + if len(seen) == ssize: + return + if node.object_type == DIRECTORY and self.data[node.swhid()]["known"]: + sub_nodes = [n for n in node.iter_tree(dedup=False)] + sub_nodes.remove(node) # remove root node + for sub_node in sub_nodes: + seen.append(sub_node) + self.data[sub_node.swhid()]["known"] = True + + @no_type_check + async def get_nodes_chunks( + self, session: aiohttp.ClientSession, api_url: str, ssize: int + ): + """Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API + rate limit. It query all the nodes in the case the source code contains + less than QUERY_LIMIT nodes. + """ + nodes = self.source_tree.iter_tree(dedup=False) + for nodes_chunk in grouper(nodes, QUERY_LIMIT): + nodes_chunk = [n for n in nodes_chunk] + swhids = [node.swhid() for node in nodes_chunk] + swhids_res = await swhids_discovery(swhids, session, api_url) + for node in nodes_chunk: + swhid = node.swhid() + self.data[swhid]["known"] = swhids_res[str(swhid)]["known"] + yield nodes_chunk + + class FilePriority(Policy): """Check the Merkle tree querying all the file contents and set all the upstream directories to unknown in the case a file content is unknown. diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -13,7 +13,15 @@ from .data import MerkleNodeInfo from .output import Output -from .policy import QUERY_LIMIT, DirectoryPriority, FilePriority, LazyBFS, QueryAll +from .policy import ( + QUERY_LIMIT, + DirectoryPriority, + FilePriority, + GreedyBFS, + LazyBFS, + QueryAll, + source_size, +) async def run(config: Dict[str, Any], policy) -> None: @@ -35,10 +43,6 @@ await policy.run(session, api_url) -def source_size(source_tree: Directory): - return len([n for n in source_tree.iter_tree(dedup=False)]) - - def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str): if policy == "auto": return ( @@ -48,6 +52,8 @@ ) elif policy == "bfs": return LazyBFS(source_tree, nodes_data) + elif policy == "greedybfs": + return GreedyBFS(source_tree, nodes_data) elif policy == "filepriority": return FilePriority(source_tree, nodes_data) elif policy == "dirpriority": diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -7,6 +7,7 @@ import os from pathlib import Path import shutil +import sys import aiohttp from aioresponses import aioresponses # type: ignore @@ -78,13 +79,17 @@ """Generate a model.from_disk.Directory from a "big" temporary directory (more than 1000 nodes) """ + # workaround to avoid a RecursionError that could be generated while creating + # a large number of directories + sys.setrecursionlimit(1100) dir_ = tmp_path / "big-directory" - dir_.mkdir() + sub_dirs = dir_ for i in range(0, QUERY_LIMIT + 1): - file_ = dir_ / f"file_{i}.org" - file_.touch() + sub_dirs = sub_dirs / "dir" + sub_dirs.mkdir(parents=True, exist_ok=True) + file_ = sub_dirs / "file.org" + file_.touch() dir_obj = model_of_dir(str(dir_).encode()) - assert len(dir_obj) > QUERY_LIMIT return dir_obj diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py --- a/swh/scanner/tests/flask_api.py +++ b/swh/scanner/tests/flask_api.py @@ -6,6 +6,7 @@ from flask import Flask, request from swh.scanner.exceptions import LargePayloadExc +from swh.scanner.policy import QUERY_LIMIT from .data import unknown_swhids @@ -24,9 +25,10 @@ for swhid in swhids: f.write(swhid + "\n") - if len(swhids) > 900: + if len(swhids) > QUERY_LIMIT: raise LargePayloadExc( - "The maximum number of SWHIDs this endpoint can receive is 900" + f"The maximum number of SWHIDs this endpoint can receive is " + f"{QUERY_LIMIT}" ) res = {swhid: {"known": False} for swhid in swhids} diff --git a/swh/scanner/tests/test_policy.py b/swh/scanner/tests/test_policy.py --- a/swh/scanner/tests/test_policy.py +++ b/swh/scanner/tests/test_policy.py @@ -8,13 +8,15 @@ from flask import url_for import pytest -from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.identifiers import CONTENT, CoreSWHID, ObjectType from swh.scanner.data import MerkleNodeInfo from swh.scanner.exceptions import APIError from swh.scanner.policy import ( DirectoryPriority, FilePriority, + GreedyBFS, LazyBFS, + source_size, swhids_discovery, ) @@ -47,17 +49,6 @@ ) -def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): - - api_url = url_for("index", _external=True) - request = [ - "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) - ] # /known/ is limited at 900 - - with pytest.raises(APIError): - event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) - - def test_scanner_directory_priority_has_contents(source_tree): nodes_data = MerkleNodeInfo() policy = DirectoryPriority(source_tree, nodes_data) @@ -143,3 +134,35 @@ for swhid in backend_swhids_requests[5:]: assert CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY + + +def test_greedy_bfs_policy( + live_server, event_loop, aiosession, big_source_tree, tmp_requests +): + open(tmp_requests, "w").close() + api_url = url_for("index", _external=True) + + nodes_data = MerkleNodeInfo() + policy = GreedyBFS(big_source_tree, nodes_data) + event_loop.run_until_complete(policy.run(aiosession, api_url)) + + backend_swhids_requests = get_backend_swhids_order(tmp_requests) + + last_swhid = backend_swhids_requests[-1] + assert CoreSWHID.from_string(last_swhid).object_type == ObjectType.CONTENT + + +@pytest.mark.asyncio +async def test_greedy_bfs_get_nodes_chunks(live_server, aiosession, big_source_tree): + api_url = url_for("index", _external=True) + + nodes_data = MerkleNodeInfo() + policy = GreedyBFS(big_source_tree, nodes_data) + chunks = [ + n_chunk + async for n_chunk in policy.get_nodes_chunks( + aiosession, api_url, source_size(big_source_tree) + ) + ] + assert len(chunks) == 2 + assert chunks[1][-1].object_type == CONTENT