Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/tests/test_common.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.loader.package.tests.common import ( | from swh.loader.package.tests.common import ( | ||||
decode_target, check_snapshot, check_metadata, check_metadata_paths | decode_target, check_snapshot, check_metadata, check_metadata_paths | ||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
hash_hex = '43e45d56f88993aae6a0198013efa80716fd8920' | hash_hex = '43e45d56f88993aae6a0198013efa80716fd8920' | ||||
storage_config = { | |||||
'cls': 'pipeline', | |||||
'steps': [ | |||||
{ | |||||
'cls': 'validate', | |||||
}, | |||||
{ | |||||
'cls': 'memory', | |||||
} | |||||
] | |||||
} | |||||
def test_decode_target_edge(): | def test_decode_target_edge(): | ||||
assert not decode_target(None) | assert not decode_target(None) | ||||
def test_decode_target(): | def test_decode_target(): | ||||
actual_alias_decode_target = decode_target({ | actual_alias_decode_target = decode_target({ | ||||
'target_type': 'alias', | 'target_type': 'alias', | ||||
'target': b'something', | 'target': b'something', | ||||
Show All 11 Lines | def test_decode_target(): | ||||
assert actual_decode_target == { | assert actual_decode_target == { | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
'target': hash_hex, | 'target': hash_hex, | ||||
} | } | ||||
def test_check_snapshot(): | def test_check_snapshot(): | ||||
storage = get_storage(cls='memory') | storage = get_storage(**storage_config) | ||||
snap_id = '2498dbf535f882bc7f9a18fb16c9ad27fda7bab7' | snap_id = '2498dbf535f882bc7f9a18fb16c9ad27fda7bab7' | ||||
snapshot = { | snapshot = { | ||||
'id': hash_to_bytes(snap_id), | 'id': hash_to_bytes(snap_id), | ||||
'branches': { | 'branches': { | ||||
b'master': { | b'master': { | ||||
'target': hash_to_bytes(hash_hex), | 'target': hash_to_bytes(hash_hex), | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
Show All 14 Lines | expected_snapshot = { | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
} | } | ||||
} | } | ||||
} | } | ||||
check_snapshot(expected_snapshot, storage) | check_snapshot(expected_snapshot, storage) | ||||
def test_check_snapshot_failure(): | def test_check_snapshot_failure(): | ||||
storage = get_storage(cls='memory') | storage = get_storage(**storage_config) | ||||
snapshot = { | snapshot = { | ||||
'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), | 'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), | ||||
'branches': { | 'branches': { | ||||
b'master': { | b'master': { | ||||
'target': hash_to_bytes(hash_hex), | 'target': hash_to_bytes(hash_hex), | ||||
'target_type': 'revision', | 'target_type': 'revision', | ||||
}, | }, | ||||
▲ Show 20 Lines • Show All 92 Lines • Show Last 20 Lines |