Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tests/test_init.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import pytest | import pytest | ||||
import os | import os | ||||
import subprocess | import subprocess | ||||
from swh.storage import get_storage | |||||
from swh.loader.tests import prepare_repository_from_archive, assert_last_visit_matches | from swh.loader.tests import prepare_repository_from_archive, assert_last_visit_matches | ||||
from swh.model.model import OriginVisit, OriginVisitStatus | from swh.model.model import ( | ||||
OriginVisit, | |||||
OriginVisitStatus, | |||||
Snapshot, | |||||
SnapshotBranch, | |||||
TargetType, | |||||
) | |||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.loader.tests import ( | |||||
decode_target, | |||||
check_snapshot, | |||||
) | |||||
hash_hex = "43e45d56f88993aae6a0198013efa80716fd8920" | |||||
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory",}]} | |||||
ORIGIN_VISIT = OriginVisit( | ORIGIN_VISIT = OriginVisit( | ||||
origin="some-url", | origin="some-url", | ||||
visit=1, | visit=1, | ||||
date=datetime.datetime.now(tz=datetime.timezone.utc), | date=datetime.datetime.now(tz=datetime.timezone.utc), | ||||
type="archive", | type="archive", | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 127 Lines • ▼ Show 20 Lines | def test_prepare_repository_from_archive_no_filename(datadir, tmp_path): | ||||
tmp_path = str(tmp_path) | tmp_path = str(tmp_path) | ||||
expected_uncompressed_archive_path = os.path.join(tmp_path, archive_name) | expected_uncompressed_archive_path = os.path.join(tmp_path, archive_name) | ||||
expected_repo_url = os.path.join(tmp_path, f"{archive_name}.tar.gz") | expected_repo_url = os.path.join(tmp_path, f"{archive_name}.tar.gz") | ||||
assert repo_url == f"file://{expected_repo_url}" | assert repo_url == f"file://{expected_repo_url}" | ||||
# passing along the filename does not influence the on-disk extraction | # passing along the filename does not influence the on-disk extraction | ||||
# just the repo-url computation | # just the repo-url computation | ||||
assert os.path.exists(expected_uncompressed_archive_path) | assert os.path.exists(expected_uncompressed_archive_path) | ||||
def test_decode_target_edge(): | |||||
assert not decode_target(None) | |||||
def test_decode_target(): | |||||
actual_alias_decode_target = decode_target( | |||||
{"target_type": "alias", "target": b"something",} | |||||
) | |||||
assert actual_alias_decode_target == { | |||||
"target_type": "alias", | |||||
"target": "something", | |||||
} | |||||
actual_decode_target = decode_target( | |||||
{"target_type": "revision", "target": hash_to_bytes(hash_hex),} | |||||
) | |||||
assert actual_decode_target == { | |||||
"target_type": "revision", | |||||
"target": hash_hex, | |||||
} | |||||
def test_check_snapshot(): | |||||
storage = get_storage(**storage_config) | |||||
snap_id = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" | |||||
snapshot = Snapshot( | |||||
id=hash_to_bytes(snap_id), | |||||
branches={ | |||||
b"master": SnapshotBranch( | |||||
target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | |||||
), | |||||
}, | |||||
) | |||||
s = storage.snapshot_add([snapshot]) | |||||
assert s == { | |||||
"snapshot:add": 1, | |||||
} | |||||
expected_snapshot = { | |||||
"id": snap_id, | |||||
"branches": {"master": {"target": hash_hex, "target_type": "revision",}}, | |||||
} | |||||
check_snapshot(expected_snapshot, storage) | |||||
def test_check_snapshot_failure(): | |||||
storage = get_storage(**storage_config) | |||||
snapshot = Snapshot( | |||||
id=hash_to_bytes("2498dbf535f882bc7f9a18fb16c9ad27fda7bab7"), | |||||
branches={ | |||||
b"master": SnapshotBranch( | |||||
target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, | |||||
), | |||||
}, | |||||
) | |||||
s = storage.snapshot_add([snapshot]) | |||||
assert s == { | |||||
"snapshot:add": 1, | |||||
} | |||||
unexpected_snapshot = { | |||||
"id": "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7", | |||||
"branches": { | |||||
"master": {"target": hash_hex, "target_type": "release",} # wrong value | |||||
}, | |||||
} | |||||
with pytest.raises(AssertionError): | |||||
check_snapshot(unexpected_snapshot, storage) |