Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/tests/test_tar.py
- This file was added.
# Copyright (C) 2019 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from typing import Tuple | |||||
from swh.core.pytest_plugin import requests_mock_datadir_factory | |||||
from swh.loader.package.tar import TarLoader | |||||
from swh.loader.package.tests.common import check_snapshot | |||||
URL = 'https://deposit.softwareheritage.org/hello/2.10.orig.tar.gz' | |||||
PACKAGES = [ | |||||
{ | |||||
'version': '2.10', | |||||
'url': URL, | |||||
'date': '2014-10-19T16:52:35+02:00', | |||||
} | |||||
] | |||||
def integrity_to_hash(integrity_value: str) -> Tuple[str, str]: | |||||
hash_name, base64_value = integrity_value.split('-') | |||||
from base64 import b64decode | |||||
from binascii import hexlify | |||||
hash_hex = hexlify(b64decode(base64_value)).decode('utf-8') | |||||
return hash_name, hash_hex | |||||
def test_integrity_to_hash(): | |||||
hash_name, hash_hex = integrity_to_hash( | |||||
'sha256-MeBmE3qWJnbon2nRtlOC3pWn732RS4y5VvQepy4PUWs=') | |||||
assert hash_name == 'sha256' | |||||
assert hash_hex == '31e066137a962676e89f69d1b65382de95a7ef7d914b8cb956f41ea72e0f516b' # noqa | |||||
requests_mock_datadir_missing = requests_mock_datadir_factory(ignore_urls=[ | |||||
URL | |||||
]) | |||||
def test_tar_visit_with_no_artifact_found( | |||||
swh_config, requests_mock_datadir_missing): | |||||
loader = TarLoader(url=URL, packages=PACKAGES) | |||||
actual_load_status = loader.load() | |||||
assert actual_load_status['status'] == 'uneventful' | |||||
stats = loader.storage.stat_counters() | |||||
assert { | |||||
'content': 0, | |||||
'directory': 0, | |||||
'origin': 1, | |||||
'origin_visit': 1, | |||||
'person': 0, | |||||
'release': 0, | |||||
'revision': 0, | |||||
'skipped_content': 0, | |||||
'snapshot': 1, | |||||
} == stats | |||||
origin_visit = next(loader.storage.origin_visit_get(URL)) | |||||
assert origin_visit['status'] == 'partial' | |||||
def test_tar_visit_with_artifact_found(swh_config, requests_mock_datadir): | |||||
loader = TarLoader(url=URL, packages=PACKAGES) | |||||
actual_load_status = loader.load() | |||||
assert actual_load_status['status'] == 'eventful' | |||||
stats = loader.storage.stat_counters() | |||||
assert { | |||||
'content': 303, | |||||
'directory': 12, | |||||
'origin': 1, | |||||
'origin_visit': 1, | |||||
'person': 1, | |||||
'release': 0, | |||||
'revision': 1, | |||||
'skipped_content': 0, | |||||
'snapshot': 1, | |||||
} == stats | |||||
origin_visit = next(loader.storage.origin_visit_get(URL)) | |||||
assert origin_visit['status'] == 'full' | |||||
expected_snapshot = { | |||||
'id': 'e759f554b660f03ebaf2e5bf62c34e0fe0ee5748', | |||||
'branches': { | |||||
'HEAD': { | |||||
'target_type': 'alias', | |||||
'target': 'releases/2.10' | |||||
}, | |||||
'releases/2.10': { | |||||
'target_type': 'revision', | |||||
'target': '326260b671e595403f03b9e673af519e23011c52', | |||||
} | |||||
}, | |||||
} | |||||
check_snapshot(expected_snapshot, loader.storage) |