Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/tests/test_loader_metadata.py
Show First 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | def test_load_metadata(swh_config, caplog): | ||||
assert load_status == { | assert load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": FULL_SNAPSHOT_ID, | "snapshot_id": FULL_SNAPSHOT_ID, | ||||
} | } | ||||
result = storage.raw_extrinsic_metadata_get( | result = storage.raw_extrinsic_metadata_get( | ||||
MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, | MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result.next_page_token is None | ||||
assert result["results"] == REVISION_METADATA | assert result.results == REVISION_METADATA | ||||
result = storage.raw_extrinsic_metadata_get( | result = storage.raw_extrinsic_metadata_get( | ||||
MetadataTargetType.ORIGIN, ORIGIN_URL, AUTHORITY, | MetadataTargetType.ORIGIN, ORIGIN_URL, AUTHORITY, | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result.next_page_token is None | ||||
assert result["results"] == ORIGIN_METADATA | assert result.results == ORIGIN_METADATA | ||||
assert caplog.text == "" | assert caplog.text == "" | ||||
def test_existing_authority(swh_config, caplog): | def test_existing_authority(swh_config, caplog): | ||||
storage = get_storage("memory") | storage = get_storage("memory") | ||||
loader = MetadataTestLoader(ORIGIN_URL) | loader = MetadataTestLoader(ORIGIN_URL) | ||||
loader.storage = storage | loader.storage = storage | ||||
loader.config["create_authorities"] = False | loader.config["create_authorities"] = False | ||||
storage.metadata_authority_add([attr.evolve(AUTHORITY, metadata={})]) | storage.metadata_authority_add([attr.evolve(AUTHORITY, metadata={})]) | ||||
load_status = loader.load() | load_status = loader.load() | ||||
assert load_status == { | assert load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": FULL_SNAPSHOT_ID, | "snapshot_id": FULL_SNAPSHOT_ID, | ||||
} | } | ||||
result = storage.raw_extrinsic_metadata_get( | result = storage.raw_extrinsic_metadata_get( | ||||
MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, | MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result.next_page_token is None | ||||
assert result["results"] == REVISION_METADATA | assert result.results == REVISION_METADATA | ||||
assert caplog.text == "" | assert caplog.text == "" | ||||
def test_existing_fetcher(swh_config, caplog): | def test_existing_fetcher(swh_config, caplog): | ||||
storage = get_storage("memory") | storage = get_storage("memory") | ||||
loader = MetadataTestLoader(ORIGIN_URL) | loader = MetadataTestLoader(ORIGIN_URL) | ||||
loader.storage = storage | loader.storage = storage | ||||
loader.config["create_fetchers"] = False | loader.config["create_fetchers"] = False | ||||
storage.metadata_fetcher_add([attr.evolve(FETCHER, metadata={})]) | storage.metadata_fetcher_add([attr.evolve(FETCHER, metadata={})]) | ||||
load_status = loader.load() | load_status = loader.load() | ||||
assert load_status == { | assert load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": FULL_SNAPSHOT_ID, | "snapshot_id": FULL_SNAPSHOT_ID, | ||||
} | } | ||||
result = storage.raw_extrinsic_metadata_get( | result = storage.raw_extrinsic_metadata_get( | ||||
MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, | MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result.next_page_token is None | ||||
assert result["results"] == REVISION_METADATA | assert result.results == REVISION_METADATA | ||||
assert caplog.text == "" | assert caplog.text == "" |