Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/test_metadata.py
Show First 20 Lines • Show All 384 Lines • ▼ Show 20 Lines | def test_extrinsic_metadata_indexer_thirdparty_authority(self, mocker): | ||||
results = metadata_indexer.index(GITHUB_REMD.id, data=GITHUB_REMD) | results = metadata_indexer.index(GITHUB_REMD.id, data=GITHUB_REMD) | ||||
assert metadata_indexer.storage.method_calls == [ | assert metadata_indexer.storage.method_calls == [ | ||||
call.origin_get_by_sha1([b"\x01" * 20]) | call.origin_get_by_sha1([b"\x01" * 20]) | ||||
] | ] | ||||
assert results == [] | assert results == [] | ||||
def test_extrinsic_metadata_indexer_duplicate_origin(self, mocker): | def test_extrinsic_metadata_indexer_duplicate_origin(self, mocker): | ||||
"""Nominal case, calling the mapping and storing the result""" | """Two metadata objects with the same origin target""" | ||||
origin = "https://example.org/jdoe/myrepo" | origin = "https://example.org/jdoe/myrepo" | ||||
metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) | metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) | ||||
metadata_indexer.catch_exceptions = False | metadata_indexer.catch_exceptions = False | ||||
metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") | metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") | ||||
metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] | metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] | ||||
tool = metadata_indexer.idx_storage.indexer_configuration_get( | tool = metadata_indexer.idx_storage.indexer_configuration_get( | ||||
Show All 10 Lines | def test_extrinsic_metadata_indexer_duplicate_origin(self, mocker): | ||||
} | } | ||||
) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} | ) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} | ||||
results = list( | results = list( | ||||
metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) | metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) | ||||
) | ) | ||||
assert len(results) == 1, results | assert len(results) == 1, results | ||||
assert results[0].from_remd_id == b"\x00" * 20 | assert results[0].from_remd_id == b"\x00" * 20 | ||||
def test_extrinsic_directory_metadata_indexer_duplicate_origin(self, mocker): | |||||
"""Two metadata objects on directories, but with an origin context""" | |||||
origin = DEPOSIT_REMD.origin | |||||
metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) | |||||
metadata_indexer.catch_exceptions = False | |||||
metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") | |||||
metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] | |||||
tool = metadata_indexer.idx_storage.indexer_configuration_get( | |||||
{f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} | |||||
) | |||||
assert tool is not None | |||||
assert metadata_indexer.process_journal_objects( | |||||
{ | |||||
"raw_extrinsic_metadata": [ | |||||
DEPOSIT_REMD.to_dict(), | |||||
{ | |||||
**DEPOSIT_REMD.to_dict(), | |||||
"id": b"\x00" * 20, | |||||
"target": "swh:1:dir:" + "01" * 20, | |||||
}, | |||||
] | |||||
} | |||||
) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} | |||||
results = list( | |||||
metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) | |||||
) | |||||
assert len(results) == 1, results | |||||
assert results[0].from_remd_id == b"\x00" * 20 |