Page MenuHomeSoftware Heritage

test_storage.py
No OneTemporary

test_storage.py

# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import inspect
import math
import threading
from typing import Dict
import pytest
from swh.model.hashutil import hash_to_bytes
from swh.indexer.storage.exc import (
IndexerStorageArgumentException,
DuplicateId,
)
from swh.indexer.storage.interface import IndexerStorageInterface
def prepare_mimetypes_from(fossology_licenses):
"""Fossology license needs some consistent data in db to run.
"""
mimetypes = []
for c in fossology_licenses:
mimetypes.append(
{
"id": c["id"],
"mimetype": "text/plain", # for filtering on textual data to work
"encoding": "utf-8",
"indexer_configuration_id": c["indexer_configuration_id"],
}
)
return mimetypes
def endpoint_name(etype: str, ename: str) -> str:
"""Compute the storage's endpoint's name
>>> endpoint_name('content_mimetype', 'add')
'content_mimetype_add'
>>> endpoint_name('content_fosso_license', 'delete')
'content_fosso_license_delete'
"""
return f"{etype}_{ename}"
def endpoint(storage, etype: str, ename: str):
return getattr(storage, endpoint_name(etype, ename))
def expected_summary(count: int, etype: str, ename: str = "add") -> Dict[str, int]:
"""Compute the expected summary
The key is determine according to etype and ename
>>> expected_summary(10, 'content_mimetype', 'add')
{'content_mimetype:add': 10}
>>> expected_summary(9, 'origin_intrinsic_metadata', 'delete')
{'origin_intrinsic_metadata:del': 9}
"""
pattern = ename[0:3]
key = endpoint_name(etype, ename).replace(f"_{ename}", f":{pattern}")
return {key: count}
def test_check_config(swh_indexer_storage):
assert swh_indexer_storage.check_config(check_write=True)
assert swh_indexer_storage.check_config(check_write=False)
def test_types(swh_indexer_storage):
"""Checks all methods of StorageInterface are implemented by this
backend, and that they have the same signature."""
# Create an instance of the protocol (which cannot be instantiated
# directly, so this creates a subclass, then instantiates it)
interface = type("_", (IndexerStorageInterface,), {})()
assert "content_mimetype_add" in dir(interface)
missing_methods = []
for meth_name in dir(interface):
if meth_name.startswith("_"):
continue
interface_meth = getattr(interface, meth_name)
try:
concrete_meth = getattr(swh_indexer_storage, meth_name)
except AttributeError:
missing_methods.append(meth_name)
continue
expected_signature = inspect.signature(interface_meth)
actual_signature = inspect.signature(concrete_meth)
assert expected_signature == actual_signature, meth_name
assert missing_methods == []
class StorageETypeTester:
"""Base class for testing a series of common behaviour between a bunch of
endpoint types supported by an IndexerStorage.
This is supposed to be inherited with the following class attributes:
- endpoint_type
- tool_name
- example_data
See below for example usage.
"""
def test_missing(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool_id = data.tools[self.tool_name]["id"]
# given 2 (hopefully) unknown objects
query = [
{"id": data.sha1_1, "indexer_configuration_id": tool_id,},
{"id": data.sha1_2, "indexer_configuration_id": tool_id,},
]
# we expect these are both returned by the xxx_missing endpoint
actual_missing = endpoint(storage, etype, "missing")(query)
assert list(actual_missing) == [
data.sha1_1,
data.sha1_2,
]
# now, when we add one of them
summary = endpoint(storage, etype, "add")(
[
{
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool_id,
}
]
)
assert summary == expected_summary(1, etype)
# we expect only the other one returned
actual_missing = endpoint(storage, etype, "missing")(query)
assert list(actual_missing) == [data.sha1_1]
def test_add__drop_duplicate(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool_id = data.tools[self.tool_name]["id"]
# add the first object
data_v1 = {
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool_id,
}
summary = endpoint(storage, etype, "add")([data_v1])
assert summary == expected_summary(1, etype)
# should be able to retrieve it
actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
expected_data_v1 = [
{
"id": data.sha1_2,
**self.example_data[0],
"tool": data.tools[self.tool_name],
}
]
assert actual_data == expected_data_v1
# now if we add a modified version of the same object (same id)
data_v2 = data_v1.copy()
data_v2.update(self.example_data[1])
summary2 = endpoint(storage, etype, "add")([data_v2])
assert summary2 == expected_summary(0, etype) # not added
# we expect to retrieve the original data, not the modified one
actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
assert actual_data == expected_data_v1
def test_add__update_in_place_duplicate(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
data_v1 = {
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
# given
summary = endpoint(storage, etype, "add")([data_v1])
assert summary == expected_summary(1, etype) # not added
# when
actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
expected_data_v1 = [{"id": data.sha1_2, **self.example_data[0], "tool": tool,}]
# then
assert actual_data == expected_data_v1
# given
data_v2 = data_v1.copy()
data_v2.update(self.example_data[1])
endpoint(storage, etype, "add")([data_v2], conflict_update=True)
assert summary == expected_summary(1, etype) # modified so counted
actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
expected_data_v2 = [{"id": data.sha1_2, **self.example_data[1], "tool": tool,}]
# data did change as the v2 was used to overwrite v1
assert actual_data == expected_data_v2
def test_add__update_in_place_deadlock(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
hashes = [
hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4{:03d}".format(i))
for i in range(1000)
]
data_v1 = [
{
"id": hash_,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
for hash_ in hashes
]
data_v2 = [
{
"id": hash_,
**self.example_data[1],
"indexer_configuration_id": tool["id"],
}
for hash_ in hashes
]
# Remove one item from each, so that both queries have to succeed for
# all items to be in the DB.
data_v2a = data_v2[1:]
data_v2b = list(reversed(data_v2[0:-1]))
# given
endpoint(storage, etype, "add")(data_v1)
# when
actual_data = list(endpoint(storage, etype, "get")(hashes))
expected_data_v1 = [
{"id": hash_, **self.example_data[0], "tool": tool,} for hash_ in hashes
]
# then
assert actual_data == expected_data_v1
# given
def f1():
endpoint(storage, etype, "add")(data_v2a, conflict_update=True)
def f2():
endpoint(storage, etype, "add")(data_v2b, conflict_update=True)
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t2.start()
t1.start()
t1.join()
t2.join()
actual_data = sorted(
endpoint(storage, etype, "get")(hashes), key=lambda x: x["id"]
)
expected_data_v2 = [
{"id": hash_, **self.example_data[1], "tool": tool,} for hash_ in hashes
]
assert actual_data == expected_data_v2
def test_add__duplicate_twice(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
data_rev1 = {
"id": data.revision_id_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
data_rev2 = {
"id": data.revision_id_2,
**self.example_data[1],
"indexer_configuration_id": tool["id"],
}
# when
summary = endpoint(storage, etype, "add")([data_rev1])
assert summary == expected_summary(1, etype)
with pytest.raises(DuplicateId):
endpoint(storage, etype, "add")(
[data_rev2, data_rev2], conflict_update=True
)
# then
actual_data = list(
endpoint(storage, etype, "get")([data.revision_id_2, data.revision_id_1])
)
expected_data = [
{"id": data.revision_id_2, **self.example_data[0], "tool": tool,}
]
assert actual_data == expected_data
def test_get(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
query = [data.sha1_2, data.sha1_1]
data1 = {
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
# when
summary = endpoint(storage, etype, "add")([data1])
assert summary == expected_summary(1, etype)
# then
actual_data = list(endpoint(storage, etype, "get")(query))
# then
expected_data = [{"id": data.sha1_2, **self.example_data[0], "tool": tool,}]
assert actual_data == expected_data
class TestIndexerStorageContentMimetypes(StorageETypeTester):
"""Test Indexer Storage content_mimetype related methods
"""
endpoint_type = "content_mimetype"
tool_name = "file"
example_data = [
{"mimetype": "text/plain", "encoding": "utf-8",},
{"mimetype": "text/html", "encoding": "us-ascii",},
]
def test_generate_content_mimetype_get_partition_failure(self, swh_indexer_storage):
"""get_partition call with wrong limit input should fail"""
storage = swh_indexer_storage
indexer_configuration_id = None
with pytest.raises(
IndexerStorageArgumentException, match="limit should not be None"
):
storage.content_mimetype_get_partition(
indexer_configuration_id, 0, 3, limit=None
)
def test_generate_content_mimetype_get_partition_no_limit(
self, swh_indexer_storage_with_data
):
"""get_partition should return result"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c["id"] for c in mimetypes])
indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
assert len(mimetypes) == 16
nb_partitions = 16
actual_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id, partition_id, nb_partitions
)
assert actual_result.next_page_token is None
actual_ids.extend(actual_result.results)
assert len(actual_ids) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_mimetype_get_partition_full(
self, swh_indexer_storage_with_data
):
"""get_partition for a single partition should return available ids
"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c["id"] for c in mimetypes])
indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id, 0, 1
)
assert actual_result.next_page_token is None
actual_ids = actual_result.results
assert len(actual_ids) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_mimetype_get_partition_empty(
self, swh_indexer_storage_with_data
):
"""get_partition when at least one of the partitions is empty"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c["id"] for c in mimetypes])
indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
# nb_partitions = smallest power of 2 such that at least one of
# the partitions is empty
nb_mimetypes = len(mimetypes)
nb_partitions = 1 << math.floor(math.log2(nb_mimetypes) + 1)
seen_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=nb_mimetypes + 1,
)
for actual_id in actual_result.results:
seen_ids.append(actual_id)
# Limit is higher than the max number of results
assert actual_result.next_page_token is None
assert set(seen_ids) == expected_ids
def test_generate_content_mimetype_get_partition_with_pagination(
self, swh_indexer_storage_with_data
):
"""get_partition should return ids provided with pagination
"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c["id"] for c in mimetypes])
indexer_configuration_id = mimetypes[0]["indexer_configuration_id"]
nb_partitions = 4
actual_ids = []
for partition_id in range(nb_partitions):
next_page_token = None
while True:
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=2,
page_token=next_page_token,
)
actual_ids.extend(actual_result.results)
next_page_token = actual_result.next_page_token
if next_page_token is None:
break
assert len(set(actual_ids)) == len(set(expected_ids))
for actual_id in actual_ids:
assert actual_id in expected_ids
class TestIndexerStorageContentLanguage(StorageETypeTester):
"""Test Indexer Storage content_language related methods
"""
endpoint_type = "content_language"
tool_name = "pygments"
example_data = [
{"lang": "haskell",},
{"lang": "common-lisp",},
]
class TestIndexerStorageContentCTags(StorageETypeTester):
"""Test Indexer Storage content_ctags related methods
"""
endpoint_type = "content_ctags"
tool_name = "universal-ctags"
example_data = [
{
"ctags": [
{"name": "done", "kind": "variable", "line": 119, "lang": "OCaml",}
]
},
{
"ctags": [
{"name": "done", "kind": "variable", "line": 100, "lang": "Python",},
{"name": "main", "kind": "function", "line": 119, "lang": "Python",},
]
},
]
# the following tests are disabled because CTAGS behaves differently
@pytest.mark.skip
def test_add__drop_duplicate(self):
pass
@pytest.mark.skip
def test_add__update_in_place_duplicate(self):
pass
@pytest.mark.skip
def test_add__update_in_place_deadlock(self):
pass
@pytest.mark.skip
def test_add__duplicate_twice(self):
pass
@pytest.mark.skip
def test_get(self):
pass
def test_content_ctags_search(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
# 1. given
tool = data.tools["universal-ctags"]
tool_id = tool["id"]
ctag1 = {
"id": data.sha1_1,
"indexer_configuration_id": tool_id,
"ctags": [
{"name": "hello", "kind": "function", "line": 133, "lang": "Python",},
{"name": "counter", "kind": "variable", "line": 119, "lang": "Python",},
{"name": "hello", "kind": "variable", "line": 210, "lang": "Python",},
],
}
ctag2 = {
"id": data.sha1_2,
"indexer_configuration_id": tool_id,
"ctags": [
{"name": "hello", "kind": "variable", "line": 100, "lang": "C",},
{"name": "result", "kind": "variable", "line": 120, "lang": "C",},
],
}
storage.content_ctags_add([ctag1, ctag2])
# 1. when
actual_ctags = list(storage.content_ctags_search("hello", limit=1))
# 1. then
assert actual_ctags == [
{
"id": ctag1["id"],
"tool": tool,
"name": "hello",
"kind": "function",
"line": 133,
"lang": "Python",
}
]
# 2. when
actual_ctags = list(
storage.content_ctags_search("hello", limit=1, last_sha1=ctag1["id"])
)
# 2. then
assert actual_ctags == [
{
"id": ctag2["id"],
"tool": tool,
"name": "hello",
"kind": "variable",
"line": 100,
"lang": "C",
}
]
# 3. when
actual_ctags = list(storage.content_ctags_search("hello"))
# 3. then
assert actual_ctags == [
{
"id": ctag1["id"],
"tool": tool,
"name": "hello",
"kind": "function",
"line": 133,
"lang": "Python",
},
{
"id": ctag1["id"],
"tool": tool,
"name": "hello",
"kind": "variable",
"line": 210,
"lang": "Python",
},
{
"id": ctag2["id"],
"tool": tool,
"name": "hello",
"kind": "variable",
"line": 100,
"lang": "C",
},
]
# 4. when
actual_ctags = list(storage.content_ctags_search("counter"))
# then
assert actual_ctags == [
{
"id": ctag1["id"],
"tool": tool,
"name": "counter",
"kind": "variable",
"line": 119,
"lang": "Python",
}
]
# 5. when
actual_ctags = list(storage.content_ctags_search("result", limit=1))
# then
assert actual_ctags == [
{
"id": ctag2["id"],
"tool": tool,
"name": "result",
"kind": "variable",
"line": 120,
"lang": "C",
}
]
def test_content_ctags_search_no_result(self, swh_indexer_storage):
storage = swh_indexer_storage
actual_ctags = list(storage.content_ctags_search("counter"))
assert not actual_ctags
def test_content_ctags_add__add_new_ctags_added(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool = data.tools["universal-ctags"]
tool_id = tool["id"]
ctag_v1 = {
"id": data.sha1_2,
"indexer_configuration_id": tool_id,
"ctags": [
{"name": "done", "kind": "variable", "line": 100, "lang": "Scheme",}
],
}
# given
storage.content_ctags_add([ctag_v1])
storage.content_ctags_add([ctag_v1]) # conflict does nothing
# when
actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
# then
expected_ctags = [
{
"id": data.sha1_2,
"name": "done",
"kind": "variable",
"line": 100,
"lang": "Scheme",
"tool": tool,
}
]
assert actual_ctags == expected_ctags
# given
ctag_v2 = ctag_v1.copy()
ctag_v2.update(
{
"ctags": [
{"name": "defn", "kind": "function", "line": 120, "lang": "Scheme",}
]
}
)
storage.content_ctags_add([ctag_v2])
expected_ctags = [
{
"id": data.sha1_2,
"name": "done",
"kind": "variable",
"line": 100,
"lang": "Scheme",
"tool": tool,
},
{
"id": data.sha1_2,
"name": "defn",
"kind": "function",
"line": 120,
"lang": "Scheme",
"tool": tool,
},
]
actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
assert actual_ctags == expected_ctags
def test_content_ctags_add__update_in_place(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
# given
tool = data.tools["universal-ctags"]
tool_id = tool["id"]
ctag_v1 = {
"id": data.sha1_2,
"indexer_configuration_id": tool_id,
"ctags": [
{"name": "done", "kind": "variable", "line": 100, "lang": "Scheme",}
],
}
# given
storage.content_ctags_add([ctag_v1])
# when
actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
# then
expected_ctags = [
{
"id": data.sha1_2,
"name": "done",
"kind": "variable",
"line": 100,
"lang": "Scheme",
"tool": tool,
}
]
assert actual_ctags == expected_ctags
# given
ctag_v2 = ctag_v1.copy()
ctag_v2.update(
{
"ctags": [
{
"name": "done",
"kind": "variable",
"line": 100,
"lang": "Scheme",
},
{
"name": "defn",
"kind": "function",
"line": 120,
"lang": "Scheme",
},
]
}
)
storage.content_ctags_add([ctag_v2], conflict_update=True)
actual_ctags = list(storage.content_ctags_get([data.sha1_2]))
# ctag did change as the v2 was used to overwrite v1
expected_ctags = [
{
"id": data.sha1_2,
"name": "done",
"kind": "variable",
"line": 100,
"lang": "Scheme",
"tool": tool,
},
{
"id": data.sha1_2,
"name": "defn",
"kind": "function",
"line": 120,
"lang": "Scheme",
"tool": tool,
},
]
assert actual_ctags == expected_ctags
class TestIndexerStorageContentMetadata(StorageETypeTester):
"""Test Indexer Storage content_metadata related methods
"""
tool_name = "swh-metadata-detector"
endpoint_type = "content_metadata"
example_data = [
{
"metadata": {
"other": {},
"codeRepository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test",
},
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
},
{"metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"},},
]
class TestIndexerStorageRevisionIntrinsicMetadata(StorageETypeTester):
"""Test Indexer Storage revision_intrinsic_metadata related methods
"""
tool_name = "swh-metadata-detector"
endpoint_type = "revision_intrinsic_metadata"
example_data = [
{
"metadata": {
"other": {},
"codeRepository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test",
},
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
"mappings": ["mapping1"],
},
{
"metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"},
"mappings": ["mapping2"],
},
]
def test_revision_intrinsic_metadata_delete(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
query = [data.sha1_2, data.sha1_1]
data1 = {
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
# when
summary = endpoint(storage, etype, "add")([data1])
assert summary == expected_summary(1, etype)
summary2 = endpoint(storage, etype, "delete")(
[{"id": data.sha1_2, "indexer_configuration_id": tool["id"],}]
)
assert summary2 == expected_summary(1, etype, "del")
# then
actual_data = list(endpoint(storage, etype, "get")(query))
# then
assert not actual_data
def test_revision_intrinsic_metadata_delete_nonexisting(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
endpoint(storage, etype, "delete")(
[{"id": data.sha1_2, "indexer_configuration_id": tool["id"],}]
)
class TestIndexerStorageContentFossologyLicence:
def test_content_fossology_license_add__new_license_added(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool = data.tools["nomos"]
tool_id = tool["id"]
license_v1 = {
"id": data.sha1_1,
"licenses": ["Apache-2.0"],
"indexer_configuration_id": tool_id,
}
# given
storage.content_fossology_license_add([license_v1])
# conflict does nothing
storage.content_fossology_license_add([license_v1])
# when
actual_licenses = list(storage.content_fossology_license_get([data.sha1_1]))
# then
expected_license = {data.sha1_1: [{"licenses": ["Apache-2.0"], "tool": tool,}]}
assert actual_licenses == [expected_license]
# given
license_v2 = license_v1.copy()
license_v2.update(
{"licenses": ["BSD-2-Clause"],}
)
storage.content_fossology_license_add([license_v2])
actual_licenses = list(storage.content_fossology_license_get([data.sha1_1]))
expected_license = {
data.sha1_1: [{"licenses": ["Apache-2.0", "BSD-2-Clause"], "tool": tool}]
}
# license did not change as the v2 was dropped.
assert actual_licenses == [expected_license]
def test_generate_content_fossology_license_get_partition_failure(
self, swh_indexer_storage_with_data
):
"""get_partition call with wrong limit input should fail"""
storage, data = swh_indexer_storage_with_data
indexer_configuration_id = None
with pytest.raises(
IndexerStorageArgumentException, match="limit should not be None"
):
storage.content_fossology_license_get_partition(
indexer_configuration_id, 0, 3, limit=None,
)
def test_generate_content_fossology_license_get_partition_no_limit(
self, swh_indexer_storage_with_data
):
"""get_partition should return results"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = set([c["id"] for c in fossology_licenses])
assert len(fossology_licenses) == 10
assert len(mimetypes) == 10
nb_partitions = 4
actual_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id, partition_id, nb_partitions
)
assert actual_result.next_page_token is None
actual_ids.extend(actual_result.results)
assert len(set(actual_ids)) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_fossology_license_get_partition_full(
self, swh_indexer_storage_with_data
):
"""get_partition for a single partition should return available ids
"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = set([c["id"] for c in fossology_licenses])
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id, 0, 1
)
assert actual_result.next_page_token is None
actual_ids = actual_result.results
assert len(set(actual_ids)) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_fossology_license_get_partition_empty(
self, swh_indexer_storage_with_data
):
"""get_partition when at least one of the partitions is empty"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = set([c["id"] for c in fossology_licenses])
# nb_partitions = smallest power of 2 such that at least one of
# the partitions is empty
nb_licenses = len(fossology_licenses)
nb_partitions = 1 << math.floor(math.log2(nb_licenses) + 1)
seen_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=nb_licenses + 1,
)
for actual_id in actual_result.results:
seen_ids.append(actual_id)
# Limit is higher than the max number of results
assert actual_result.next_page_token is None
assert set(seen_ids) == expected_ids
def test_generate_content_fossology_license_get_partition_with_pagination(
self, swh_indexer_storage_with_data
):
"""get_partition should return ids provided with paginationv
"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from(fossology_licenses)
indexer_configuration_id = fossology_licenses[0]["indexer_configuration_id"]
storage.content_mimetype_add(mimetypes, conflict_update=True)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = [c["id"] for c in fossology_licenses]
nb_partitions = 4
actual_ids = []
for partition_id in range(nb_partitions):
next_page_token = None
while True:
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=2,
page_token=next_page_token,
)
actual_ids.extend(actual_result.results)
next_page_token = actual_result.next_page_token
if next_page_token is None:
break
assert len(set(actual_ids)) == len(set(expected_ids))
for actual_id in actual_ids:
assert actual_id in expected_ids
class TestIndexerStorageOriginIntrinsicMetadata:
def test_origin_intrinsic_metadata_get(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"version": None,
"name": None,
}
metadata_rev = {
"id": data.revision_id_2,
"metadata": metadata,
"mappings": ["mapping1"],
"indexer_configuration_id": tool_id,
}
metadata_origin = {
"id": data.origin_url_1,
"metadata": metadata,
"indexer_configuration_id": tool_id,
"mappings": ["mapping1"],
"from_revision": data.revision_id_2,
}
# when
storage.revision_intrinsic_metadata_add([metadata_rev])
storage.origin_intrinsic_metadata_add([metadata_origin])
# then
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1, "no://where"])
)
expected_metadata = [
{
"id": data.origin_url_1,
"metadata": metadata,
"tool": data.tools["swh-metadata-detector"],
"from_revision": data.revision_id_2,
"mappings": ["mapping1"],
}
]
assert actual_metadata == expected_metadata
def test_origin_intrinsic_metadata_delete(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"version": None,
"name": None,
}
metadata_rev = {
"id": data.revision_id_2,
"metadata": metadata,
"mappings": ["mapping1"],
"indexer_configuration_id": tool_id,
}
metadata_origin = {
"id": data.origin_url_1,
"metadata": metadata,
"indexer_configuration_id": tool_id,
"mappings": ["mapping1"],
"from_revision": data.revision_id_2,
}
metadata_origin2 = metadata_origin.copy()
metadata_origin2["id"] = data.origin_url_2
# when
storage.revision_intrinsic_metadata_add([metadata_rev])
storage.origin_intrinsic_metadata_add([metadata_origin, metadata_origin2])
storage.origin_intrinsic_metadata_delete(
[{"id": data.origin_url_1, "indexer_configuration_id": tool_id}]
)
# then
actual_metadata = list(
storage.origin_intrinsic_metadata_get(
[data.origin_url_1, data.origin_url_2, "no://where"]
)
)
for item in actual_metadata:
item["indexer_configuration_id"] = item.pop("tool")["id"]
assert actual_metadata == [metadata_origin2]
def test_origin_intrinsic_metadata_delete_nonexisting(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
tool_id = data.tools["swh-metadata-detector"]["id"]
storage.origin_intrinsic_metadata_delete(
[{"id": data.origin_url_1, "indexer_configuration_id": tool_id}]
)
def test_origin_intrinsic_metadata_add_drop_duplicate(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata_v1 = {
"version": None,
"name": None,
}
metadata_rev_v1 = {
"id": data.revision_id_1,
"metadata": metadata_v1.copy(),
"mappings": [],
"indexer_configuration_id": tool_id,
}
metadata_origin_v1 = {
"id": data.origin_url_1,
"metadata": metadata_v1.copy(),
"indexer_configuration_id": tool_id,
"mappings": [],
"from_revision": data.revision_id_1,
}
# given
storage.revision_intrinsic_metadata_add([metadata_rev_v1])
storage.origin_intrinsic_metadata_add([metadata_origin_v1])
# when
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1, "no://where"])
)
expected_metadata_v1 = [
{
"id": data.origin_url_1,
"metadata": metadata_v1,
"tool": data.tools["swh-metadata-detector"],
"from_revision": data.revision_id_1,
"mappings": [],
}
]
assert actual_metadata == expected_metadata_v1
# given
metadata_v2 = metadata_v1.copy()
metadata_v2.update(
{"name": "test_metadata", "author": "MG",}
)
metadata_rev_v2 = metadata_rev_v1.copy()
metadata_origin_v2 = metadata_origin_v1.copy()
metadata_rev_v2["metadata"] = metadata_v2
metadata_origin_v2["metadata"] = metadata_v2
storage.revision_intrinsic_metadata_add([metadata_rev_v2])
storage.origin_intrinsic_metadata_add([metadata_origin_v2])
# then
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1])
)
# metadata did not change as the v2 was dropped.
assert actual_metadata == expected_metadata_v1
def test_origin_intrinsic_metadata_add_update_in_place_duplicate(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata_v1 = {
"version": None,
"name": None,
}
metadata_rev_v1 = {
"id": data.revision_id_2,
"metadata": metadata_v1,
"mappings": [],
"indexer_configuration_id": tool_id,
}
metadata_origin_v1 = {
"id": data.origin_url_1,
"metadata": metadata_v1.copy(),
"indexer_configuration_id": tool_id,
"mappings": [],
"from_revision": data.revision_id_2,
}
# given
storage.revision_intrinsic_metadata_add([metadata_rev_v1])
storage.origin_intrinsic_metadata_add([metadata_origin_v1])
# when
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1])
)
# then
expected_metadata_v1 = [
{
"id": data.origin_url_1,
"metadata": metadata_v1,
"tool": data.tools["swh-metadata-detector"],
"from_revision": data.revision_id_2,
"mappings": [],
}
]
assert actual_metadata == expected_metadata_v1
# given
metadata_v2 = metadata_v1.copy()
metadata_v2.update(
{"name": "test_update_duplicated_metadata", "author": "MG",}
)
metadata_rev_v2 = metadata_rev_v1.copy()
metadata_origin_v2 = metadata_origin_v1.copy()
metadata_rev_v2["metadata"] = metadata_v2
metadata_origin_v2 = {
"id": data.origin_url_1,
"metadata": metadata_v2.copy(),
"indexer_configuration_id": tool_id,
"mappings": ["npm"],
"from_revision": data.revision_id_1,
}
storage.revision_intrinsic_metadata_add([metadata_rev_v2], conflict_update=True)
storage.origin_intrinsic_metadata_add(
[metadata_origin_v2], conflict_update=True
)
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1])
)
expected_metadata_v2 = [
{
"id": data.origin_url_1,
"metadata": metadata_v2,
"tool": data.tools["swh-metadata-detector"],
"from_revision": data.revision_id_1,
"mappings": ["npm"],
}
]
# metadata did change as the v2 was used to overwrite v1
assert actual_metadata == expected_metadata_v2
def test_origin_intrinsic_metadata_add__update_in_place_deadlock(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
ids = list(range(10))
example_data1 = {
"metadata": {"version": None, "name": None,},
"mappings": [],
}
example_data2 = {
"metadata": {"version": "v1.1.1", "name": "foo",},
"mappings": [],
}
metadata_rev_v1 = {
"id": data.revision_id_2,
"metadata": {"version": None, "name": None,},
"mappings": [],
"indexer_configuration_id": tool_id,
}
data_v1 = [
{
"id": "file:///tmp/origin%d" % id_,
"from_revision": data.revision_id_2,
**example_data1,
"indexer_configuration_id": tool_id,
}
for id_ in ids
]
data_v2 = [
{
"id": "file:///tmp/origin%d" % id_,
"from_revision": data.revision_id_2,
**example_data2,
"indexer_configuration_id": tool_id,
}
for id_ in ids
]
# Remove one item from each, so that both queries have to succeed for
# all items to be in the DB.
data_v2a = data_v2[1:]
data_v2b = list(reversed(data_v2[0:-1]))
# given
storage.revision_intrinsic_metadata_add([metadata_rev_v1])
storage.origin_intrinsic_metadata_add(data_v1)
# when
origins = ["file:///tmp/origin%d" % i for i in ids]
actual_data = list(storage.origin_intrinsic_metadata_get(origins))
expected_data_v1 = [
{
"id": "file:///tmp/origin%d" % id_,
"from_revision": data.revision_id_2,
**example_data1,
"tool": data.tools["swh-metadata-detector"],
}
for id_ in ids
]
# then
assert actual_data == expected_data_v1
# given
def f1():
storage.origin_intrinsic_metadata_add(data_v2a, conflict_update=True)
def f2():
storage.origin_intrinsic_metadata_add(data_v2b, conflict_update=True)
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t2.start()
t1.start()
t1.join()
t2.join()
actual_data = list(storage.origin_intrinsic_metadata_get(origins))
expected_data_v2 = [
{
"id": "file:///tmp/origin%d" % id_,
"from_revision": data.revision_id_2,
**example_data2,
"tool": data.tools["swh-metadata-detector"],
}
for id_ in ids
]
assert len(actual_data) == len(expected_data_v2)
assert sorted(actual_data, key=lambda x: x["id"]) == expected_data_v2
def test_origin_intrinsic_metadata_add__duplicate_twice(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"developmentStatus": None,
"name": None,
}
metadata_rev = {
"id": data.revision_id_2,
"metadata": metadata,
"mappings": ["mapping1"],
"indexer_configuration_id": tool_id,
}
metadata_origin = {
"id": data.origin_url_1,
"metadata": metadata,
"indexer_configuration_id": tool_id,
"mappings": ["mapping1"],
"from_revision": data.revision_id_2,
}
# when
storage.revision_intrinsic_metadata_add([metadata_rev])
with pytest.raises(DuplicateId):
storage.origin_intrinsic_metadata_add([metadata_origin, metadata_origin])
def test_origin_intrinsic_metadata_search_fulltext(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata1 = {
"author": "John Doe",
}
metadata1_rev = {
"id": data.revision_id_1,
"metadata": metadata1,
"mappings": [],
"indexer_configuration_id": tool_id,
}
metadata1_origin = {
"id": data.origin_url_1,
"metadata": metadata1,
"mappings": [],
"indexer_configuration_id": tool_id,
"from_revision": data.revision_id_1,
}
metadata2 = {
"author": "Jane Doe",
}
metadata2_rev = {
"id": data.revision_id_2,
"metadata": metadata2,
"mappings": [],
"indexer_configuration_id": tool_id,
}
metadata2_origin = {
"id": data.origin_url_2,
"metadata": metadata2,
"mappings": [],
"indexer_configuration_id": tool_id,
"from_revision": data.revision_id_2,
}
# when
storage.revision_intrinsic_metadata_add([metadata1_rev])
storage.origin_intrinsic_metadata_add([metadata1_origin])
storage.revision_intrinsic_metadata_add([metadata2_rev])
storage.origin_intrinsic_metadata_add([metadata2_origin])
# then
search = storage.origin_intrinsic_metadata_search_fulltext
assert set([res["id"] for res in search(["Doe"])]) == set(
[data.origin_url_1, data.origin_url_2]
)
assert [res["id"] for res in search(["John", "Doe"])] == [data.origin_url_1]
assert [res["id"] for res in search(["John"])] == [data.origin_url_1]
assert not list(search(["John", "Jane"]))
def test_origin_intrinsic_metadata_search_fulltext_rank(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
# The following authors have "Random Person" to add some more content
# to the JSON data, to work around normalization quirks when there
# are few words (rank/(1+ln(nb_words)) is very sensitive to nb_words
# for small values of nb_words).
metadata1 = {"author": ["Random Person", "John Doe", "Jane Doe",]}
metadata1_rev = {
"id": data.revision_id_1,
"metadata": metadata1,
"mappings": [],
"indexer_configuration_id": tool_id,
}
metadata1_origin = {
"id": data.origin_url_1,
"metadata": metadata1,
"mappings": [],
"indexer_configuration_id": tool_id,
"from_revision": data.revision_id_1,
}
metadata2 = {"author": ["Random Person", "Jane Doe",]}
metadata2_rev = {
"id": data.revision_id_2,
"metadata": metadata2,
"mappings": [],
"indexer_configuration_id": tool_id,
}
metadata2_origin = {
"id": data.origin_url_2,
"metadata": metadata2,
"mappings": [],
"indexer_configuration_id": tool_id,
"from_revision": data.revision_id_2,
}
# when
storage.revision_intrinsic_metadata_add([metadata1_rev])
storage.origin_intrinsic_metadata_add([metadata1_origin])
storage.revision_intrinsic_metadata_add([metadata2_rev])
storage.origin_intrinsic_metadata_add([metadata2_origin])
# then
search = storage.origin_intrinsic_metadata_search_fulltext
assert [res["id"] for res in search(["Doe"])] == [
data.origin_url_1,
data.origin_url_2,
]
assert [res["id"] for res in search(["Doe"], limit=1)] == [data.origin_url_1]
assert [res["id"] for res in search(["John"])] == [data.origin_url_1]
assert [res["id"] for res in search(["Jane"])] == [
data.origin_url_2,
data.origin_url_1,
]
assert [res["id"] for res in search(["John", "Jane"])] == [data.origin_url_1]
def _fill_origin_intrinsic_metadata(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
tool1_id = data.tools["swh-metadata-detector"]["id"]
tool2_id = data.tools["swh-metadata-detector2"]["id"]
metadata1 = {
"@context": "foo",
"author": "John Doe",
}
metadata1_rev = {
"id": data.revision_id_1,
"metadata": metadata1,
"mappings": ["npm"],
"indexer_configuration_id": tool1_id,
}
metadata1_origin = {
"id": data.origin_url_1,
"metadata": metadata1,
"mappings": ["npm"],
"indexer_configuration_id": tool1_id,
"from_revision": data.revision_id_1,
}
metadata2 = {
"@context": "foo",
"author": "Jane Doe",
}
metadata2_rev = {
"id": data.revision_id_2,
"metadata": metadata2,
"mappings": ["npm", "gemspec"],
"indexer_configuration_id": tool2_id,
}
metadata2_origin = {
"id": data.origin_url_2,
"metadata": metadata2,
"mappings": ["npm", "gemspec"],
"indexer_configuration_id": tool2_id,
"from_revision": data.revision_id_2,
}
metadata3 = {
"@context": "foo",
}
metadata3_rev = {
"id": data.revision_id_3,
"metadata": metadata3,
"mappings": ["npm", "gemspec"],
"indexer_configuration_id": tool2_id,
}
metadata3_origin = {
"id": data.origin_url_3,
"metadata": metadata3,
"mappings": ["pkg-info"],
"indexer_configuration_id": tool2_id,
"from_revision": data.revision_id_3,
}
storage.revision_intrinsic_metadata_add([metadata1_rev])
storage.origin_intrinsic_metadata_add([metadata1_origin])
storage.revision_intrinsic_metadata_add([metadata2_rev])
storage.origin_intrinsic_metadata_add([metadata2_origin])
storage.revision_intrinsic_metadata_add([metadata3_rev])
storage.origin_intrinsic_metadata_add([metadata3_origin])
def test_origin_intrinsic_metadata_search_by_producer(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
self._fill_origin_intrinsic_metadata(swh_indexer_storage_with_data)
tool1 = data.tools["swh-metadata-detector"]
tool2 = data.tools["swh-metadata-detector2"]
endpoint = storage.origin_intrinsic_metadata_search_by_producer
# test pagination
# no 'page_token' param, return all origins
result = endpoint(ids_only=True)
assert result["origins"] == [
data.origin_url_1,
data.origin_url_2,
data.origin_url_3,
]
assert "next_page_token" not in result
# 'page_token' is < than origin_1, return everything
result = endpoint(page_token=data.origin_url_1[:-1], ids_only=True)
assert result["origins"] == [
data.origin_url_1,
data.origin_url_2,
data.origin_url_3,
]
assert "next_page_token" not in result
# 'page_token' is origin_3, return nothing
result = endpoint(page_token=data.origin_url_3, ids_only=True)
assert not result["origins"]
assert "next_page_token" not in result
# test limit argument
result = endpoint(page_token=data.origin_url_1[:-1], limit=2, ids_only=True)
assert result["origins"] == [data.origin_url_1, data.origin_url_2]
assert result["next_page_token"] == result["origins"][-1]
result = endpoint(page_token=data.origin_url_1, limit=2, ids_only=True)
assert result["origins"] == [data.origin_url_2, data.origin_url_3]
assert "next_page_token" not in result
result = endpoint(page_token=data.origin_url_2, limit=2, ids_only=True)
assert result["origins"] == [data.origin_url_3]
assert "next_page_token" not in result
# test mappings filtering
result = endpoint(mappings=["npm"], ids_only=True)
assert result["origins"] == [data.origin_url_1, data.origin_url_2]
assert "next_page_token" not in result
result = endpoint(mappings=["npm", "gemspec"], ids_only=True)
assert result["origins"] == [data.origin_url_1, data.origin_url_2]
assert "next_page_token" not in result
result = endpoint(mappings=["gemspec"], ids_only=True)
assert result["origins"] == [data.origin_url_2]
assert "next_page_token" not in result
result = endpoint(mappings=["pkg-info"], ids_only=True)
assert result["origins"] == [data.origin_url_3]
assert "next_page_token" not in result
result = endpoint(mappings=["foobar"], ids_only=True)
assert not result["origins"]
assert "next_page_token" not in result
# test pagination + mappings
result = endpoint(mappings=["npm"], limit=1, ids_only=True)
assert result["origins"] == [data.origin_url_1]
assert result["next_page_token"] == result["origins"][-1]
# test tool filtering
result = endpoint(tool_ids=[tool1["id"]], ids_only=True)
assert result["origins"] == [data.origin_url_1]
assert "next_page_token" not in result
result = endpoint(tool_ids=[tool2["id"]], ids_only=True)
assert sorted(result["origins"]) == [data.origin_url_2, data.origin_url_3]
assert "next_page_token" not in result
result = endpoint(tool_ids=[tool1["id"], tool2["id"]], ids_only=True)
assert sorted(result["origins"]) == [
data.origin_url_1,
data.origin_url_2,
data.origin_url_3,
]
assert "next_page_token" not in result
# test ids_only=False
assert endpoint(mappings=["gemspec"])["origins"] == [
{
"id": data.origin_url_2,
"metadata": {"@context": "foo", "author": "Jane Doe",},
"mappings": ["npm", "gemspec"],
"tool": tool2,
"from_revision": data.revision_id_2,
}
]
def test_origin_intrinsic_metadata_stats(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
self._fill_origin_intrinsic_metadata(swh_indexer_storage_with_data)
result = storage.origin_intrinsic_metadata_stats()
assert result == {
"per_mapping": {
"gemspec": 1,
"npm": 2,
"pkg-info": 1,
"codemeta": 0,
"maven": 0,
},
"total": 3,
"non_empty": 2,
}
class TestIndexerStorageIndexerCondifuration:
def test_indexer_configuration_add(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "some-unknown-tool",
"tool_version": "some-version",
"tool_configuration": {"debian-package": "some-package"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool is None # does not exist
# add it
actual_tools = list(storage.indexer_configuration_add([tool]))
assert len(actual_tools) == 1
actual_tool = actual_tools[0]
assert actual_tool is not None # now it exists
new_id = actual_tool.pop("id")
assert actual_tool == tool
actual_tools2 = list(storage.indexer_configuration_add([tool]))
actual_tool2 = actual_tools2[0]
assert actual_tool2 is not None # now it exists
new_id2 = actual_tool2.pop("id")
assert new_id == new_id2
assert actual_tool == actual_tool2
def test_indexer_configuration_add_multiple(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "some-unknown-tool",
"tool_version": "some-version",
"tool_configuration": {"debian-package": "some-package"},
}
actual_tools = list(storage.indexer_configuration_add([tool]))
assert len(actual_tools) == 1
new_tools = [
tool,
{
"tool_name": "yet-another-tool",
"tool_version": "version",
"tool_configuration": {},
},
]
actual_tools = list(storage.indexer_configuration_add(new_tools))
assert len(actual_tools) == 2
# order not guaranteed, so we iterate over results to check
for tool in actual_tools:
_id = tool.pop("id")
assert _id is not None
assert tool in new_tools
def test_indexer_configuration_get_missing(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "unknown-tool",
"tool_version": "3.1.0rc2-31-ga2cbb8c",
"tool_configuration": {"command_line": "nomossa <filepath>"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool is None
def test_indexer_configuration_get(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "nomos",
"tool_version": "3.1.0rc2-31-ga2cbb8c",
"tool_configuration": {"command_line": "nomossa <filepath>"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool
expected_tool = tool.copy()
del actual_tool["id"]
assert expected_tool == actual_tool
def test_indexer_configuration_metadata_get_missing_context(
self, swh_indexer_storage_with_data
):
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "swh-metadata-translator",
"tool_version": "0.0.1",
"tool_configuration": {"context": "unknown-context"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool is None
def test_indexer_configuration_metadata_get(self, swh_indexer_storage_with_data):
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "swh-metadata-translator",
"tool_version": "0.0.1",
"tool_configuration": {"type": "local", "context": "NpmMapping"},
}
storage.indexer_configuration_add([tool])
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool
expected_tool = tool.copy()
expected_tool["id"] = actual_tool["id"]
assert expected_tool == actual_tool

File Metadata

Mime Type
text/x-python
Expires
Jul 4 2025, 9:55 AM (5 w, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3387610

Event Timeline