diff --git a/swh/indexer/tests/test_ctags.py b/swh/indexer/tests/test_ctags.py index aa1b175..cf3b9cc 100644 --- a/swh/indexer/tests/test_ctags.py +++ b/swh/indexer/tests/test_ctags.py @@ -1,192 +1,183 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from unittest.mock import patch +import pytest + import swh.indexer.ctags from swh.indexer.ctags import ( CtagsIndexer, run_ctags ) from swh.indexer.tests.utils import ( CommonContentIndexerTest, - CommonIndexerWithErrorsTest, CommonIndexerNoTool, SHA1_TO_CTAGS, BASE_TEST_CONFIG, - OBJ_STORAGE_DATA, fill_storage, fill_obj_storage + OBJ_STORAGE_DATA, fill_storage, fill_obj_storage, + filter_dict, ) class BasicTest(unittest.TestCase): @patch('swh.indexer.ctags.subprocess') def test_run_ctags(self, mock_subprocess): """Computing licenses from a raw content should return results """ output0 = """ {"name":"defun","kind":"function","line":1,"language":"scheme"} {"name":"name","kind":"symbol","line":5,"language":"else"}""" output1 = """ {"name":"let","kind":"var","line":10,"language":"something"}""" expected_result0 = [ { 'name': 'defun', 'kind': 'function', 'line': 1, 'lang': 'scheme' }, { 'name': 'name', 'kind': 'symbol', 'line': 5, 'lang': 'else' } ] expected_result1 = [ { 'name': 'let', 'kind': 'var', 'line': 10, 'lang': 'something' } ] for path, lang, intermediary_result, expected_result in [ (b'some/path', 'lisp', output0, expected_result0), (b'some/path/2', 'markdown', output1, expected_result1) ]: mock_subprocess.check_output.return_value = intermediary_result actual_result = list(run_ctags(path, lang=lang)) self.assertEqual(actual_result, expected_result) class InjectCtagsIndexer: """Override ctags computations. """ def compute_ctags(self, path, lang): """Inject fake ctags given path (sha1 identifier). """ return { 'lang': lang, **SHA1_TO_CTAGS.get(path) } -class CtagsIndexerTest(InjectCtagsIndexer, CtagsIndexer): - """Specific language whose configuration is enough to satisfy the - indexing tests. - """ - def parse_config_file(self, *args, **kwargs): - return { - **BASE_TEST_CONFIG, - 'tools': { - 'name': 'universal-ctags', - 'version': '~git7859817b', - 'configuration': { - 'command_line': '''ctags --fields=+lnz --sort=no ''' - ''' --links=no ''', - 'max_content_size': 1000, - }, - }, - 'languages': { - 'python': 'python', - 'haskell': 'haskell', - 'bar': 'bar', - }, - 'workdir': '/tmp', - } +CONFIG = { + **BASE_TEST_CONFIG, + 'tools': { + 'name': 'universal-ctags', + 'version': '~git7859817b', + 'configuration': { + 'command_line': '''ctags --fields=+lnz --sort=no ''' + ''' --links=no ''', + 'max_content_size': 1000, + }, + }, + 'languages': { + 'python': 'python', + 'haskell': 'haskell', + 'bar': 'bar', + }, + 'workdir': '/tmp', +} class TestCtagsIndexer(CommonContentIndexerTest, unittest.TestCase): """Ctags indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ legacy_get_format = True def get_indexer_results(self, ids): yield from self.idx_storage.content_ctags_get(ids) def setUp(self): super().setUp() - self.indexer = CtagsIndexerTest() + self.indexer = CtagsIndexer(config=CONFIG) self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) # Prepare test input self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = 'd4c647f0fc257591cc9ba1722484229780d1c607' self.id2 = '688a5ef812c53907562fe379d4b3851e69c7cb15' tool = {k.replace('tool_', ''): v for (k, v) in self.indexer.tool.items()} self.expected_results = { self.id0: { 'id': self.id0, 'tool': tool, **SHA1_TO_CTAGS[self.id0][0], }, self.id1: { 'id': self.id1, 'tool': tool, **SHA1_TO_CTAGS[self.id1][0], }, self.id2: { 'id': self.id2, 'tool': tool, **SHA1_TO_CTAGS[self.id2][0], } } self._set_mocks() def _set_mocks(self): def find_ctags_for_content(raw_content): for (sha1, ctags) in SHA1_TO_CTAGS.items(): if OBJ_STORAGE_DATA[sha1] == raw_content: return ctags else: raise ValueError(('%r not found in objstorage, can\'t mock ' 'its ctags.') % raw_content) def fake_language(raw_content, *args, **kwargs): ctags = find_ctags_for_content(raw_content) return {'lang': ctags[0]['lang']} self._real_compute_language = swh.indexer.ctags.compute_language swh.indexer.ctags.compute_language = fake_language def fake_check_output(cmd, *args, **kwargs): print(cmd) id_ = cmd[-1].split('/')[-1] return '\n'.join( json.dumps({'language': ctag['lang'], **ctag}) for ctag in SHA1_TO_CTAGS[id_]) self._real_check_output = swh.indexer.ctags.subprocess.check_output swh.indexer.ctags.subprocess.check_output = fake_check_output def tearDown(self): swh.indexer.ctags.compute_language = self._real_compute_language swh.indexer.ctags.subprocess.check_output = self._real_check_output super().tearDown() -class CtagsIndexerUnknownToolTestStorage( - CommonIndexerNoTool, CtagsIndexerTest): - """Fossology license indexer with wrong configuration""" - - -class TestCtagsIndexersErrors( - CommonIndexerWithErrorsTest, unittest.TestCase): - """Test the indexer raise the right errors when wrongly initialized""" - Indexer = CtagsIndexerUnknownToolTestStorage +def test_ctags_w_no_tool(): + with pytest.raises(ValueError): + CtagsIndexer(config=filter_dict(CONFIG, 'tools')) diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py index 697542b..0ba43f3 100644 --- a/swh/indexer/tests/test_fossology_license.py +++ b/swh/indexer/tests/test_fossology_license.py @@ -1,208 +1,178 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest - from unittest.mock import patch +import pytest + from swh.indexer import fossology_license from swh.indexer.fossology_license import ( FossologyLicenseIndexer, FossologyLicenseRangeIndexer, compute_license ) from swh.indexer.tests.utils import ( SHA1_TO_LICENSES, CommonContentIndexerTest, CommonContentIndexerRangeTest, - CommonIndexerWithErrorsTest, CommonIndexerNoTool, - BASE_TEST_CONFIG, fill_storage, fill_obj_storage + BASE_TEST_CONFIG, fill_storage, fill_obj_storage, filter_dict, ) class BasicTest(unittest.TestCase): @patch('swh.indexer.fossology_license.subprocess') def test_compute_license(self, mock_subprocess): """Computing licenses from a raw content should return results """ for path, intermediary_result, output in [ (b'some/path', None, []), (b'some/path/2', [], []), (b'other/path', ' contains license(s) GPL,AGPL', ['GPL', 'AGPL'])]: mock_subprocess.check_output.return_value = intermediary_result actual_result = compute_license(path, log=None) self.assertEqual(actual_result, { 'licenses': output, 'path': path, }) def mock_compute_license(path, log=None): """path is the content identifier """ if isinstance(id, bytes): path = path.decode('utf-8') # path is something like /tmp/tmpXXX/ so we keep only the sha1 part path = path.split('/')[-1] return { 'licenses': SHA1_TO_LICENSES.get(path) } -class FossologyLicenseTestIndexer(FossologyLicenseIndexer): - """Specific fossology license whose configuration is enough to satisfy - the indexing checks. +CONFIG = { + **BASE_TEST_CONFIG, + 'workdir': '/tmp', + 'tools': { + 'name': 'nomos', + 'version': '3.1.0rc2-31-ga2cbb8c', + 'configuration': { + 'command_line': 'nomossa ', + }, + }, +} - """ - def parse_config_file(self, *args, **kwargs): - return { - **BASE_TEST_CONFIG, - 'workdir': '/tmp', - 'tools': { - 'name': 'nomos', - 'version': '3.1.0rc2-31-ga2cbb8c', - 'configuration': { - 'command_line': 'nomossa ', - }, - }, - } +RANGE_CONFIG = dict(list(CONFIG.items()) + [('write_batch_size', 100)]) class TestFossologyLicenseIndexer(CommonContentIndexerTest, unittest.TestCase): """Language indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ def get_indexer_results(self, ids): yield from self.idx_storage.content_fossology_license_get(ids) def setUp(self): super().setUp() # replace actual license computation with a mock self.orig_compute_license = fossology_license.compute_license fossology_license.compute_license = mock_compute_license - self.indexer = FossologyLicenseTestIndexer() + self.indexer = FossologyLicenseIndexer(CONFIG) self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = '688a5ef812c53907562fe379d4b3851e69c7cb15' self.id2 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709' # empty content tool = {k.replace('tool_', ''): v for (k, v) in self.indexer.tool.items()} # then self.expected_results = { self.id0: { 'tool': tool, 'licenses': SHA1_TO_LICENSES[self.id0], }, self.id1: { 'tool': tool, 'licenses': SHA1_TO_LICENSES[self.id1], }, self.id2: { 'tool': tool, 'licenses': SHA1_TO_LICENSES[self.id2], } } def tearDown(self): super().tearDown() fossology_license.compute_license = self.orig_compute_license -class FossologyLicenseRangeIndexerTest(FossologyLicenseRangeIndexer): - """Testing the range indexer on fossology license. - - """ - def parse_config_file(self, *args, **kwargs): - return { - **BASE_TEST_CONFIG, - 'workdir': '/tmp', - 'tools': { - 'name': 'nomos', - 'version': '3.1.0rc2-31-ga2cbb8c', - 'configuration': { - 'command_line': 'nomossa ', - }, - }, - 'write_batch_size': 100, - } - - class TestFossologyLicenseRangeIndexer( CommonContentIndexerRangeTest, unittest.TestCase): """Range Fossology License Indexer tests. - new data within range are indexed - no data outside a range are indexed - with filtering existing indexed data prior to compute new index - without filtering existing indexed data prior to compute new index """ def setUp(self): super().setUp() # replace actual license computation with a mock self.orig_compute_license = fossology_license.compute_license fossology_license.compute_license = mock_compute_license - self.indexer = FossologyLicenseRangeIndexerTest() + self.indexer = FossologyLicenseRangeIndexer(config=RANGE_CONFIG) fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = '02fb2c89e14f7fab46701478c83779c7beb7b069' self.id2 = '103bc087db1d26afc3a0283f38663d081e9b01e6' tool_id = self.indexer.tool['id'] self.expected_results = { self.id0: { 'id': self.id0, 'indexer_configuration_id': tool_id, 'licenses': SHA1_TO_LICENSES[self.id0] }, self.id1: { 'id': self.id1, 'indexer_configuration_id': tool_id, 'licenses': SHA1_TO_LICENSES[self.id1] }, self.id2: { 'id': self.id2, 'indexer_configuration_id': tool_id, 'licenses': SHA1_TO_LICENSES[self.id2] } } def tearDown(self): super().tearDown() fossology_license.compute_license = self.orig_compute_license -class FossologyLicenseIndexerUnknownToolTestStorage( - CommonIndexerNoTool, FossologyLicenseTestIndexer): - """Fossology license indexer with wrong configuration""" - - -class FossologyLicenseRangeIndexerUnknownToolTestStorage( - CommonIndexerNoTool, FossologyLicenseRangeIndexerTest): - """Fossology license range indexer with wrong configuration""" +def test_fossology_w_no_tool(): + with pytest.raises(ValueError): + FossologyLicenseIndexer(config=filter_dict(CONFIG, 'tools')) -class TestFossologyLicenseIndexersErrors( - CommonIndexerWithErrorsTest, unittest.TestCase): - """Test the indexer raise the right errors when wrongly initialized""" - Indexer = FossologyLicenseIndexerUnknownToolTestStorage - RangeIndexer = FossologyLicenseRangeIndexerUnknownToolTestStorage +def test_fossology_range_w_no_tool(): + with pytest.raises(ValueError): + FossologyLicenseRangeIndexer(config=filter_dict(RANGE_CONFIG, 'tools')) diff --git a/swh/indexer/tests/test_language.py b/swh/indexer/tests/test_language.py index d44a0c8..a6e78c7 100644 --- a/swh/indexer/tests/test_language.py +++ b/swh/indexer/tests/test_language.py @@ -1,102 +1,93 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +import pytest + from swh.indexer import language from swh.indexer.language import LanguageIndexer from swh.indexer.tests.utils import ( - CommonContentIndexerTest, CommonIndexerWithErrorsTest, - CommonIndexerNoTool, BASE_TEST_CONFIG, fill_storage, fill_obj_storage + CommonContentIndexerTest, + BASE_TEST_CONFIG, fill_storage, fill_obj_storage, filter_dict, ) -class LanguageTestIndexer(LanguageIndexer): - """Specific language whose configuration is enough to satisfy the - indexing tests. - """ - def parse_config_file(self, *args, **kwargs): - return { - **BASE_TEST_CONFIG, - 'tools': { - 'name': 'pygments', - 'version': '2.0.1+dfsg-1.1+deb8u1', - 'configuration': { - 'type': 'library', - 'debian-package': 'python3-pygments', - 'max_content_size': 10240, - }, - } - } +CONFIG = { + **BASE_TEST_CONFIG, + 'tools': { + 'name': 'pygments', + 'version': '2.0.1+dfsg-1.1+deb8u1', + 'configuration': { + 'type': 'library', + 'debian-package': 'python3-pygments', + 'max_content_size': 10240, + }, + } +} class Language(unittest.TestCase): """Tests pygments tool for language detection """ def test_compute_language_none(self): # given self.content = "" self.declared_language = { 'lang': None } # when result = language.compute_language(self.content) # then self.assertEqual(self.declared_language, result) class TestLanguageIndexer(CommonContentIndexerTest, unittest.TestCase): """Language indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ legacy_get_format = True def get_indexer_results(self, ids): yield from self.indexer.idx_storage.content_language_get(ids) def setUp(self): - self.indexer = LanguageTestIndexer() + self.indexer = LanguageIndexer(config=CONFIG) fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '02fb2c89e14f7fab46701478c83779c7beb7b069' self.id1 = '103bc087db1d26afc3a0283f38663d081e9b01e6' self.id2 = 'd4c647f0fc257591cc9ba1722484229780d1c607' tool = {k.replace('tool_', ''): v for (k, v) in self.indexer.tool.items()} self.expected_results = { self.id0: { 'id': self.id0, 'tool': tool, 'lang': 'python', }, self.id1: { 'id': self.id1, 'tool': tool, 'lang': 'c' }, self.id2: { 'id': self.id2, 'tool': tool, 'lang': 'text-only' } } -class LanguageIndexerUnknownToolTestStorage( - CommonIndexerNoTool, LanguageTestIndexer): - """Fossology license indexer with wrong configuration""" - - -class TestLanguageIndexersErrors( - CommonIndexerWithErrorsTest, unittest.TestCase): - """Test the indexer raise the right errors when wrongly initialized""" - Indexer = LanguageIndexerUnknownToolTestStorage +def test_language_w_no_tool(): + with pytest.raises(ValueError): + LanguageIndexer(config=filter_dict(CONFIG, 'tools')) diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py index 39db531..f8647e7 100644 --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -1,173 +1,143 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest import unittest from swh.indexer.mimetype import ( MimetypeIndexer, MimetypeRangeIndexer, compute_mimetype_encoding ) from swh.indexer.tests.utils import ( CommonContentIndexerTest, CommonContentIndexerRangeTest, - CommonIndexerWithErrorsTest, CommonIndexerNoTool, - BASE_TEST_CONFIG, fill_storage, fill_obj_storage + BASE_TEST_CONFIG, fill_storage, fill_obj_storage, filter_dict, ) class BasicTest(unittest.TestCase): def test_compute_mimetype_encoding(self): """Compute mimetype encoding should return results""" for _input, _mimetype, _encoding in [ ('du français'.encode(), 'text/plain', 'utf-8'), (b'def __init__(self):', 'text/x-python', 'us-ascii')]: actual_result = compute_mimetype_encoding(_input) self.assertEqual(actual_result, { 'mimetype': _mimetype, 'encoding': _encoding }) -class MimetypeTestIndexer(MimetypeIndexer): - """Specific mimetype indexer instance whose configuration is enough to - satisfy the indexing tests. - - """ - def parse_config_file(self, *args, **kwargs): - return { - **BASE_TEST_CONFIG, - 'tools': { - 'name': 'file', - 'version': '1:5.30-1+deb9u1', - 'configuration': { - "type": "library", - "debian-package": "python3-magic" - }, - }, - } +CONFIG = { + **BASE_TEST_CONFIG, + 'tools': { + 'name': 'file', + 'version': '1:5.30-1+deb9u1', + 'configuration': { + "type": "library", + "debian-package": "python3-magic" + }, + }, +} class TestMimetypeIndexer(CommonContentIndexerTest, unittest.TestCase): """Mimetype indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ legacy_get_format = True def get_indexer_results(self, ids): yield from self.idx_storage.content_mimetype_get(ids) def setUp(self): - self.indexer = MimetypeTestIndexer() + self.indexer = MimetypeIndexer(config=CONFIG) self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = '688a5ef812c53907562fe379d4b3851e69c7cb15' self.id2 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709' tool = {k.replace('tool_', ''): v for (k, v) in self.indexer.tool.items()} self.expected_results = { self.id0: { 'id': self.id0, 'tool': tool, 'mimetype': 'text/plain', 'encoding': 'us-ascii', }, self.id1: { 'id': self.id1, 'tool': tool, 'mimetype': 'text/plain', 'encoding': 'us-ascii', }, self.id2: { 'id': self.id2, 'tool': tool, 'mimetype': 'application/x-empty', 'encoding': 'binary', } } -class MimetypeRangeIndexerTest(MimetypeRangeIndexer): - """Specific mimetype whose configuration is enough to satisfy the - indexing tests. - - """ - def parse_config_file(self, *args, **kwargs): - return { - **BASE_TEST_CONFIG, - 'tools': { - 'name': 'file', - 'version': '1:5.30-1+deb9u1', - 'configuration': { - "type": "library", - "debian-package": "python3-magic" - }, - }, - 'write_batch_size': 100, - } +RANGE_CONFIG = dict(list(CONFIG.items()) + [('write_batch_size', 100)]) class TestMimetypeRangeIndexer( CommonContentIndexerRangeTest, unittest.TestCase): """Range Mimetype Indexer tests. - new data within range are indexed - no data outside a range are indexed - with filtering existing indexed data prior to compute new index - without filtering existing indexed data prior to compute new index """ def setUp(self): super().setUp() - self.indexer = MimetypeRangeIndexerTest() + self.indexer = MimetypeRangeIndexer(config=RANGE_CONFIG) fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) self.id0 = '01c9379dfc33803963d07c1ccc748d3fe4c96bb5' self.id1 = '02fb2c89e14f7fab46701478c83779c7beb7b069' self.id2 = '103bc087db1d26afc3a0283f38663d081e9b01e6' tool_id = self.indexer.tool['id'] self.expected_results = { self.id0: { 'encoding': 'us-ascii', 'id': self.id0, 'indexer_configuration_id': tool_id, 'mimetype': 'text/plain'}, self.id1: { 'encoding': 'us-ascii', 'id': self.id1, 'indexer_configuration_id': tool_id, 'mimetype': 'text/x-python'}, self.id2: { 'encoding': 'us-ascii', 'id': self.id2, 'indexer_configuration_id': tool_id, 'mimetype': 'text/plain'} } -class MimetypeIndexerUnknownToolTestStorage( - CommonIndexerNoTool, MimetypeTestIndexer): - """Mimetype indexer with wrong configuration""" - - -class MimetypeRangeIndexerUnknownToolTestStorage( - CommonIndexerNoTool, MimetypeRangeIndexerTest): - """Mimetype range indexer with wrong configuration""" +def test_mimetype_w_no_tool(): + with pytest.raises(ValueError): + MimetypeIndexer(config=filter_dict(CONFIG, 'tools')) -class TestMimetypeIndexersErrors( - CommonIndexerWithErrorsTest, unittest.TestCase): - """Test the indexer raise the right errors when wrongly initialized""" - Indexer = MimetypeIndexerUnknownToolTestStorage - RangeIndexer = MimetypeRangeIndexerUnknownToolTestStorage +def test_mimetype_range_w_no_tool(): + with pytest.raises(ValueError): + MimetypeRangeIndexer(config=filter_dict(CONFIG, 'tools')) diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py index 63fce88..c374307 100644 --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -1,660 +1,640 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import hashlib import random from swh.model import hashutil from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.indexer.storage import INDEXER_CFG_KEY BASE_TEST_CONFIG = { 'storage': { 'cls': 'memory', 'args': { }, }, 'objstorage': { 'cls': 'memory', 'args': { }, }, INDEXER_CFG_KEY: { 'cls': 'memory', 'args': { }, }, } ORIGINS = [ { 'id': 52189575, 'lister': None, 'project': None, 'type': 'git', 'url': 'https://github.com/SoftwareHeritage/swh-storage'}, { 'id': 4423668, 'lister': None, 'project': None, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/3dldf'}, { 'id': 77775770, 'lister': None, 'project': None, 'type': 'deposit', 'url': 'https://forge.softwareheritage.org/source/jesuisgpl/'}, { 'id': 85072327, 'lister': None, 'project': None, 'type': 'pypi', 'url': 'https://pypi.org/project/limnoria/'}, { 'id': 49908349, 'lister': None, 'project': None, 'type': 'svn', 'url': 'http://0-512-md.googlecode.com/svn/'}, { 'id': 54974445, 'lister': None, 'project': None, 'type': 'git', 'url': 'https://github.com/librariesio/yarn-parser'}, ] SNAPSHOTS = { 52189575: { 'branches': { b'refs/heads/add-revision-origin-cache': { 'target': b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0' b's\xe7/\xe9l\x1e', 'target_type': 'revision'}, b'HEAD': { 'target': b'8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}' b'\xac\xefrm', 'target_type': 'revision'}, b'refs/tags/v0.0.103': { 'target': b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+' b'\x0f\xdd', 'target_type': 'release'}, }}, 4423668: { 'branches': { b'3DLDF-1.1.4.tar.gz': { 'target': b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc' b'"G\x99\x11', 'target_type': 'revision'}, b'3DLDF-2.0.2.tar.gz': { 'target': b'\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e=' b'\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V', 'target_type': 'revision'}, b'3DLDF-2.0.3-examples.tar.gz': { 'target': b'!H\x19\xc0\xee\x82-\x12F1\xbd\x97' b'\xfe\xadZ\x80\x80\xc1\x83\xff', 'target_type': 'revision'}, b'3DLDF-2.0.3.tar.gz': { 'target': b'\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee' b'\xcc\x1a\xb4`\x8c\x8by', 'target_type': 'revision'}, b'3DLDF-2.0.tar.gz': { 'target': b'F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G' b'\xd3\xd1m', b'target_type': 'revision'} }}, 77775770: { 'branches': { b'master': { 'target': b'\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{' b'\xa6\xe9\x99\xb1\x9e]q\xeb', 'target_type': 'revision'} }, 'id': b"h\xc0\xd2a\x04\xd4~'\x8d\xd6\xbe\x07\xeda\xfa\xfbV" b"\x1d\r "}, 85072327: { 'branches': { b'HEAD': { 'target': b'releases/2018.09.09', 'target_type': 'alias'}, b'releases/2018.09.01': { 'target': b'<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d' b'\xbb\xdfF\xfdw\xcf', 'target_type': 'revision'}, b'releases/2018.09.09': { 'target': b'\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k' b'A\x10\x9d\xc5\xfa2\xf8t', 'target_type': 'revision'}}, 'id': b'{\xda\x8e\x84\x7fX\xff\x92\x80^\x93V\x18\xa3\xfay' b'\x12\x9e\xd6\xb3'}, 49908349: { 'branches': { b'master': { 'target': b'\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8' b'\xc9\xad#.\x1bw=\x18', 'target_type': 'revision'}}, 'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7' b'\x05\xea\xb8\x1f\xc4H\xf4s'}, 54974445: { 'branches': { b'HEAD': { 'target': hash_to_bytes( '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'), 'target_type': 'revision'}}} } REVISIONS = [{ 'id': hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f'), 'author': { 'id': 26, 'name': b'Andrew Nesbitt', 'fullname': b'Andrew Nesbitt ', 'email': b'andrewnez@gmail.com' }, 'committer': { 'id': 26, 'name': b'Andrew Nesbitt', 'fullname': b'Andrew Nesbitt ', 'email': b'andrewnez@gmail.com' }, 'synthetic': False, 'date': { 'negative_utc': False, 'timestamp': { 'seconds': 1487596456, 'microseconds': 0 }, 'offset': 0 }, 'directory': b'10' }] DIRECTORY_ID = b'10' DIRECTORY = [{ 'sha1_git': b'abc', 'name': b'index.js', 'target': b'abc', 'length': 897, 'status': 'visible', 'type': 'file', 'perms': 33188, 'sha1': b'bcd' }, { 'sha1_git': b'aab', 'name': b'package.json', 'target': b'aab', 'length': 712, 'status': 'visible', 'type': 'file', 'perms': 33188, 'sha1': b'cde' }, { 'target': b'11', 'type': 'dir', 'length': None, 'name': b'.github', 'sha1': None, 'perms': 16384, 'sha1_git': None, 'status': None, 'sha256': None } ] SHA1_TO_LICENSES = { '01c9379dfc33803963d07c1ccc748d3fe4c96bb5': ['GPL'], '02fb2c89e14f7fab46701478c83779c7beb7b069': ['Apache2.0'], '103bc087db1d26afc3a0283f38663d081e9b01e6': ['MIT'], '688a5ef812c53907562fe379d4b3851e69c7cb15': ['AGPL'], 'da39a3ee5e6b4b0d3255bfef95601890afd80709': [], } SHA1_TO_CTAGS = { '01c9379dfc33803963d07c1ccc748d3fe4c96bb5': [{ 'name': 'foo', 'kind': 'str', 'line': 10, 'lang': 'bar', }], 'd4c647f0fc257591cc9ba1722484229780d1c607': [{ 'name': 'let', 'kind': 'int', 'line': 100, 'lang': 'haskell', }], '688a5ef812c53907562fe379d4b3851e69c7cb15': [{ 'name': 'symbol', 'kind': 'float', 'line': 99, 'lang': 'python', }], } OBJ_STORAGE_DATA = { '01c9379dfc33803963d07c1ccc748d3fe4c96bb5': b'this is some text', '688a5ef812c53907562fe379d4b3851e69c7cb15': b'another text', '8986af901dd2043044ce8f0d8fc039153641cf17': b'yet another text', '02fb2c89e14f7fab46701478c83779c7beb7b069': b""" import unittest import logging from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.tests.test_utils import MockObjStorage class MockStorage(): def content_mimetype_add(self, mimetypes): self.state = mimetypes self.conflict_update = conflict_update def indexer_configuration_add(self, tools): return [{ 'id': 10, }] """, '103bc087db1d26afc3a0283f38663d081e9b01e6': b""" #ifndef __AVL__ #define __AVL__ typedef struct _avl_tree avl_tree; typedef struct _data_t { int content; } data_t; """, '93666f74f1cf635c8c8ac118879da6ec5623c410': b""" (should 'pygments (recognize 'lisp 'easily)) """, '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5': b""" { "name": "test_metadata", "version": "0.0.1", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """, 'd4c647f0fc257591cc9ba1722484229780d1c607': b""" { "version": "5.0.3", "name": "npm", "description": "a package manager for JavaScript", "keywords": [ "install", "modules", "package manager", "package.json" ], "preferGlobal": true, "config": { "publishtest": false }, "homepage": "https://docs.npmjs.com/", "author": "Isaac Z. Schlueter (http://blog.izs.me)", "repository": { "type": "git", "url": "https://github.com/npm/npm" }, "bugs": { "url": "https://github.com/npm/npm/issues" }, "dependencies": { "JSONStream": "~1.3.1", "abbrev": "~1.1.0", "ansi-regex": "~2.1.1", "ansicolors": "~0.3.2", "ansistyles": "~0.1.3" }, "devDependencies": { "tacks": "~1.2.6", "tap": "~10.3.2" }, "license": "Artistic-2.0" } """, 'a7ab314d8a11d2c93e3dcf528ca294e7b431c449': b""" """, 'da39a3ee5e6b4b0d3255bfef95601890afd80709': b'', '636465': b""" { "name": "yarn-parser", "version": "1.0.0", "description": "Tiny web service for parsing yarn.lock files", "main": "index.js", "scripts": { "start": "node index.js", "test": "mocha" }, "engines": { "node": "9.8.0" }, "repository": { "type": "git", "url": "git+https://github.com/librariesio/yarn-parser.git" }, "keywords": [ "yarn", "parse", "lock", "dependencies" ], "author": "Andrew Nesbitt", "license": "AGPL-3.0", "bugs": { "url": "https://github.com/librariesio/yarn-parser/issues" }, "homepage": "https://github.com/librariesio/yarn-parser#readme", "dependencies": { "@yarnpkg/lockfile": "^1.0.0", "body-parser": "^1.15.2", "express": "^4.14.0" }, "devDependencies": { "chai": "^4.1.2", "mocha": "^5.2.0", "request": "^2.87.0", "test": "^0.6.0" } } """ } CONTENT_METADATA = [{ 'tool': { 'configuration': { 'type': 'local', 'context': 'NpmMapping' }, 'version': '0.0.1', 'id': 6, 'name': 'swh-metadata-translator' }, 'id': b'cde', 'translated_metadata': { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'type': 'SoftwareSourceCode', 'codemeta:issueTracker': 'https://github.com/librariesio/yarn-parser/issues', 'version': '1.0.0', 'name': 'yarn-parser', 'schema:author': 'Andrew Nesbitt', 'url': 'https://github.com/librariesio/yarn-parser#readme', 'processorRequirements': {'node': '7.5'}, 'license': 'AGPL-3.0', 'keywords': ['yarn', 'parse', 'lock', 'dependencies'], 'schema:codeRepository': 'git+https://github.com/librariesio/yarn-parser.git', 'description': 'Tiny web service for parsing yarn.lock files', } }] +def filter_dict(d, keys): + 'return a copy of the dict with keys deleted' + if not isinstance(keys, (list, tuple)): + keys = (keys, ) + return dict((k, v) for (k, v) in d.items() if k not in keys) + + def fill_obj_storage(obj_storage): """Add some content in an object storage.""" for (obj_id, content) in OBJ_STORAGE_DATA.items(): obj_storage.add(content, obj_id=hash_to_bytes(obj_id)) def fill_storage(storage): for origin in ORIGINS: origin = origin.copy() del origin['id'] storage.origin_add_one(origin) for (orig_pseudo_id, snap) in SNAPSHOTS.items(): for orig in ORIGINS: if orig_pseudo_id == orig['id']: origin_id = storage.origin_get( {'type': orig['type'], 'url': orig['url']})['id'] break else: assert False visit = storage.origin_visit_add(origin_id, datetime.datetime.now()) snap_id = snap.get('id') or \ bytes([random.randint(0, 255) for _ in range(32)]) storage.snapshot_add(origin_id, visit['visit'], { 'id': snap_id, 'branches': snap['branches'] }) storage.revision_add(REVISIONS) storage.directory_add([{ 'id': DIRECTORY_ID, 'entries': DIRECTORY, }]) for (obj_id, content) in OBJ_STORAGE_DATA.items(): # TODO: use MultiHash if hasattr(hashlib, 'blake2s'): blake2s256 = hashlib.blake2s(content, digest_size=32).digest() else: # fallback for Python <3.6 blake2s256 = bytes([random.randint(0, 255) for _ in range(32)]) storage.content_add([{ 'data': content, 'length': len(content), 'status': 'visible', 'sha1': hash_to_bytes(obj_id), 'sha1_git': hash_to_bytes(obj_id), 'sha256': hashlib.sha256(content).digest(), 'blake2s256': blake2s256 }]) -class CommonIndexerNoTool: - """Mixin to wronly initialize content indexer""" - def prepare(self): - super().prepare() - self.tools = None - - -class CommonIndexerWithErrorsTest: - """Test indexer configuration checks. - - """ - Indexer = None - RangeIndexer = None - - def test_wrong_unknown_configuration_tool(self): - """Indexer with unknown configuration tool fails check""" - with self.assertRaisesRegex(ValueError, 'Tools None is unknown'): - print('indexer: %s' % self.Indexer) - self.Indexer() - - def test_wrong_unknown_configuration_tool_range(self): - """Range Indexer with unknown configuration tool fails check""" - if self.RangeIndexer is not None: - with self.assertRaisesRegex(ValueError, 'Tools None is unknown'): - self.RangeIndexer() - - class CommonContentIndexerTest(metaclass=abc.ABCMeta): legacy_get_format = False """True iff the tested indexer uses the legacy format. see: https://forge.softwareheritage.org/T1433""" def get_indexer_results(self, ids): """Override this for indexers that don't have a mock storage.""" return self.indexer.idx_storage.state def assert_legacy_results_ok(self, sha1s, expected_results=None): # XXX old format, remove this when all endpoints are # updated to the new one # see: https://forge.softwareheritage.org/T1433 sha1s = [sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1) for sha1 in sha1s] actual_results = list(self.get_indexer_results(sha1s)) if expected_results is None: expected_results = self.expected_results self.assertEqual(len(expected_results), len(actual_results), (expected_results, actual_results)) for indexed_data in actual_results: _id = indexed_data['id'] expected_data = expected_results[hashutil.hash_to_hex(_id)].copy() expected_data['id'] = _id self.assertEqual(indexed_data, expected_data) def assert_results_ok(self, sha1s, expected_results=None): if self.legacy_get_format: self.assert_legacy_results_ok(sha1s, expected_results) return sha1s = [sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1) for sha1 in sha1s] actual_results = list(self.get_indexer_results(sha1s)) if expected_results is None: expected_results = self.expected_results self.assertEqual(len(expected_results), len(actual_results), (expected_results, actual_results)) for indexed_data in actual_results: (_id, indexed_data) = list(indexed_data.items())[0] expected_data = expected_results[hashutil.hash_to_hex(_id)].copy() expected_data = [expected_data] self.assertEqual(indexed_data, expected_data) def test_index(self): """Known sha1 have their data indexed """ sha1s = [self.id0, self.id1, self.id2] # when self.indexer.run(sha1s, policy_update='update-dups') self.assert_results_ok(sha1s) # 2nd pass self.indexer.run(sha1s, policy_update='ignore-dups') self.assert_results_ok(sha1s) def test_index_one_unknown_sha1(self): """Unknown sha1 are not indexed""" sha1s = [self.id1, '799a5ef812c53907562fe379d4b3851e69c7cb15', # unknown '800a5ef812c53907562fe379d4b3851e69c7cb15'] # unknown # when self.indexer.run(sha1s, policy_update='update-dups') # then expected_results = { k: v for k, v in self.expected_results.items() if k in sha1s } self.assert_results_ok(sha1s, expected_results) class CommonContentIndexerRangeTest: """Allows to factorize tests on range indexer. """ def setUp(self): self.contents = sorted(OBJ_STORAGE_DATA) def assert_results_ok(self, start, end, actual_results, expected_results=None): if expected_results is None: expected_results = self.expected_results actual_results = list(actual_results) for indexed_data in actual_results: _id = indexed_data['id'] assert isinstance(_id, bytes) indexed_data = indexed_data.copy() indexed_data['id'] = hash_to_hex(indexed_data['id']) self.assertEqual(indexed_data, expected_results[hash_to_hex(_id)]) self.assertTrue(start <= _id <= end) _tool_id = indexed_data['indexer_configuration_id'] self.assertEqual(_tool_id, self.indexer.tool['id']) def test__index_contents(self): """Indexing contents without existing data results in indexed data """ _start, _end = [self.contents[0], self.contents[2]] # output hex ids start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given actual_results = list(self.indexer._index_contents( start, end, indexed={})) self.assert_results_ok(start, end, actual_results) def test__index_contents_with_indexed_data(self): """Indexing contents with existing data results in less indexed data """ _start, _end = [self.contents[0], self.contents[2]] # output hex ids start, end = map(hashutil.hash_to_bytes, (_start, _end)) data_indexed = [self.id0, self.id2] # given actual_results = self.indexer._index_contents( start, end, indexed=set(map(hash_to_bytes, data_indexed))) # craft the expected results expected_results = self.expected_results.copy() for already_indexed_key in data_indexed: expected_results.pop(already_indexed_key) self.assert_results_ok( start, end, actual_results, expected_results) def test_generate_content_get(self): """Optimal indexing should result in indexed data """ _start, _end = [self.contents[0], self.contents[2]] # output hex ids start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given actual_results = self.indexer.run(start, end) # then self.assertTrue(actual_results) def test_generate_content_get_input_as_bytes(self): """Optimal indexing should result in indexed data Input are in bytes here. """ _start, _end = [self.contents[0], self.contents[2]] # output hex ids start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given actual_results = self.indexer.run( # checks the bytes input this time start, end, skip_existing=False) # no already indexed data so same result as prior test # then self.assertTrue(actual_results) def test_generate_content_get_no_result(self): """No result indexed returns False""" _start, _end = ['0000000000000000000000000000000000000000', '0000000000000000000000000000000000000001'] start, end = map(hashutil.hash_to_bytes, (_start, _end)) # given actual_results = self.indexer.run( start, end, incremental=False) # then self.assertFalse(actual_results)