Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/tests/test_scanner.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import json | ||||
from flask import url_for | from flask import url_for | ||||
import pytest | import pytest | ||||
from swh.model.exceptions import InvalidDirectoryPath | from swh.scanner.data import MerkleNodeInfo | ||||
from swh.scanner.exceptions import APIError | from swh.scanner.exceptions import APIError | ||||
from swh.scanner.model import Tree | from swh.scanner.scanner import run, swhids_discovery | ||||
from swh.scanner.scanner import extract_regex_objs, get_subpaths, run, swhids_discovery | |||||
from .data import correct_api_response, present_swhids, to_exclude_swhid | from .data import correct_api_response, unknown_swhids | ||||
aio_url = "http://example.org/api/known/" | aio_url = "http://example.org/api/known/" | ||||
def test_extract_regex_objs(temp_folder): | |||||
root_path = bytes(temp_folder["root"]) | |||||
patterns = (bytes(temp_folder["subdir"]), b"/none") | |||||
sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] | |||||
assert len(sre_patterns) == 2 | |||||
patterns = (*patterns, b"/tmp") | |||||
with pytest.raises(InvalidDirectoryPath): | |||||
sre_patterns = [reg_obj for reg_obj in extract_regex_objs(root_path, patterns)] | |||||
def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): | def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): | ||||
mock_aioresponse.post( | mock_aioresponse.post( | ||||
aio_url, | aio_url, | ||||
status=200, | status=200, | ||||
content_type="application/json", | content_type="application/json", | ||||
body=json.dumps(correct_api_response), | body=json.dumps(correct_api_response), | ||||
) | ) | ||||
Show All 19 Lines | def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): | ||||
request = [ | request = [ | ||||
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) | "swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) | ||||
] # /known/ is limited at 900 | ] # /known/ is limited at 900 | ||||
with pytest.raises(APIError): | with pytest.raises(APIError): | ||||
event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) | event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) | ||||
def test_scanner_get_subpaths(temp_folder): | |||||
root = temp_folder["root"] | |||||
actual_result = [] | |||||
for subpath, swhid in get_subpaths(root, tuple()): | |||||
# also check if it's a symlink since pytest tmp_dir fixture create | |||||
# also a symlink to each directory inside the tmp_dir path | |||||
if subpath.is_dir() and not subpath.is_symlink(): | |||||
actual_result.append((subpath, swhid)) | |||||
assert len(actual_result) == 2 | |||||
@pytest.mark.options(debug=False) | @pytest.mark.options(debug=False) | ||||
def test_app(app): | def test_app(app): | ||||
assert not app.debug | assert not app.debug | ||||
def test_scanner_result(live_server, event_loop, test_sample_folder): | def test_scanner_result(live_server, event_loop, source_tree): | ||||
zack: We discussed how iter_tree should not be used to output stuff, because it deduplicates and… | |||||
Not Done Inline ActionsYes, but we first need to change swh.model.merkle.iter_tree to output *all* the nodes. DanSeraf: Yes, but we first need to change `swh.model.merkle.iter_tree` to output *all* the nodes. | |||||
api_url = url_for("index", _external=True) | api_url = url_for("index", _external=True) | ||||
config = {"web-api": {"url": api_url, "auth-token": None}} | config = {"web-api": {"url": api_url, "auth-token": None}} | ||||
source_tree = Tree(test_sample_folder) | nodes_data = MerkleNodeInfo() | ||||
event_loop.run_until_complete(run(config, test_sample_folder, source_tree, set())) | event_loop.run_until_complete(run(config, source_tree, nodes_data)) | ||||
for node in source_tree.iter_tree(): | |||||
for child_node in source_tree.iterate(): | node_swhid = str(node.swhid()) | ||||
node_info = list(child_node.attributes.values())[0] | if node_swhid in unknown_swhids: | ||||
if node_info["swhid"] in present_swhids: | assert nodes_data[node_swhid]["known"] is False | ||||
assert node_info["known"] is True | |||||
else: | else: | ||||
assert node_info["known"] is False | assert nodes_data[node_swhid]["known"] is True | ||||
def test_scanner_result_with_exclude_patterns( | |||||
live_server, event_loop, test_sample_folder | |||||
): | |||||
api_url = url_for("index", _external=True) | |||||
config = {"web-api": {"url": api_url, "auth-token": None}} | |||||
to_exclude_dir = str(test_sample_folder) + "/toexclude" | |||||
patterns = (to_exclude_dir.encode(),) | |||||
exclude_pattern = { | |||||
reg_obj for reg_obj in extract_regex_objs(bytes(test_sample_folder), patterns) | |||||
} | |||||
source_tree = Tree(test_sample_folder) | |||||
event_loop.run_until_complete( | |||||
run(config, test_sample_folder, source_tree, exclude_pattern) | |||||
) | |||||
for child_node in source_tree.iterate(): | |||||
node_info = list(child_node.attributes.values())[0] | |||||
assert node_info["swhid"] != to_exclude_swhid |
We discussed how iter_tree should not be used to output stuff, because it deduplicates and might hence skip sub-trees. Can you hence add an explicit test case with sharing (ideally: of a sub-directory somewhere) and make the test verify that all nodes are present in the output even if they are deduplicated in the in-memory data model?