Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7163502
D2228.id7702.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
103 KB
Subscribers
None
D2228.id7702.diff
View Options
diff --git a/swh/indexer/tests/storage/conftest.py b/swh/indexer/tests/storage/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/indexer/tests/storage/conftest.py
@@ -0,0 +1,120 @@
+from os.path import join
+import pytest
+
+from . import SQL_DIR
+from swh.storage.tests.conftest import postgresql_fact
+from swh.indexer.storage import get_indexer_storage
+from swh.model.hashutil import hash_to_bytes
+from .generate_data_test import MIMETYPE_OBJECTS, FOSSOLOGY_LICENSES
+
+
+DUMP_FILES = join(SQL_DIR, '*.sql')
+
+TOOLS = [
+ {
+ 'tool_name': 'universal-ctags',
+ 'tool_version': '~git7859817b',
+ 'tool_configuration': {
+ "command_line": "ctags --fields=+lnz --sort=no --links=no "
+ "--output-format=json <filepath>"}
+ },
+ {
+ 'tool_name': 'swh-metadata-translator',
+ 'tool_version': '0.0.1',
+ 'tool_configuration': {"type": "local", "context": "NpmMapping"},
+ },
+ {
+ 'tool_name': 'swh-metadata-detector',
+ 'tool_version': '0.0.1',
+ 'tool_configuration': {
+ "type": "local", "context": ["NpmMapping", "CodemetaMapping"]},
+ },
+ {
+ 'tool_name': 'swh-metadata-detector2',
+ 'tool_version': '0.0.1',
+ 'tool_configuration': {
+ "type": "local", "context": ["NpmMapping", "CodemetaMapping"]},
+ },
+ {
+ 'tool_name': 'file',
+ 'tool_version': '5.22',
+ 'tool_configuration': {"command_line": "file --mime <filepath>"},
+ },
+ {
+ 'tool_name': 'pygments',
+ 'tool_version': '2.0.1+dfsg-1.1+deb8u1',
+ 'tool_configuration': {
+ "type": "library", "debian-package": "python3-pygments"},
+ },
+ {
+ 'tool_name': 'pygments2',
+ 'tool_version': '2.0.1+dfsg-1.1+deb8u1',
+ 'tool_configuration': {
+ "type": "library",
+ "debian-package": "python3-pygments",
+ "max_content_size": 10240
+ },
+ },
+ {
+ 'tool_name': 'nomos',
+ 'tool_version': '3.1.0rc2-31-ga2cbb8c',
+ 'tool_configuration': {"command_line": "nomossa <filepath>"},
+ }
+]
+
+
+class DataObj(dict):
+ def __getattr__(self, key):
+ return self.__getitem__(key)
+
+ def __setattr__(self, key, value):
+ return self.__setitem__(key, value)
+
+
+@pytest.fixture
+def swh_indexer_storage_w_data(swh_indexer_storage):
+ data = DataObj()
+ tools = {
+ tool['tool_name']: {
+ 'id': tool['id'],
+ 'name': tool['tool_name'],
+ 'version': tool['tool_version'],
+ 'configuration': tool['tool_configuration'],
+ }
+ for tool in swh_indexer_storage.indexer_configuration_add(TOOLS)}
+ data.tools = tools
+ data.sha1_1 = hash_to_bytes(
+ '34973274ccef6ab4dfaaf86599792fa9c3fe4689')
+ data.sha1_2 = hash_to_bytes(
+ '61c2b3a30496d329e21af70dd2d7e097046d07b7')
+ data.revision_id_1 = hash_to_bytes(
+ '7026b7c1a2af56521e951c01ed20f255fa054238')
+ data.revision_id_2 = hash_to_bytes(
+ '7026b7c1a2af56521e9587659012345678904321')
+ data.revision_id_3 = hash_to_bytes(
+ '7026b7c1a2af56521e9587659012345678904320')
+ data.origin_url_1 = 'file:///dev/0/zero' # 44434341
+ data.origin_url_2 = 'file:///dev/1/one' # 44434342
+ data.origin_url_3 = 'file:///dev/2/two' # 54974445
+ data.mimetypes = MIMETYPE_OBJECTS[:]
+ swh_indexer_storage.content_mimetype_add(
+ MIMETYPE_OBJECTS)
+ data.fossology_licenses = FOSSOLOGY_LICENSES[:]
+ swh_indexer_storage._test_data = data
+
+ return (swh_indexer_storage, data)
+
+
+swh_indexer_storage_postgresql = postgresql_fact(
+ 'postgresql_proc', dump_files=DUMP_FILES)
+
+
+@pytest.fixture
+def swh_indexer_storage(swh_indexer_storage_postgresql):
+ storage_config = {
+ 'cls': 'local',
+ 'args': {
+ 'db': swh_indexer_storage_postgresql.dsn,
+ },
+ }
+ return get_indexer_storage(**storage_config)
diff --git a/swh/indexer/tests/storage/generate_data_test.py b/swh/indexer/tests/storage/generate_data_test.py
--- a/swh/indexer/tests/storage/generate_data_test.py
+++ b/swh/indexer/tests/storage/generate_data_test.py
@@ -3,6 +3,8 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from uuid import uuid1
+
from swh.model.hashutil import MultiHash
from hypothesis.strategies import (composite, sets, one_of, uuids,
tuples, sampled_from)
@@ -84,7 +86,16 @@
return content_mimetypes
-FOSSOLOGY_LICENSES = [
+MIMETYPE_OBJECTS = [
+ {'id': MultiHash.from_data(uuid1().bytes, {'sha1'}).digest()['sha1'],
+ 'indexer_configuration_id': 1,
+ 'mimetype': mt,
+ 'encoding': enc,
+ }
+ for mt in MIMETYPES
+ for enc in ENCODINGS]
+
+LICENSES = [
b'3DFX',
b'BSD',
b'GPL',
@@ -92,9 +103,17 @@
b'MIT',
]
+FOSSOLOGY_LICENSES = [
+ {'id': MultiHash.from_data(uuid1().bytes, {'sha1'}).digest()['sha1'],
+ 'indexer_configuration_id': 1,
+ 'licenses': [LICENSES[i % len(LICENSES)], ],
+ }
+ for i in range(10)
+ ]
+
def gen_license():
- return one_of(sampled_from(FOSSOLOGY_LICENSES))
+ return one_of(sampled_from(LICENSES))
@composite
@@ -130,6 +149,5 @@
content_licenses.append({
**_init_content(uuid),
'licenses': [license],
- 'indexer_configuration_id': 1,
})
return content_licenses
diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py
--- a/swh/indexer/tests/storage/test_api_client.py
+++ b/swh/indexer/tests/storage/test_api_client.py
@@ -3,36 +3,40 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import unittest
+import pytest
-from swh.core.api.tests.server_testing import ServerTestFixture
-from swh.indexer.storage import INDEXER_CFG_KEY
from swh.indexer.storage.api.client import RemoteStorage
-from swh.indexer.storage.api.server import app
-
-from .test_storage import CommonTestStorage, BasePgTestStorage
-
-
-class TestRemoteStorage(CommonTestStorage, ServerTestFixture,
- BasePgTestStorage, unittest.TestCase):
- """Test the indexer's remote storage API.
-
- This class doesn't define any tests as we want identical
- functionality between local and remote storage. All the tests are
- therefore defined in
- `class`:swh.indexer.storage.test_storage.CommonTestStorage.
-
- """
-
- def setUp(self):
- self.config = {
- INDEXER_CFG_KEY: {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=%s' % self.TEST_DB_NAME,
- }
- }
- }
- self.app = app
- super().setUp()
- self.storage = RemoteStorage(self.url())
+import swh.indexer.storage.api.server as server
+
+from swh.indexer.storage import get_indexer_storage
+
+from .test_storage import * # noqa
+
+
+@pytest.fixture
+def app(swh_indexer_storage_postgresql):
+ storage_config = {
+ 'cls': 'local',
+ 'args': {
+ 'db': swh_indexer_storage_postgresql.dsn,
+ },
+ }
+ server.storage = get_indexer_storage(**storage_config)
+ return server.app
+
+
+@pytest.fixture
+def swh_rpc_client_class():
+ # these are needed for the swh_indexer_storage_w_data fixture
+ assert hasattr(RemoteStorage, 'indexer_configuration_add')
+ assert hasattr(RemoteStorage, 'content_mimetype_add')
+ return RemoteStorage
+
+
+@pytest.fixture
+def swh_indexer_storage(swh_rpc_client, app):
+ # This version of the swh_storage fixture uses the swh_rpc_client fixture
+ # to instantiate a RemoteStorage (see swh_rpc_client_class above) that
+ # proxies, via the swh.core RPC mechanism, the local (in memory) storage
+ # configured in the app fixture above.
+ return swh_rpc_client
diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py
--- a/swh/indexer/tests/storage/test_in_memory.py
+++ b/swh/indexer/tests/storage/test_in_memory.py
@@ -1,19 +1,16 @@
-from unittest import TestCase
+import pytest
-from .test_storage import CommonTestStorage
+from swh.indexer.storage import get_indexer_storage
+from .test_storage import * # noqa
-class IndexerTestInMemoryStorage(CommonTestStorage, TestCase):
- def setUp(self):
- self.storage_config = {
- 'cls': 'memory',
- 'args': {
- },
- }
- super().setUp()
- def reset_storage_tables(self):
- self.storage = self.storage.__class__()
-
- def test_check_config(self):
- pass
+@pytest.fixture
+def swh_indexer_storage(swh_indexer_storage_postgresql):
+ storage_config = {
+ 'cls': 'local',
+ 'args': {
+ 'db': swh_indexer_storage_postgresql.dsn,
+ },
+ }
+ return get_indexer_storage(**storage_config)
diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py
--- a/swh/indexer/tests/storage/test_storage.py
+++ b/swh/indexer/tests/storage/test_storage.py
@@ -1,522 +1,378 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import os
import threading
-import unittest
-
import pytest
-from hypothesis import given
-
from swh.model.hashutil import hash_to_bytes
-from swh.indexer.storage import get_indexer_storage, MAPPING_NAMES
-from swh.core.db.tests.db_testing import SingleDbTestFixture
-from swh.indexer.tests.storage.generate_data_test import (
- gen_content_mimetypes, gen_content_fossology_licenses
-)
-from swh.indexer.tests.storage import SQL_DIR
-from swh.indexer.metadata_dictionary import MAPPINGS
-
-TOOLS = [
- {
- 'tool_name': 'universal-ctags',
- 'tool_version': '~git7859817b',
- 'tool_configuration': {
- "command_line": "ctags --fields=+lnz --sort=no --links=no "
- "--output-format=json <filepath>"}
- },
- {
- 'tool_name': 'swh-metadata-translator',
- 'tool_version': '0.0.1',
- 'tool_configuration': {"type": "local", "context": "NpmMapping"},
- },
- {
- 'tool_name': 'swh-metadata-detector',
- 'tool_version': '0.0.1',
- 'tool_configuration': {
- "type": "local", "context": ["NpmMapping", "CodemetaMapping"]},
- },
- {
- 'tool_name': 'swh-metadata-detector2',
- 'tool_version': '0.0.1',
- 'tool_configuration': {
- "type": "local", "context": ["NpmMapping", "CodemetaMapping"]},
- },
- {
- 'tool_name': 'file',
- 'tool_version': '5.22',
- 'tool_configuration': {"command_line": "file --mime <filepath>"},
- },
- {
- 'tool_name': 'pygments',
- 'tool_version': '2.0.1+dfsg-1.1+deb8u1',
- 'tool_configuration': {
- "type": "library", "debian-package": "python3-pygments"},
- },
- {
- 'tool_name': 'pygments',
- 'tool_version': '2.0.1+dfsg-1.1+deb8u1',
- 'tool_configuration': {
- "type": "library",
- "debian-package": "python3-pygments",
- "max_content_size": 10240
- },
- },
- {
- 'tool_name': 'nomos',
- 'tool_version': '3.1.0rc2-31-ga2cbb8c',
- 'tool_configuration': {"command_line": "nomossa <filepath>"},
- }
-]
+def prepare_mimetypes_from(fossology_licenses):
+ """Fossology license needs some consistent data in db to run.
-@pytest.mark.db
-class BasePgTestStorage(SingleDbTestFixture):
- """Base test class for most indexer tests.
-
- It adds support for Storage testing to the SingleDbTestFixture class.
- It will also build the database from the swh-indexed/sql/*.sql files.
"""
+ mimetypes = []
+ for c in fossology_licenses:
+ mimetypes.append({
+ 'id': c['id'],
+ 'mimetype': 'text/plain',
+ 'encoding': 'utf-8',
+ 'indexer_configuration_id': c['indexer_configuration_id'],
+ })
+ return mimetypes
- TEST_DB_NAME = 'softwareheritage-test-indexer'
- TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql')
-
- def setUp(self):
- super().setUp()
- self.storage_config = {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=%s' % self.TEST_DB_NAME,
- },
- }
-
- def tearDown(self):
- self.reset_storage_tables()
- self.storage = None
- super().tearDown()
-
- def reset_storage_tables(self):
- excluded = {'indexer_configuration'}
- self.reset_db_tables(self.TEST_DB_NAME, excluded=excluded)
-
- db = self.test_db[self.TEST_DB_NAME]
- db.conn.commit()
-
-
-def gen_generic_endpoint_tests(endpoint_type, tool_name,
- example_data1, example_data2):
- def rename(f):
- f.__name__ = 'test_' + endpoint_type + f.__name__
- return f
-
- def endpoint(self, endpoint_name):
- return getattr(self.storage, endpoint_type + '_' + endpoint_name)
-
- @rename
- def missing(self):
- # given
- tool_id = self.tools[tool_name]['id']
- query = [
- {
- 'id': self.sha1_1,
- 'indexer_configuration_id': tool_id,
- },
- {
- 'id': self.sha1_2,
- 'indexer_configuration_id': tool_id,
- }]
+def endpoint(storage, endpoint_type, endpoint_name):
+ return getattr(storage, endpoint_type + '_' + endpoint_name)
- # when
- actual_missing = endpoint(self, 'missing')(query)
- # then
- self.assertEqual(list(actual_missing), [
- self.sha1_1,
- self.sha1_2,
- ])
+def check_missing(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool_id = data.tools[self.tool_name]['id']
- # given
- endpoint(self, 'add')([{
- 'id': self.sha1_2,
- **example_data1,
+ # given 2 (hopefully) unknown objects
+ query = [
+ {
+ 'id': data.sha1_1,
'indexer_configuration_id': tool_id,
- }])
-
- # when
- actual_missing = endpoint(self, 'missing')(query)
-
- # then
- self.assertEqual(list(actual_missing), [self.sha1_1])
-
- @rename
- def add__drop_duplicate(self):
- # given
- tool_id = self.tools[tool_name]['id']
-
- data_v1 = {
- 'id': self.sha1_2,
- **example_data1,
+ },
+ {
+ 'id': data.sha1_2,
'indexer_configuration_id': tool_id,
- }
-
- # given
- endpoint(self, 'add')([data_v1])
-
- # when
- actual_data = list(endpoint(self, 'get')([self.sha1_2]))
-
- # then
- expected_data_v1 = [{
- 'id': self.sha1_2,
- **example_data1,
- 'tool': self.tools[tool_name],
}]
- self.assertEqual(actual_data, expected_data_v1)
-
- # given
- data_v2 = data_v1.copy()
- data_v2.update(example_data2)
-
- endpoint(self, 'add')([data_v2])
- actual_data = list(endpoint(self, 'get')([self.sha1_2]))
+ # we expect these are both returned by the xxx_missing endpoint
+ actual_missing = endpoint(storage, etype, 'missing')(query)
+ assert list(actual_missing) == [
+ data.sha1_1,
+ data.sha1_2,
+ ]
+
+ # now, when we add one of them
+ endpoint(storage, etype, 'add')([{
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool_id,
+ }])
+
+ # we expect only the other one returned
+ actual_missing = endpoint(storage, etype, 'missing')(query)
+ assert list(actual_missing) == [data.sha1_1]
+
+
+def check_add__drop_duplicate(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool_id = data.tools[self.tool_name]['id']
+
+ # add the first object
+ data_v1 = {
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool_id,
+ }
+ endpoint(storage, etype, 'add')([data_v1])
+
+ # should be able to retrieve it
+ actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2]))
+ expected_data_v1 = [{
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'tool': data.tools[self.tool_name],
+ }]
+ assert actual_data == expected_data_v1
+
+ # now if we add a modified version of the same object (same id)
+ data_v2 = data_v1.copy()
+ data_v2.update(self.example_data[1])
+ endpoint(storage, etype, 'add')([data_v2])
+
+ # we excpect to retrieve the original data, not the modified one
+ actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2]))
+ assert actual_data == expected_data_v1
+
+
+def check_add__update_in_place_duplicate(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool = data.tools[self.tool_name]
+
+ data_v1 = {
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool['id'],
+ }
- # data did not change as the v2 was dropped.
- self.assertEqual(actual_data, expected_data_v1)
+ # given
+ endpoint(storage, etype, 'add')([data_v1])
- @rename
- def add__update_in_place_duplicate(self):
- # given
- tool_id = self.tools[tool_name]['id']
+ # when
+ actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2]))
- data_v1 = {
- 'id': self.sha1_2,
- **example_data1,
- 'indexer_configuration_id': tool_id,
- }
+ expected_data_v1 = [{
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'tool': tool,
+ }]
- # given
- endpoint(self, 'add')([data_v1])
+ # then
+ assert actual_data == expected_data_v1
- # when
- actual_data = list(endpoint(self, 'get')([self.sha1_2]))
+ # given
+ data_v2 = data_v1.copy()
+ data_v2.update(self.example_data[1])
- expected_data_v1 = [{
- 'id': self.sha1_2,
- **example_data1,
- 'tool': self.tools[tool_name],
- }]
+ endpoint(storage, etype, 'add')([data_v2], conflict_update=True)
- # then
- self.assertEqual(actual_data, expected_data_v1)
+ actual_data = list(endpoint(storage, etype, 'get')([data.sha1_2]))
- # given
- data_v2 = data_v1.copy()
- data_v2.update(example_data2)
+ expected_data_v2 = [{
+ 'id': data.sha1_2,
+ **self.example_data[1],
+ 'tool': tool,
+ }]
- endpoint(self, 'add')([data_v2], conflict_update=True)
+ # data did change as the v2 was used to overwrite v1
+ assert actual_data == expected_data_v2
- actual_data = list(endpoint(self, 'get')([self.sha1_2]))
- expected_data_v2 = [{
- 'id': self.sha1_2,
- **example_data2,
- 'tool': self.tools[tool_name],
- }]
+def check_add__update_in_place_deadlock(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool = data.tools[self.tool_name]
- # data did change as the v2 was used to overwrite v1
- self.assertEqual(actual_data, expected_data_v2)
+ hashes = [
+ hash_to_bytes(
+ '34973274ccef6ab4dfaaf86599792fa9c3fe4{:03d}'.format(i))
+ for i in range(1000)]
- @rename
- def add__update_in_place_deadlock(self):
- # given
- tool_id = self.tools[tool_name]['id']
+ data_v1 = [
+ {
+ 'id': hash_,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool['id'],
+ }
+ for hash_ in hashes
+ ]
+ data_v2 = [
+ {
+ 'id': hash_,
+ **self.example_data[1],
+ 'indexer_configuration_id': tool['id'],
+ }
+ for hash_ in hashes
+ ]
- hashes = [
- hash_to_bytes(
- '34973274ccef6ab4dfaaf86599792fa9c3fe4{:03d}'.format(i))
- for i in range(1000)]
+ # Remove one item from each, so that both queries have to succeed for
+ # all items to be in the DB.
+ data_v2a = data_v2[1:]
+ data_v2b = list(reversed(data_v2[0:-1]))
- data_v1 = [
- {
- 'id': hash_,
- **example_data1,
- 'indexer_configuration_id': tool_id,
- }
- for hash_ in hashes
- ]
- data_v2 = [
- {
- 'id': hash_,
- **example_data2,
- 'indexer_configuration_id': tool_id,
- }
- for hash_ in hashes
- ]
+ # given
+ endpoint(storage, etype, 'add')(data_v1)
- # Remove one item from each, so that both queries have to succeed for
- # all items to be in the DB.
- data_v2a = data_v2[1:]
- data_v2b = list(reversed(data_v2[0:-1]))
+ # when
+ actual_data = list(endpoint(storage, etype, 'get')(hashes))
- # given
- endpoint(self, 'add')(data_v1)
+ expected_data_v1 = [
+ {
+ 'id': hash_,
+ **self.example_data[0],
+ 'tool': tool,
+ }
+ for hash_ in hashes
+ ]
- # when
- actual_data = list(endpoint(self, 'get')(hashes))
+ # then
+ assert actual_data == expected_data_v1
- expected_data_v1 = [
- {
- 'id': hash_,
- **example_data1,
- 'tool': self.tools[tool_name],
- }
- for hash_ in hashes
- ]
+ # given
+ def f1():
+ endpoint(storage, etype, 'add')(data_v2a, conflict_update=True)
- # then
- self.assertEqual(actual_data, expected_data_v1)
+ def f2():
+ endpoint(storage, etype, 'add')(data_v2b, conflict_update=True)
- # given
- def f1():
- endpoint(self, 'add')(data_v2a, conflict_update=True)
+ t1 = threading.Thread(target=f1)
+ t2 = threading.Thread(target=f2)
+ t2.start()
+ t1.start()
- def f2():
- endpoint(self, 'add')(data_v2b, conflict_update=True)
+ t1.join()
+ t2.join()
- t1 = threading.Thread(target=f1)
- t2 = threading.Thread(target=f2)
- t2.start()
- t1.start()
+ actual_data = sorted(endpoint(storage, etype, 'get')(hashes),
+ key=lambda x: x['id'])
- t1.join()
- t2.join()
+ expected_data_v2 = [
+ {
+ 'id': hash_,
+ **self.example_data[1],
+ 'tool': tool,
+ }
+ for hash_ in hashes
+ ]
- actual_data = list(endpoint(self, 'get')(hashes))
+ assert actual_data == expected_data_v2
- expected_data_v2 = [
- {
- 'id': hash_,
- **example_data2,
- 'tool': self.tools[tool_name],
- }
- for hash_ in hashes
- ]
- self.assertCountEqual(actual_data, expected_data_v2)
+def check_add__duplicate_twice(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool = data.tools[self.tool_name]
- def add__duplicate_twice(self):
- # given
- tool_id = self.tools[tool_name]['id']
+ data_rev1 = {
+ 'id': data.revision_id_2,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool['id']
+ }
- data_rev1 = {
- 'id': self.revision_id_2,
- **example_data1,
- 'indexer_configuration_id': tool_id
- }
+ data_rev2 = {
+ 'id': data.revision_id_2,
+ **self.example_data[1],
+ 'indexer_configuration_id': tool['id']
+ }
- data_rev2 = {
- 'id': self.revision_id_2,
- **example_data2,
- 'indexer_configuration_id': tool_id
- }
+ # when
+ endpoint(storage, etype, 'add')([data_rev1])
- # when
- endpoint(self, 'add')([data_rev1])
+ with pytest.raises(ValueError):
+ endpoint(storage, etype, 'add')(
+ [data_rev2, data_rev2],
+ conflict_update=True)
- with self.assertRaises(ValueError):
- endpoint(self, 'add')(
- [data_rev2, data_rev2],
- conflict_update=True)
+ # then
+ actual_data = list(endpoint(storage, etype, 'get')(
+ [data.revision_id_2, data.revision_id_1]))
- # then
- actual_data = list(endpoint(self, 'get')(
- [self.revision_id_2, self.revision_id_1]))
+ expected_data = [{
+ 'id': data.revision_id_2,
+ **self.example_data[0],
+ 'tool': tool,
+ }]
+ assert actual_data == expected_data
- expected_data = [{
- 'id': self.revision_id_2,
- **example_data1,
- 'tool': self.tools[tool_name]
- }]
- self.assertEqual(actual_data, expected_data)
- @rename
- def get(self):
- # given
- tool_id = self.tools[tool_name]['id']
+def check_get(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool = data.tools[self.tool_name]
- query = [self.sha1_2, self.sha1_1]
+ query = [data.sha1_2, data.sha1_1]
+ data1 = {
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool['id'],
+ }
- data1 = {
- 'id': self.sha1_2,
- **example_data1,
- 'indexer_configuration_id': tool_id,
- }
+ # when
+ endpoint(storage, etype, 'add')([data1])
- # when
- endpoint(self, 'add')([data1])
+ # then
+ actual_data = list(endpoint(storage, etype, 'get')(query))
- # then
- actual_data = list(endpoint(self, 'get')(query))
+ # then
+ expected_data = [{
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'tool': tool,
+ }]
- # then
- expected_data = [{
- 'id': self.sha1_2,
- **example_data1,
- 'tool': self.tools[tool_name]
- }]
+ assert actual_data == expected_data
- self.assertEqual(actual_data, expected_data)
- @rename
- def delete(self):
- # given
- tool_id = self.tools[tool_name]['id']
+def check_revision_intrinsic_metadata_delete(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool = data.tools[self.tool_name]
- query = [self.sha1_2, self.sha1_1]
+ query = [data.sha1_2, data.sha1_1]
+ data1 = {
+ 'id': data.sha1_2,
+ **self.example_data[0],
+ 'indexer_configuration_id': tool['id'],
+ }
- data1 = {
- 'id': self.sha1_2,
- **example_data1,
- 'indexer_configuration_id': tool_id,
+ # when
+ endpoint(storage, etype, 'add')([data1])
+ endpoint(storage, etype, 'delete')([
+ {
+ 'id': data.sha1_2,
+ 'indexer_configuration_id': tool['id'],
}
+ ])
- # when
- endpoint(self, 'add')([data1])
- endpoint(self, 'delete')([
- {
- 'id': self.sha1_2,
- 'indexer_configuration_id': tool_id,
- }
- ])
+ # then
+ actual_data = list(endpoint(storage, etype, 'get')(query))
- # then
- actual_data = list(endpoint(self, 'get')(query))
+ # then
+ assert not actual_data
- # then
- self.assertEqual(actual_data, [])
-
- @rename
- def delete_nonexisting(self):
- tool_id = self.tools[tool_name]['id']
- endpoint(self, 'delete')([
- {
- 'id': self.sha1_2,
- 'indexer_configuration_id': tool_id,
- }
- ])
-
- return (
- missing,
- add__drop_duplicate,
- add__update_in_place_duplicate,
- add__update_in_place_deadlock,
- add__duplicate_twice,
- get,
- delete,
- delete_nonexisting,
- )
+def check_revision_intrinsic_metadata_delete_nonexisting(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ etype = self.endpoint_type
+ tool = data.tools[self.tool_name]
+ endpoint(storage, etype, 'delete')([
+ {
+ 'id': data.sha1_2,
+ 'indexer_configuration_id': tool['id'],
+ }
+ ])
-class CommonTestStorage:
- """Base class for Indexer Storage testing.
+class TestIndexerStorageContentMimetypes:
+ """Test Indexer Storage content_mimetype related methods
"""
- def setUp(self, *args, **kwargs):
- super().setUp()
- self.storage = get_indexer_storage(**self.storage_config)
- tools = self.storage.indexer_configuration_add(TOOLS)
- self.tools = {}
- for tool in tools:
- tool_name = tool['tool_name']
- while tool_name in self.tools:
- tool_name += '_'
- self.tools[tool_name] = {
- 'id': tool['id'],
- 'name': tool['tool_name'],
- 'version': tool['tool_version'],
- 'configuration': tool['tool_configuration'],
- }
-
- self.sha1_1 = hash_to_bytes('34973274ccef6ab4dfaaf86599792fa9c3fe4689')
- self.sha1_2 = hash_to_bytes('61c2b3a30496d329e21af70dd2d7e097046d07b7')
- self.revision_id_1 = hash_to_bytes(
- '7026b7c1a2af56521e951c01ed20f255fa054238')
- self.revision_id_2 = hash_to_bytes(
- '7026b7c1a2af56521e9587659012345678904321')
- self.revision_id_3 = hash_to_bytes(
- '7026b7c1a2af56521e9587659012345678904320')
- self.origin_url_1 = 'file:///dev/0/zero' # 44434341
- self.origin_url_2 = 'file:///dev/1/one' # 44434342
- self.origin_url_3 = 'file:///dev/2/two' # 54974445
-
- def test_check_config(self):
- self.assertTrue(self.storage.check_config(check_write=True))
- self.assertTrue(self.storage.check_config(check_write=False))
-
- # generate content_mimetype tests
- (
- test_content_mimetype_missing,
- test_content_mimetype_add__drop_duplicate,
- test_content_mimetype_add__update_in_place_duplicate,
- test_content_mimetype_add__update_in_place_deadlock,
- test_content_mimetype_add__duplicate_twice,
- test_content_mimetype_get,
- _, # content_mimetype_detete,
- _, # content_mimetype_detete_nonexisting,
- ) = gen_generic_endpoint_tests(
- endpoint_type='content_mimetype',
- tool_name='file',
- example_data1={
+ endpoint_type = 'content_mimetype'
+ tool_name = 'file'
+ example_data = [
+ {
'mimetype': 'text/plain',
'encoding': 'utf-8',
},
- example_data2={
+ {
'mimetype': 'text/html',
'encoding': 'us-ascii',
},
- )
-
- # content_language tests
- (
- test_content_language_missing,
- test_content_language_add__drop_duplicate,
- test_content_language_add__update_in_place_duplicate,
- test_content_language_add__update_in_place_deadlock,
- test_content_language_add__duplicate_twice,
- test_content_language_get,
- _, # test_content_language_delete,
- _, # test_content_language_delete_nonexisting,
- ) = gen_generic_endpoint_tests(
- endpoint_type='content_language',
- tool_name='pygments',
- example_data1={
+ ]
+
+ test_missing = check_missing
+ test_add__drop_duplicate = check_add__drop_duplicate
+ test_add__update_in_place_duplicate = check_add__update_in_place_duplicate
+ test_add__update_in_place_deadlock = check_add__update_in_place_deadlock
+ test_add__duplicate_twice = check_add__duplicate_twice
+ test_get = check_get
+
+
+class TestIndexerStorageContentLanguage:
+ """Test Indexer Storage content_language related methods
+ """
+ endpoint_type = 'content_language'
+ tool_name = 'pygments'
+ example_data = [
+ {
'lang': 'haskell',
},
- example_data2={
+ {
'lang': 'common-lisp',
},
- )
-
- # content_ctags tests
- (
- test_content_ctags_missing,
- # the following tests are disabled because CTAGS behave differently
- _, # test_content_ctags_add__drop_duplicate,
- _, # test_content_ctags_add__update_in_place_duplicate,
- _, # test_content_ctags_add__update_in_place_deadlock,
- _, # test_content_ctags_add__duplicate_twice,
- _, # test_content_ctags_get,
- _, # test_content_ctags_delete,
- _, # test_content_ctags_delete_nonexisting,
- ) = gen_generic_endpoint_tests(
- endpoint_type='content_ctags',
- tool_name='universal-ctags',
- example_data1={
+ ]
+
+ test_missing = check_missing
+ test_add__drop_duplicate = check_add__drop_duplicate
+ test_add__update_in_place_duplicate = check_add__update_in_place_duplicate
+ test_add__update_in_place_deadlock = check_add__update_in_place_deadlock
+ test_add__duplicate_twice = check_add__duplicate_twice
+ test_get = check_get
+
+
+class TestIndexerStorageContentCTags:
+ """Test Indexer Storage content_ctags related methods
+ """
+ endpoint_type = 'content_ctags'
+ tool_name = 'universal-ctags'
+ example_data = [
+ {
'ctags': [{
'name': 'done',
'kind': 'variable',
@@ -524,7 +380,7 @@
'lang': 'OCaml',
}]
},
- example_data2={
+ {
'ctags': [
{
'name': 'done',
@@ -539,15 +395,103 @@
'lang': 'Python',
}]
},
- )
+ ]
+
+ test_missing = check_missing
+
+
+class TestIndexerStorageContentMetadata:
+ """Test Indexer Storage content_metadata related methods
+ """
+ tool_name = 'swh-metadata-detector'
+ endpoint_type = 'content_metadata'
+ example_data = [
+ {
+ 'metadata': {
+ 'other': {},
+ 'codeRepository': {
+ 'type': 'git',
+ 'url': 'https://github.com/moranegg/metadata_test'
+ },
+ 'description': 'Simple package.json test for indexer',
+ 'name': 'test_metadata',
+ 'version': '0.0.1'
+ },
+ },
+ {
+ 'metadata': {
+ 'other': {},
+ 'name': 'test_metadata',
+ 'version': '0.0.1'
+ },
+ },
+ ]
+
+ test_missing = check_missing
+ test_add__drop_duplicate = check_add__drop_duplicate
+ test_add__update_in_place_duplicate = check_add__update_in_place_duplicate
+ test_add__update_in_place_deadlock = check_add__update_in_place_deadlock
+ test_add__duplicate_twice = check_add__duplicate_twice
+ test_get = check_get
- def test_content_ctags_search(self):
+
+class TestIndexerStorageRevisionIntrinsicMetadata:
+ """Test Indexer Storage revision_intrinsic_metadata related methods
+ """
+ tool_name = 'swh-metadata-detector'
+ endpoint_type = 'revision_intrinsic_metadata'
+ example_data = [
+ {
+ 'metadata': {
+ 'other': {},
+ 'codeRepository': {
+ 'type': 'git',
+ 'url': 'https://github.com/moranegg/metadata_test'
+ },
+ 'description': 'Simple package.json test for indexer',
+ 'name': 'test_metadata',
+ 'version': '0.0.1'
+ },
+ 'mappings': ['mapping1'],
+ },
+ {
+ 'metadata': {
+ 'other': {},
+ 'name': 'test_metadata',
+ 'version': '0.0.1'
+ },
+ 'mappings': ['mapping2'],
+ },
+ ]
+ test_missing = check_missing
+ test_add__drop_duplicate = check_add__drop_duplicate
+ test_add__update_in_place_duplicate = check_add__update_in_place_duplicate
+ test_add__update_in_place_deadlock = check_add__update_in_place_deadlock
+ test_add__duplicate_twice = check_add__duplicate_twice
+ test_get = check_get
+ test_revision_intrinsic_metadata_delete = \
+ check_revision_intrinsic_metadata_delete
+ test_revision_intrinsic_metadata_delete_nonexisting = \
+ check_revision_intrinsic_metadata_delete_nonexisting
+
+
+class TestIndexerStorageOthers:
+ """Non generic tests for the IndexerStorage.
+ """
+
+ def test_check_config(self, swh_indexer_storage):
+ storage = swh_indexer_storage
+ assert storage.check_config(check_write=True)
+ assert storage.check_config(check_write=False)
+
+ def test_content_ctags_search(self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# 1. given
- tool = self.tools['universal-ctags']
+ tool = data.tools['universal-ctags']
tool_id = tool['id']
ctag1 = {
- 'id': self.sha1_1,
+ 'id': data.sha1_1,
'indexer_configuration_id': tool_id,
'ctags': [
{
@@ -572,7 +516,7 @@
}
ctag2 = {
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'indexer_configuration_id': tool_id,
'ctags': [
{
@@ -590,14 +534,13 @@
]
}
- self.storage.content_ctags_add([ctag1, ctag2])
+ storage.content_ctags_add([ctag1, ctag2])
# 1. when
- actual_ctags = list(self.storage.content_ctags_search('hello',
- limit=1))
+ actual_ctags = list(storage.content_ctags_search('hello', limit=1))
# 1. then
- self.assertEqual(actual_ctags, [
+ assert actual_ctags == [
{
'id': ctag1['id'],
'tool': tool,
@@ -606,16 +549,16 @@
'line': 133,
'lang': 'Python',
}
- ])
+ ]
# 2. when
- actual_ctags = list(self.storage.content_ctags_search(
+ actual_ctags = list(storage.content_ctags_search(
'hello',
limit=1,
last_sha1=ctag1['id']))
# 2. then
- self.assertEqual(actual_ctags, [
+ assert actual_ctags == [
{
'id': ctag2['id'],
'tool': tool,
@@ -624,13 +567,13 @@
'line': 100,
'lang': 'C',
}
- ])
+ ]
# 3. when
- actual_ctags = list(self.storage.content_ctags_search('hello'))
+ actual_ctags = list(storage.content_ctags_search('hello'))
# 3. then
- self.assertEqual(actual_ctags, [
+ assert actual_ctags == [
{
'id': ctag1['id'],
'tool': tool,
@@ -655,47 +598,50 @@
'line': 100,
'lang': 'C',
},
- ])
+ ]
# 4. when
- actual_ctags = list(self.storage.content_ctags_search('counter'))
+ actual_ctags = list(storage.content_ctags_search('counter'))
# then
- self.assertEqual(actual_ctags, [{
+ assert actual_ctags == [{
'id': ctag1['id'],
'tool': tool,
'name': 'counter',
'kind': 'variable',
'line': 119,
'lang': 'Python',
- }])
+ }]
# 5. when
- actual_ctags = list(self.storage.content_ctags_search('result',
- limit=1))
+ actual_ctags = list(storage.content_ctags_search('result', limit=1))
# then
- self.assertEqual(actual_ctags, [{
+ assert actual_ctags == [{
'id': ctag2['id'],
'tool': tool,
'name': 'result',
'kind': 'variable',
'line': 120,
'lang': 'C',
- }])
+ }]
+
+ def test_content_ctags_search_no_result(self, swh_indexer_storage):
+ storage = swh_indexer_storage
+ actual_ctags = list(storage.content_ctags_search('counter'))
- def test_content_ctags_search_no_result(self):
- actual_ctags = list(self.storage.content_ctags_search('counter'))
+ assert not actual_ctags
- self.assertEqual(actual_ctags, [])
+ def test_content_ctags_add__add_new_ctags_added(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
- def test_content_ctags_add__add_new_ctags_added(self):
# given
- tool = self.tools['universal-ctags']
+ tool = data.tools['universal-ctags']
tool_id = tool['id']
ctag_v1 = {
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'indexer_configuration_id': tool_id,
'ctags': [{
'name': 'done',
@@ -706,16 +652,15 @@
}
# given
- self.storage.content_ctags_add([ctag_v1])
- self.storage.content_ctags_add([ctag_v1]) # conflict does nothing
+ storage.content_ctags_add([ctag_v1])
+ storage.content_ctags_add([ctag_v1]) # conflict does nothing
# when
- actual_ctags = list(self.storage.content_ctags_get(
- [self.sha1_2]))
+ actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
# then
expected_ctags = [{
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'name': 'done',
'kind': 'variable',
'line': 100,
@@ -723,7 +668,7 @@
'tool': tool,
}]
- self.assertEqual(actual_ctags, expected_ctags)
+ assert actual_ctags == expected_ctags
# given
ctag_v2 = ctag_v1.copy()
@@ -738,18 +683,18 @@
]
})
- self.storage.content_ctags_add([ctag_v2])
+ storage.content_ctags_add([ctag_v2])
expected_ctags = [
{
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'name': 'done',
'kind': 'variable',
'line': 100,
'lang': 'Scheme',
'tool': tool,
}, {
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'name': 'defn',
'kind': 'function',
'line': 120,
@@ -758,18 +703,20 @@
}
]
- actual_ctags = list(self.storage.content_ctags_get(
- [self.sha1_2]))
+ actual_ctags = list(storage.content_ctags_get(
+ [data.sha1_2]))
- self.assertEqual(actual_ctags, expected_ctags)
+ assert actual_ctags == expected_ctags
- def test_content_ctags_add__update_in_place(self):
+ def test_content_ctags_add__update_in_place(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool = self.tools['universal-ctags']
+ tool = data.tools['universal-ctags']
tool_id = tool['id']
ctag_v1 = {
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'indexer_configuration_id': tool_id,
'ctags': [{
'name': 'done',
@@ -780,16 +727,16 @@
}
# given
- self.storage.content_ctags_add([ctag_v1])
+ storage.content_ctags_add([ctag_v1])
# when
- actual_ctags = list(self.storage.content_ctags_get(
- [self.sha1_2]))
+ actual_ctags = list(storage.content_ctags_get(
+ [data.sha1_2]))
# then
expected_ctags = [
{
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'name': 'done',
'kind': 'variable',
'line': 100,
@@ -797,7 +744,7 @@
'tool': tool
}
]
- self.assertEqual(actual_ctags, expected_ctags)
+ assert actual_ctags == expected_ctags
# given
ctag_v2 = ctag_v1.copy()
@@ -818,15 +765,15 @@
]
})
- self.storage.content_ctags_add([ctag_v2], conflict_update=True)
+ storage.content_ctags_add([ctag_v2], conflict_update=True)
- actual_ctags = list(self.storage.content_ctags_get(
- [self.sha1_2]))
+ actual_ctags = list(storage.content_ctags_get(
+ [data.sha1_2]))
# ctag did change as the v2 was used to overwrite v1
expected_ctags = [
{
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'name': 'done',
'kind': 'variable',
'line': 100,
@@ -834,7 +781,7 @@
'tool': tool,
},
{
- 'id': self.sha1_2,
+ 'id': data.sha1_2,
'name': 'defn',
'kind': 'function',
'line': 120,
@@ -842,59 +789,38 @@
'tool': tool,
}
]
- self.assertEqual(actual_ctags, expected_ctags)
-
- # content_fossology_license tests
- (
- _, # The endpoint content_fossology_license_missing does not exist
- # the following tests are disabled because fossology_license tests
- # behave differently
- _, # test_content_fossology_license_add__drop_duplicate,
- _, # test_content_fossology_license_add__update_in_place_duplicate,
- _, # test_content_fossology_license_add__update_in_place_deadlock,
- _, # test_content_metadata_add__duplicate_twice,
- _, # test_content_fossology_license_get,
- _, # test_content_fossology_license_delete,
- _, # test_content_fossology_license_delete_nonexisting,
- ) = gen_generic_endpoint_tests(
- endpoint_type='content_fossology_license',
- tool_name='nomos',
- example_data1={
- 'licenses': ['Apache-2.0'],
- },
- example_data2={
- 'licenses': ['BSD-2-Clause'],
- },
- )
+ assert actual_ctags == expected_ctags
- def test_content_fossology_license_add__new_license_added(self):
+ def test_content_fossology_license_add__new_license_added(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool = self.tools['nomos']
+ tool = data.tools['nomos']
tool_id = tool['id']
license_v1 = {
- 'id': self.sha1_1,
+ 'id': data.sha1_1,
'licenses': ['Apache-2.0'],
'indexer_configuration_id': tool_id,
}
# given
- self.storage.content_fossology_license_add([license_v1])
+ storage.content_fossology_license_add([license_v1])
# conflict does nothing
- self.storage.content_fossology_license_add([license_v1])
+ storage.content_fossology_license_add([license_v1])
# when
- actual_licenses = list(self.storage.content_fossology_license_get(
- [self.sha1_1]))
+ actual_licenses = list(storage.content_fossology_license_get(
+ [data.sha1_1]))
# then
expected_license = {
- self.sha1_1: [{
+ data.sha1_1: [{
'licenses': ['Apache-2.0'],
'tool': tool,
}]
}
- self.assertEqual(actual_licenses, [expected_license])
+ assert actual_licenses == [expected_license]
# given
license_v2 = license_v1.copy()
@@ -902,222 +828,160 @@
'licenses': ['BSD-2-Clause'],
})
- self.storage.content_fossology_license_add([license_v2])
+ storage.content_fossology_license_add([license_v2])
- actual_licenses = list(self.storage.content_fossology_license_get(
- [self.sha1_1]))
+ actual_licenses = list(storage.content_fossology_license_get(
+ [data.sha1_1]))
expected_license = {
- self.sha1_1: [{
+ data.sha1_1: [{
'licenses': ['Apache-2.0', 'BSD-2-Clause'],
'tool': tool
}]
}
# license did not change as the v2 was dropped.
- self.assertEqual(actual_licenses, [expected_license])
-
- # content_metadata tests
- (
- test_content_metadata_missing,
- test_content_metadata_add__drop_duplicate,
- test_content_metadata_add__update_in_place_duplicate,
- test_content_metadata_add__update_in_place_deadlock,
- test_content_metadata_add__duplicate_twice,
- test_content_metadata_get,
- _, # test_content_metadata_delete,
- _, # test_content_metadata_delete_nonexisting,
- ) = gen_generic_endpoint_tests(
- endpoint_type='content_metadata',
- tool_name='swh-metadata-detector',
- example_data1={
- 'metadata': {
- 'other': {},
- 'codeRepository': {
- 'type': 'git',
- 'url': 'https://github.com/moranegg/metadata_test'
- },
- 'description': 'Simple package.json test for indexer',
- 'name': 'test_metadata',
- 'version': '0.0.1'
- },
- },
- example_data2={
- 'metadata': {
- 'other': {},
- 'name': 'test_metadata',
- 'version': '0.0.1'
- },
- },
- )
-
- # revision_intrinsic_metadata tests
- (
- test_revision_intrinsic_metadata_missing,
- test_revision_intrinsic_metadata_add__drop_duplicate,
- test_revision_intrinsic_metadata_add__update_in_place_duplicate,
- test_revision_intrinsic_metadata_add__update_in_place_deadlock,
- test_revision_intrinsic_metadata_add__duplicate_twice,
- test_revision_intrinsic_metadata_get,
- test_revision_intrinsic_metadata_delete,
- test_revision_intrinsic_metadata_delete_nonexisting,
- ) = gen_generic_endpoint_tests(
- endpoint_type='revision_intrinsic_metadata',
- tool_name='swh-metadata-detector',
- example_data1={
- 'metadata': {
- 'other': {},
- 'codeRepository': {
- 'type': 'git',
- 'url': 'https://github.com/moranegg/metadata_test'
- },
- 'description': 'Simple package.json test for indexer',
- 'name': 'test_metadata',
- 'version': '0.0.1'
- },
- 'mappings': ['mapping1'],
- },
- example_data2={
- 'metadata': {
- 'other': {},
- 'name': 'test_metadata',
- 'version': '0.0.1'
- },
- 'mappings': ['mapping2'],
- },
- )
+ assert actual_licenses == [expected_license]
- def test_origin_intrinsic_metadata_get(self):
+ def test_origin_intrinsic_metadata_get(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
metadata = {
'version': None,
'name': None,
}
metadata_rev = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': metadata,
'mappings': ['mapping1'],
'indexer_configuration_id': tool_id,
}
metadata_origin = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata,
'indexer_configuration_id': tool_id,
'mappings': ['mapping1'],
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
# when
- self.storage.revision_intrinsic_metadata_add([metadata_rev])
- self.storage.origin_intrinsic_metadata_add([metadata_origin])
+ storage.revision_intrinsic_metadata_add([metadata_rev])
+ storage.origin_intrinsic_metadata_add([metadata_origin])
# then
- actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
- [self.origin_url_1, 'no://where']))
+ actual_metadata = list(storage.origin_intrinsic_metadata_get(
+ [data.origin_url_1, 'no://where']))
expected_metadata = [{
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata,
- 'tool': self.tools['swh-metadata-detector'],
- 'from_revision': self.revision_id_2,
+ 'tool': data.tools['swh-metadata-detector'],
+ 'from_revision': data.revision_id_2,
'mappings': ['mapping1'],
}]
- self.assertEqual(actual_metadata, expected_metadata)
+ assert actual_metadata == expected_metadata
- def test_origin_intrinsic_metadata_delete(self):
+ def test_origin_intrinsic_metadata_delete(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
metadata = {
'version': None,
'name': None,
}
metadata_rev = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': metadata,
'mappings': ['mapping1'],
'indexer_configuration_id': tool_id,
}
metadata_origin = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata,
'indexer_configuration_id': tool_id,
'mappings': ['mapping1'],
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
metadata_origin2 = metadata_origin.copy()
- metadata_origin2['id'] = self.origin_url_2
+ metadata_origin2['id'] = data.origin_url_2
# when
- self.storage.revision_intrinsic_metadata_add([metadata_rev])
- self.storage.origin_intrinsic_metadata_add([
+ storage.revision_intrinsic_metadata_add([metadata_rev])
+ storage.origin_intrinsic_metadata_add([
metadata_origin, metadata_origin2])
- self.storage.origin_intrinsic_metadata_delete([
+ storage.origin_intrinsic_metadata_delete([
{
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'indexer_configuration_id': tool_id
}
])
# then
- actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
- [self.origin_url_1, self.origin_url_2, 'no://where']))
+ actual_metadata = list(storage.origin_intrinsic_metadata_get(
+ [data.origin_url_1, data.origin_url_2, 'no://where']))
for item in actual_metadata:
item['indexer_configuration_id'] = item.pop('tool')['id']
- self.assertEqual(actual_metadata, [metadata_origin2])
+ assert actual_metadata == [metadata_origin2]
- def test_origin_intrinsic_metadata_delete_nonexisting(self):
- tool_id = self.tools['swh-metadata-detector']['id']
- self.storage.origin_intrinsic_metadata_delete([
+ def test_origin_intrinsic_metadata_delete_nonexisting(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ tool_id = data.tools['swh-metadata-detector']['id']
+ storage.origin_intrinsic_metadata_delete([
{
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'indexer_configuration_id': tool_id
}
])
- def test_origin_intrinsic_metadata_add_drop_duplicate(self):
+ def test_origin_intrinsic_metadata_add_drop_duplicate(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
metadata_v1 = {
'version': None,
'name': None,
}
metadata_rev_v1 = {
- 'id': self.revision_id_1,
+ 'id': data.revision_id_1,
'metadata': metadata_v1.copy(),
'mappings': [],
'indexer_configuration_id': tool_id,
}
metadata_origin_v1 = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata_v1.copy(),
'indexer_configuration_id': tool_id,
'mappings': [],
- 'from_revision': self.revision_id_1,
+ 'from_revision': data.revision_id_1,
}
# given
- self.storage.revision_intrinsic_metadata_add([metadata_rev_v1])
- self.storage.origin_intrinsic_metadata_add([metadata_origin_v1])
+ storage.revision_intrinsic_metadata_add([metadata_rev_v1])
+ storage.origin_intrinsic_metadata_add([metadata_origin_v1])
# when
- actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
- [self.origin_url_1, 'no://where']))
+ actual_metadata = list(storage.origin_intrinsic_metadata_get(
+ [data.origin_url_1, 'no://where']))
expected_metadata_v1 = [{
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata_v1,
- 'tool': self.tools['swh-metadata-detector'],
- 'from_revision': self.revision_id_1,
+ 'tool': data.tools['swh-metadata-detector'],
+ 'from_revision': data.revision_id_1,
'mappings': [],
}]
- self.assertEqual(actual_metadata, expected_metadata_v1)
+ assert actual_metadata == expected_metadata_v1
# given
metadata_v2 = metadata_v1.copy()
@@ -1130,55 +994,57 @@
metadata_rev_v2['metadata'] = metadata_v2
metadata_origin_v2['metadata'] = metadata_v2
- self.storage.revision_intrinsic_metadata_add([metadata_rev_v2])
- self.storage.origin_intrinsic_metadata_add([metadata_origin_v2])
+ storage.revision_intrinsic_metadata_add([metadata_rev_v2])
+ storage.origin_intrinsic_metadata_add([metadata_origin_v2])
# then
- actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
- [self.origin_url_1]))
+ actual_metadata = list(storage.origin_intrinsic_metadata_get(
+ [data.origin_url_1]))
# metadata did not change as the v2 was dropped.
- self.assertEqual(actual_metadata, expected_metadata_v1)
+ assert actual_metadata == expected_metadata_v1
- def test_origin_intrinsic_metadata_add_update_in_place_duplicate(self):
+ def test_origin_intrinsic_metadata_add_update_in_place_duplicate(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
metadata_v1 = {
'version': None,
'name': None,
}
metadata_rev_v1 = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': metadata_v1,
'mappings': [],
'indexer_configuration_id': tool_id,
}
metadata_origin_v1 = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata_v1.copy(),
'indexer_configuration_id': tool_id,
'mappings': [],
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
# given
- self.storage.revision_intrinsic_metadata_add([metadata_rev_v1])
- self.storage.origin_intrinsic_metadata_add([metadata_origin_v1])
+ storage.revision_intrinsic_metadata_add([metadata_rev_v1])
+ storage.origin_intrinsic_metadata_add([metadata_origin_v1])
# when
- actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
- [self.origin_url_1]))
+ actual_metadata = list(storage.origin_intrinsic_metadata_get(
+ [data.origin_url_1]))
# then
expected_metadata_v1 = [{
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata_v1,
- 'tool': self.tools['swh-metadata-detector'],
- 'from_revision': self.revision_id_2,
+ 'tool': data.tools['swh-metadata-detector'],
+ 'from_revision': data.revision_id_2,
'mappings': [],
}]
- self.assertEqual(actual_metadata, expected_metadata_v1)
+ assert actual_metadata == expected_metadata_v1
# given
metadata_v2 = metadata_v1.copy()
@@ -1190,35 +1056,37 @@
metadata_origin_v2 = metadata_origin_v1.copy()
metadata_rev_v2['metadata'] = metadata_v2
metadata_origin_v2 = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata_v2.copy(),
'indexer_configuration_id': tool_id,
'mappings': ['npm'],
- 'from_revision': self.revision_id_1,
+ 'from_revision': data.revision_id_1,
}
- self.storage.revision_intrinsic_metadata_add(
+ storage.revision_intrinsic_metadata_add(
[metadata_rev_v2], conflict_update=True)
- self.storage.origin_intrinsic_metadata_add(
+ storage.origin_intrinsic_metadata_add(
[metadata_origin_v2], conflict_update=True)
- actual_metadata = list(self.storage.origin_intrinsic_metadata_get(
- [self.origin_url_1]))
+ actual_metadata = list(storage.origin_intrinsic_metadata_get(
+ [data.origin_url_1]))
expected_metadata_v2 = [{
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata_v2,
- 'tool': self.tools['swh-metadata-detector'],
- 'from_revision': self.revision_id_1,
+ 'tool': data.tools['swh-metadata-detector'],
+ 'from_revision': data.revision_id_1,
'mappings': ['npm'],
}]
# metadata did change as the v2 was used to overwrite v1
- self.assertEqual(actual_metadata, expected_metadata_v2)
+ assert actual_metadata == expected_metadata_v2
- def test_origin_intrinsic_metadata_add__update_in_place_deadlock(self):
+ def test_origin_intrinsic_metadata_add__update_in_place_deadlock(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
ids = list(range(10))
@@ -1238,7 +1106,7 @@
}
metadata_rev_v1 = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': {
'version': None,
'name': None,
@@ -1250,7 +1118,7 @@
data_v1 = [
{
'id': 'file:///tmp/origin%d' % id_,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
**example_data1,
'indexer_configuration_id': tool_id,
}
@@ -1259,7 +1127,7 @@
data_v2 = [
{
'id': 'file:///tmp/origin%d' % id_,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
**example_data2,
'indexer_configuration_id': tool_id,
}
@@ -1272,33 +1140,33 @@
data_v2b = list(reversed(data_v2[0:-1]))
# given
- self.storage.revision_intrinsic_metadata_add([metadata_rev_v1])
- self.storage.origin_intrinsic_metadata_add(data_v1)
+ storage.revision_intrinsic_metadata_add([metadata_rev_v1])
+ storage.origin_intrinsic_metadata_add(data_v1)
# when
origins = ['file:///tmp/origin%d' % i for i in ids]
- actual_data = list(self.storage.origin_intrinsic_metadata_get(origins))
+ actual_data = list(storage.origin_intrinsic_metadata_get(origins))
expected_data_v1 = [
{
'id': 'file:///tmp/origin%d' % id_,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
**example_data1,
- 'tool': self.tools['swh-metadata-detector'],
+ 'tool': data.tools['swh-metadata-detector'],
}
for id_ in ids
]
# then
- self.assertEqual(actual_data, expected_data_v1)
+ assert actual_data == expected_data_v1
# given
def f1():
- self.storage.origin_intrinsic_metadata_add(
+ storage.origin_intrinsic_metadata_add(
data_v2a, conflict_update=True)
def f2():
- self.storage.origin_intrinsic_metadata_add(
+ storage.origin_intrinsic_metadata_add(
data_v2b, conflict_update=True)
t1 = threading.Thread(target=f1)
@@ -1309,112 +1177,112 @@
t1.join()
t2.join()
- actual_data = list(self.storage.origin_intrinsic_metadata_get(origins))
+ actual_data = list(storage.origin_intrinsic_metadata_get(origins))
expected_data_v2 = [
{
'id': 'file:///tmp/origin%d' % id_,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
**example_data2,
- 'tool': self.tools['swh-metadata-detector'],
+ 'tool': data.tools['swh-metadata-detector'],
}
for id_ in ids
]
- self.maxDiff = None
- self.assertCountEqual(actual_data, expected_data_v2)
+ assert len(actual_data) == len(expected_data_v2)
+ assert sorted(actual_data, key=lambda x: x['id']) == expected_data_v2
- def test_origin_intrinsic_metadata_add__duplicate_twice(self):
+ def test_origin_intrinsic_metadata_add__duplicate_twice(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
metadata = {
'developmentStatus': None,
'name': None,
}
metadata_rev = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': metadata,
'mappings': ['mapping1'],
'indexer_configuration_id': tool_id,
}
metadata_origin = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata,
'indexer_configuration_id': tool_id,
'mappings': ['mapping1'],
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
# when
- self.storage.revision_intrinsic_metadata_add([metadata_rev])
+ storage.revision_intrinsic_metadata_add([metadata_rev])
- with self.assertRaises(ValueError):
- self.storage.origin_intrinsic_metadata_add([
+ with pytest.raises(ValueError):
+ storage.origin_intrinsic_metadata_add([
metadata_origin, metadata_origin])
- def test_origin_intrinsic_metadata_search_fulltext(self):
+ def test_origin_intrinsic_metadata_search_fulltext(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
metadata1 = {
'author': 'John Doe',
}
metadata1_rev = {
- 'id': self.revision_id_1,
+ 'id': data.revision_id_1,
'metadata': metadata1,
'mappings': [],
'indexer_configuration_id': tool_id,
}
metadata1_origin = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata1,
'mappings': [],
'indexer_configuration_id': tool_id,
- 'from_revision': self.revision_id_1,
+ 'from_revision': data.revision_id_1,
}
metadata2 = {
'author': 'Jane Doe',
}
metadata2_rev = {
- 'id': self.revision_id_2,
- 'origin': self.origin_url_1,
+ 'id': data.revision_id_2,
'metadata': metadata2,
'mappings': [],
'indexer_configuration_id': tool_id,
}
metadata2_origin = {
- 'id': self.origin_url_2,
+ 'id': data.origin_url_2,
'metadata': metadata2,
'mappings': [],
'indexer_configuration_id': tool_id,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
# when
- self.storage.revision_intrinsic_metadata_add([metadata1_rev])
- self.storage.origin_intrinsic_metadata_add([metadata1_origin])
- self.storage.revision_intrinsic_metadata_add([metadata2_rev])
- self.storage.origin_intrinsic_metadata_add([metadata2_origin])
+ storage.revision_intrinsic_metadata_add([metadata1_rev])
+ storage.origin_intrinsic_metadata_add([metadata1_origin])
+ storage.revision_intrinsic_metadata_add([metadata2_rev])
+ storage.origin_intrinsic_metadata_add([metadata2_origin])
# then
- search = self.storage.origin_intrinsic_metadata_search_fulltext
- self.assertCountEqual(
- [res['id'] for res in search(['Doe'])],
- [self.origin_url_1, self.origin_url_2])
- self.assertEqual(
- [res['id'] for res in search(['John', 'Doe'])],
- [self.origin_url_1])
- self.assertEqual(
- [res['id'] for res in search(['John'])],
- [self.origin_url_1])
- self.assertEqual(
- [res['id'] for res in search(['John', 'Jane'])],
- [])
-
- def test_origin_intrinsic_metadata_search_fulltext_rank(self):
+ search = storage.origin_intrinsic_metadata_search_fulltext
+ assert set([res['id'] for res in search(['Doe'])]) \
+ == set([data.origin_url_1, data.origin_url_2])
+ assert [res['id'] for res in search(['John', 'Doe'])] \
+ == [data.origin_url_1]
+ assert [res['id'] for res in search(['John'])] \
+ == [data.origin_url_1]
+ assert not list(search(['John', 'Jane']))
+
+ def test_origin_intrinsic_metadata_search_fulltext_rank(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
# given
- tool_id = self.tools['swh-metadata-detector']['id']
+ tool_id = data.tools['swh-metadata-detector']['id']
# The following authors have "Random Person" to add some more content
# to the JSON data, to work around normalization quirks when there
@@ -1428,17 +1296,17 @@
]
}
metadata1_rev = {
- 'id': self.revision_id_1,
+ 'id': data.revision_id_1,
'metadata': metadata1,
'mappings': [],
'indexer_configuration_id': tool_id,
}
metadata1_origin = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata1,
'mappings': [],
'indexer_configuration_id': tool_id,
- 'from_revision': self.revision_id_1,
+ 'from_revision': data.revision_id_1,
}
metadata2 = {
'author': [
@@ -1447,191 +1315,182 @@
]
}
metadata2_rev = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': metadata2,
'mappings': [],
'indexer_configuration_id': tool_id,
}
metadata2_origin = {
- 'id': self.origin_url_2,
+ 'id': data.origin_url_2,
'metadata': metadata2,
'mappings': [],
'indexer_configuration_id': tool_id,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
# when
- self.storage.revision_intrinsic_metadata_add([metadata1_rev])
- self.storage.origin_intrinsic_metadata_add([metadata1_origin])
- self.storage.revision_intrinsic_metadata_add([metadata2_rev])
- self.storage.origin_intrinsic_metadata_add([metadata2_origin])
+ storage.revision_intrinsic_metadata_add([metadata1_rev])
+ storage.origin_intrinsic_metadata_add([metadata1_origin])
+ storage.revision_intrinsic_metadata_add([metadata2_rev])
+ storage.origin_intrinsic_metadata_add([metadata2_origin])
# then
- search = self.storage.origin_intrinsic_metadata_search_fulltext
- self.assertEqual(
- [res['id'] for res in search(['Doe'])],
- [self.origin_url_1, self.origin_url_2])
- self.assertEqual(
- [res['id'] for res in search(['Doe'], limit=1)],
- [self.origin_url_1])
- self.assertEqual(
- [res['id'] for res in search(['John'])],
- [self.origin_url_1])
- self.assertEqual(
- [res['id'] for res in search(['Jane'])],
- [self.origin_url_2, self.origin_url_1])
- self.assertEqual(
- [res['id'] for res in search(['John', 'Jane'])],
- [self.origin_url_1])
-
- def _fill_origin_intrinsic_metadata(self):
- tool1_id = self.tools['swh-metadata-detector']['id']
- tool2_id = self.tools['swh-metadata-detector2']['id']
+ search = storage.origin_intrinsic_metadata_search_fulltext
+ assert [res['id'] for res in search(['Doe'])] \
+ == [data.origin_url_1, data.origin_url_2]
+ assert [res['id'] for res in search(['Doe'], limit=1)] \
+ == [data.origin_url_1]
+ assert [res['id'] for res in search(['John'])] \
+ == [data.origin_url_1]
+ assert [res['id'] for res in search(['Jane'])] \
+ == [data.origin_url_2, data.origin_url_1]
+ assert [res['id'] for res in search(['John', 'Jane'])] \
+ == [data.origin_url_1]
+
+ def _fill_origin_intrinsic_metadata(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ tool1_id = data.tools['swh-metadata-detector']['id']
+ tool2_id = data.tools['swh-metadata-detector2']['id']
metadata1 = {
'@context': 'foo',
'author': 'John Doe',
}
metadata1_rev = {
- 'id': self.revision_id_1,
+ 'id': data.revision_id_1,
'metadata': metadata1,
'mappings': ['npm'],
'indexer_configuration_id': tool1_id,
}
metadata1_origin = {
- 'id': self.origin_url_1,
+ 'id': data.origin_url_1,
'metadata': metadata1,
'mappings': ['npm'],
'indexer_configuration_id': tool1_id,
- 'from_revision': self.revision_id_1,
+ 'from_revision': data.revision_id_1,
}
metadata2 = {
'@context': 'foo',
'author': 'Jane Doe',
}
metadata2_rev = {
- 'id': self.revision_id_2,
+ 'id': data.revision_id_2,
'metadata': metadata2,
'mappings': ['npm', 'gemspec'],
'indexer_configuration_id': tool2_id,
}
metadata2_origin = {
- 'id': self.origin_url_2,
+ 'id': data.origin_url_2,
'metadata': metadata2,
'mappings': ['npm', 'gemspec'],
'indexer_configuration_id': tool2_id,
- 'from_revision': self.revision_id_2,
+ 'from_revision': data.revision_id_2,
}
metadata3 = {
'@context': 'foo',
}
metadata3_rev = {
- 'id': self.revision_id_3,
+ 'id': data.revision_id_3,
'metadata': metadata3,
'mappings': ['npm', 'gemspec'],
'indexer_configuration_id': tool2_id,
}
metadata3_origin = {
- 'id': self.origin_url_3,
+ 'id': data.origin_url_3,
'metadata': metadata3,
'mappings': ['pkg-info'],
'indexer_configuration_id': tool2_id,
- 'from_revision': self.revision_id_3,
+ 'from_revision': data.revision_id_3,
}
- self.storage.revision_intrinsic_metadata_add([metadata1_rev])
- self.storage.origin_intrinsic_metadata_add([metadata1_origin])
- self.storage.revision_intrinsic_metadata_add([metadata2_rev])
- self.storage.origin_intrinsic_metadata_add([metadata2_origin])
- self.storage.revision_intrinsic_metadata_add([metadata3_rev])
- self.storage.origin_intrinsic_metadata_add([metadata3_origin])
-
- def test_origin_intrinsic_metadata_search_by_producer(self):
- self._fill_origin_intrinsic_metadata()
- tool1 = self.tools['swh-metadata-detector']
- tool2 = self.tools['swh-metadata-detector2']
- endpoint = self.storage.origin_intrinsic_metadata_search_by_producer
+ storage.revision_intrinsic_metadata_add([metadata1_rev])
+ storage.origin_intrinsic_metadata_add([metadata1_origin])
+ storage.revision_intrinsic_metadata_add([metadata2_rev])
+ storage.origin_intrinsic_metadata_add([metadata2_origin])
+ storage.revision_intrinsic_metadata_add([metadata3_rev])
+ storage.origin_intrinsic_metadata_add([metadata3_origin])
+
+ def test_origin_intrinsic_metadata_search_by_producer(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ self._fill_origin_intrinsic_metadata(
+ swh_indexer_storage_w_data)
+ tool1 = data.tools['swh-metadata-detector']
+ tool2 = data.tools['swh-metadata-detector2']
+ endpoint = storage.origin_intrinsic_metadata_search_by_producer
# test pagination
# no 'page_token' param, return all origins
- self.assertCountEqual(
- endpoint(ids_only=True)['origins'],
- [self.origin_url_1, self.origin_url_2, self.origin_url_3])
+ assert endpoint(ids_only=True)['origins'] \
+ == [data.origin_url_1, data.origin_url_2, data.origin_url_3]
# 'page_token' is < than origin_1, return everything
- self.assertCountEqual(
- endpoint(
- page_token=self.origin_url_1[:-1], ids_only=True)['origins'],
- [self.origin_url_1, self.origin_url_2, self.origin_url_3])
+ assert endpoint(
+ page_token=data.origin_url_1[:-1], ids_only=True)['origins'] \
+ == [data.origin_url_1, data.origin_url_2, data.origin_url_3]
# 'page_token' is origin_3, return nothing
- self.assertCountEqual(
- endpoint(page_token=self.origin_url_3, ids_only=True)['origins'],
- [])
+ assert not endpoint(
+ page_token=data.origin_url_3, ids_only=True)['origins']
# test limit argument
- self.assertCountEqual(
- endpoint(page_token=self.origin_url_1[:-1],
- limit=2, ids_only=True)['origins'],
- [self.origin_url_1, self.origin_url_2])
- self.assertCountEqual(
- endpoint(page_token=self.origin_url_1,
- limit=2, ids_only=True)['origins'],
- [self.origin_url_2, self.origin_url_3])
- self.assertCountEqual(
- endpoint(page_token=self.origin_url_2,
- limit=2, ids_only=True)['origins'],
- [self.origin_url_3])
+ assert endpoint(page_token=data.origin_url_1[:-1],
+ limit=2, ids_only=True)['origins'] \
+ == [data.origin_url_1, data.origin_url_2]
+ assert endpoint(page_token=data.origin_url_1,
+ limit=2, ids_only=True)['origins'] \
+ == [data.origin_url_2, data.origin_url_3]
+ assert endpoint(page_token=data.origin_url_2,
+ limit=2, ids_only=True)['origins'] \
+ == [data.origin_url_3]
# test mappings filtering
- self.assertCountEqual(
- endpoint(mappings=['npm'], ids_only=True)['origins'],
- [self.origin_url_1, self.origin_url_2])
- self.assertCountEqual(
- endpoint(mappings=['npm', 'gemspec'], ids_only=True)['origins'],
- [self.origin_url_1, self.origin_url_2])
- self.assertCountEqual(
- endpoint(mappings=['gemspec'], ids_only=True)['origins'],
- [self.origin_url_2])
- self.assertCountEqual(
- endpoint(mappings=['pkg-info'], ids_only=True)['origins'],
- [self.origin_url_3])
- self.assertCountEqual(
- endpoint(mappings=['foobar'], ids_only=True)['origins'],
- [])
+ assert endpoint(mappings=['npm'], ids_only=True)['origins'] \
+ == [data.origin_url_1, data.origin_url_2]
+ assert endpoint(mappings=['npm', 'gemspec'],
+ ids_only=True)['origins'] \
+ == [data.origin_url_1, data.origin_url_2]
+ assert endpoint(mappings=['gemspec'], ids_only=True)['origins'] \
+ == [data.origin_url_2]
+ assert endpoint(mappings=['pkg-info'], ids_only=True)['origins'] \
+ == [data.origin_url_3]
+ assert not endpoint(mappings=['foobar'], ids_only=True)['origins']
# test pagination + mappings
- self.assertCountEqual(
- endpoint(mappings=['npm'], limit=1, ids_only=True)['origins'],
- [self.origin_url_1])
+ assert endpoint(mappings=['npm'], limit=1, ids_only=True)['origins'] \
+ == [data.origin_url_1]
# test tool filtering
- self.assertCountEqual(
- endpoint(tool_ids=[tool1['id']], ids_only=True)['origins'],
- [self.origin_url_1])
- self.assertCountEqual(
- endpoint(tool_ids=[tool2['id']], ids_only=True)['origins'],
- [self.origin_url_2, self.origin_url_3])
- self.assertCountEqual(
- endpoint(tool_ids=[tool1['id'], tool2['id']],
- ids_only=True)['origins'],
- [self.origin_url_1, self.origin_url_2, self.origin_url_3])
+ assert endpoint(
+ tool_ids=[tool1['id']], ids_only=True)['origins'] \
+ == [data.origin_url_1]
+ assert sorted(endpoint(
+ tool_ids=[tool2['id']], ids_only=True)['origins']) \
+ == [data.origin_url_2, data.origin_url_3]
+ assert sorted(endpoint(
+ tool_ids=[tool1['id'], tool2['id']], ids_only=True)['origins']) \
+ == [data.origin_url_1, data.origin_url_2, data.origin_url_3]
# test ids_only=False
- self.assertEqual(endpoint(mappings=['gemspec'])['origins'], [{
- 'id': self.origin_url_2,
- 'metadata': {
- '@context': 'foo',
- 'author': 'Jane Doe',
- },
- 'mappings': ['npm', 'gemspec'],
- 'tool': tool2,
- 'from_revision': self.revision_id_2,
- }])
+ assert endpoint(mappings=['gemspec'])['origins'] \
+ == [{
+ 'id': data.origin_url_2,
+ 'metadata': {
+ '@context': 'foo',
+ 'author': 'Jane Doe',
+ },
+ 'mappings': ['npm', 'gemspec'],
+ 'tool': tool2,
+ 'from_revision': data.revision_id_2,
+ }]
- def test_origin_intrinsic_metadata_stats(self):
- self._fill_origin_intrinsic_metadata()
+ def test_origin_intrinsic_metadata_stats(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
+ self._fill_origin_intrinsic_metadata(
+ swh_indexer_storage_w_data)
- result = self.storage.origin_intrinsic_metadata_stats()
- self.assertEqual(result, {
+ result = storage.origin_intrinsic_metadata_stats()
+ assert result == {
'per_mapping': {
'gemspec': 1,
'npm': 2,
@@ -1641,44 +1500,48 @@
},
'total': 3,
'non_empty': 2,
- })
+ }
- def test_indexer_configuration_add(self):
+ def test_indexer_configuration_add(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
tool = {
'tool_name': 'some-unknown-tool',
'tool_version': 'some-version',
'tool_configuration': {"debian-package": "some-package"},
}
- actual_tool = self.storage.indexer_configuration_get(tool)
- self.assertIsNone(actual_tool) # does not exist
+ actual_tool = storage.indexer_configuration_get(tool)
+ assert actual_tool is None # does not exist
# add it
- actual_tools = list(self.storage.indexer_configuration_add([tool]))
+ actual_tools = list(storage.indexer_configuration_add([tool]))
- self.assertEqual(len(actual_tools), 1)
+ assert len(actual_tools) == 1
actual_tool = actual_tools[0]
- self.assertIsNotNone(actual_tool) # now it exists
+ assert actual_tool is not None # now it exists
new_id = actual_tool.pop('id')
- self.assertEqual(actual_tool, tool)
+ assert actual_tool == tool
- actual_tools2 = list(self.storage.indexer_configuration_add([tool]))
+ actual_tools2 = list(storage.indexer_configuration_add([tool]))
actual_tool2 = actual_tools2[0]
- self.assertIsNotNone(actual_tool2) # now it exists
+ assert actual_tool2 is not None # now it exists
new_id2 = actual_tool2.pop('id')
- self.assertEqual(new_id, new_id2)
- self.assertEqual(actual_tool, actual_tool2)
+ assert new_id == new_id2
+ assert actual_tool == actual_tool2
- def test_indexer_configuration_add_multiple(self):
+ def test_indexer_configuration_add_multiple(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
tool = {
'tool_name': 'some-unknown-tool',
'tool_version': 'some-version',
'tool_configuration': {"debian-package": "some-package"},
}
- actual_tools = list(self.storage.indexer_configuration_add([tool]))
- self.assertEqual(len(actual_tools), 1)
+ actual_tools = list(storage.indexer_configuration_add([tool]))
+ assert len(actual_tools) == 1
new_tools = [tool, {
'tool_name': 'yet-another-tool',
@@ -1686,85 +1549,93 @@
'tool_configuration': {},
}]
- actual_tools = list(self.storage.indexer_configuration_add(new_tools))
- self.assertEqual(len(actual_tools), 2)
+ actual_tools = list(storage.indexer_configuration_add(new_tools))
+ assert len(actual_tools) == 2
# order not guaranteed, so we iterate over results to check
for tool in actual_tools:
_id = tool.pop('id')
- self.assertIsNotNone(_id)
- self.assertIn(tool, new_tools)
+ assert _id is not None
+ assert tool in new_tools
- def test_indexer_configuration_get_missing(self):
+ def test_indexer_configuration_get_missing(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
tool = {
'tool_name': 'unknown-tool',
'tool_version': '3.1.0rc2-31-ga2cbb8c',
'tool_configuration': {"command_line": "nomossa <filepath>"},
}
- actual_tool = self.storage.indexer_configuration_get(tool)
+ actual_tool = storage.indexer_configuration_get(tool)
- self.assertIsNone(actual_tool)
+ assert actual_tool is None
- def test_indexer_configuration_get(self):
+ def test_indexer_configuration_get(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
tool = {
'tool_name': 'nomos',
'tool_version': '3.1.0rc2-31-ga2cbb8c',
'tool_configuration': {"command_line": "nomossa <filepath>"},
}
- self.storage.indexer_configuration_add([tool])
- actual_tool = self.storage.indexer_configuration_get(tool)
+ actual_tool = storage.indexer_configuration_get(tool)
+ assert actual_tool
expected_tool = tool.copy()
del actual_tool['id']
- self.assertEqual(expected_tool, actual_tool)
+ assert expected_tool == actual_tool
- def test_indexer_configuration_metadata_get_missing_context(self):
+ def test_indexer_configuration_metadata_get_missing_context(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
tool = {
'tool_name': 'swh-metadata-translator',
'tool_version': '0.0.1',
'tool_configuration': {"context": "unknown-context"},
}
- actual_tool = self.storage.indexer_configuration_get(tool)
+ actual_tool = storage.indexer_configuration_get(tool)
- self.assertIsNone(actual_tool)
+ assert actual_tool is None
- def test_indexer_configuration_metadata_get(self):
+ def test_indexer_configuration_metadata_get(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
tool = {
'tool_name': 'swh-metadata-translator',
'tool_version': '0.0.1',
'tool_configuration': {"type": "local", "context": "NpmMapping"},
}
- self.storage.indexer_configuration_add([tool])
- actual_tool = self.storage.indexer_configuration_get(tool)
+ storage.indexer_configuration_add([tool])
+ actual_tool = storage.indexer_configuration_get(tool)
+ assert actual_tool
expected_tool = tool.copy()
expected_tool['id'] = actual_tool['id']
- self.assertEqual(expected_tool, actual_tool)
+ assert expected_tool == actual_tool
- @pytest.mark.property_based
- def test_generate_content_mimetype_get_range_limit_none(self):
+ def test_generate_content_mimetype_get_range_limit_none(
+ self, swh_indexer_storage):
+ storage = swh_indexer_storage
"""mimetype_get_range call with wrong limit input should fail"""
- with self.assertRaises(ValueError) as e:
- self.storage.content_mimetype_get_range(
+ with pytest.raises(ValueError) as e:
+ storage.content_mimetype_get_range(
start=None, end=None, indexer_configuration_id=None,
limit=None)
- self.assertEqual(e.exception.args, (
- 'Development error: limit should not be None',))
+ assert e.value.args == (
+ 'Development error: limit should not be None',)
- @pytest.mark.property_based
- @given(gen_content_mimetypes(min_size=1, max_size=4))
- def test_generate_content_mimetype_get_range_no_limit(self, mimetypes):
+ def test_generate_content_mimetype_get_range_no_limit(
+ self, swh_indexer_storage_w_data):
"""mimetype_get_range returns mimetypes within range provided"""
- self.reset_storage_tables()
- # add mimetypes to storage
- self.storage.content_mimetype_add(mimetypes)
+ storage, data = swh_indexer_storage_w_data
+ mimetypes = data.mimetypes
# All ids from the db
content_ids = sorted([c['id'] for c in mimetypes])
@@ -1774,95 +1645,81 @@
# retrieve mimetypes
tool_id = mimetypes[0]['indexer_configuration_id']
- actual_result = self.storage.content_mimetype_get_range(
+ actual_result = storage.content_mimetype_get_range(
start, end, indexer_configuration_id=tool_id)
actual_ids = actual_result['ids']
actual_next = actual_result['next']
- self.assertEqual(len(mimetypes), len(actual_ids))
- self.assertIsNone(actual_next)
- self.assertEqual(content_ids, actual_ids)
+ assert len(mimetypes) == len(actual_ids)
+ assert actual_next is None
+ assert content_ids == actual_ids
- @pytest.mark.property_based
- @given(gen_content_mimetypes(min_size=4, max_size=4))
- def test_generate_content_mimetype_get_range_limit(self, mimetypes):
+ def test_generate_content_mimetype_get_range_limit(
+ self, swh_indexer_storage_w_data):
"""mimetype_get_range paginates results if limit exceeded"""
- self.reset_storage_tables()
-
- # add mimetypes to storage
- self.storage.content_mimetype_add(mimetypes)
+ storage, data = swh_indexer_storage_w_data
# input the list of sha1s we want from storage
- content_ids = sorted([c['id'] for c in mimetypes])
+ content_ids = sorted(
+ [c['id'] for c in data.mimetypes])
+ mimetypes = list(storage.content_mimetype_get(content_ids))
+ assert len(mimetypes) == len(data.mimetypes)
+
start = content_ids[0]
end = content_ids[-1]
-
- # retrieve mimetypes limited to 3 results
- limited_results = len(mimetypes) - 1
- tool_id = mimetypes[0]['indexer_configuration_id']
- actual_result = self.storage.content_mimetype_get_range(
+ # retrieve mimetypes limited to 10 results
+ actual_result = storage.content_mimetype_get_range(
start, end,
- indexer_configuration_id=tool_id, limit=limited_results)
+ indexer_configuration_id=1,
+ limit=10)
+ assert actual_result
+ assert set(actual_result.keys()) == {'ids', 'next'}
actual_ids = actual_result['ids']
actual_next = actual_result['next']
- self.assertEqual(limited_results, len(actual_ids))
- self.assertIsNotNone(actual_next)
- self.assertEqual(actual_next, content_ids[-1])
+ assert len(actual_ids) == 10
+ assert actual_next is not None
+ assert actual_next == content_ids[10]
- expected_mimetypes = content_ids[:-1]
- self.assertEqual(expected_mimetypes, actual_ids)
+ expected_mimetypes = content_ids[:10]
+ assert expected_mimetypes == actual_ids
# retrieve next part
- actual_results2 = self.storage.content_mimetype_get_range(
- start=end, end=end, indexer_configuration_id=tool_id)
- actual_ids2 = actual_results2['ids']
- actual_next2 = actual_results2['next']
+ actual_result = storage.content_mimetype_get_range(
+ start=end, end=end, indexer_configuration_id=1)
+ assert set(actual_result.keys()) == {'ids', 'next'}
+ actual_ids = actual_result['ids']
+ actual_next = actual_result['next']
- self.assertIsNone(actual_next2)
- expected_mimetypes2 = [content_ids[-1]]
- self.assertEqual(expected_mimetypes2, actual_ids2)
+ assert actual_next is None
+ expected_mimetypes = [content_ids[-1]]
+ assert expected_mimetypes == actual_ids
- @pytest.mark.property_based
- def test_generate_content_fossology_license_get_range_limit_none(self):
+ def test_generate_content_fossology_license_get_range_limit_none(
+ self, swh_indexer_storage_w_data):
+ storage, data = swh_indexer_storage_w_data
"""license_get_range call with wrong limit input should fail"""
- with self.assertRaises(ValueError) as e:
- self.storage.content_fossology_license_get_range(
+ with pytest.raises(ValueError) as e:
+ storage.content_fossology_license_get_range(
start=None, end=None, indexer_configuration_id=None,
limit=None)
- self.assertEqual(e.exception.args, (
- 'Development error: limit should not be None',))
-
- @pytest.mark.property_based
- def prepare_mimetypes_from(self, fossology_licenses):
- """Fossology license needs some consistent data in db to run.
-
- """
- mimetypes = []
- for c in fossology_licenses:
- mimetypes.append({
- 'id': c['id'],
- 'mimetype': 'text/plain',
- 'encoding': 'utf-8',
- 'indexer_configuration_id': c['indexer_configuration_id'],
- })
- return mimetypes
+ assert e.value.args == (
+ 'Development error: limit should not be None',)
- @pytest.mark.property_based
- @given(gen_content_fossology_licenses(min_size=1, max_size=4))
def test_generate_content_fossology_license_get_range_no_limit(
- self, fossology_licenses):
+ self, swh_indexer_storage_w_data):
"""license_get_range returns licenses within range provided"""
- self.reset_storage_tables()
+ storage, data = swh_indexer_storage_w_data
# craft some consistent mimetypes
- mimetypes = self.prepare_mimetypes_from(fossology_licenses)
+ fossology_licenses = data.fossology_licenses
+ mimetypes = prepare_mimetypes_from(fossology_licenses)
- self.storage.content_mimetype_add(mimetypes)
+ storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
- self.storage.content_fossology_license_add(fossology_licenses)
+ storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
content_ids = sorted([c['id'] for c in fossology_licenses])
@@ -1872,26 +1729,25 @@
# retrieve fossology_licenses
tool_id = fossology_licenses[0]['indexer_configuration_id']
- actual_result = self.storage.content_fossology_license_get_range(
+ actual_result = storage.content_fossology_license_get_range(
start, end, indexer_configuration_id=tool_id)
actual_ids = actual_result['ids']
actual_next = actual_result['next']
- self.assertEqual(len(fossology_licenses), len(actual_ids))
- self.assertIsNone(actual_next)
- self.assertEqual(content_ids, actual_ids)
+ assert len(fossology_licenses) == len(actual_ids)
+ assert actual_next is None
+ assert content_ids == actual_ids
- @pytest.mark.property_based
- @given(gen_content_fossology_licenses(min_size=1, max_size=4),
- gen_content_mimetypes(min_size=1, max_size=1))
def test_generate_content_fossology_license_get_range_no_limit_with_filter(
- self, fossology_licenses, mimetypes):
+ self, swh_indexer_storage_w_data):
"""This filters non textual, then returns results within range"""
- self.reset_storage_tables()
+ storage, data = swh_indexer_storage_w_data
+ fossology_licenses = data.fossology_licenses
+ mimetypes = data.mimetypes
# craft some consistent mimetypes
- _mimetypes = self.prepare_mimetypes_from(fossology_licenses)
+ _mimetypes = prepare_mimetypes_from(fossology_licenses)
# add binary mimetypes which will get filtered out in results
for m in mimetypes:
_mimetypes.append({
@@ -1899,9 +1755,9 @@
**m,
})
- self.storage.content_mimetype_add(_mimetypes)
+ storage.content_mimetype_add(_mimetypes, conflict_update=True)
# add fossology_licenses to storage
- self.storage.content_fossology_license_add(fossology_licenses)
+ storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
content_ids = sorted([c['id'] for c in fossology_licenses])
@@ -1911,28 +1767,28 @@
# retrieve fossology_licenses
tool_id = fossology_licenses[0]['indexer_configuration_id']
- actual_result = self.storage.content_fossology_license_get_range(
+ actual_result = storage.content_fossology_license_get_range(
start, end, indexer_configuration_id=tool_id)
actual_ids = actual_result['ids']
actual_next = actual_result['next']
- self.assertEqual(len(fossology_licenses), len(actual_ids))
- self.assertIsNone(actual_next)
- self.assertEqual(content_ids, actual_ids)
+ assert len(fossology_licenses) == len(actual_ids)
+ assert actual_next is None
+ assert content_ids == actual_ids
- @pytest.mark.property_based
- @given(gen_content_fossology_licenses(min_size=4, max_size=4))
def test_generate_fossology_license_get_range_limit(
- self, fossology_licenses):
+ self, swh_indexer_storage_w_data):
"""fossology_license_get_range paginates results if limit exceeded"""
- self.reset_storage_tables()
+ storage, data = swh_indexer_storage_w_data
+ fossology_licenses = data.fossology_licenses
+
# craft some consistent mimetypes
- mimetypes = self.prepare_mimetypes_from(fossology_licenses)
+ mimetypes = prepare_mimetypes_from(fossology_licenses)
# add fossology_licenses to storage
- self.storage.content_mimetype_add(mimetypes)
- self.storage.content_fossology_license_add(fossology_licenses)
+ storage.content_mimetype_add(mimetypes, conflict_update=True)
+ storage.content_fossology_license_add(fossology_licenses)
# input the list of sha1s we want from storage
content_ids = sorted([c['id'] for c in fossology_licenses])
@@ -1942,43 +1798,26 @@
# retrieve fossology_licenses limited to 3 results
limited_results = len(fossology_licenses) - 1
tool_id = fossology_licenses[0]['indexer_configuration_id']
- actual_result = self.storage.content_fossology_license_get_range(
+ actual_result = storage.content_fossology_license_get_range(
start, end,
indexer_configuration_id=tool_id, limit=limited_results)
actual_ids = actual_result['ids']
actual_next = actual_result['next']
- self.assertEqual(limited_results, len(actual_ids))
- self.assertIsNotNone(actual_next)
- self.assertEqual(actual_next, content_ids[-1])
+ assert limited_results == len(actual_ids)
+ assert actual_next is not None
+ assert actual_next == content_ids[-1]
expected_fossology_licenses = content_ids[:-1]
- self.assertEqual(expected_fossology_licenses, actual_ids)
+ assert expected_fossology_licenses == actual_ids
# retrieve next part
- actual_results2 = self.storage.content_fossology_license_get_range(
+ actual_results2 = storage.content_fossology_license_get_range(
start=end, end=end, indexer_configuration_id=tool_id)
actual_ids2 = actual_results2['ids']
actual_next2 = actual_results2['next']
- self.assertIsNone(actual_next2)
+ assert actual_next2 is None
expected_fossology_licenses2 = [content_ids[-1]]
- self.assertEqual(expected_fossology_licenses2, actual_ids2)
-
-
-@pytest.mark.db
-class IndexerTestStorage(CommonTestStorage, BasePgTestStorage,
- unittest.TestCase):
- """Running the tests locally.
-
- For the client api tests (remote storage), see
- `class`:swh.indexer.storage.test_api_client:TestRemoteStorage
- class.
-
- """
- pass
-
-
-def test_mapping_names():
- assert set(MAPPING_NAMES) == {m.name for m in MAPPINGS.values()}
+ assert expected_fossology_licenses2 == actual_ids2
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jan 30, 9:32 AM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3233258
Attached To
D2228: tests: migrate indexer storage tests to pytest
Event Timeline
Log In to Comment