Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/deposit/tests/test_deposit.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import re | import re | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
Show All 11 Lines | def test_deposit_init_ok(swh_config, swh_loader_config): | ||||
deposit_id = 999 | deposit_id = 999 | ||||
loader = DepositLoader(url, deposit_id) # Something that does not exist | loader = DepositLoader(url, deposit_id) # Something that does not exist | ||||
assert loader.url == url | assert loader.url == url | ||||
assert loader.client is not None | assert loader.client is not None | ||||
assert loader.client.base_url == swh_loader_config['deposit']['url'] | assert loader.client.base_url == swh_loader_config['deposit']['url'] | ||||
def test_deposit_loading_failure_to_fetch_metadata(swh_config): | def test_deposit_loading_unknown_deposit( | ||||
"""Error during fetching artifact ends us with failed/partial visit | swh_config, requests_mock_datadir): | ||||
"""Loading an unknown deposit should fail | |||||
no origin, no visit, no snapshot | |||||
""" | """ | ||||
# private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' | # private api url form: 'https://deposit.s.o/1/private/hal/666/raw/' | ||||
url = 'some-url' | url = 'some-url' | ||||
unknown_deposit_id = 666 | unknown_deposit_id = 667 | ||||
loader = DepositLoader(url, unknown_deposit_id) # does not exist | loader = DepositLoader(url, unknown_deposit_id) # does not exist | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {'status': 'failed'} | assert actual_load_status == {'status': 'failed'} | ||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
assert { | assert { | ||||
'content': 0, | 'content': 0, | ||||
'directory': 0, | 'directory': 0, | ||||
'origin': 1, | 'origin': 0, | ||||
'origin_visit': 1, | 'origin_visit': 0, | ||||
'person': 0, | 'person': 0, | ||||
'release': 0, | 'release': 0, | ||||
'revision': 0, | 'revision': 0, | ||||
'skipped_content': 0, | 'skipped_content': 0, | ||||
'snapshot': 0, | 'snapshot': 0, | ||||
} == stats | } == stats | ||||
ardumont: No more noise in the storage! \m/ | |||||
origin_visit = next(loader.storage.origin_visit_get(url)) | |||||
assert origin_visit['status'] == 'partial' | |||||
assert origin_visit['type'] == 'deposit' | |||||
requests_mock_datadir_missing_one = requests_mock_datadir_factory(ignore_urls=[ | requests_mock_datadir_missing_one = requests_mock_datadir_factory(ignore_urls=[ | ||||
'https://deposit.softwareheritage.org/1/private/666/raw/', | 'https://deposit.softwareheritage.org/1/private/666/raw/', | ||||
]) | ]) | ||||
def test_deposit_loading_failure_to_retrieve_1_artifact( | def test_deposit_loading_failure_to_retrieve_1_artifact( | ||||
swh_config, requests_mock_datadir_missing_one): | swh_config, requests_mock_datadir_missing_one): | ||||
▲ Show 20 Lines • Show All 141 Lines • Show Last 20 Lines |
No more noise in the storage! \m/