Page MenuHomeSoftware Heritage

test_storage.py
No OneTemporary

test_storage.py

# Copyright (C) 2015-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import math
import threading
from typing import Any, Dict, List, Tuple, Type
import attr
import pytest
from swh.indexer.storage.exc import DuplicateId, IndexerStorageArgumentException
from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult
from swh.indexer.storage.model import (
BaseRow,
ContentLicenseRow,
ContentMetadataRow,
ContentMimetypeRow,
DirectoryIntrinsicMetadataRow,
OriginExtrinsicMetadataRow,
OriginIntrinsicMetadataRow,
)
from swh.model.hashutil import hash_to_bytes
def prepare_mimetypes_from_licenses(
fossology_licenses: List[ContentLicenseRow],
) -> List[ContentMimetypeRow]:
"""Fossology license needs some consistent data in db to run."""
mimetypes = []
for c in fossology_licenses:
mimetypes.append(
ContentMimetypeRow(
id=c.id,
mimetype="text/plain", # for filtering on textual data to work
encoding="utf-8",
indexer_configuration_id=c.indexer_configuration_id,
)
)
return mimetypes
def endpoint_name(etype: str, ename: str) -> str:
"""Compute the storage's endpoint's name
>>> endpoint_name('content_mimetype', 'add')
'content_mimetype_add'
>>> endpoint_name('content_fosso_license', 'delete')
'content_fosso_license_delete'
"""
return f"{etype}_{ename}"
def endpoint(storage, etype: str, ename: str):
return getattr(storage, endpoint_name(etype, ename))
def expected_summary(count: int, etype: str, ename: str = "add") -> Dict[str, int]:
"""Compute the expected summary
The key is determine according to etype and ename
>>> expected_summary(10, 'content_mimetype', 'add')
{'content_mimetype:add': 10}
>>> expected_summary(9, 'origin_intrinsic_metadata', 'delete')
{'origin_intrinsic_metadata:del': 9}
"""
pattern = ename[0:3]
key = endpoint_name(etype, ename).replace(f"_{ename}", f":{pattern}")
return {key: count}
def test_check_config(swh_indexer_storage) -> None:
assert swh_indexer_storage.check_config(check_write=True)
assert swh_indexer_storage.check_config(check_write=False)
class StorageETypeTester:
"""Base class for testing a series of common behaviour between a bunch of
endpoint types supported by an IndexerStorage.
This is supposed to be inherited with the following class attributes:
- endpoint_type
- tool_name
- example_data
See below for example usage.
"""
endpoint_type: str
tool_name: str
example_data: List[Dict]
row_class: Type[BaseRow]
def test_missing(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool_id = data.tools[self.tool_name]["id"]
# given 2 (hopefully) unknown objects
query = [
{
"id": data.sha1_1,
"indexer_configuration_id": tool_id,
},
{
"id": data.sha1_2,
"indexer_configuration_id": tool_id,
},
]
# we expect these are both returned by the xxx_missing endpoint
actual_missing = endpoint(storage, etype, "missing")(query)
assert list(actual_missing) == [
data.sha1_1,
data.sha1_2,
]
# now, when we add one of them
summary = endpoint(storage, etype, "add")(
[
self.row_class.from_dict(
{
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool_id,
}
)
]
)
assert summary == expected_summary(1, etype)
# we expect only the other one returned
actual_missing = endpoint(storage, etype, "missing")(query)
assert list(actual_missing) == [data.sha1_1]
def test_add__update_in_place_duplicate(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
data_v1 = {
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
# given
summary = endpoint(storage, etype, "add")([self.row_class.from_dict(data_v1)])
assert summary == expected_summary(1, etype) # not added
# when
actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
expected_data_v1 = [
self.row_class.from_dict(
{"id": data.sha1_2, **self.example_data[0], "tool": tool}
)
]
# then
assert actual_data == expected_data_v1
# given
data_v2 = data_v1.copy()
data_v2.update(self.example_data[1])
endpoint(storage, etype, "add")([self.row_class.from_dict(data_v2)])
assert summary == expected_summary(1, etype) # modified so counted
actual_data = list(endpoint(storage, etype, "get")([data.sha1_2]))
expected_data_v2 = [
self.row_class.from_dict(
{
"id": data.sha1_2,
**self.example_data[1],
"tool": tool,
}
)
]
# data did change as the v2 was used to overwrite v1
assert actual_data == expected_data_v2
def test_add_deadlock(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
hashes = [
hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4{:03d}".format(i))
for i in range(1000)
]
data_v1 = [
self.row_class.from_dict(
{
"id": hash_,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
)
for hash_ in hashes
]
data_v2 = [
self.row_class.from_dict(
{
"id": hash_,
**self.example_data[1],
"indexer_configuration_id": tool["id"],
}
)
for hash_ in hashes
]
# Remove one item from each, so that both queries have to succeed for
# all items to be in the DB.
data_v2a = data_v2[1:]
data_v2b = list(reversed(data_v2[0:-1]))
# given
endpoint(storage, etype, "add")(data_v1)
# when
actual_data = sorted(
endpoint(storage, etype, "get")(hashes),
key=lambda x: x.id,
)
expected_data_v1 = [
self.row_class.from_dict(
{"id": hash_, **self.example_data[0], "tool": tool}
)
for hash_ in hashes
]
# then
assert actual_data == expected_data_v1
# given
def f1() -> None:
endpoint(storage, etype, "add")(data_v2a)
def f2() -> None:
endpoint(storage, etype, "add")(data_v2b)
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t2.start()
t1.start()
t1.join()
t2.join()
actual_data = sorted(
endpoint(storage, etype, "get")(hashes),
key=lambda x: x.id,
)
expected_data_v2 = [
self.row_class.from_dict(
{"id": hash_, **self.example_data[1], "tool": tool}
)
for hash_ in hashes
]
assert len(actual_data) == len(expected_data_v1) == len(expected_data_v2)
for (item, expected_item_v1, expected_item_v2) in zip(
actual_data, expected_data_v1, expected_data_v2
):
assert item in (expected_item_v1, expected_item_v2)
def test_add__duplicate_twice(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
data_dir1 = self.row_class.from_dict(
{
"id": data.directory_id_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
)
data_dir2 = self.row_class.from_dict(
{
"id": data.directory_id_2,
**self.example_data[1],
"indexer_configuration_id": tool["id"],
}
)
# when
summary = endpoint(storage, etype, "add")([data_dir1])
assert summary == expected_summary(1, etype)
with pytest.raises(DuplicateId):
endpoint(storage, etype, "add")([data_dir2, data_dir2])
# then
actual_data = list(
endpoint(storage, etype, "get")([data.directory_id_2, data.directory_id_1])
)
expected_data = [
self.row_class.from_dict(
{"id": data.directory_id_2, **self.example_data[0], "tool": tool}
)
]
assert actual_data == expected_data
def test_add(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
# conftest fills it with mimetypes
storage.journal_writer.journal.objects = [] # type: ignore
query = [data.sha1_2, data.sha1_1]
data1 = self.row_class.from_dict(
{
"id": data.sha1_2,
**self.example_data[0],
"indexer_configuration_id": tool["id"],
}
)
# when
summary = endpoint(storage, etype, "add")([data1])
assert summary == expected_summary(1, etype)
# then
actual_data = list(endpoint(storage, etype, "get")(query))
# then
expected_data = [
self.row_class.from_dict(
{"id": data.sha1_2, **self.example_data[0], "tool": tool}
)
]
assert actual_data == expected_data
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_data = [
obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type
]
assert list(sorted(actual_journal_data)) == list(sorted(expected_data))
class TestIndexerStorageContentMimetypes(StorageETypeTester):
"""Test Indexer Storage content_mimetype related methods"""
endpoint_type = "content_mimetype"
tool_name = "file"
example_data = [
{
"mimetype": "text/plain",
"encoding": "utf-8",
},
{
"mimetype": "text/html",
"encoding": "us-ascii",
},
]
row_class = ContentMimetypeRow
def test_generate_content_mimetype_get_partition_failure(
self, swh_indexer_storage: IndexerStorageInterface
) -> None:
"""get_partition call with wrong limit input should fail"""
storage = swh_indexer_storage
indexer_configuration_id = 42
with pytest.raises(
IndexerStorageArgumentException, match="limit should not be None"
):
storage.content_mimetype_get_partition(
indexer_configuration_id, 0, 3, limit=None # type: ignore
)
def test_generate_content_mimetype_get_partition_no_limit(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition should return result"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c.id for c in mimetypes])
indexer_configuration_id = mimetypes[0].indexer_configuration_id
assert len(mimetypes) == 16
nb_partitions = 16
actual_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id, partition_id, nb_partitions
)
assert actual_result.next_page_token is None
actual_ids.extend(actual_result.results)
assert len(actual_ids) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_mimetype_get_partition_full(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition for a single partition should return available ids"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c.id for c in mimetypes])
indexer_configuration_id = mimetypes[0].indexer_configuration_id
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id, 0, 1
)
assert actual_result.next_page_token is None
actual_ids = actual_result.results
assert len(actual_ids) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_mimetype_get_partition_empty(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition when at least one of the partitions is empty"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c.id for c in mimetypes])
indexer_configuration_id = mimetypes[0].indexer_configuration_id
# nb_partitions = smallest power of 2 such that at least one of
# the partitions is empty
nb_mimetypes = len(mimetypes)
nb_partitions = 1 << math.floor(math.log2(nb_mimetypes) + 1)
seen_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=nb_mimetypes + 1,
)
for actual_id in actual_result.results:
seen_ids.append(actual_id)
# Limit is higher than the max number of results
assert actual_result.next_page_token is None
assert set(seen_ids) == expected_ids
def test_generate_content_mimetype_get_partition_with_pagination(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition should return ids provided with pagination"""
storage, data = swh_indexer_storage_with_data
mimetypes = data.mimetypes
expected_ids = set([c.id for c in mimetypes])
indexer_configuration_id = mimetypes[0].indexer_configuration_id
nb_partitions = 4
actual_ids = []
for partition_id in range(nb_partitions):
next_page_token = None
while True:
actual_result = storage.content_mimetype_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=2,
page_token=next_page_token,
)
actual_ids.extend(actual_result.results)
next_page_token = actual_result.next_page_token
if next_page_token is None:
break
assert len(set(actual_ids)) == len(set(expected_ids))
for actual_id in actual_ids:
assert actual_id in expected_ids
class TestIndexerStorageContentMetadata(StorageETypeTester):
"""Test Indexer Storage content_metadata related methods"""
tool_name = "swh-metadata-detector"
endpoint_type = "content_metadata"
example_data = [
{
"metadata": {
"other": {},
"codeRepository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test",
},
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
},
{
"metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"},
},
]
row_class = ContentMetadataRow
def test_add_with_null(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
etype = self.endpoint_type
tool = data.tools[self.tool_name]
# conftest fills it with mimetypes
storage.journal_writer.journal.objects = [] # type: ignore
query = [data.sha1_2, data.sha1_1]
data1 = self.row_class.from_dict(
{
"id": data.sha1_2,
"metadata": {"description": "with\u0000nul"},
"indexer_configuration_id": tool["id"],
}
)
# when
summary = endpoint(storage, etype, "add")([data1])
assert summary == expected_summary(1, etype)
# then
actual_data = list(endpoint(storage, etype, "get")(query))
# then
expected_data_postgresql = [
self.row_class.from_dict(
{
"id": data.sha1_2,
"metadata": {"description": "withnul"},
"tool": tool,
}
)
]
expected_data_verbatim = [
self.row_class.from_dict(
{
"id": data.sha1_2,
"metadata": {"description": "with\u0000nul"},
"tool": tool,
}
)
]
assert actual_data in (expected_data_postgresql, expected_data_verbatim)
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_data = [
obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type
]
assert list(sorted(actual_journal_data)) == list(sorted(expected_data_verbatim))
class TestIndexerStorageDirectoryIntrinsicMetadata(StorageETypeTester):
"""Test Indexer Storage directory_intrinsic_metadata related methods"""
tool_name = "swh-metadata-detector"
endpoint_type = "directory_intrinsic_metadata"
example_data = [
{
"metadata": {
"other": {},
"codeRepository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test",
},
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
"mappings": ["mapping1"],
},
{
"metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"},
"mappings": ["mapping2"],
},
]
row_class = DirectoryIntrinsicMetadataRow
class TestIndexerStorageContentFossologyLicense(StorageETypeTester):
endpoint_type = "content_fossology_license"
tool_name = "nomos"
example_data = [
{"license": "Apache-2.0"},
{"license": "BSD-2-Clause"},
]
row_class = ContentLicenseRow
# the following tests are disabled because licenses behaves differently
@pytest.mark.skip
def test_add__update_in_place_duplicate(self):
pass
@pytest.mark.skip
def test_add_deadlock(self):
pass
# content_fossology_license_missing does not exist
@pytest.mark.skip
def test_missing(self):
pass
def test_content_fossology_license_add__new_license_added(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool = data.tools["nomos"]
tool_id = tool["id"]
license1 = ContentLicenseRow(
id=data.sha1_1,
license="Apache-2.0",
indexer_configuration_id=tool_id,
)
# given
storage.content_fossology_license_add([license1])
# conflict does nothing
storage.content_fossology_license_add([license1])
# when
actual_licenses = list(storage.content_fossology_license_get([data.sha1_1]))
# then
expected_licenses = [
ContentLicenseRow(
id=data.sha1_1,
license="Apache-2.0",
tool=tool,
)
]
assert actual_licenses == expected_licenses
# given
license2 = ContentLicenseRow(
id=data.sha1_1,
license="BSD-2-Clause",
indexer_configuration_id=tool_id,
)
storage.content_fossology_license_add([license2])
actual_licenses = list(storage.content_fossology_license_get([data.sha1_1]))
expected_licenses.append(
ContentLicenseRow(
id=data.sha1_1,
license="BSD-2-Clause",
tool=tool,
)
)
# first license was not removed when the second one was added
assert sorted(actual_licenses) == sorted(expected_licenses)
def test_generate_content_fossology_license_get_partition_failure(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition call with wrong limit input should fail"""
storage, data = swh_indexer_storage_with_data
indexer_configuration_id = 42
with pytest.raises(
IndexerStorageArgumentException, match="limit should not be None"
):
storage.content_fossology_license_get_partition(
indexer_configuration_id,
0,
3,
limit=None, # type: ignore
)
def test_generate_content_fossology_license_get_partition_no_limit(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition should return results"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
storage.content_mimetype_add(mimetypes)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = set([c.id for c in fossology_licenses])
assert len(fossology_licenses) == 10
assert len(mimetypes) == 10
nb_partitions = 4
actual_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id, partition_id, nb_partitions
)
assert actual_result.next_page_token is None
actual_ids.extend(actual_result.results)
assert len(set(actual_ids)) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_fossology_license_get_partition_full(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition for a single partition should return available ids"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
storage.content_mimetype_add(mimetypes)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = set([c.id for c in fossology_licenses])
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id, 0, 1
)
assert actual_result.next_page_token is None
actual_ids = actual_result.results
assert len(set(actual_ids)) == len(expected_ids)
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_generate_content_fossology_license_get_partition_empty(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition when at least one of the partitions is empty"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
storage.content_mimetype_add(mimetypes)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = set([c.id for c in fossology_licenses])
# nb_partitions = smallest power of 2 such that at least one of
# the partitions is empty
nb_licenses = len(fossology_licenses)
nb_partitions = 1 << math.floor(math.log2(nb_licenses) + 1)
seen_ids = []
for partition_id in range(nb_partitions):
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=nb_licenses + 1,
)
for actual_id in actual_result.results:
seen_ids.append(actual_id)
# Limit is higher than the max number of results
assert actual_result.next_page_token is None
assert set(seen_ids) == expected_ids
def test_generate_content_fossology_license_get_partition_with_pagination(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
"""get_partition should return ids provided with paginationv"""
storage, data = swh_indexer_storage_with_data
# craft some consistent mimetypes
fossology_licenses = data.fossology_licenses
mimetypes = prepare_mimetypes_from_licenses(fossology_licenses)
indexer_configuration_id = fossology_licenses[0].indexer_configuration_id
storage.content_mimetype_add(mimetypes)
# add fossology_licenses to storage
storage.content_fossology_license_add(fossology_licenses)
# All ids from the db
expected_ids = [c.id for c in fossology_licenses]
nb_partitions = 4
actual_ids = []
for partition_id in range(nb_partitions):
next_page_token = None
while True:
actual_result = storage.content_fossology_license_get_partition(
indexer_configuration_id,
partition_id,
nb_partitions,
limit=2,
page_token=next_page_token,
)
actual_ids.extend(actual_result.results)
next_page_token = actual_result.next_page_token
if next_page_token is None:
break
assert len(set(actual_ids)) == len(set(expected_ids))
for actual_id in actual_ids:
assert actual_id in expected_ids
def test_add_empty(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
(storage, data) = swh_indexer_storage_with_data
etype = self.endpoint_type
summary = endpoint(storage, etype, "add")([])
assert summary == {"content_fossology_license:add": 0}
actual_license = list(endpoint(storage, etype, "get")([data.sha1_2]))
assert actual_license == []
def test_get_unknown(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
(storage, data) = swh_indexer_storage_with_data
etype = self.endpoint_type
actual_license = list(endpoint(storage, etype, "get")([data.sha1_2]))
assert actual_license == []
class TestIndexerStorageOriginIntrinsicMetadata:
def test_origin_intrinsic_metadata_add(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"version": None,
"name": None,
}
metadata_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata=metadata,
mappings=["mapping1"],
indexer_configuration_id=tool_id,
)
metadata_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata,
indexer_configuration_id=tool_id,
mappings=["mapping1"],
from_directory=data.directory_id_2,
)
# when
storage.directory_intrinsic_metadata_add([metadata_dir])
storage.origin_intrinsic_metadata_add([metadata_origin])
# then
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1, "no://where"])
)
expected_metadata = [
OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata,
tool=data.tools["swh-metadata-detector"],
from_directory=data.directory_id_2,
mappings=["mapping1"],
)
]
assert actual_metadata == expected_metadata
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_metadata = [
obj
for (obj_type, obj) in journal_objects
if obj_type == "origin_intrinsic_metadata"
]
assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata))
def test_origin_intrinsic_metadata_add_update_in_place_duplicate(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata_v1: Dict[str, Any] = {
"version": None,
"name": None,
}
metadata_dir_v1 = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata=metadata_v1,
mappings=[],
indexer_configuration_id=tool_id,
)
metadata_origin_v1 = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v1.copy(),
indexer_configuration_id=tool_id,
mappings=[],
from_directory=data.directory_id_2,
)
# given
storage.directory_intrinsic_metadata_add([metadata_dir_v1])
storage.origin_intrinsic_metadata_add([metadata_origin_v1])
# when
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1])
)
# then
expected_metadata_v1 = [
OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v1,
tool=data.tools["swh-metadata-detector"],
from_directory=data.directory_id_2,
mappings=[],
)
]
assert actual_metadata == expected_metadata_v1
# given
metadata_v2 = metadata_v1.copy()
metadata_v2.update(
{
"name": "test_update_duplicated_metadata",
"author": "MG",
}
)
metadata_dir_v2 = attr.evolve(metadata_dir_v1, metadata=metadata_v2)
metadata_origin_v2 = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v2.copy(),
indexer_configuration_id=tool_id,
mappings=["npm"],
from_directory=data.directory_id_1,
)
storage.directory_intrinsic_metadata_add([metadata_dir_v2])
storage.origin_intrinsic_metadata_add([metadata_origin_v2])
actual_metadata = list(
storage.origin_intrinsic_metadata_get([data.origin_url_1])
)
expected_metadata_v2 = [
OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v2,
tool=data.tools["swh-metadata-detector"],
from_directory=data.directory_id_1,
mappings=["npm"],
)
]
# metadata did change as the v2 was used to overwrite v1
assert actual_metadata == expected_metadata_v2
def test_origin_intrinsic_metadata_add__deadlock(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
origins = ["file:///tmp/origin{:02d}".format(i) for i in range(100)]
example_data1: Dict[str, Any] = {
"metadata": {
"version": None,
"name": None,
},
"mappings": [],
}
example_data2: Dict[str, Any] = {
"metadata": {
"version": "v1.1.1",
"name": "foo",
},
"mappings": [],
}
metadata_dir_v1 = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata={
"version": None,
"name": None,
},
mappings=[],
indexer_configuration_id=tool_id,
)
data_v1 = [
OriginIntrinsicMetadataRow(
id=origin,
from_directory=data.directory_id_2,
indexer_configuration_id=tool_id,
**example_data1,
)
for origin in origins
]
data_v2 = [
OriginIntrinsicMetadataRow(
id=origin,
from_directory=data.directory_id_2,
indexer_configuration_id=tool_id,
**example_data2,
)
for origin in origins
]
# Remove one item from each, so that both queries have to succeed for
# all items to be in the DB.
data_v2a = data_v2[1:]
data_v2b = list(reversed(data_v2[0:-1]))
# given
storage.directory_intrinsic_metadata_add([metadata_dir_v1])
storage.origin_intrinsic_metadata_add(data_v1)
# when
actual_data = list(storage.origin_intrinsic_metadata_get(origins))
expected_data_v1 = [
OriginIntrinsicMetadataRow(
id=origin,
from_directory=data.directory_id_2,
tool=data.tools["swh-metadata-detector"],
**example_data1,
)
for origin in origins
]
# then
assert actual_data == expected_data_v1
# given
def f1() -> None:
storage.origin_intrinsic_metadata_add(data_v2a)
def f2() -> None:
storage.origin_intrinsic_metadata_add(data_v2b)
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t2.start()
t1.start()
t1.join()
t2.join()
actual_data = list(storage.origin_intrinsic_metadata_get(origins))
expected_data_v2 = [
OriginIntrinsicMetadataRow(
id=origin,
from_directory=data.directory_id_2,
tool=data.tools["swh-metadata-detector"],
**example_data2,
)
for origin in origins
]
actual_data.sort(key=lambda item: item.id)
assert len(actual_data) == len(expected_data_v1) == len(expected_data_v2)
for (item, expected_item_v1, expected_item_v2) in zip(
actual_data, expected_data_v1, expected_data_v2
):
assert item in (expected_item_v1, expected_item_v2)
def test_origin_intrinsic_metadata_add__duplicate_twice(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"developmentStatus": None,
"name": None,
}
metadata_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata=metadata,
mappings=["mapping1"],
indexer_configuration_id=tool_id,
)
metadata_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata,
indexer_configuration_id=tool_id,
mappings=["mapping1"],
from_directory=data.directory_id_2,
)
# when
storage.directory_intrinsic_metadata_add([metadata_dir])
with pytest.raises(DuplicateId):
storage.origin_intrinsic_metadata_add([metadata_origin, metadata_origin])
def test_origin_intrinsic_metadata_search_fulltext(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata1 = {
"author": "John Doe",
}
metadata1_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_1,
metadata=metadata1,
mappings=[],
indexer_configuration_id=tool_id,
)
metadata1_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata1,
mappings=[],
indexer_configuration_id=tool_id,
from_directory=data.directory_id_1,
)
metadata2 = {
"author": "Jane Doe",
}
metadata2_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata=metadata2,
mappings=[],
indexer_configuration_id=tool_id,
)
metadata2_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_2,
metadata=metadata2,
mappings=[],
indexer_configuration_id=tool_id,
from_directory=data.directory_id_2,
)
# when
storage.directory_intrinsic_metadata_add([metadata1_dir])
storage.origin_intrinsic_metadata_add([metadata1_origin])
storage.directory_intrinsic_metadata_add([metadata2_dir])
storage.origin_intrinsic_metadata_add([metadata2_origin])
# then
search = storage.origin_intrinsic_metadata_search_fulltext
assert set([res.id for res in search(["Doe"])]) == set(
[data.origin_url_1, data.origin_url_2]
)
assert [res.id for res in search(["John", "Doe"])] == [data.origin_url_1]
assert [res.id for res in search(["John"])] == [data.origin_url_1]
assert not list(search(["John", "Jane"]))
def test_origin_intrinsic_metadata_search_fulltext_rank(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
# The following authors have "Random Person" to add some more content
# to the JSON data, to work around normalization quirks when there
# are few words (rank/(1+ln(nb_words)) is very sensitive to nb_words
# for small values of nb_words).
metadata1 = {
"author": [
"Random Person",
"John Doe",
"Jane Doe",
]
}
metadata1_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_1,
metadata=metadata1,
mappings=[],
indexer_configuration_id=tool_id,
)
metadata1_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata1,
mappings=[],
indexer_configuration_id=tool_id,
from_directory=data.directory_id_1,
)
metadata2 = {
"author": [
"Random Person",
"Jane Doe",
]
}
metadata2_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata=metadata2,
mappings=[],
indexer_configuration_id=tool_id,
)
metadata2_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_2,
metadata=metadata2,
mappings=[],
indexer_configuration_id=tool_id,
from_directory=data.directory_id_2,
)
# when
storage.directory_intrinsic_metadata_add([metadata1_dir])
storage.origin_intrinsic_metadata_add([metadata1_origin])
storage.directory_intrinsic_metadata_add([metadata2_dir])
storage.origin_intrinsic_metadata_add([metadata2_origin])
# then
search = storage.origin_intrinsic_metadata_search_fulltext
assert [res.id for res in search(["Doe"])] == [
data.origin_url_1,
data.origin_url_2,
]
assert [res.id for res in search(["Doe"], limit=1)] == [data.origin_url_1]
assert [res.id for res in search(["John"])] == [data.origin_url_1]
assert [res.id for res in search(["Jane"])] == [
data.origin_url_2,
data.origin_url_1,
]
assert [res.id for res in search(["John", "Jane"])] == [data.origin_url_1]
def _fill_origin_intrinsic_metadata(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool1_id = data.tools["swh-metadata-detector"]["id"]
tool2_id = data.tools["swh-metadata-detector2"]["id"]
metadata1 = {
"@context": "foo",
"author": "John Doe",
}
metadata1_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_1,
metadata=metadata1,
mappings=["npm"],
indexer_configuration_id=tool1_id,
)
metadata1_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata1,
mappings=["npm"],
indexer_configuration_id=tool1_id,
from_directory=data.directory_id_1,
)
metadata2 = {
"@context": "foo",
"author": "Jane Doe",
}
metadata2_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_2,
metadata=metadata2,
mappings=["npm", "gemspec"],
indexer_configuration_id=tool2_id,
)
metadata2_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_2,
metadata=metadata2,
mappings=["npm", "gemspec"],
indexer_configuration_id=tool2_id,
from_directory=data.directory_id_2,
)
metadata3 = {
"@context": "foo",
}
metadata3_dir = DirectoryIntrinsicMetadataRow(
id=data.directory_id_3,
metadata=metadata3,
mappings=["npm", "gemspec"],
indexer_configuration_id=tool2_id,
)
metadata3_origin = OriginIntrinsicMetadataRow(
id=data.origin_url_3,
metadata=metadata3,
mappings=["pkg-info"],
indexer_configuration_id=tool2_id,
from_directory=data.directory_id_3,
)
storage.directory_intrinsic_metadata_add([metadata1_dir])
storage.origin_intrinsic_metadata_add([metadata1_origin])
storage.directory_intrinsic_metadata_add([metadata2_dir])
storage.origin_intrinsic_metadata_add([metadata2_origin])
storage.directory_intrinsic_metadata_add([metadata3_dir])
storage.origin_intrinsic_metadata_add([metadata3_origin])
def test_origin_intrinsic_metadata_search_by_producer(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
self._fill_origin_intrinsic_metadata(swh_indexer_storage_with_data)
tool1 = data.tools["swh-metadata-detector"]
tool2 = data.tools["swh-metadata-detector2"]
endpoint = storage.origin_intrinsic_metadata_search_by_producer
# test pagination
# no 'page_token' param, return all origins
result = endpoint(ids_only=True)
assert result == PagedResult(
results=[
data.origin_url_1,
data.origin_url_2,
data.origin_url_3,
],
next_page_token=None,
)
# 'page_token' is < than origin_1, return everything
result = endpoint(page_token=data.origin_url_1[:-1], ids_only=True)
assert result == PagedResult(
results=[
data.origin_url_1,
data.origin_url_2,
data.origin_url_3,
],
next_page_token=None,
)
# 'page_token' is origin_3, return nothing
result = endpoint(page_token=data.origin_url_3, ids_only=True)
assert result == PagedResult(results=[], next_page_token=None)
# test limit argument
result = endpoint(page_token=data.origin_url_1[:-1], limit=2, ids_only=True)
assert result == PagedResult(
results=[data.origin_url_1, data.origin_url_2],
next_page_token=data.origin_url_2,
)
result = endpoint(page_token=data.origin_url_1, limit=2, ids_only=True)
assert result == PagedResult(
results=[data.origin_url_2, data.origin_url_3],
next_page_token=None,
)
result = endpoint(page_token=data.origin_url_2, limit=2, ids_only=True)
assert result == PagedResult(
results=[data.origin_url_3],
next_page_token=None,
)
# test mappings filtering
result = endpoint(mappings=["npm"], ids_only=True)
assert result == PagedResult(
results=[data.origin_url_1, data.origin_url_2],
next_page_token=None,
)
result = endpoint(mappings=["npm", "gemspec"], ids_only=True)
assert result == PagedResult(
results=[data.origin_url_1, data.origin_url_2],
next_page_token=None,
)
result = endpoint(mappings=["gemspec"], ids_only=True)
assert result == PagedResult(
results=[data.origin_url_2],
next_page_token=None,
)
result = endpoint(mappings=["pkg-info"], ids_only=True)
assert result == PagedResult(
results=[data.origin_url_3],
next_page_token=None,
)
result = endpoint(mappings=["foobar"], ids_only=True)
assert result == PagedResult(
results=[],
next_page_token=None,
)
# test pagination + mappings
result = endpoint(mappings=["npm"], limit=1, ids_only=True)
assert result == PagedResult(
results=[data.origin_url_1],
next_page_token=data.origin_url_1,
)
# test tool filtering
result = endpoint(tool_ids=[tool1["id"]], ids_only=True)
assert result == PagedResult(
results=[data.origin_url_1],
next_page_token=None,
)
result = endpoint(tool_ids=[tool2["id"]], ids_only=True)
assert sorted(result.results) == [data.origin_url_2, data.origin_url_3]
assert result.next_page_token is None
result = endpoint(tool_ids=[tool1["id"], tool2["id"]], ids_only=True)
assert sorted(result.results) == [
data.origin_url_1,
data.origin_url_2,
data.origin_url_3,
]
assert result.next_page_token is None
# test ids_only=False
assert endpoint(mappings=["gemspec"]) == PagedResult(
results=[
OriginIntrinsicMetadataRow(
id=data.origin_url_2,
metadata={
"@context": "foo",
"author": "Jane Doe",
},
mappings=["npm", "gemspec"],
tool=tool2,
from_directory=data.directory_id_2,
)
],
next_page_token=None,
)
def test_origin_intrinsic_metadata_stats(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
self._fill_origin_intrinsic_metadata(swh_indexer_storage_with_data)
result = storage.origin_intrinsic_metadata_stats()
assert result == {
"per_mapping": {
"cff": 0,
"gemspec": 1,
"npm": 2,
"pkg-info": 1,
"codemeta": 0,
"maven": 0,
},
"total": 3,
"non_empty": 2,
}
class TestIndexerStorageOriginExtrinsicMetadata:
def test_origin_extrinsic_metadata_add(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"version": None,
"name": None,
}
metadata_origin = OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata,
indexer_configuration_id=tool_id,
mappings=["mapping1"],
from_remd_id=b"\x02" * 20,
)
# when
storage.origin_extrinsic_metadata_add([metadata_origin])
# then
actual_metadata = list(
storage.origin_extrinsic_metadata_get([data.origin_url_1, "no://where"])
)
expected_metadata = [
OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata,
tool=data.tools["swh-metadata-detector"],
from_remd_id=b"\x02" * 20,
mappings=["mapping1"],
)
]
assert actual_metadata == expected_metadata
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_metadata = [
obj
for (obj_type, obj) in journal_objects
if obj_type == "origin_extrinsic_metadata"
]
assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata))
def test_origin_extrinsic_metadata_add_update_in_place_duplicate(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata_v1: Dict[str, Any] = {
"version": None,
"name": None,
}
metadata_origin_v1 = OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v1.copy(),
indexer_configuration_id=tool_id,
mappings=[],
from_remd_id=b"\x02" * 20,
)
# given
storage.origin_extrinsic_metadata_add([metadata_origin_v1])
# when
actual_metadata = list(
storage.origin_extrinsic_metadata_get([data.origin_url_1])
)
# then
expected_metadata_v1 = [
OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v1,
tool=data.tools["swh-metadata-detector"],
from_remd_id=b"\x02" * 20,
mappings=[],
)
]
assert actual_metadata == expected_metadata_v1
# given
metadata_v2 = metadata_v1.copy()
metadata_v2.update(
{
"name": "test_update_duplicated_metadata",
"author": "MG",
}
)
metadata_origin_v2 = OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v2.copy(),
indexer_configuration_id=tool_id,
mappings=["github"],
from_remd_id=b"\x02" * 20,
)
storage.origin_extrinsic_metadata_add([metadata_origin_v2])
actual_metadata = list(
storage.origin_extrinsic_metadata_get([data.origin_url_1])
)
expected_metadata_v2 = [
OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata_v2,
tool=data.tools["swh-metadata-detector"],
from_remd_id=b"\x02" * 20,
mappings=["github"],
)
]
# metadata did change as the v2 was used to overwrite v1
assert actual_metadata == expected_metadata_v2
def test_origin_extrinsic_metadata_add__deadlock(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
origins = ["file:///tmp/origin{:02d}".format(i) for i in range(100)]
example_data1: Dict[str, Any] = {
"metadata": {
"version": None,
"name": None,
},
"mappings": [],
}
example_data2: Dict[str, Any] = {
"metadata": {
"version": "v1.1.1",
"name": "foo",
},
"mappings": [],
}
data_v1 = [
OriginExtrinsicMetadataRow(
id=origin,
from_remd_id=b"\x02" * 20,
indexer_configuration_id=tool_id,
**example_data1,
)
for origin in origins
]
data_v2 = [
OriginExtrinsicMetadataRow(
id=origin,
from_remd_id=b"\x02" * 20,
indexer_configuration_id=tool_id,
**example_data2,
)
for origin in origins
]
# Remove one item from each, so that both queries have to succeed for
# all items to be in the DB.
data_v2a = data_v2[1:]
data_v2b = list(reversed(data_v2[0:-1]))
# given
storage.origin_extrinsic_metadata_add(data_v1)
# when
actual_data = list(storage.origin_extrinsic_metadata_get(origins))
expected_data_v1 = [
OriginExtrinsicMetadataRow(
id=origin,
from_remd_id=b"\x02" * 20,
tool=data.tools["swh-metadata-detector"],
**example_data1,
)
for origin in origins
]
# then
assert actual_data == expected_data_v1
# given
def f1() -> None:
storage.origin_extrinsic_metadata_add(data_v2a)
def f2() -> None:
storage.origin_extrinsic_metadata_add(data_v2b)
t1 = threading.Thread(target=f1)
t2 = threading.Thread(target=f2)
t2.start()
t1.start()
t1.join()
t2.join()
actual_data = list(storage.origin_extrinsic_metadata_get(origins))
expected_data_v2 = [
OriginExtrinsicMetadataRow(
id=origin,
from_remd_id=b"\x02" * 20,
tool=data.tools["swh-metadata-detector"],
**example_data2,
)
for origin in origins
]
actual_data.sort(key=lambda item: item.id)
assert len(actual_data) == len(expected_data_v1) == len(expected_data_v2)
for (item, expected_item_v1, expected_item_v2) in zip(
actual_data, expected_data_v1, expected_data_v2
):
assert item in (expected_item_v1, expected_item_v2)
def test_origin_extrinsic_metadata_add__duplicate_twice(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
# given
tool_id = data.tools["swh-metadata-detector"]["id"]
metadata = {
"developmentStatus": None,
"name": None,
}
metadata_origin = OriginExtrinsicMetadataRow(
id=data.origin_url_1,
metadata=metadata,
indexer_configuration_id=tool_id,
mappings=["mapping1"],
from_remd_id=b"\x02" * 20,
)
# when
with pytest.raises(DuplicateId):
storage.origin_extrinsic_metadata_add([metadata_origin, metadata_origin])
class TestIndexerStorageIndexerConfiguration:
def test_indexer_configuration_add(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "some-unknown-tool",
"tool_version": "some-version",
"tool_configuration": {"debian-package": "some-package"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool is None # does not exist
# add it
actual_tools = list(storage.indexer_configuration_add([tool]))
assert len(actual_tools) == 1
actual_tool = actual_tools[0]
assert actual_tool is not None # now it exists
new_id = actual_tool.pop("id")
assert actual_tool == tool
actual_tools2 = list(storage.indexer_configuration_add([tool]))
actual_tool2 = actual_tools2[0]
assert actual_tool2 is not None # now it exists
new_id2 = actual_tool2.pop("id")
assert new_id == new_id2
assert actual_tool == actual_tool2
def test_indexer_configuration_add_multiple(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "some-unknown-tool",
"tool_version": "some-version",
"tool_configuration": {"debian-package": "some-package"},
}
actual_tools = list(storage.indexer_configuration_add([tool]))
assert len(actual_tools) == 1
new_tools = [
tool,
{
"tool_name": "yet-another-tool",
"tool_version": "version",
"tool_configuration": {},
},
]
actual_tools = list(storage.indexer_configuration_add(new_tools))
assert len(actual_tools) == 2
# order not guaranteed, so we iterate over results to check
for tool in actual_tools:
_id = tool.pop("id")
assert _id is not None
assert tool in new_tools
def test_indexer_configuration_get_missing(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "unknown-tool",
"tool_version": "3.1.0rc2-31-ga2cbb8c",
"tool_configuration": {"command_line": "nomossa <filepath>"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool is None
def test_indexer_configuration_get(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "nomos",
"tool_version": "3.1.0rc2-31-ga2cbb8c",
"tool_configuration": {"command_line": "nomossa <filepath>"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool
expected_tool = tool.copy()
del actual_tool["id"]
assert expected_tool == actual_tool
def test_indexer_configuration_metadata_get_missing_context(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "swh-metadata-translator",
"tool_version": "0.0.1",
"tool_configuration": {"context": "unknown-context"},
}
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool is None
def test_indexer_configuration_metadata_get(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
) -> None:
storage, data = swh_indexer_storage_with_data
tool = {
"tool_name": "swh-metadata-translator",
"tool_version": "0.0.1",
"tool_configuration": {"type": "local", "context": "NpmMapping"},
}
storage.indexer_configuration_add([tool])
actual_tool = storage.indexer_configuration_get(tool)
assert actual_tool
expected_tool = tool.copy()
expected_tool["id"] = actual_tool["id"]
assert expected_tool == actual_tool

File Metadata

Mime Type
text/x-python
Expires
Jun 4 2025, 7:48 PM (12 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3242205

Event Timeline