Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/storage/test_storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import math | import math | ||||
import threading | import threading | ||||
from typing import Any, Dict, List, Tuple, Type, cast | from typing import Any, Dict, List, Tuple, Type | ||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from swh.indexer.storage.exc import DuplicateId, IndexerStorageArgumentException | from swh.indexer.storage.exc import DuplicateId, IndexerStorageArgumentException | ||||
from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult | from swh.indexer.storage.interface import IndexerStorageInterface, PagedResult | ||||
from swh.indexer.storage.model import ( | from swh.indexer.storage.model import ( | ||||
BaseRow, | BaseRow, | ||||
▲ Show 20 Lines • Show All 795 Lines • ▼ Show 20 Lines | example_data = [ | ||||
}, | }, | ||||
{ | { | ||||
"metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"}, | "metadata": {"other": {}, "name": "test_metadata", "version": "0.0.1"}, | ||||
"mappings": ["mapping2"], | "mappings": ["mapping2"], | ||||
}, | }, | ||||
] | ] | ||||
row_class = RevisionIntrinsicMetadataRow | row_class = RevisionIntrinsicMetadataRow | ||||
def test_revision_intrinsic_metadata_delete( | |||||
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] | |||||
) -> None: | |||||
storage, data = swh_indexer_storage_with_data | |||||
etype = self.endpoint_type | |||||
tool = data.tools[self.tool_name] | |||||
query = [data.sha1_2, data.sha1_1] | |||||
data1 = RevisionIntrinsicMetadataRow( | |||||
id=data.sha1_2, | |||||
indexer_configuration_id=tool["id"], | |||||
**self.example_data[0], # type: ignore | |||||
) | |||||
# when | |||||
summary = endpoint(storage, etype, "add")([data1]) | |||||
assert summary == expected_summary(1, etype) | |||||
summary2 = endpoint(storage, etype, "delete")( | |||||
[{"id": data.sha1_2, "indexer_configuration_id": tool["id"],}] | |||||
) | |||||
assert summary2 == expected_summary(1, etype, "del") | |||||
# then | |||||
actual_data = list(endpoint(storage, etype, "get")(query)) | |||||
# then | |||||
assert not actual_data | |||||
def test_revision_intrinsic_metadata_delete_nonexisting( | |||||
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] | |||||
) -> None: | |||||
storage, data = swh_indexer_storage_with_data | |||||
etype = self.endpoint_type | |||||
tool = data.tools[self.tool_name] | |||||
endpoint(storage, etype, "delete")( | |||||
[{"id": data.sha1_2, "indexer_configuration_id": tool["id"],}] | |||||
) | |||||
class TestIndexerStorageContentFossologyLicense: | class TestIndexerStorageContentFossologyLicense: | ||||
endpoint_type = "content_fossology_license" | endpoint_type = "content_fossology_license" | ||||
tool_name = "nomos" | tool_name = "nomos" | ||||
row_class = ContentLicenseRow | row_class = ContentLicenseRow | ||||
def test_content_fossology_license_add__new_license_added( | def test_content_fossology_license_add__new_license_added( | ||||
▲ Show 20 Lines • Show All 260 Lines • ▼ Show 20 Lines | ) -> None: | ||||
tool=data.tools["swh-metadata-detector"], | tool=data.tools["swh-metadata-detector"], | ||||
from_revision=data.revision_id_2, | from_revision=data.revision_id_2, | ||||
mappings=["mapping1"], | mappings=["mapping1"], | ||||
) | ) | ||||
] | ] | ||||
assert actual_metadata == expected_metadata | assert actual_metadata == expected_metadata | ||||
def test_origin_intrinsic_metadata_delete( | |||||
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] | |||||
) -> None: | |||||
storage, data = swh_indexer_storage_with_data | |||||
# given | |||||
tool_id = data.tools["swh-metadata-detector"]["id"] | |||||
metadata = { | |||||
"version": None, | |||||
"name": None, | |||||
} | |||||
metadata_rev = RevisionIntrinsicMetadataRow( | |||||
id=data.revision_id_2, | |||||
indexer_configuration_id=tool_id, | |||||
metadata=metadata, | |||||
mappings=["mapping1"], | |||||
) | |||||
metadata_origin = OriginIntrinsicMetadataRow( | |||||
id=data.origin_url_1, | |||||
metadata=metadata, | |||||
indexer_configuration_id=tool_id, | |||||
mappings=["mapping1"], | |||||
from_revision=data.revision_id_2, | |||||
) | |||||
metadata_origin2 = attr.evolve(metadata_origin, id=data.origin_url_2) | |||||
# when | |||||
storage.revision_intrinsic_metadata_add([metadata_rev]) | |||||
storage.origin_intrinsic_metadata_add([metadata_origin, metadata_origin2]) | |||||
storage.origin_intrinsic_metadata_delete( | |||||
[{"id": data.origin_url_1, "indexer_configuration_id": tool_id}] | |||||
) | |||||
# then | |||||
actual_metadata = list( | |||||
storage.origin_intrinsic_metadata_get( | |||||
[data.origin_url_1, data.origin_url_2, "no://where"] | |||||
) | |||||
) | |||||
assert [ | |||||
attr.evolve(m, indexer_configuration_id=cast(Dict, m.tool)["id"], tool=None) | |||||
for m in actual_metadata | |||||
] == [metadata_origin2] | |||||
def test_origin_intrinsic_metadata_delete_nonexisting( | |||||
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] | |||||
) -> None: | |||||
storage, data = swh_indexer_storage_with_data | |||||
tool_id = data.tools["swh-metadata-detector"]["id"] | |||||
storage.origin_intrinsic_metadata_delete( | |||||
[{"id": data.origin_url_1, "indexer_configuration_id": tool_id}] | |||||
) | |||||
def test_origin_intrinsic_metadata_add_drop_duplicate( | def test_origin_intrinsic_metadata_add_drop_duplicate( | ||||
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] | self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] | ||||
) -> None: | ) -> None: | ||||
storage, data = swh_indexer_storage_with_data | storage, data = swh_indexer_storage_with_data | ||||
# given | # given | ||||
tool_id = data.tools["swh-metadata-detector"]["id"] | tool_id = data.tools["swh-metadata-detector"]["id"] | ||||
metadata_v1: Dict[str, Any] = { | metadata_v1: Dict[str, Any] = { | ||||
▲ Show 20 Lines • Show All 695 Lines • Show Last 20 Lines |