diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py index 0ea142a..7a7e0d3 100644 --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -1,57 +1,56 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information INDEXER_CLASSES = { 'mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer', 'language': 'swh.indexer.language.ContentLanguageIndexer', 'ctags': 'swh.indexer.ctags.CtagsIndexer', 'fossology_license': 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer', } TASK_NAMES = { 'orchestrator_all': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', 'orchestrator_text': 'swh.indexer.tasks.SWHOrchestratorTextContentsTask', 'mimetype': 'swh.indexer.tasks.SWHContentMimetypeTask', 'language': 'swh.indexer.tasks.SWHContentLanguageTask', 'ctags': 'swh.indexer.tasks.SWHCtagsTask', 'fossology_license': 'swh.indexer.tasks.SWHContentFossologyLicenseTask', 'rehash': 'swh.indexer.tasks.SWHRecomputeChecksumsTask', } __all__ = [ 'INDEXER_CLASSES', 'TASK_NAMES', ] def get_storage(cls, args): - """ - Get a storage object of class `storage_class` with arguments - `storage_args`. + """Get an indexer storage object of class `storage_class` with + arguments `storage_args`. Args: storage (dict): dictionary with keys: - cls (str): storage's class, either 'local' or 'remote' - args (dict): dictionary with keys Returns: an instance of swh.indexer's storage (either local or remote) Raises: ValueError if passed an unknown storage class. """ if cls == 'remote': - from .api.client import RemoteStorage as IndexerStorage + from .storage.api.client import RemoteStorage as IndexerStorage elif cls == 'local': from .storage import IndexerStorage else: raise ValueError('Unknown storage class `%s`' % cls) return IndexerStorage(**args) diff --git a/swh/indexer/storage.py b/swh/indexer/storage/__init__.py similarity index 100% rename from swh/indexer/storage.py rename to swh/indexer/storage/__init__.py diff --git a/swh/indexer/api/__init__.py b/swh/indexer/storage/api/__init__.py similarity index 100% copy from swh/indexer/api/__init__.py copy to swh/indexer/storage/api/__init__.py diff --git a/swh/indexer/api/client.py b/swh/indexer/storage/api/client.py similarity index 100% rename from swh/indexer/api/client.py rename to swh/indexer/storage/api/client.py diff --git a/swh/indexer/api/server.py b/swh/indexer/storage/api/server.py similarity index 100% rename from swh/indexer/api/server.py rename to swh/indexer/storage/api/server.py diff --git a/swh/indexer/converters.py b/swh/indexer/storage/converters.py similarity index 100% rename from swh/indexer/converters.py rename to swh/indexer/storage/converters.py diff --git a/swh/indexer/db.py b/swh/indexer/storage/db.py similarity index 100% rename from swh/indexer/db.py rename to swh/indexer/storage/db.py diff --git a/swh/indexer/tests/common.py b/swh/indexer/tests/common.py deleted file mode 100644 index e397b48..0000000 --- a/swh/indexer/tests/common.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import pathlib - -from swh.indexer import get_storage - - -class StorageTestFixture: - """Mix this in a test subject class to get Storage testing support. - - This fixture requires to come before DbTestFixture in the inheritance list - as it uses its methods to setup its own internal database. - - Usage example: - - class TestStorage(StorageTestFixture, DbTestFixture): - ... - """ - TEST_STORAGE_DB_NAME = 'softwareheritage-test-indexer' - - @classmethod - def setUpClass(cls): - if not hasattr(cls, 'DB_TEST_FIXTURE_IMPORTED'): - raise RuntimeError("StorageTestFixture needs to be followed by " - "DbTestFixture in the inheritance list.") - - test_dir = pathlib.Path(__file__).absolute().parent - test_data_dir = test_dir / '../../../../swh-storage-testdata' - test_db_dump = (test_data_dir / 'dumps/swh.dump').absolute() - cls.add_db(cls.TEST_STORAGE_DB_NAME, str(test_db_dump), 'pg_dump') - super().setUpClass() - - def setUp(self): - super().setUp() - - self.storage_config = { - 'cls': 'local', - 'args': { - 'db': self.test_db[self.TEST_STORAGE_DB_NAME].conn, - }, - } - self.storage = get_storage(**self.storage_config) - - def tearDown(self): - self.objtmp.cleanup() - super().tearDown() - - def reset_storage_tables(self): - excluded = {'indexer_configuration'} - self.reset_db_tables(self.TEST_STORAGE_DB_NAME, excluded=excluded) - - db = self.test_db[self.TEST_STORAGE_DB_NAME] - db.conn.commit() diff --git a/swh/indexer/api/__init__.py b/swh/indexer/tests/storage/__init__.py similarity index 100% rename from swh/indexer/api/__init__.py rename to swh/indexer/tests/storage/__init__.py diff --git a/swh/indexer/tests/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py similarity index 83% rename from swh/indexer/tests/test_api_client.py rename to swh/indexer/tests/storage/test_api_client.py index 5c77994..9e47975 100644 --- a/swh/indexer/tests/test_api_client.py +++ b/swh/indexer/tests/storage/test_api_client.py @@ -1,35 +1,36 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from .test_storage import CommonTestStorage from swh.storage.tests.server_testing import ServerTestFixture -from swh.indexer.api.client import RemoteStorage -from swh.indexer.api.server import app +from swh.indexer.storage.api.client import RemoteStorage +from swh.indexer.storage.api.server import app class TestRemoteStorage(CommonTestStorage, ServerTestFixture, unittest.TestCase): """Test the indexer's remote storage API. This class doesn't define any tests as we want identical functionality between local and remote storage. All the tests are - therefore defined in CommonTestStorage. + therefore defined in + `class`:swh.indexer.storage.test_storage.CommonTestStorage. """ def setUp(self): self.config = { 'storage': { 'cls': 'local', 'args': { 'db': 'dbname=%s' % self.TEST_STORAGE_DB_NAME, } } } self.app = app super().setUp() self.storage = RemoteStorage(self.url()) diff --git a/swh/indexer/tests/test_converters.py b/swh/indexer/tests/storage/test_converters.py similarity index 99% rename from swh/indexer/tests/test_converters.py rename to swh/indexer/tests/storage/test_converters.py index 61410eb..89946d4 100644 --- a/swh/indexer/tests/test_converters.py +++ b/swh/indexer/tests/storage/test_converters.py @@ -1,199 +1,199 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from nose.plugins.attrib import attr -from swh.indexer import converters +from swh.indexer.storage import converters @attr('!db') class TestConverters(unittest.TestCase): def setUp(self): self.maxDiff = None @istest def ctags_to_db(self): input_ctag = { 'id': b'some-id', 'indexer_configuration_id': 100, 'ctags': [ { 'name': 'some-name', 'kind': 'some-kind', 'line': 10, 'lang': 'Yaml', }, { 'name': 'main', 'kind': 'function', 'line': 12, 'lang': 'Yaml', }, ] } expected_ctags = [ { 'id': b'some-id', 'name': 'some-name', 'kind': 'some-kind', 'line': 10, 'lang': 'Yaml', 'indexer_configuration_id': 100, }, { 'id': b'some-id', 'name': 'main', 'kind': 'function', 'line': 12, 'lang': 'Yaml', 'indexer_configuration_id': 100, }] # when actual_ctags = list(converters.ctags_to_db(input_ctag)) # then self.assertEquals(actual_ctags, expected_ctags) @istest def db_to_ctags(self): input_ctags = { 'id': b'some-id', 'name': 'some-name', 'kind': 'some-kind', 'line': 10, 'lang': 'Yaml', 'tool_id': 200, 'tool_name': 'some-toolname', 'tool_version': 'some-toolversion', 'tool_configuration': {} } expected_ctags = { 'id': b'some-id', 'name': 'some-name', 'kind': 'some-kind', 'line': 10, 'lang': 'Yaml', 'tool': { 'id': 200, 'name': 'some-toolname', 'version': 'some-toolversion', 'configuration': {}, } } # when actual_ctags = converters.db_to_ctags(input_ctags) # then self.assertEquals(actual_ctags, expected_ctags) @istest def db_to_mimetype(self): input_mimetype = { 'id': b'some-id', 'tool_id': 10, 'tool_name': 'some-toolname', 'tool_version': 'some-toolversion', 'tool_configuration': {}, 'encoding': b'ascii', 'mimetype': b'text/plain', } expected_mimetype = { 'id': b'some-id', 'encoding': b'ascii', 'mimetype': b'text/plain', 'tool': { 'id': 10, 'name': 'some-toolname', 'version': 'some-toolversion', 'configuration': {}, } } actual_mimetype = converters.db_to_mimetype(input_mimetype) self.assertEquals(actual_mimetype, expected_mimetype) @istest def db_to_language(self): input_language = { 'id': b'some-id', 'tool_id': 20, 'tool_name': 'some-toolname', 'tool_version': 'some-toolversion', 'tool_configuration': {}, 'lang': b'css', } expected_language = { 'id': b'some-id', 'lang': b'css', 'tool': { 'id': 20, 'name': 'some-toolname', 'version': 'some-toolversion', 'configuration': {}, } } actual_language = converters.db_to_language(input_language) self.assertEquals(actual_language, expected_language) @istest def db_to_fossology_license(self): input_license = { 'id': b'some-id', 'tool_id': 20, 'tool_name': 'nomossa', 'tool_version': '5.22', 'tool_configuration': {}, 'licenses': ['GPL2.0'], } expected_license = { 'id': b'some-id', 'licenses': ['GPL2.0'], 'tool': { 'id': 20, 'name': 'nomossa', 'version': '5.22', 'configuration': {}, } } actual_license = converters.db_to_fossology_license(input_license) self.assertEquals(actual_license, expected_license) @istest def db_to_metadata(self): input_metadata = { 'id': b'some-id', 'tool_id': 20, 'tool_name': 'some-toolname', 'tool_version': 'some-toolversion', 'tool_configuration': {}, 'translated_metadata': b'translated_metadata', } expected_metadata = { 'id': b'some-id', 'translated_metadata': b'translated_metadata', 'tool': { 'id': 20, 'name': 'some-toolname', 'version': 'some-toolversion', 'configuration': {}, } } actual_metadata = converters.db_to_metadata(input_metadata) self.assertEquals(actual_metadata, expected_metadata) diff --git a/swh/indexer/tests/test_storage.py b/swh/indexer/tests/storage/test_storage.py similarity index 95% rename from swh/indexer/tests/test_storage.py rename to swh/indexer/tests/storage/test_storage.py index 7b95de5..b75b3b5 100644 --- a/swh/indexer/tests/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -1,1446 +1,1504 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pathlib import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.model.hashutil import hash_to_bytes +from swh.indexer import get_storage from swh.core.tests.db_testing import DbTestFixture -from .test_utils import StorageTestFixture + + +PATH_TO_STORAGE_TEST_DATA = '../../../../../swh-storage-testdata' + + +class StorageTestFixture: + """Mix this in a test subject class to get Storage testing support. + + This fixture requires to come before DbTestFixture in the inheritance list + as it uses its methods to setup its own internal database. + + Usage example: + + class TestStorage(StorageTestFixture, DbTestFixture): + ... + """ + TEST_STORAGE_DB_NAME = 'softwareheritage-test-indexer' + + @classmethod + def setUpClass(cls): + if not hasattr(cls, 'DB_TEST_FIXTURE_IMPORTED'): + raise RuntimeError("StorageTestFixture needs to be followed by " + "DbTestFixture in the inheritance list.") + + test_dir = pathlib.Path(__file__).absolute().parent + test_data_dir = test_dir / PATH_TO_STORAGE_TEST_DATA + test_db_dump = (test_data_dir / 'dumps/swh-indexer.dump').absolute() + cls.add_db(cls.TEST_STORAGE_DB_NAME, str(test_db_dump), 'pg_dump') + super().setUpClass() + + def setUp(self): + super().setUp() + + self.storage_config = { + 'cls': 'local', + 'args': { + 'db': self.test_db[self.TEST_STORAGE_DB_NAME].conn, + }, + } + self.storage = get_storage(**self.storage_config) + + def tearDown(self): + super().tearDown() + + def reset_storage_tables(self): + excluded = {'indexer_configuration'} + self.reset_db_tables(self.TEST_STORAGE_DB_NAME, excluded=excluded) + + db = self.test_db[self.TEST_STORAGE_DB_NAME] + db.conn.commit() class BaseTestStorage(StorageTestFixture, DbTestFixture): def setUp(self): super().setUp() db = self.test_db[self.TEST_STORAGE_DB_NAME] self.conn = db.conn self.cursor = db.cursor self.sha1_1 = hash_to_bytes('34973274ccef6ab4dfaaf86599792fa9c3fe4689') self.sha1_2 = hash_to_bytes('61c2b3a30496d329e21af70dd2d7e097046d07b7') self.revision_id_1 = hash_to_bytes( '7026b7c1a2af56521e951c01ed20f255fa054238') self.revision_id_2 = hash_to_bytes( '7026b7c1a2af56521e9587659012345678904321') def tearDown(self): self.reset_storage_tables() super().tearDown() def fetch_tools(self): tools = {} self.cursor.execute(''' select tool_name, id, tool_version, tool_configuration from indexer_configuration order by id''') for row in self.cursor.fetchall(): key = row[0] while key in tools: key = '_' + key tools[key] = { 'id': row[1], 'name': row[0], 'version': row[2], 'configuration': row[3] } return tools @attr('db') class CommonTestStorage(BaseTestStorage): """Base class for Indexer Storage testing. """ @istest def check_config(self): self.assertTrue(self.storage.check_config(check_write=True)) self.assertTrue(self.storage.check_config(check_write=False)) @istest def content_mimetype_missing(self): # given tools = self.fetch_tools() tool_id = tools['file']['id'] mimetypes = [ { 'id': self.sha1_1, 'indexer_configuration_id': tool_id, }, { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, }] # when actual_missing = self.storage.content_mimetype_missing(mimetypes) # then self.assertEqual(list(actual_missing), [ self.sha1_1, self.sha1_2, ]) # given self.storage.content_mimetype_add([{ 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'indexer_configuration_id': tool_id, }]) # when actual_missing = self.storage.content_mimetype_missing(mimetypes) # then self.assertEqual(list(actual_missing), [self.sha1_1]) @istest def content_mimetype_add__drop_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['file']['id'] mimetype_v1 = { 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'indexer_configuration_id': tool_id, } # given self.storage.content_mimetype_add([mimetype_v1]) # when actual_mimetypes = list(self.storage.content_mimetype_get( [self.sha1_2])) # then expected_mimetypes_v1 = [{ 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'tool': tools['file'], }] self.assertEqual(actual_mimetypes, expected_mimetypes_v1) # given mimetype_v2 = mimetype_v1.copy() mimetype_v2.update({ 'mimetype': b'text/html', 'encoding': b'us-ascii', }) self.storage.content_mimetype_add([mimetype_v2]) actual_mimetypes = list(self.storage.content_mimetype_get( [self.sha1_2])) # mimetype did not change as the v2 was dropped. self.assertEqual(actual_mimetypes, expected_mimetypes_v1) @istest def content_mimetype_add__update_in_place_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['file']['id'] mimetype_v1 = { 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'indexer_configuration_id': tool_id, } # given self.storage.content_mimetype_add([mimetype_v1]) # when actual_mimetypes = list(self.storage.content_mimetype_get( [self.sha1_2])) expected_mimetypes_v1 = [{ 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'tool': tools['file'], }] # then self.assertEqual(actual_mimetypes, expected_mimetypes_v1) # given mimetype_v2 = mimetype_v1.copy() mimetype_v2.update({ 'mimetype': b'text/html', 'encoding': b'us-ascii', }) self.storage.content_mimetype_add([mimetype_v2], conflict_update=True) actual_mimetypes = list(self.storage.content_mimetype_get( [self.sha1_2])) expected_mimetypes_v2 = [{ 'id': self.sha1_2, 'mimetype': b'text/html', 'encoding': b'us-ascii', 'tool': { 'id': 2, 'name': 'file', 'version': '5.22', 'configuration': {'command_line': 'file --mime '} } }] # mimetype did change as the v2 was used to overwrite v1 self.assertEqual(actual_mimetypes, expected_mimetypes_v2) @istest def content_mimetype_get(self): # given tools = self.fetch_tools() tool_id = tools['file']['id'] mimetypes = [self.sha1_2, self.sha1_1] mimetype1 = { 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'indexer_configuration_id': tool_id, } # when self.storage.content_mimetype_add([mimetype1]) # then actual_mimetypes = list(self.storage.content_mimetype_get(mimetypes)) # then expected_mimetypes = [{ 'id': self.sha1_2, 'mimetype': b'text/plain', 'encoding': b'utf-8', 'tool': tools['file'] }] self.assertEqual(actual_mimetypes, expected_mimetypes) @istest def content_language_missing(self): # given tools = self.fetch_tools() tool_id = tools['pygments']['id'] languages = [ { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, }, { 'id': self.sha1_1, 'indexer_configuration_id': tool_id, } ] # when actual_missing = list(self.storage.content_language_missing(languages)) # then self.assertEqual(list(actual_missing), [ self.sha1_2, self.sha1_1, ]) # given self.storage.content_language_add([{ 'id': self.sha1_2, 'lang': 'haskell', 'indexer_configuration_id': tool_id, }]) # when actual_missing = list(self.storage.content_language_missing(languages)) # then self.assertEqual(actual_missing, [self.sha1_1]) @istest def content_language_get(self): # given tools = self.fetch_tools() tool_id = tools['pygments']['id'] language1 = { 'id': self.sha1_2, 'lang': 'common-lisp', 'indexer_configuration_id': tool_id, } # when self.storage.content_language_add([language1]) # then actual_languages = list(self.storage.content_language_get( [self.sha1_2, self.sha1_1])) # then expected_languages = [{ 'id': self.sha1_2, 'lang': 'common-lisp', 'tool': tools['pygments'] }] self.assertEqual(actual_languages, expected_languages) @istest def content_language_add__drop_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['pygments']['id'] language_v1 = { 'id': self.sha1_2, 'lang': 'emacslisp', 'indexer_configuration_id': tool_id, } # given self.storage.content_language_add([language_v1]) # when actual_languages = list(self.storage.content_language_get( [self.sha1_2])) # then expected_languages_v1 = [{ 'id': self.sha1_2, 'lang': 'emacslisp', 'tool': tools['pygments'] }] self.assertEqual(actual_languages, expected_languages_v1) # given language_v2 = language_v1.copy() language_v2.update({ 'lang': 'common-lisp', }) self.storage.content_language_add([language_v2]) actual_languages = list(self.storage.content_language_get( [self.sha1_2])) # language did not change as the v2 was dropped. self.assertEqual(actual_languages, expected_languages_v1) @istest def content_language_add__update_in_place_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['pygments']['id'] language_v1 = { 'id': self.sha1_2, 'lang': 'common-lisp', 'indexer_configuration_id': tool_id, } # given self.storage.content_language_add([language_v1]) # when actual_languages = list(self.storage.content_language_get( [self.sha1_2])) # then expected_languages_v1 = [{ 'id': self.sha1_2, 'lang': 'common-lisp', 'tool': tools['pygments'] }] self.assertEqual(actual_languages, expected_languages_v1) # given language_v2 = language_v1.copy() language_v2.update({ 'lang': 'emacslisp', }) self.storage.content_language_add([language_v2], conflict_update=True) actual_languages = list(self.storage.content_language_get( [self.sha1_2])) # language did not change as the v2 was dropped. expected_languages_v2 = [{ 'id': self.sha1_2, 'lang': 'emacslisp', 'tool': tools['pygments'] }] # language did change as the v2 was used to overwrite v1 self.assertEqual(actual_languages, expected_languages_v2) @istest def content_ctags_missing(self): # given tools = self.fetch_tools() tool_id = tools['universal-ctags']['id'] ctags = [ { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, }, { 'id': self.sha1_1, 'indexer_configuration_id': tool_id, } ] # when actual_missing = self.storage.content_ctags_missing(ctags) # then self.assertEqual(list(actual_missing), [ self.sha1_2, self.sha1_1 ]) # given self.storage.content_ctags_add([ { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, 'ctags': [{ 'name': 'done', 'kind': 'variable', 'line': 119, 'lang': 'OCaml', }] }, ]) # when actual_missing = self.storage.content_ctags_missing(ctags) # then self.assertEqual(list(actual_missing), [self.sha1_1]) @istest def content_ctags_get(self): # given tools = self.fetch_tools() tool_id = tools['universal-ctags']['id'] ctags = [self.sha1_2, self.sha1_1] ctag1 = { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, 'ctags': [ { 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Python', }, { 'name': 'main', 'kind': 'function', 'line': 119, 'lang': 'Python', }] } # when self.storage.content_ctags_add([ctag1]) # then actual_ctags = list(self.storage.content_ctags_get(ctags)) # then expected_ctags = [ { 'id': self.sha1_2, 'tool': tools['universal-ctags'], 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Python', }, { 'id': self.sha1_2, 'tool': tools['universal-ctags'], 'name': 'main', 'kind': 'function', 'line': 119, 'lang': 'Python', } ] self.assertEqual(actual_ctags, expected_ctags) @istest def content_ctags_search(self): # 1. given tools = self.fetch_tools() tool = tools['universal-ctags'] tool_id = tool['id'] ctag1 = { 'id': self.sha1_1, 'indexer_configuration_id': tool_id, 'ctags': [ { 'name': 'hello', 'kind': 'function', 'line': 133, 'lang': 'Python', }, { 'name': 'counter', 'kind': 'variable', 'line': 119, 'lang': 'Python', }, ] } ctag2 = { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, 'ctags': [ { 'name': 'hello', 'kind': 'variable', 'line': 100, 'lang': 'C', }, ] } self.storage.content_ctags_add([ctag1, ctag2]) # 1. when actual_ctags = list(self.storage.content_ctags_search('hello', limit=1)) # 1. then self.assertEqual(actual_ctags, [ { 'id': ctag1['id'], 'tool': tool, 'name': 'hello', 'kind': 'function', 'line': 133, 'lang': 'Python', } ]) # 2. when actual_ctags = list(self.storage.content_ctags_search( 'hello', limit=1, last_sha1=ctag1['id'])) # 2. then self.assertEqual(actual_ctags, [ { 'id': ctag2['id'], 'tool': tool, 'name': 'hello', 'kind': 'variable', 'line': 100, 'lang': 'C', } ]) # 3. when actual_ctags = list(self.storage.content_ctags_search('hello')) # 3. then self.assertEqual(actual_ctags, [ { 'id': ctag1['id'], 'tool': tool, 'name': 'hello', 'kind': 'function', 'line': 133, 'lang': 'Python', }, { 'id': ctag2['id'], 'tool': tool, 'name': 'hello', 'kind': 'variable', 'line': 100, 'lang': 'C', }, ]) # 4. when actual_ctags = list(self.storage.content_ctags_search('counter')) # then self.assertEqual(actual_ctags, [{ 'id': ctag1['id'], 'tool': tool, 'name': 'counter', 'kind': 'variable', 'line': 119, 'lang': 'Python', }]) @istest def content_ctags_search_no_result(self): actual_ctags = list(self.storage.content_ctags_search('counter')) self.assertEquals(actual_ctags, []) @istest def content_ctags_add__add_new_ctags_added(self): # given tools = self.fetch_tools() tool = tools['universal-ctags'] tool_id = tool['id'] ctag_v1 = { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, 'ctags': [{ 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', }] } # given self.storage.content_ctags_add([ctag_v1]) self.storage.content_ctags_add([ctag_v1]) # conflict does nothing # when actual_ctags = list(self.storage.content_ctags_get( [self.sha1_2])) # then expected_ctags = [{ 'id': self.sha1_2, 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', 'tool': tool, }] self.assertEqual(actual_ctags, expected_ctags) # given ctag_v2 = ctag_v1.copy() ctag_v2.update({ 'ctags': [ { 'name': 'defn', 'kind': 'function', 'line': 120, 'lang': 'Scheme', } ] }) self.storage.content_ctags_add([ctag_v2]) expected_ctags = [ { 'id': self.sha1_2, 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', 'tool': tool, }, { 'id': self.sha1_2, 'name': 'defn', 'kind': 'function', 'line': 120, 'lang': 'Scheme', 'tool': tool, } ] actual_ctags = list(self.storage.content_ctags_get( [self.sha1_2])) self.assertEqual(actual_ctags, expected_ctags) @istest def content_ctags_add__update_in_place(self): # given tools = self.fetch_tools() tool = tools['universal-ctags'] tool_id = tool['id'] ctag_v1 = { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, 'ctags': [{ 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', }] } # given self.storage.content_ctags_add([ctag_v1]) # when actual_ctags = list(self.storage.content_ctags_get( [self.sha1_2])) # then expected_ctags = [ { 'id': self.sha1_2, 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', 'tool': tool } ] self.assertEqual(actual_ctags, expected_ctags) # given ctag_v2 = ctag_v1.copy() ctag_v2.update({ 'ctags': [ { 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', }, { 'name': 'defn', 'kind': 'function', 'line': 120, 'lang': 'Scheme', } ] }) self.storage.content_ctags_add([ctag_v2], conflict_update=True) actual_ctags = list(self.storage.content_ctags_get( [self.sha1_2])) # ctag did change as the v2 was used to overwrite v1 expected_ctags = [ { 'id': self.sha1_2, 'name': 'done', 'kind': 'variable', 'line': 100, 'lang': 'Scheme', 'tool': tool, }, { 'id': self.sha1_2, 'name': 'defn', 'kind': 'function', 'line': 120, 'lang': 'Scheme', 'tool': tool, } ] self.assertEqual(actual_ctags, expected_ctags) @istest def content_fossology_license_get(self): # given tools = self.fetch_tools() tool = tools['nomos'] tool_id = tool['id'] license1 = { 'id': self.sha1_1, 'licenses': ['GPL-2.0+'], 'indexer_configuration_id': tool_id, } # when self.storage.content_fossology_license_add([license1]) # then actual_licenses = list(self.storage.content_fossology_license_get( [self.sha1_2, self.sha1_1])) expected_license = { 'id': self.sha1_1, 'licenses': ['GPL-2.0+'], 'tool': tool, } # then self.assertEqual(actual_licenses, [expected_license]) @istest def content_fossology_license_add__new_license_added(self): # given tools = self.fetch_tools() tool = tools['nomos'] tool_id = tool['id'] license_v1 = { 'id': self.sha1_1, 'licenses': ['Apache-2.0'], 'indexer_configuration_id': tool_id, } # given self.storage.content_fossology_license_add([license_v1]) # conflict does nothing self.storage.content_fossology_license_add([license_v1]) # when actual_licenses = list(self.storage.content_fossology_license_get( [self.sha1_1])) # then expected_license = { 'id': self.sha1_1, 'licenses': ['Apache-2.0'], 'tool': tool, } self.assertEqual(actual_licenses, [expected_license]) # given license_v2 = license_v1.copy() license_v2.update({ 'licenses': ['BSD-2-Clause'], }) self.storage.content_fossology_license_add([license_v2]) actual_licenses = list(self.storage.content_fossology_license_get( [self.sha1_1])) expected_license.update({ 'licenses': ['Apache-2.0', 'BSD-2-Clause'], }) # license did not change as the v2 was dropped. self.assertEqual(actual_licenses, [expected_license]) @istest def content_fossology_license_add__update_in_place_duplicate(self): # given tools = self.fetch_tools() tool = tools['nomos'] tool_id = tool['id'] license_v1 = { 'id': self.sha1_1, 'licenses': ['CECILL'], 'indexer_configuration_id': tool_id, } # given self.storage.content_fossology_license_add([license_v1]) # conflict does nothing self.storage.content_fossology_license_add([license_v1]) # when actual_licenses = list(self.storage.content_fossology_license_get( [self.sha1_1])) # then expected_license = { 'id': self.sha1_1, 'licenses': ['CECILL'], 'tool': tool, } self.assertEqual(actual_licenses, [expected_license]) # given license_v2 = license_v1.copy() license_v2.update({ 'licenses': ['CECILL-2.0'] }) self.storage.content_fossology_license_add([license_v2], conflict_update=True) actual_licenses = list(self.storage.content_fossology_license_get( [self.sha1_1])) # license did change as the v2 was used to overwrite v1 expected_license.update({ 'licenses': ['CECILL-2.0'] }) self.assertEqual(actual_licenses, [expected_license]) @istest def content_metadata_missing(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-translator']['id'] metadatas = [ { 'id': self.sha1_2, 'indexer_configuration_id': tool_id, }, { 'id': self.sha1_1, 'indexer_configuration_id': tool_id, } ] # when actual_missing = list(self.storage.content_metadata_missing(metadatas)) # then self.assertEqual(list(actual_missing), [ self.sha1_2, self.sha1_1, ]) # given self.storage.content_metadata_add([{ 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'indexer_configuration_id': tool_id }]) # when actual_missing = list(self.storage.content_metadata_missing(metadatas)) # then self.assertEqual(actual_missing, [self.sha1_1]) @istest def content_metadata_get(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-translator']['id'] metadata1 = { 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'indexer_configuration_id': tool_id, } # when self.storage.content_metadata_add([metadata1]) # then actual_metadatas = list(self.storage.content_metadata_get( [self.sha1_2, self.sha1_1])) expected_metadatas = [{ 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'tool': tools['swh-metadata-translator'] }] self.assertEqual(actual_metadatas, expected_metadatas) @istest def content_metadata_add_drop_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-translator']['id'] metadata_v1 = { 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'name': 'test_metadata', 'version': '0.0.1' }, 'indexer_configuration_id': tool_id, } # given self.storage.content_metadata_add([metadata_v1]) # when actual_metadatas = list(self.storage.content_metadata_get( [self.sha1_2])) expected_metadatas_v1 = [{ 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'name': 'test_metadata', 'version': '0.0.1' }, 'tool': tools['swh-metadata-translator'] }] self.assertEqual(actual_metadatas, expected_metadatas_v1) # given metadata_v2 = metadata_v1.copy() metadata_v2.update({ 'translated_metadata': { 'other': {}, 'name': 'test_drop_duplicated_metadata', 'version': '0.0.1' }, }) self.storage.content_metadata_add([metadata_v2]) # then actual_metadatas = list(self.storage.content_metadata_get( [self.sha1_2])) # metadata did not change as the v2 was dropped. self.assertEqual(actual_metadatas, expected_metadatas_v1) @istest def content_metadata_add_update_in_place_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-translator']['id'] metadata_v1 = { 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'name': 'test_metadata', 'version': '0.0.1' }, 'indexer_configuration_id': tool_id, } # given self.storage.content_metadata_add([metadata_v1]) # when actual_metadatas = list(self.storage.content_metadata_get( [self.sha1_2])) # then expected_metadatas_v1 = [{ 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'name': 'test_metadata', 'version': '0.0.1' }, 'tool': tools['swh-metadata-translator'] }] self.assertEqual(actual_metadatas, expected_metadatas_v1) # given metadata_v2 = metadata_v1.copy() metadata_v2.update({ 'translated_metadata': { 'other': {}, 'name': 'test_update_duplicated_metadata', 'version': '0.0.1' }, }) self.storage.content_metadata_add([metadata_v2], conflict_update=True) actual_metadatas = list(self.storage.content_metadata_get( [self.sha1_2])) # language did not change as the v2 was dropped. expected_metadatas_v2 = [{ 'id': self.sha1_2, 'translated_metadata': { 'other': {}, 'name': 'test_update_duplicated_metadata', 'version': '0.0.1' }, 'tool': tools['swh-metadata-translator'] }] # metadata did change as the v2 was used to overwrite v1 self.assertEqual(actual_metadatas, expected_metadatas_v2) @istest def revision_metadata_missing(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-detector']['id'] metadatas = [ { 'id': self.revision_id_1, 'indexer_configuration_id': tool_id, }, { 'id': self.revision_id_2, 'indexer_configuration_id': tool_id, } ] # when actual_missing = list(self.storage.revision_metadata_missing( metadatas)) # then self.assertEqual(list(actual_missing), [ self.revision_id_1, self.revision_id_2, ]) # given self.storage.revision_metadata_add([{ 'id': self.revision_id_1, 'translated_metadata': { 'developmentStatus': None, 'version': None, 'operatingSystem': None, 'description': None, 'keywords': None, 'issueTracker': None, 'name': None, 'author': None, 'relatedLink': None, 'url': None, 'type': None, 'license': None, 'maintainer': None, 'email': None, 'softwareRequirements': None, 'identifier': None }, 'indexer_configuration_id': tool_id }]) # when actual_missing = list(self.storage.revision_metadata_missing( metadatas)) # then self.assertEqual(actual_missing, [self.revision_id_2]) @istest def revision_metadata_get(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-detector']['id'] metadata_rev = { 'id': self.revision_id_2, 'translated_metadata': { 'developmentStatus': None, 'version': None, 'operatingSystem': None, 'description': None, 'keywords': None, 'issueTracker': None, 'name': None, 'author': None, 'relatedLink': None, 'url': None, 'type': None, 'license': None, 'maintainer': None, 'email': None, 'softwareRequirements': None, 'identifier': None }, 'indexer_configuration_id': tool_id } # when self.storage.revision_metadata_add([metadata_rev]) # then actual_metadatas = list(self.storage.revision_metadata_get( [self.revision_id_2, self.revision_id_1])) expected_metadatas = [{ 'id': self.revision_id_2, 'translated_metadata': metadata_rev['translated_metadata'], 'tool': tools['swh-metadata-detector'] }] self.assertEqual(actual_metadatas, expected_metadatas) @istest def revision_metadata_add_drop_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-detector']['id'] metadata_v1 = { 'id': self.revision_id_1, 'translated_metadata': { 'developmentStatus': None, 'version': None, 'operatingSystem': None, 'description': None, 'keywords': None, 'issueTracker': None, 'name': None, 'author': None, 'relatedLink': None, 'url': None, 'type': None, 'license': None, 'maintainer': None, 'email': None, 'softwareRequirements': None, 'identifier': None }, 'indexer_configuration_id': tool_id, } # given self.storage.revision_metadata_add([metadata_v1]) # when actual_metadatas = list(self.storage.revision_metadata_get( [self.revision_id_1])) expected_metadatas_v1 = [{ 'id': self.revision_id_1, 'translated_metadata': metadata_v1['translated_metadata'], 'tool': tools['swh-metadata-detector'] }] self.assertEqual(actual_metadatas, expected_metadatas_v1) # given metadata_v2 = metadata_v1.copy() metadata_v2.update({ 'translated_metadata': { 'name': 'test_metadata', 'author': 'MG', }, }) self.storage.revision_metadata_add([metadata_v2]) # then actual_metadatas = list(self.storage.revision_metadata_get( [self.revision_id_1])) # metadata did not change as the v2 was dropped. self.assertEqual(actual_metadatas, expected_metadatas_v1) @istest def revision_metadata_add_update_in_place_duplicate(self): # given tools = self.fetch_tools() tool_id = tools['swh-metadata-detector']['id'] metadata_v1 = { 'id': self.revision_id_2, 'translated_metadata': { 'developmentStatus': None, 'version': None, 'operatingSystem': None, 'description': None, 'keywords': None, 'issueTracker': None, 'name': None, 'author': None, 'relatedLink': None, 'url': None, 'type': None, 'license': None, 'maintainer': None, 'email': None, 'softwareRequirements': None, 'identifier': None }, 'indexer_configuration_id': tool_id, } # given self.storage.revision_metadata_add([metadata_v1]) # when actual_metadatas = list(self.storage.revision_metadata_get( [self.revision_id_2])) # then expected_metadatas_v1 = [{ 'id': self.revision_id_2, 'translated_metadata': metadata_v1['translated_metadata'], 'tool': tools['swh-metadata-detector'] }] self.assertEqual(actual_metadatas, expected_metadatas_v1) # given metadata_v2 = metadata_v1.copy() metadata_v2.update({ 'translated_metadata': { 'name': 'test_update_duplicated_metadata', 'author': 'MG' }, }) self.storage.revision_metadata_add([metadata_v2], conflict_update=True) actual_metadatas = list(self.storage.revision_metadata_get( [self.revision_id_2])) # language did not change as the v2 was dropped. expected_metadatas_v2 = [{ 'id': self.revision_id_2, 'translated_metadata': metadata_v2['translated_metadata'], 'tool': tools['swh-metadata-detector'] }] # metadata did change as the v2 was used to overwrite v1 self.assertEqual(actual_metadatas, expected_metadatas_v2) @istest def indexer_configuration_add(self): tool = { 'tool_name': 'some-unknown-tool', 'tool_version': 'some-version', 'tool_configuration': {"debian-package": "some-package"}, } actual_tool = self.storage.indexer_configuration_get(tool) self.assertIsNone(actual_tool) # does not exist # add it actual_tools = list(self.storage.indexer_configuration_add([tool])) self.assertEquals(len(actual_tools), 1) actual_tool = actual_tools[0] self.assertIsNotNone(actual_tool) # now it exists new_id = actual_tool.pop('id') self.assertEquals(actual_tool, tool) actual_tools2 = list(self.storage.indexer_configuration_add([tool])) actual_tool2 = actual_tools2[0] self.assertIsNotNone(actual_tool2) # now it exists new_id2 = actual_tool2.pop('id') self.assertEqual(new_id, new_id2) self.assertEqual(actual_tool, actual_tool2) @istest def indexer_configuration_add_multiple(self): tool = { 'tool_name': 'some-unknown-tool', 'tool_version': 'some-version', 'tool_configuration': {"debian-package": "some-package"}, } actual_tools = list(self.storage.indexer_configuration_add([tool])) self.assertEqual(len(actual_tools), 1) new_tools = [tool, { 'tool_name': 'yet-another-tool', 'tool_version': 'version', 'tool_configuration': {}, }] actual_tools = list(self.storage.indexer_configuration_add(new_tools)) self.assertEqual(len(actual_tools), 2) # order not guaranteed, so we iterate over results to check for tool in actual_tools: _id = tool.pop('id') self.assertIsNotNone(_id) self.assertIn(tool, new_tools) @istest def indexer_configuration_get_missing(self): tool = { 'tool_name': 'unknown-tool', 'tool_version': '3.1.0rc2-31-ga2cbb8c', 'tool_configuration': {"command_line": "nomossa "}, } actual_tool = self.storage.indexer_configuration_get(tool) self.assertIsNone(actual_tool) @istest def indexer_configuration_get(self): tool = { 'tool_name': 'nomos', 'tool_version': '3.1.0rc2-31-ga2cbb8c', 'tool_configuration': {"command_line": "nomossa "}, } actual_tool = self.storage.indexer_configuration_get(tool) expected_tool = tool.copy() expected_tool['id'] = 1 self.assertEqual(expected_tool, actual_tool) @istest def indexer_configuration_metadata_get_missing_context(self): tool = { 'tool_name': 'swh-metadata-translator', 'tool_version': '0.0.1', 'tool_configuration': {"context": "unknown-context"}, } actual_tool = self.storage.indexer_configuration_get(tool) self.assertIsNone(actual_tool) @istest def indexer_configuration_metadata_get(self): tool = { 'tool_name': 'swh-metadata-translator', 'tool_version': '0.0.1', 'tool_configuration': {"type": "local", "context": "npm"}, } actual_tool = self.storage.indexer_configuration_get(tool) expected_tool = tool.copy() expected_tool['id'] = actual_tool['id'] self.assertEqual(expected_tool, actual_tool) class IndexerTestStorage(CommonTestStorage, unittest.TestCase): + """Running the tests locally. + + For the client api tests (remote storage), see + `class`:swh.indexer.storage.test_api_client:TestRemoteStorage + class. + + """ pass diff --git a/swh/indexer/tests/test_utils.py b/swh/indexer/tests/test_utils.py index f7599de..328a062 100644 --- a/swh/indexer/tests/test_utils.py +++ b/swh/indexer/tests/test_utils.py @@ -1,303 +1,254 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import pathlib from swh.objstorage.exc import ObjNotFoundError -from swh.indexer import get_storage - - -class StorageTestFixture: - """Mix this in a test subject class to get Storage testing support. - - This fixture requires to come before DbTestFixture in the inheritance list - as it uses its methods to setup its own internal database. - - Usage example: - - class TestStorage(StorageTestFixture, DbTestFixture): - ... - """ - TEST_STORAGE_DB_NAME = 'softwareheritage-test-indexer' - - @classmethod - def setUpClass(cls): - if not hasattr(cls, 'DB_TEST_FIXTURE_IMPORTED'): - raise RuntimeError("StorageTestFixture needs to be followed by " - "DbTestFixture in the inheritance list.") - - test_dir = pathlib.Path(__file__).absolute().parent - test_data_dir = test_dir / '../../../../swh-storage-testdata' - test_db_dump = (test_data_dir / 'dumps/swh-indexer.dump').absolute() - cls.add_db(cls.TEST_STORAGE_DB_NAME, str(test_db_dump), 'pg_dump') - super().setUpClass() - - def setUp(self): - super().setUp() - - self.storage_config = { - 'cls': 'local', - 'args': { - 'db': self.test_db[self.TEST_STORAGE_DB_NAME].conn, - }, - } - self.storage = get_storage(**self.storage_config) - - def tearDown(self): - super().tearDown() - - def reset_storage_tables(self): - excluded = {'indexer_configuration'} - self.reset_db_tables(self.TEST_STORAGE_DB_NAME, excluded=excluded) - - db = self.test_db[self.TEST_STORAGE_DB_NAME] - db.conn.commit() class MockObjStorage: """Mock objstorage with predefined contents. """ data = {} def __init__(self): self.data = { '01c9379dfc33803963d07c1ccc748d3fe4c96bb5': b'this is some text', '688a5ef812c53907562fe379d4b3851e69c7cb15': b'another text', '8986af901dd2043044ce8f0d8fc039153641cf17': b'yet another text', '02fb2c89e14f7fab46701478c83779c7beb7b069': b""" import unittest import logging from nose.tools import istest from swh.indexer.mimetype import ContentMimetypeIndexer from swh.indexer.tests.test_utils import MockObjStorage class MockStorage(): def content_mimetype_add(self, mimetypes): self.state = mimetypes self.conflict_update = conflict_update def indexer_configuration_add(self, tools): return [{ 'id': 10, }] """, '103bc087db1d26afc3a0283f38663d081e9b01e6': b""" #ifndef __AVL__ #define __AVL__ typedef struct _avl_tree avl_tree; typedef struct _data_t { int content; } data_t; """, '93666f74f1cf635c8c8ac118879da6ec5623c410': b""" (should 'pygments (recognize 'lisp 'easily)) """, '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5': b""" { "name": "test_metadata", "version": "0.0.1", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """, 'd4c647f0fc257591cc9ba1722484229780d1c607': b""" { "version": "5.0.3", "name": "npm", "description": "a package manager for JavaScript", "keywords": [ "install", "modules", "package manager", "package.json" ], "preferGlobal": true, "config": { "publishtest": false }, "homepage": "https://docs.npmjs.com/", "author": "Isaac Z. Schlueter (http://blog.izs.me)", "repository": { "type": "git", "url": "https://github.com/npm/npm" }, "bugs": { "url": "https://github.com/npm/npm/issues" }, "dependencies": { "JSONStream": "~1.3.1", "abbrev": "~1.1.0", "ansi-regex": "~2.1.1", "ansicolors": "~0.3.2", "ansistyles": "~0.1.3" }, "devDependencies": { "tacks": "~1.2.6", "tap": "~10.3.2" }, "license": "Artistic-2.0" } """, 'a7ab314d8a11d2c93e3dcf528ca294e7b431c449': b""" """, 'da39a3ee5e6b4b0d3255bfef95601890afd80709': b'', } def __iter__(self): yield from self.data.keys() def __contains__(self, sha1): return self.data.get(sha1) is not None def get(self, sha1): raw_content = self.data.get(sha1) if raw_content is None: raise ObjNotFoundError(sha1) return raw_content class MockStorage(): """Mock storage to simplify reading indexers' outputs. """ def content_metadata_missing(self, sha1s): yield from [] def content_metadata_add(self, metadata, conflict_update=None): self.state = metadata self.conflict_update = conflict_update def revision_metadata_add(self, metadata, conflict_update=None): self.state = metadata self.conflict_update = conflict_update def indexer_configuration_add(self, tools): tool = tools[0] if tool['tool_name'] == 'swh-metadata-translator': return [{ 'id': 30, 'tool_name': 'swh-metadata-translator', 'tool_version': '0.0.1', 'tool_configuration': { 'type': 'local', 'context': 'npm' }, }] elif tool['tool_name'] == 'swh-metadata-detector': return [{ 'id': 7, 'tool_name': 'swh-metadata-detector', 'tool_version': '0.0.1', 'tool_configuration': { 'type': 'local', 'context': 'npm' }, }] def revision_get(self, revisions): return [{ 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'committer': { 'id': 26, 'name': b'Andrew Nesbitt', 'fullname': b'Andrew Nesbitt ', 'email': b'andrewnez@gmail.com' }, 'synthetic': False, 'date': { 'negative_utc': False, 'timestamp': { 'seconds': 1487596456, 'microseconds': 0 }, 'offset': 0 }, 'directory': b'10' }] def directory_ls(self, directory, recursive=False, cur=None): # with directory: b'\x9d', return [{ 'sha1_git': b'abc', 'name': b'index.js', 'target': b'abc', 'length': 897, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'10', 'sha1': b'bcd' }, { 'sha1_git': b'aab', 'name': b'package.json', 'target': b'aab', 'length': 712, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'10', 'sha1': b'cde' }, { 'dir_id': b'10', 'target': b'11', 'type': 'dir', 'length': None, 'name': b'.github', 'sha1': None, 'perms': 16384, 'sha1_git': None, 'status': None, 'sha256': None }] def content_metadata_get(self, sha1s): return [{ 'tool': { 'configuration': { 'type': 'local', 'context': 'npm' }, 'version': '0.0.1', 'id': 6, 'name': 'swh-metadata-translator' }, 'id': b'cde', 'translated_metadata': { 'issueTracker': { 'url': 'https://github.com/librariesio/yarn-parser/issues' }, 'version': '1.0.0', 'name': 'yarn-parser', 'author': 'Andrew Nesbitt', 'url': 'https://github.com/librariesio/yarn-parser#readme', 'processorRequirements': {'node': '7.5'}, 'other': { 'scripts': { 'start': 'node index.js' }, 'main': 'index.js' }, 'license': 'AGPL-3.0', 'keywords': ['yarn', 'parse', 'lock', 'dependencies'], 'codeRepository': { 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }, 'description': 'Tiny web service for parsing yarn.lock files', 'softwareRequirements': { 'yarn': '^0.21.0', 'express': '^4.14.0', 'body-parser': '^1.15.2'} } }]