Page MenuHomeSoftware Heritage

D8539.id30878.diff
No OneTemporary

D8539.id30878.diff

diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py
--- a/swh/scanner/cli.py
+++ b/swh/scanner/cli.py
@@ -142,7 +142,9 @@
"--policy",
default="auto",
show_default=True,
- type=click.Choice(["auto", "bfs", "greedybfs", "filepriority", "dirpriority"]),
+ type=click.Choice(
+ ["auto", "bfs", "greedybfs", "filepriority", "dirpriority", "randomdir"]
+ ),
help="The scan policy.",
)
@click.option(
@@ -178,6 +180,8 @@
dirpriority: scan all the source code directories and check only unknown
directory contents.
+ randomdir: scan the source code using a random Merkle search on directories.
+
Other information about software artifacts could be specified with the -e/
--extra-info option:\n
\b
diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py
--- a/swh/scanner/policy.py
+++ b/swh/scanner/policy.py
@@ -4,16 +4,20 @@
# See top-level LICENSE file for more information
import abc
-from typing import no_type_check
+import itertools
+from typing import Iterable, List, no_type_check
from swh.core.utils import grouper
-from swh.model.from_disk import Directory
+from swh.loader.core import discovery
+from swh.model import from_disk
+from swh.model.from_disk import model
+from swh.model.model import Sha1Git
from .client import QUERY_LIMIT, Client
from .data import MerkleNodeInfo
-def source_size(source_tree: Directory):
+def source_size(source_tree: from_disk.Directory):
"""return the size of a source tree as the number of nodes it contains"""
return sum(1 for n in source_tree.iter_tree(dedup=False))
@@ -23,10 +27,10 @@
data: MerkleNodeInfo
"""information about contents and directories of the merkle tree"""
- source_tree: Directory
+ source_tree: from_disk.Directory
"""representation of a source code project directory in the merkle tree"""
- def __init__(self, source_tree: Directory, data: MerkleNodeInfo):
+ def __init__(self, source_tree: from_disk.Directory, data: MerkleNodeInfo):
self.source_tree = source_tree
self.data = data
@@ -232,20 +236,85 @@
"known"
]
- def has_contents(self, directory: Directory):
+ def has_contents(self, directory: from_disk.Directory):
"""Check if the directory given in input has contents"""
for entry in directory.entries:
if entry["type"] == "file":
return True
return False
- def get_contents(self, dir_: Directory):
+ def get_contents(self, dir_: from_disk.Directory):
"""Get all the contents of a given directory"""
for _, node in list(dir_.items()):
if node.object_type == "content":
yield node
+class WebAPIConnection(discovery.ArchiveDiscoveryInterface):
+ """Use the web APIs to query the archive"""
+
+ def __init__(
+ self,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+ client: Client,
+ ) -> None:
+ super().__init__(contents, skipped_contents, directories)
+ self.client = client
+
+ self.sha_to_swhid = {}
+ self.swhid_to_sha = {}
+ for content in contents:
+ self.sha_to_swhid[content.sha1_git] = str(content.swhid())
+ self.swhid_to_sha[str(content.swhid())] = content.sha1_git
+
+ for directory in directories:
+ self.sha_to_swhid[directory.id] = str(directory.swhid())
+ self.swhid_to_sha[str(directory.swhid())] = directory.id
+
+ async def content_missing(self, contents: List[Sha1Git]) -> List[Sha1Git]:
+ """List content missing from the archive by sha1"""
+ return await self._missing(contents)
+
+ async def skipped_content_missing(
+ self, skipped_contents: List[Sha1Git]
+ ) -> Iterable[Sha1Git]:
+ """List skipped content missing from the archive by sha1"""
+ # TODO what should we do about skipped contents?
+ return skipped_contents
+
+ async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List directories missing from the archive by sha1"""
+ return await self._missing(directories)
+
+ async def _missing(self, shas):
+ res = await self.client.known([self.sha_to_swhid[o] for o in shas])
+ return [self.swhid_to_sha[k] for k, v in res.items() if not v["known"]]
+
+
+class RandomDirSamplingPriority(Policy):
+ """Check the Merkle tree querying random directories. Set all ancestors to
+ unknown for unknown directories, otherwise set all descendants to known.
+ Finally check all the remaining file contents.
+ """
+
+ @no_type_check
+ async def run(self, client: Client):
+ contents, skipped_contents, directories = from_disk.iter_directory(
+ self.source_tree
+ )
+
+ get_unknowns = discovery.filter_known_objects(
+ WebAPIConnection(contents, skipped_contents, directories, client),
+ )
+
+ unknowns = set(itertools.chain(*await get_unknowns))
+
+ for obj in itertools.chain(contents, skipped_contents, directories):
+ self.data[obj.swhid()]["known"] = obj not in unknowns
+
+
class QueryAll(Policy):
"""Check the status of every node in the Merkle tree."""
diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py
--- a/swh/scanner/scanner.py
+++ b/swh/scanner/scanner.py
@@ -21,6 +21,7 @@
GreedyBFS,
LazyBFS,
QueryAll,
+ RandomDirSamplingPriority,
source_size,
)
@@ -66,6 +67,8 @@
return FilePriority(source_tree, nodes_data)
elif policy == "dirpriority":
return DirectoryPriority(source_tree, nodes_data)
+ elif policy == "randomdir":
+ return RandomDirSamplingPriority(source_tree, nodes_data)
else:
raise Exception(f"policy '{policy}' not found")
diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py
--- a/swh/scanner/tests/conftest.py
+++ b/swh/scanner/tests/conftest.py
@@ -144,12 +144,21 @@
@pytest.fixture(scope="session")
def tmp_requests(tmpdir_factory):
+ """Logs each SWHID per line in every request made to the `known` endpoint"""
requests_file = tmpdir_factory.mktemp("data").join("requests.json")
return requests_file
@pytest.fixture(scope="session")
-def app(tmp_requests):
+def tmp_accesses(tmpdir_factory):
+ """Logs each request made to the `known` endpoint, writing the number
+ of SWHIDs queried, one per line."""
+ requests_file = tmpdir_factory.mktemp("data").join("accesses.json")
+ return requests_file
+
+
+@pytest.fixture(scope="session")
+def app(tmp_requests, tmp_accesses):
"""Flask backend API (used by live_server)."""
- app = create_app(tmp_requests)
+ app = create_app(tmp_requests, tmp_accesses)
return app
diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py
--- a/swh/scanner/tests/flask_api.py
+++ b/swh/scanner/tests/flask_api.py
@@ -11,7 +11,7 @@
from .data import fake_origin, unknown_swhids
-def create_app(tmp_requests):
+def create_app(tmp_requests, tmp_accesses):
app = Flask(__name__)
@app.route("/")
@@ -25,6 +25,9 @@
for swhid in swhids:
f.write(swhid + "\n")
+ with open(tmp_accesses, "a") as f:
+ f.write(f"{len(swhids)}\n")
+
if len(swhids) > QUERY_LIMIT:
raise LargePayloadExc(
f"The maximum number of SWHIDs this endpoint can receive is "
diff --git a/swh/scanner/tests/test_policy.py b/swh/scanner/tests/test_policy.py
--- a/swh/scanner/tests/test_policy.py
+++ b/swh/scanner/tests/test_policy.py
@@ -15,6 +15,7 @@
FilePriority,
GreedyBFS,
LazyBFS,
+ RandomDirSamplingPriority,
source_size,
)
@@ -32,6 +33,13 @@
return [x.strip() for x in backend_swhids_order]
+def get_backend_known_requests(tmp_accesses):
+ with open(tmp_accesses, "r") as f:
+ calls = f.readlines()
+
+ return [int(call.strip()) for call in calls]
+
+
def test_lazybfs_policy(
live_server, aiosession, event_loop, source_tree_policy, tmp_requests
):
@@ -130,6 +138,63 @@
assert CoreSWHID.from_string(last_swhid).object_type == ObjectType.CONTENT
+def test_randomdir_policy(
+ live_server,
+ event_loop,
+ aiosession,
+ big_source_tree,
+ tmp_requests,
+ tmp_accesses,
+ mocker,
+):
+ # This is harder to test with exact assertions due to the random nature
+ # of our sampling algorithm and everything else that can be random.
+ # Setting random.seed has failed to produce stable results.
+ # TODO figure out why?
+
+ open(tmp_requests, "w").close()
+ open(tmp_accesses, "w").close()
+ api_url = url_for("index", _external=True)
+
+ nodes_data = MerkleNodeInfo()
+ init_merkle_node_info(big_source_tree, nodes_data, {"known"})
+ policy = RandomDirSamplingPriority(big_source_tree, nodes_data)
+ client = Client(api_url, aiosession)
+ event_loop.run_until_complete(policy.run(client))
+
+ backend_swhids_requests = get_backend_swhids_order(tmp_requests)
+ # Check that we only query directories in the case where all directories
+ # fit in a single request
+ assert all(
+ CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY
+ for swhid in backend_swhids_requests
+ )
+
+ assert all(v["known"] is True for k, v in policy.data.items())
+
+ # Check that we only do a single query of 1000 items
+ backend_known_requests = get_backend_known_requests(tmp_accesses)
+ assert [1000] == backend_known_requests
+
+ # Test with smaller sample sizes to actually trigger the random sampling
+ open(tmp_requests, "w").close()
+ open(tmp_accesses, "w").close()
+ mocker.patch("swh.scanner.policy.discovery.SAMPLE_SIZE", 10)
+
+ nodes_data = MerkleNodeInfo()
+ init_merkle_node_info(big_source_tree, nodes_data, {"known"})
+ policy = RandomDirSamplingPriority(big_source_tree, nodes_data)
+ client = Client(api_url, aiosession)
+ event_loop.run_until_complete(policy.run(client))
+
+ assert all(v["known"] is True for k, v in policy.data.items())
+
+ # Check that we only do at least two queries of < 10 items
+ backend_known_requests = get_backend_known_requests(tmp_accesses)
+ assert len(backend_known_requests) >= 2
+ assert all(length <= 10 for length in backend_known_requests)
+
+
@pytest.mark.asyncio
async def test_greedy_bfs_get_nodes_chunks(live_server, aiosession, big_source_tree):
api_url = url_for("index", _external=True)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:20 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229614

Event Timeline