Changeset View
Changeset View
Standalone View
Standalone View
swh/scanner/tests/test_scanner.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | |||||
from flask import url_for | from flask import url_for | ||||
import pytest | import pytest | ||||
from swh.scanner.data import MerkleNodeInfo | from swh.scanner.data import MerkleNodeInfo | ||||
from swh.scanner.exceptions import APIError | from swh.scanner.policy import DirectoryPriority, FilePriority, LazyBFS | ||||
from swh.scanner.scanner import run, swhids_discovery | from swh.scanner.scanner import run | ||||
from .data import correct_api_response, unknown_swhids | |||||
aio_url = "http://example.org/api/known/" | |||||
def test_scanner_correct_api_request(mock_aioresponse, event_loop, aiosession): | from .data import unknown_swhids | ||||
mock_aioresponse.post( | |||||
aio_url, | |||||
status=200, | |||||
content_type="application/json", | |||||
body=json.dumps(correct_api_response), | |||||
) | |||||
actual_result = event_loop.run_until_complete( | |||||
swhids_discovery([], aiosession, "http://example.org/api/") | |||||
) | |||||
assert correct_api_response == actual_result | |||||
@pytest.mark.options(debug=False) | |||||
def test_app(app): | |||||
assert not app.debug | |||||
def test_scanner_raise_apierror(mock_aioresponse, event_loop, aiosession): | |||||
mock_aioresponse.post(aio_url, content_type="application/json", status=413) | |||||
with pytest.raises(APIError): | def test_scanner_result_bfs(live_server, event_loop, source_tree): | ||||
event_loop.run_until_complete( | api_url = url_for("index", _external=True) | ||||
swhids_discovery([], aiosession, "http://example.org/api/") | config = {"web-api": {"url": api_url, "auth-token": None}} | ||||
) | |||||
nodes_data = MerkleNodeInfo() | |||||
policy = LazyBFS(source_tree, nodes_data) | |||||
event_loop.run_until_complete(run(config, policy)) | |||||
for node in source_tree.iter_tree(): | |||||
if str(node.swhid()) in unknown_swhids: | |||||
assert nodes_data[node.swhid()]["known"] is False | |||||
else: | |||||
assert nodes_data[node.swhid()]["known"] is True | |||||
def test_scanner_raise_apierror_input_size_limit(event_loop, aiosession, live_server): | |||||
def test_scanner_result_file_priority(live_server, event_loop, source_tree): | |||||
api_url = url_for("index", _external=True) | api_url = url_for("index", _external=True) | ||||
request = [ | config = {"web-api": {"url": api_url, "auth-token": None}} | ||||
"swh:1:cnt:7c4c57ba9ff496ad179b8f65b1d286edbda34c9a" for i in range(901) | |||||
] # /known/ is limited at 900 | |||||
with pytest.raises(APIError): | |||||
event_loop.run_until_complete(swhids_discovery(request, aiosession, api_url)) | |||||
@pytest.mark.options(debug=False) | nodes_data = MerkleNodeInfo() | ||||
def test_app(app): | policy = FilePriority(source_tree, nodes_data) | ||||
assert not app.debug | event_loop.run_until_complete(run(config, policy)) | ||||
for node in source_tree.iter_tree(): | |||||
if str(node.swhid()) in unknown_swhids: | |||||
assert nodes_data[node.swhid()]["known"] is False | |||||
else: | |||||
assert nodes_data[node.swhid()]["known"] is True | |||||
def test_scanner_result(live_server, event_loop, source_tree): | def test_scanner_result_directory_priority(live_server, event_loop, source_tree): | ||||
api_url = url_for("index", _external=True) | api_url = url_for("index", _external=True) | ||||
config = {"web-api": {"url": api_url, "auth-token": None}} | config = {"web-api": {"url": api_url, "auth-token": None}} | ||||
nodes_data = MerkleNodeInfo() | nodes_data = MerkleNodeInfo() | ||||
event_loop.run_until_complete(run(config, source_tree, nodes_data)) | policy = DirectoryPriority(source_tree, nodes_data) | ||||
event_loop.run_until_complete(run(config, policy)) | |||||
for node in source_tree.iter_tree(): | for node in source_tree.iter_tree(): | ||||
if str(node.swhid()) in unknown_swhids: | if str(node.swhid()) in unknown_swhids: | ||||
assert nodes_data[node.swhid()]["known"] is False | assert nodes_data[node.swhid()]["known"] is False | ||||
else: | else: | ||||
assert nodes_data[node.swhid()]["known"] is True | assert nodes_data[node.swhid()]["known"] is True |