diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core >= 0.3 swh.model >= 2.6.4 +swh.loader.core @ git+https://forge.softwareheritage.org/source/swh-loader-core.git@1facea3cd215155f77ae4083a33837b7c6f642b0#egg=swh.loader.core \ No newline at end of file diff --git a/swh/scanner/cli.py b/swh/scanner/cli.py --- a/swh/scanner/cli.py +++ b/swh/scanner/cli.py @@ -142,7 +142,9 @@ "--policy", default="auto", show_default=True, - type=click.Choice(["auto", "bfs", "greedybfs", "filepriority", "dirpriority"]), + type=click.Choice( + ["auto", "bfs", "greedybfs", "filepriority", "dirpriority", "randomdir"] + ), help="The scan policy.", ) @click.option( @@ -178,6 +180,8 @@ dirpriority: scan all the source code directories and check only unknown directory contents. + randomdir: scan the source code using a random Merkle search on directories. + Other information about software artifacts could be specified with the -e/ --extra-info option:\n \b diff --git a/swh/scanner/policy.py b/swh/scanner/policy.py --- a/swh/scanner/policy.py +++ b/swh/scanner/policy.py @@ -4,16 +4,20 @@ # See top-level LICENSE file for more information import abc -from typing import no_type_check +import itertools +from typing import Iterable, List, no_type_check from swh.core.utils import grouper -from swh.model.from_disk import Directory +from swh.loader.core import discovery +from swh.model import from_disk +from swh.model.from_disk import model +from swh.model.model import Sha1Git from .client import QUERY_LIMIT, Client from .data import MerkleNodeInfo -def source_size(source_tree: Directory): +def source_size(source_tree: from_disk.Directory): """return the size of a source tree as the number of nodes it contains""" return sum(1 for n in source_tree.iter_tree(dedup=False)) @@ -23,10 +27,10 @@ data: MerkleNodeInfo """information about contents and directories of the merkle tree""" - source_tree: Directory + source_tree: from_disk.Directory """representation of a source code project directory in the merkle tree""" - def __init__(self, source_tree: Directory, data: MerkleNodeInfo): + def __init__(self, source_tree: from_disk.Directory, data: MerkleNodeInfo): self.source_tree = source_tree self.data = data @@ -232,20 +236,87 @@ "known" ] - def has_contents(self, directory: Directory): + def has_contents(self, directory: from_disk.Directory): """Check if the directory given in input has contents""" for entry in directory.entries: if entry["type"] == "file": return True return False - def get_contents(self, dir_: Directory): + def get_contents(self, dir_: from_disk.Directory): """Get all the contents of a given directory""" for _, node in list(dir_.items()): if node.object_type == "content": yield node +class WebAPIConnection(discovery.ArchiveDiscoveryInterface): + """Use the web APIs to query the archive""" + + def __init__( + self, + contents: List[model.Content], + skipped_contents: List[model.SkippedContent], + directories: List[model.Directory], + client: Client, + ) -> None: + super().__init__(contents, skipped_contents, directories) + self.client = client + + self.sha_to_swhid = {} + self.swhid_to_sha = {} + for content in contents: + swhid = str(content.swhid()) + self.sha_to_swhid[content.sha1_git] = swhid + self.swhid_to_sha[swhid] = content.sha1_git + + for directory in directories: + swhid = str(directory.swhid()) + self.sha_to_swhid[directory.id] = swhid + self.swhid_to_sha[swhid] = directory.id + + async def content_missing(self, contents: List[Sha1Git]) -> List[Sha1Git]: + """List content missing from the archive by sha1""" + return await self._missing(contents) + + async def skipped_content_missing( + self, skipped_contents: List[Sha1Git] + ) -> Iterable[Sha1Git]: + """List skipped content missing from the archive by sha1""" + # TODO what should we do about skipped contents? + return skipped_contents + + async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: + """List directories missing from the archive by sha1""" + return await self._missing(directories) + + async def _missing(self, shas): + res = await self.client.known([self.sha_to_swhid[o] for o in shas]) + return [self.swhid_to_sha[k] for k, v in res.items() if not v["known"]] + + +class RandomDirSamplingPriority(Policy): + """Check the Merkle tree querying random directories. Set all ancestors to + unknown for unknown directories, otherwise set all descendants to known. + Finally check all the remaining file contents. + """ + + @no_type_check + async def run(self, client: Client): + contents, skipped_contents, directories = from_disk.iter_directory( + self.source_tree + ) + + get_unknowns = discovery.filter_known_objects( + WebAPIConnection(contents, skipped_contents, directories, client), + ) + + unknowns = set(itertools.chain(*await get_unknowns)) + + for obj in itertools.chain(contents, skipped_contents, directories): + self.data[obj.swhid()]["known"] = obj not in unknowns + + class QueryAll(Policy): """Check the status of every node in the Merkle tree.""" diff --git a/swh/scanner/scanner.py b/swh/scanner/scanner.py --- a/swh/scanner/scanner.py +++ b/swh/scanner/scanner.py @@ -21,6 +21,7 @@ GreedyBFS, LazyBFS, QueryAll, + RandomDirSamplingPriority, source_size, ) @@ -66,6 +67,8 @@ return FilePriority(source_tree, nodes_data) elif policy == "dirpriority": return DirectoryPriority(source_tree, nodes_data) + elif policy == "randomdir": + return RandomDirSamplingPriority(source_tree, nodes_data) else: raise Exception(f"policy '{policy}' not found") diff --git a/swh/scanner/tests/conftest.py b/swh/scanner/tests/conftest.py --- a/swh/scanner/tests/conftest.py +++ b/swh/scanner/tests/conftest.py @@ -144,12 +144,21 @@ @pytest.fixture(scope="session") def tmp_requests(tmpdir_factory): + """Logs each SWHID per line in every request made to the `known` endpoint""" requests_file = tmpdir_factory.mktemp("data").join("requests.json") return requests_file @pytest.fixture(scope="session") -def app(tmp_requests): +def tmp_accesses(tmpdir_factory): + """Logs each request made to the `known` endpoint, writing the number + of SWHIDs queried, one per line.""" + requests_file = tmpdir_factory.mktemp("data").join("accesses.json") + return requests_file + + +@pytest.fixture(scope="session") +def app(tmp_requests, tmp_accesses): """Flask backend API (used by live_server).""" - app = create_app(tmp_requests) + app = create_app(tmp_requests, tmp_accesses) return app diff --git a/swh/scanner/tests/flask_api.py b/swh/scanner/tests/flask_api.py --- a/swh/scanner/tests/flask_api.py +++ b/swh/scanner/tests/flask_api.py @@ -11,7 +11,7 @@ from .data import fake_origin, unknown_swhids -def create_app(tmp_requests): +def create_app(tmp_requests, tmp_accesses): app = Flask(__name__) @app.route("/") @@ -25,6 +25,9 @@ for swhid in swhids: f.write(swhid + "\n") + with open(tmp_accesses, "a") as f: + f.write(f"{len(swhids)}\n") + if len(swhids) > QUERY_LIMIT: raise LargePayloadExc( f"The maximum number of SWHIDs this endpoint can receive is " diff --git a/swh/scanner/tests/test_policy.py b/swh/scanner/tests/test_policy.py --- a/swh/scanner/tests/test_policy.py +++ b/swh/scanner/tests/test_policy.py @@ -15,6 +15,7 @@ FilePriority, GreedyBFS, LazyBFS, + RandomDirSamplingPriority, source_size, ) @@ -32,6 +33,13 @@ return [x.strip() for x in backend_swhids_order] +def get_backend_known_requests(tmp_accesses): + with open(tmp_accesses, "r") as f: + calls = f.readlines() + + return [int(call.strip()) for call in calls] + + def test_lazybfs_policy( live_server, aiosession, event_loop, source_tree_policy, tmp_requests ): @@ -130,6 +138,63 @@ assert CoreSWHID.from_string(last_swhid).object_type == ObjectType.CONTENT +def test_randomdir_policy( + live_server, + event_loop, + aiosession, + big_source_tree, + tmp_requests, + tmp_accesses, + mocker, +): + # This is harder to test with exact assertions due to the random nature + # of our sampling algorithm and everything else that can be random. + # Setting random.seed has failed to produce stable results. + # TODO figure out why? + + open(tmp_requests, "w").close() + open(tmp_accesses, "w").close() + api_url = url_for("index", _external=True) + + nodes_data = MerkleNodeInfo() + init_merkle_node_info(big_source_tree, nodes_data, {"known"}) + policy = RandomDirSamplingPriority(big_source_tree, nodes_data) + client = Client(api_url, aiosession) + event_loop.run_until_complete(policy.run(client)) + + backend_swhids_requests = get_backend_swhids_order(tmp_requests) + # Check that we only query directories in the case where all directories + # fit in a single request + assert all( + CoreSWHID.from_string(swhid).object_type == ObjectType.DIRECTORY + for swhid in backend_swhids_requests + ) + + assert all(v["known"] is True for k, v in policy.data.items()) + + # Check that we only do a single query of 1000 items + backend_known_requests = get_backend_known_requests(tmp_accesses) + assert [1000] == backend_known_requests + + # Test with smaller sample sizes to actually trigger the random sampling + open(tmp_requests, "w").close() + open(tmp_accesses, "w").close() + mocker.patch("swh.scanner.policy.discovery.SAMPLE_SIZE", 10) + + nodes_data = MerkleNodeInfo() + init_merkle_node_info(big_source_tree, nodes_data, {"known"}) + policy = RandomDirSamplingPriority(big_source_tree, nodes_data) + client = Client(api_url, aiosession) + event_loop.run_until_complete(policy.run(client)) + + assert all(v["known"] is True for k, v in policy.data.items()) + + # Check that we only do at least two queries of < 10 items + backend_known_requests = get_backend_known_requests(tmp_accesses) + assert len(backend_known_requests) >= 2 + assert all(length <= 10 for length in backend_known_requests) + + @pytest.mark.asyncio async def test_greedy_bfs_get_nodes_chunks(live_server, aiosession, big_source_tree): api_url = url_for("index", _external=True)