diff --git a/swh/scanner/backend.py b/swh/scanner/backend.py --- a/swh/scanner/backend.py +++ b/swh/scanner/backend.py @@ -7,8 +7,7 @@ from .db import Db from .exceptions import LargePayloadExc - -LIMIT = 1000 +from .policy import QUERY_LIMIT def create_app(db: Db): @@ -20,9 +19,10 @@ def known(): swhids = request.get_json() - if len(swhids) > LIMIT: + if len(swhids) > QUERY_LIMIT: raise LargePayloadExc( - f"The maximum number of SWHIDs this endpoint can receive is {LIMIT}" + f"The maximum number of SWHIDs this endpoint can receive is" + f"{QUERY_LIMIT}" ) cur = db.conn.cursor() diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -137,9 +137,9 @@ @click.option( "-p", "--policy", - default="bfs", + default="auto", show_default=True, - type=click.Choice(["bfs", "filepriority", "dirpriority"]), + type=click.Choice(["auto", "bfs", "filepriority", "dirpriority"]), help="The scan policy.", ) @click.pass_context diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py --- a/swh/scanner/policy.py +++ b/swh/scanner/policy.py @@ -16,6 +16,10 @@ from .data import MerkleNodeInfo from .exceptions import error_response +# Maximum number of SWHIDs that can be requested by a single call to the +# Web API endpoint /known/ +QUERY_LIMIT = 1000 + async def swhids_discovery( swhids: List[str], session: aiohttp.ClientSession, api_url: str, @@ -38,12 +42,11 @@ """ endpoint = api_url + "known/" - chunk_size = 1000 requests = [] def get_chunk(swhids): - for i in range(0, len(swhids), chunk_size): - yield swhids[i : i + chunk_size] + for i in range(0, len(swhids), QUERY_LIMIT): + yield swhids[i : i + QUERY_LIMIT] async def make_request(swhids): async with session.post(endpoint, json=swhids) as resp: @@ -52,7 +55,7 @@ return await resp.json() - if len(swhids) > chunk_size: + if len(swhids) > QUERY_LIMIT: for swhids_chunk in get_chunk(swhids): requests.append(asyncio.create_task(make_request(swhids_chunk))) @@ -86,6 +89,11 @@ class LazyBFS(Policy): + """Read nodes in the merkle tree using the BFS algorithm. + Lookup only directories that are unknown otherwise set all the downstream + contents to known. + """ + async def run( self, session: aiohttp.ClientSession, api_url: str, ): @@ -112,6 +120,12 @@ class FilePriority(Policy): + """Check the Merkle tree querying all the file contents and set all the upstream + directories to unknown in the case a file content is unknown. + Finally check all the directories which status is still unknown and set all the + sub-directories of known directories to known. + """ + @no_type_check async def run( self, session: aiohttp.ClientSession, api_url: str, @@ -169,6 +183,13 @@ class DirectoryPriority(Policy): + """Check the Merkle tree querying all the directories that have at least one file + content and set all the upstream directories to unknown in the case a directory + is unknown otherwise set all the downstream contents to known. + Finally check the status of empty directories and all the remaining file + contents. + """ + @no_type_check async def run( self, session: aiohttp.ClientSession, api_url: str, @@ -248,3 +269,18 @@ for _, node in list(dir_.items()): if node.object_type == CONTENT: yield node + + +class QueryAll(Policy): + """Check the status of every node in the Merkle tree. + """ + + @no_type_check + async def run( + self, session: aiohttp.ClientSession, api_url: str, + ): + all_nodes = [node for node in self.source_tree.iter_tree()] + all_swhids = [str(node.swhid()) for node in all_nodes] + swhids_res = await swhids_discovery(all_swhids, session, api_url) + for node in all_nodes: + self.data[node.swhid()]["known"] = swhids_res[str(node.swhid())]["known"] diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -13,7 +13,7 @@ from .data import MerkleNodeInfo from .output import Output -from .policy import DirectoryPriority, FilePriority, LazyBFS +from .policy import QUERY_LIMIT, DirectoryPriority, FilePriority, LazyBFS, QueryAll async def run(config: Dict[str, Any], policy) -> None: @@ -35,8 +35,18 @@ await policy.run(session, api_url) +def source_size(source_tree: Directory): + return len([n for n in source_tree.iter_tree(dedup=False)]) + + def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str): - if policy == "bfs": + if policy == "auto": + return ( + QueryAll(source_tree, nodes_data) + if source_size(source_tree) <= QUERY_LIMIT + else LazyBFS(source_tree, nodes_data) + ) + elif policy == "bfs": return LazyBFS(source_tree, nodes_data) elif policy == "filepriority": return FilePriority(source_tree, nodes_data) diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -14,6 +14,7 @@ from swh.model.cli import model_of_dir from swh.scanner.data import MerkleNodeInfo +from swh.scanner.policy import QUERY_LIMIT from .data import present_swhids from .flask_api import create_app @@ -72,6 +73,21 @@ return model_of_dir(str(test_sample_folder).encode()) +@pytest.fixture(scope="function") +def big_source_tree(tmp_path): + """Generate a model.from_disk.Directory from a "big" temporary directory + (more than 1000 nodes) + """ + dir_ = tmp_path / "big-directory" + dir_.mkdir() + for i in range(0, QUERY_LIMIT + 1): + file_ = dir_ / f"file_{i}.org" + file_.touch() + dir_obj = model_of_dir(str(dir_).encode()) + assert len(dir_obj) > QUERY_LIMIT + return dir_obj + + @pytest.fixture(scope="function") def source_tree_policy(test_sample_folder_policy): """Generate a model.from_disk.Directory object from the test sample diff --git a/swh/scanner/tests/test_backend.py b/swh/scanner/tests/test_backend.py --- a/swh/scanner/tests/test_backend.py +++ b/swh/scanner/tests/test_backend.py @@ -3,8 +3,9 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scanner.backend import LIMIT, create_app +from swh.scanner.backend import create_app from swh.scanner.db import Db +from swh.scanner.policy import QUERY_LIMIT from .data import present_swhids @@ -13,7 +14,7 @@ tmp_dbfile = tmp_path / "tmp_db.sqlite" db = Db(tmp_dbfile) cur = db.conn.cursor() - db.create_from(test_swhids_sample, LIMIT, cur) + db.create_from(test_swhids_sample, QUERY_LIMIT, cur) app = create_app(db) @@ -31,7 +32,7 @@ db = Db(tmp_dbfile) cur = db.conn.cursor() - db.create_from(test_swhids_sample, LIMIT, cur) + db.create_from(test_swhids_sample, QUERY_LIMIT, cur) app = create_app(db) @@ -52,7 +53,7 @@ swhids = [swhid for n in range(1001)] db = Db(tmp_dbfile) cur = db.conn.cursor() - db.create_from(test_swhids_sample, LIMIT, cur) + db.create_from(test_swhids_sample, QUERY_LIMIT, cur) app = create_app(db) diff --git a/swh/scanner/tests/test_db.py b/swh/scanner/tests/test_db.py --- a/swh/scanner/tests/test_db.py +++ b/swh/scanner/tests/test_db.py @@ -4,18 +4,17 @@ # See top-level LICENSE file for more information from swh.scanner.db import Db +from swh.scanner.policy import QUERY_LIMIT from .data import present_swhids -CHUNK_SIZE = 1000 - def test_db_create_from(tmp_path, test_swhids_sample): tmp_dbfile = tmp_path / "tmp_db.sqlite" db = Db(tmp_dbfile) cur = db.conn.cursor() - db.create_from(test_swhids_sample, CHUNK_SIZE, cur) + db.create_from(test_swhids_sample, QUERY_LIMIT, cur) for swhid in present_swhids: cur = db.conn.cursor() @@ -30,7 +29,7 @@ db = Db(tmp_dbfile) cur = db.conn.cursor() - db.create_from(test_swhids_sample, CHUNK_SIZE, cur) + db.create_from(test_swhids_sample, QUERY_LIMIT, cur) for swhid in swhids: cur = db.conn.cursor() diff --git a/swh/scanner/tests/test_scanner.py b/swh/scanner/tests/test_scanner.py --- a/swh/scanner/tests/test_scanner.py +++ b/swh/scanner/tests/test_scanner.py @@ -7,8 +7,8 @@ import pytest from swh.scanner.data import MerkleNodeInfo -from swh.scanner.policy import DirectoryPriority, FilePriority, LazyBFS -from swh.scanner.scanner import run +from swh.scanner.policy import DirectoryPriority, FilePriority, LazyBFS, QueryAll +from swh.scanner.scanner import get_policy_obj, run from .data import unknown_swhids @@ -18,6 +18,16 @@ assert not app.debug +def test_get_policy_obj_auto(source_tree, nodes_data): + assert isinstance(get_policy_obj(source_tree, nodes_data, "auto"), QueryAll) + + +def test_get_policy_obj_bfs(big_source_tree, nodes_data): + # check that the policy object is the LazyBFS if the source tree contains more than + # 1000 nodes + assert isinstance(get_policy_obj(big_source_tree, nodes_data, "auto"), LazyBFS) + + def test_scanner_result_bfs(live_server, event_loop, source_tree): api_url = url_for("index", _external=True) config = {"web-api": {"url": api_url, "auth-token": None}} @@ -58,3 +68,17 @@ assert nodes_data[node.swhid()]["known"] is False else: assert nodes_data[node.swhid()]["known"] is True + + +def test_scanner_result_query_all(live_server, event_loop, source_tree): + api_url = url_for("index", _external=True) + config = {"web-api": {"url": api_url, "auth-token": None}} + + nodes_data = MerkleNodeInfo() + policy = QueryAll(source_tree, nodes_data) + event_loop.run_until_complete(run(config, policy)) + for node in source_tree.iter_tree(): + if str(node.swhid()) in unknown_swhids: + assert nodes_data[node.swhid()]["known"] is False + else: + assert nodes_data[node.swhid()]["known"] is True