Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/test_fossology_license.py
Show All 19 Lines | from swh.indexer.tests.utils import ( | ||||
BASE_TEST_CONFIG, | BASE_TEST_CONFIG, | ||||
SHA1_TO_LICENSES, | SHA1_TO_LICENSES, | ||||
CommonContentIndexerPartitionTest, | CommonContentIndexerPartitionTest, | ||||
CommonContentIndexerTest, | CommonContentIndexerTest, | ||||
fill_obj_storage, | fill_obj_storage, | ||||
fill_storage, | fill_storage, | ||||
filter_dict, | filter_dict, | ||||
) | ) | ||||
from swh.model.hashutil import hash_to_bytes | |||||
class BasicTest(unittest.TestCase): | class BasicTest(unittest.TestCase): | ||||
@patch("swh.indexer.fossology_license.subprocess") | @patch("swh.indexer.fossology_license.subprocess") | ||||
def test_compute_license(self, mock_subprocess): | def test_compute_license(self, mock_subprocess): | ||||
"""Computing licenses from a raw content should return results | """Computing licenses from a raw content should return results | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def setUp(self): | ||||
fill_obj_storage(self.indexer.objstorage) | fill_obj_storage(self.indexer.objstorage) | ||||
self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5" | self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5" | ||||
self.id1 = "688a5ef812c53907562fe379d4b3851e69c7cb15" | self.id1 = "688a5ef812c53907562fe379d4b3851e69c7cb15" | ||||
self.id2 = "da39a3ee5e6b4b0d3255bfef95601890afd80709" # empty content | self.id2 = "da39a3ee5e6b4b0d3255bfef95601890afd80709" # empty content | ||||
tool = {k.replace("tool_", ""): v for (k, v) in self.indexer.tool.items()} | tool = {k.replace("tool_", ""): v for (k, v) in self.indexer.tool.items()} | ||||
# then | # then | ||||
self.expected_results = { | self.expected_results = [ | ||||
self.id0: {"tool": tool, "licenses": SHA1_TO_LICENSES[self.id0],}, | *[ | ||||
self.id1: {"tool": tool, "licenses": SHA1_TO_LICENSES[self.id1],}, | ContentLicenseRow( | ||||
self.id2: None, | id=hash_to_bytes(self.id0), tool=tool, license=license | ||||
} | ) | ||||
for license in SHA1_TO_LICENSES[self.id0] | |||||
], | |||||
*[ | |||||
ContentLicenseRow( | |||||
id=hash_to_bytes(self.id1), tool=tool, license=license | |||||
) | |||||
for license in SHA1_TO_LICENSES[self.id1] | |||||
], | |||||
*[], # self.id2 | |||||
] | |||||
def tearDown(self): | def tearDown(self): | ||||
super().tearDown() | super().tearDown() | ||||
fossology_license.compute_license = self.orig_compute_license | fossology_license.compute_license = self.orig_compute_license | ||||
class TestFossologyLicensePartitionIndexer( | class TestFossologyLicensePartitionIndexer( | ||||
CommonContentIndexerPartitionTest, unittest.TestCase | CommonContentIndexerPartitionTest, unittest.TestCase | ||||
Show All 18 Lines | def setUp(self): | ||||
self.indexer.catch_exceptions = False | self.indexer.catch_exceptions = False | ||||
fill_storage(self.indexer.storage) | fill_storage(self.indexer.storage) | ||||
fill_obj_storage(self.indexer.objstorage) | fill_obj_storage(self.indexer.objstorage) | ||||
def tearDown(self): | def tearDown(self): | ||||
super().tearDown() | super().tearDown() | ||||
fossology_license.compute_license = self.orig_compute_license | fossology_license.compute_license = self.orig_compute_license | ||||
def assert_results_ok(self, partition_id, nb_partitions, actual_results): | |||||
# TODO: remove this method when fossology_license endpoints moved away | |||||
# from dicts. | |||||
actual_result_rows = [] | |||||
for res in actual_results: | |||||
for license in res["licenses"]: | |||||
actual_result_rows.append( | |||||
ContentLicenseRow( | |||||
id=res["id"], | |||||
indexer_configuration_id=res["indexer_configuration_id"], | |||||
license=license, | |||||
) | |||||
) | |||||
super().assert_results_ok(partition_id, nb_partitions, actual_result_rows) | |||||
def test_fossology_w_no_tool(): | def test_fossology_w_no_tool(): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
FossologyLicenseIndexer(config=filter_dict(CONFIG, "tools")) | FossologyLicenseIndexer(config=filter_dict(CONFIG, "tools")) | ||||
def test_fossology_range_w_no_tool(): | def test_fossology_range_w_no_tool(): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
FossologyLicensePartitionIndexer(config=filter_dict(RANGE_CONFIG, "tools")) | FossologyLicensePartitionIndexer(config=filter_dict(RANGE_CONFIG, "tools")) |