Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tests/test_init.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import pytest | import pytest | ||||
import os | import os | ||||
import subprocess | import subprocess | ||||
from swh.loader.tests import prepare_repository_from_archive, assert_last_visit_matches | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.loader.tests import ( | from swh.loader.tests import ( | ||||
decode_target, | assert_last_visit_matches, | ||||
encode_target, | |||||
check_snapshot, | check_snapshot, | ||||
prepare_repository_from_archive, | |||||
) | ) | ||||
hash_hex = "43e45d56f88993aae6a0198013efa80716fd8920" | hash_hex = "43e45d56f88993aae6a0198013efa80716fd8920" | ||||
ORIGIN_VISIT = OriginVisit( | ORIGIN_VISIT = OriginVisit( | ||||
origin="some-url", | origin="some-url", | ||||
▲ Show 20 Lines • Show All 134 Lines • ▼ Show 20 Lines | def test_prepare_repository_from_archive_no_filename(datadir, tmp_path): | ||||
expected_repo_url = os.path.join(tmp_path, f"{archive_name}.tar.gz") | expected_repo_url = os.path.join(tmp_path, f"{archive_name}.tar.gz") | ||||
assert repo_url == f"file://{expected_repo_url}" | assert repo_url == f"file://{expected_repo_url}" | ||||
# passing along the filename does not influence the on-disk extraction | # passing along the filename does not influence the on-disk extraction | ||||
# just the repo-url computation | # just the repo-url computation | ||||
assert os.path.exists(expected_uncompressed_archive_path) | assert os.path.exists(expected_uncompressed_archive_path) | ||||
def test_decode_target_edge(): | def test_encode_target(): | ||||
assert not decode_target(None) | assert encode_target(None) is None | ||||
for target_alias in ["something", b"something"]: | |||||
def test_decode_target(): | target = { | ||||
actual_alias_decode_target = decode_target( | |||||
{"target_type": "alias", "target": b"something",} | |||||
) | |||||
assert actual_alias_decode_target == { | |||||
"target_type": "alias", | "target_type": "alias", | ||||
"target": "something", | "target": target_alias, | ||||
} | |||||
actual_alias_encode_target = encode_target(target) | |||||
assert actual_alias_encode_target == { | |||||
"target_type": "alias", | |||||
"target": b"something", | |||||
} | } | ||||
actual_decode_target = decode_target( | for hash_ in [hash_hex, hash_to_bytes(hash_hex)]: | ||||
{"target_type": "revision", "target": hash_to_bytes(hash_hex),} | target = {"target_type": "revision", "target": hash_} | ||||
) | actual_encode_target = encode_target(target) | ||||
assert actual_encode_target == { | |||||
assert actual_decode_target == { | |||||
"target_type": "revision", | "target_type": "revision", | ||||
"target": hash_hex, | "target": hash_to_bytes(hash_hex), | ||||
} | } | ||||
def test_check_snapshot(swh_storage): | def test_check_snapshot(swh_storage): | ||||
snap_id = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" | """Check snapshot should not raise when everything is fine""" | ||||
snapshot = Snapshot( | snapshot = Snapshot( | ||||
id=hash_to_bytes(snap_id), | id=hash_to_bytes("2498dbf535f882bc7f9a18fb16c9ad27fda7bab7"), | ||||
branches={ | branches={ | ||||
b"master": SnapshotBranch( | b"master": SnapshotBranch( | ||||
target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
s = swh_storage.snapshot_add([snapshot]) | s = swh_storage.snapshot_add([snapshot]) | ||||
assert s == { | assert s == { | ||||
"snapshot:add": 1, | "snapshot:add": 1, | ||||
} | } | ||||
expected_snapshot = { | for snap in [snapshot, snapshot.to_dict()]: | ||||
"id": snap_id, | check_snapshot(snap, swh_storage) | ||||
"branches": {"master": {"target": hash_hex, "target_type": "revision",}}, | |||||
} | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
def test_check_snapshot_failure(swh_storage): | def test_check_snapshot_failure(swh_storage): | ||||
"""check_snapshot should raise if something goes wrong""" | |||||
snap_id_hex = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" | |||||
snapshot = Snapshot( | snapshot = Snapshot( | ||||
id=hash_to_bytes("2498dbf535f882bc7f9a18fb16c9ad27fda7bab7"), | id=hash_to_bytes(snap_id_hex), | ||||
branches={ | branches={ | ||||
b"master": SnapshotBranch( | b"master": SnapshotBranch( | ||||
target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
s = swh_storage.snapshot_add([snapshot]) | s = swh_storage.snapshot_add([snapshot]) | ||||
assert s == { | assert s == { | ||||
"snapshot:add": 1, | "snapshot:add": 1, | ||||
} | } | ||||
unexpected_snapshot = { | unexpected_snapshot = { | ||||
"id": "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7", # id is correct | "id": "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7", # id is correct | ||||
"branches": { | "branches": { | ||||
"master": {"target": hash_hex, "target_type": "release",} # wrong branch | "master": {"target": hash_hex, "target_type": "release",} # wrong branch | ||||
}, | }, | ||||
} | } | ||||
with pytest.raises(AssertionError, match="Differing items"): | # id is correct, the branch is wrong, that should raise nonetheless | ||||
for snap_id in [snap_id_hex, snapshot.id]: | |||||
with pytest.raises(AssertionError, match="Differing attributes"): | |||||
unexpected_snapshot["id"] = snap_id | |||||
check_snapshot(unexpected_snapshot, swh_storage) | check_snapshot(unexpected_snapshot, swh_storage) | ||||
# snapshot id which does not exist | # snapshot id which does not exist | ||||
unexpected_snapshot["id"] = "999666f535f882bc7f9a18fb16c9ad27fda7bab7" | wrong_snap_id_hex = "999666f535f882bc7f9a18fb16c9ad27fda7bab7" | ||||
for snap_id in [wrong_snap_id_hex, hash_to_bytes(wrong_snap_id_hex)]: | |||||
unexpected_snapshot["id"] = wrong_snap_id_hex | |||||
with pytest.raises(AssertionError, match="is not found"): | with pytest.raises(AssertionError, match="is not found"): | ||||
check_snapshot(unexpected_snapshot, swh_storage) | check_snapshot(unexpected_snapshot, swh_storage) | ||||
# not a Snapshot object, raise! | |||||
with pytest.raises(AssertionError, match="variable 'snapshot' must be a snapshot"): | |||||
check_snapshot(ORIGIN_VISIT, swh_storage) |