Page MenuHomeSoftware Heritage

D6065.diff
No OneTemporary

D6065.diff

diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py
--- a/swh/scanner/cli.py
+++ b/swh/scanner/cli.py
@@ -139,7 +139,7 @@
"--policy",
default="auto",
show_default=True,
- type=click.Choice(["auto", "bfs", "filepriority", "dirpriority"]),
+ type=click.Choice(["auto", "bfs", "greedybfs", "filepriority", "dirpriority"]),
help="The scan policy.",
)
@click.pass_context
@@ -155,6 +155,9 @@
bfs: scan the source code in the BFS order, checking unknown directories only.
+ greedybfs: same as "bfs" policy, but lookup the status of source code artifacts in
+ chunks, in order to minimize the number of Web API round-trips with the archive.
+
filepriority: scan all the source code file contents, checking only unset
directories. (useful if the codebase contains a lot of source files)
diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py
--- a/swh/scanner/policy.py
+++ b/swh/scanner/policy.py
@@ -10,6 +10,7 @@
import aiohttp
+from swh.core.utils import grouper
from swh.model.from_disk import Directory
from swh.model.identifiers import CONTENT, DIRECTORY, CoreSWHID
@@ -67,6 +68,12 @@
return await make_request(swhids)
+def source_size(source_tree: Directory):
+ """return the size of a source tree as the number of nodes it contains
+ """
+ return sum(1 for n in source_tree.iter_tree(dedup=False))
+
+
class Policy(metaclass=abc.ABCMeta):
data: MerkleNodeInfo
@@ -120,6 +127,48 @@
self.data[sub_node.swhid()]["known"] = True # type: ignore
+class GreedyBFS(Policy):
+ """Query graph nodes in chunks (to maximize the Web API rate limit use) and set the
+ downstream contents of known directories to known.
+ """
+
+ async def run(
+ self, session: aiohttp.ClientSession, api_url: str,
+ ):
+ ssize = source_size(self.source_tree)
+ seen = []
+
+ async for nodes_chunk in self.get_nodes_chunks(session, api_url, ssize):
+ for node in nodes_chunk:
+ seen.append(node)
+ if len(seen) == ssize:
+ return
+ if node.object_type == DIRECTORY and self.data[node.swhid()]["known"]:
+ sub_nodes = [n for n in node.iter_tree(dedup=False)]
+ sub_nodes.remove(node) # remove root node
+ for sub_node in sub_nodes:
+ seen.append(sub_node)
+ self.data[sub_node.swhid()]["known"] = True
+
+ @no_type_check
+ async def get_nodes_chunks(
+ self, session: aiohttp.ClientSession, api_url: str, ssize: int
+ ):
+ """Query chunks of QUERY_LIMIT nodes at once in order to fill the Web API
+ rate limit. It query all the nodes in the case the source code contains
+ less than QUERY_LIMIT nodes.
+ """
+ nodes = self.source_tree.iter_tree(dedup=False)
+ for nodes_chunk in grouper(nodes, QUERY_LIMIT):
+ nodes_chunk = [n for n in nodes_chunk]
+ swhids = [node.swhid() for node in nodes_chunk]
+ swhids_res = await swhids_discovery(swhids, session, api_url)
+ for node in nodes_chunk:
+ swhid = node.swhid()
+ self.data[swhid]["known"] = swhids_res[str(swhid)]["known"]
+ yield nodes_chunk
+
+
class FilePriority(Policy):
"""Check the Merkle tree querying all the file contents and set all the upstream
directories to unknown in the case a file content is unknown.
diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py
--- a/swh/scanner/scanner.py
+++ b/swh/scanner/scanner.py
@@ -13,7 +13,15 @@
from .data import MerkleNodeInfo
from .output import Output
-from .policy import QUERY_LIMIT, DirectoryPriority, FilePriority, LazyBFS, QueryAll
+from .policy import (
+ QUERY_LIMIT,
+ DirectoryPriority,
+ FilePriority,
+ GreedyBFS,
+ LazyBFS,
+ QueryAll,
+ source_size,
+)
async def run(config: Dict[str, Any], policy) -> None:
@@ -35,10 +43,6 @@
await policy.run(session, api_url)
-def source_size(source_tree: Directory):
- return len([n for n in source_tree.iter_tree(dedup=False)])
-
-
def get_policy_obj(source_tree: Directory, nodes_data: MerkleNodeInfo, policy: str):
if policy == "auto":
return (
@@ -48,6 +52,8 @@
)
elif policy == "bfs":
return LazyBFS(source_tree, nodes_data)
+ elif policy == "greedybfs":
+ return GreedyBFS(source_tree, nodes_data)
elif policy == "filepriority":
return FilePriority(source_tree, nodes_data)
elif policy == "dirpriority":
diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py
--- a/swh/scanner/tests/conftest.py
+++ b/swh/scanner/tests/conftest.py
@@ -7,6 +7,7 @@
import os
from pathlib import Path
import shutil
+import sys
import aiohttp
from aioresponses import aioresponses # type: ignore
@@ -78,13 +79,17 @@
"""Generate a model.from_disk.Directory from a "big" temporary directory
(more than 1000 nodes)
"""
+ # workaround to avoid a RecursionError that could be generated while creating
+ # a large number of directories
+ sys.setrecursionlimit(1100)
dir_ = tmp_path / "big-directory"
- dir_.mkdir()
+ sub_dirs = dir_
for i in range(0, QUERY_LIMIT + 1):
- file_ = dir_ / f"file_{i}.org"
- file_.touch()
+ sub_dirs = sub_dirs / "dir"
+ sub_dirs.mkdir(parents=True, exist_ok=True)
+ file_ = sub_dirs / "file.org"
+ file_.touch()
dir_obj = model_of_dir(str(dir_).encode())
- assert len(dir_obj) > QUERY_LIMIT
return dir_obj
diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py
--- a/swh/scanner/tests/flask_api.py
+++ b/swh/scanner/tests/flask_api.py
@@ -6,6 +6,7 @@
from flask import Flask, request
from swh.scanner.exceptions import LargePayloadExc
+from swh.scanner.policy import QUERY_LIMIT
from .data import unknown_swhids
@@ -24,9 +25,10 @@
for swhid in swhids:
f.write(swhid + "\n")
- if len(swhids) > 900:
+ if len(swhids) > QUERY_LIMIT:
raise LargePayloadExc(
- "The maximum number of SWHIDs this endpoint can receive is 900"
+ f"The maximum number of SWHIDs this endpoint can receive is "
+ f"{QUERY_LIMIT}"
)
res = {swhid: {"known": False} for swhid in swhids}
diff --git a/swh/scanner/tests/test_policy.py b/swh/scanner/tests/test_policy.py
--- a/swh/scanner/tests/test_policy.py
+++ b/swh/scanner/tests/test_policy.py
@@ -8,13 +8,15 @@
from flask import url_for
import pytest
-from swh.model.identifiers import CoreSWHID, ObjectType
+from swh.model.identifiers import CONTENT, CoreSWHID, ObjectType
from swh.scanner.data import MerkleNodeInfo
from swh.scanner.exceptions import APIError
from swh.scanner.policy import (
DirectoryPriority,
FilePriority,
+ GreedyBFS,
LazyBFS,
+ source_size,
swhids_discovery,
)
@@ -47,17 +49,6 @@
)
-def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server):
-
- api_url = url_for("index", _external=True)
- request = [
- "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901)
- ] # /known/ is limited at 900
-
- with pytest.raises(APIError):
- event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url))
-
-
def test_scanner_directory_priority_has_contents(source_tree):
nodes_data = MerkleNodeInfo()
policy = DirectoryPriority(source_tree, nodes_data)
@@ -143,3 +134,35 @@
for swhid in backend_swhids_requests[5:]:
assert CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY
+
+
+def test_greedy_bfs_policy(
+ live_server, event_loop, aiosession, big_source_tree, tmp_requests
+):
+ open(tmp_requests, "w").close()
+ api_url = url_for("index", _external=True)
+
+ nodes_data = MerkleNodeInfo()
+ policy = GreedyBFS(big_source_tree, nodes_data)
+ event_loop.run_until_complete(policy.run(aiosession, api_url))
+
+ backend_swhids_requests = get_backend_swhids_order(tmp_requests)
+
+ last_swhid = backend_swhids_requests[-1]
+ assert CoreSWHID.from_string(last_swhid).object_type == ObjectType.CONTENT
+
+
+@pytest.mark.asyncio
+async def test_greedy_bfs_get_nodes_chunks(live_server, aiosession, big_source_tree):
+ api_url = url_for("index", _external=True)
+
+ nodes_data = MerkleNodeInfo()
+ policy = GreedyBFS(big_source_tree, nodes_data)
+ chunks = [
+ n_chunk
+ async for n_chunk in policy.get_nodes_chunks(
+ aiosession, api_url, source_size(big_source_tree)
+ )
+ ]
+ assert len(chunks) == 2
+ assert chunks[1][-1].object_type == CONTENT

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 12:46 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225444

Event Timeline