diff --git a/swh/loader/tests/__init__.py b/swh/loader/tests/__init__.py index 2ddbbe4..d88648e 100644 --- a/swh/loader/tests/__init__.py +++ b/swh/loader/tests/__init__.py @@ -1,148 +1,167 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess from pathlib import PosixPath -from typing import Dict, Optional, Union +from typing import Any, Dict, Optional, Union -from swh.model.model import OriginVisitStatus -from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.model import OriginVisitStatus, Snapshot +from swh.model.hashutil import hash_to_bytes +from swh.storage.interface import StorageInterface from swh.storage.algos.origin import origin_get_latest_visit_status def assert_last_visit_matches( storage, url: str, status: str, type: Optional[str] = None, snapshot: Optional[bytes] = None, ) -> OriginVisitStatus: """This retrieves the last visit and visit_status which are expected to exist. This also checks that the {visit|visit_status} have their respective properties correctly set. This returns the last visit_status for that given origin. Args: url: Origin url status: Check that the visit status has the given status type: Check that the returned visit has the given type snapshot: Check that the visit status points to the given snapshot Raises: AssertionError in case visit or visit status is not found, or any of the type, status and snapshot mismatch Returns: the visit status for further check during the remaining part of the test. """ visit_and_status = origin_get_latest_visit_status(storage, url) assert visit_and_status is not None, f"Origin {url} has no visits" visit, visit_status = visit_and_status if type: assert visit.type == type, f"Visit has type {visit.type} instead of {type}" assert ( visit_status.status == status ), f"Visit_status has status {visit_status.status} instead of {status}" if snapshot is not None: assert visit_status.snapshot is not None assert visit_status.snapshot == snapshot, ( f"Visit_status points to snapshot {visit_status.snapshot.hex()} " f"instead of {snapshot.hex()}" ) return visit_status def prepare_repository_from_archive( archive_path: str, filename: Optional[str] = None, tmp_path: Union[PosixPath, str] = "/tmp", ) -> str: """Given an existing archive_path, uncompress it. Returns a file repo url which can be used as origin url. This does not deal with the case where the archive passed along does not exist. """ if not isinstance(tmp_path, str): tmp_path = str(tmp_path) # uncompress folder/repositories/dump for the loader to ingest subprocess.check_output(["tar", "xf", archive_path, "-C", tmp_path]) # build the origin url (or some derivative form) _fname = filename if filename else os.path.basename(archive_path) repo_url = f"file://{tmp_path}/{_fname}" return repo_url -def decode_target(target): +def encode_target(target: Dict) -> Dict: """Test helper to ease readability in test """ if not target: return target target_type = target["target_type"] - - if target_type == "alias": - decoded_target = target["target"].decode("utf-8") + target_data = target["target"] + if target_type == "alias" and isinstance(target_data, str): + encoded_target = target_data.encode("utf-8") + elif isinstance(target_data, str): + encoded_target = hash_to_bytes(target_data) else: - decoded_target = hash_to_hex(target["target"]) + encoded_target = target_data - return {"target": decoded_target, "target_type": target_type} + return {"target": encoded_target, "target_type": target_type} -def check_snapshot(expected_snapshot, storage): +def check_snapshot( + snapshot: Union[Dict[str, Any], Snapshot], storage: StorageInterface +): """Check for snapshot match. - Provide the hashes as hexadecimal, the conversion is done - within the method. + The hashes can be both in hex or bytes, the necessary conversion will happen prior + to check. Args: - expected_snapshot (dict): full snapshot with hex ids - storage (Storage): expected storage + snapshot: full snapshot to check for existence and consistency + storage: storage to lookup information into Returns: the snapshot stored in the storage for further test assertion if any is needed. """ - expected_snapshot_id = expected_snapshot["id"] - expected_branches = expected_snapshot["branches"] - snap = storage.snapshot_get(hash_to_bytes(expected_snapshot_id)) + if isinstance(snapshot, Snapshot): + expected_snapshot = snapshot + elif isinstance(snapshot, dict): + # dict must be snapshot compliant + snapshot_dict = {"id": hash_to_bytes(snapshot["id"])} + branches = {} + for branch, target in snapshot["branches"].items(): + if isinstance(branch, str): + branch = branch.encode("utf-8") + branches[branch] = encode_target(target) + snapshot_dict["branches"] = branches + expected_snapshot = Snapshot.from_dict(snapshot_dict) + else: + raise AssertionError(f"variable 'snapshot' must be a snapshot: {snapshot!r}") + + snap = storage.snapshot_get(expected_snapshot.id) if snap is None: - raise AssertionError(f"Snapshot {expected_snapshot_id} is not found") - - branches = { - branch.decode("utf-8"): decode_target(target) - for branch, target in snap["branches"].items() - } - assert expected_branches == branches - return snap + raise AssertionError(f"Snapshot {expected_snapshot.id.hex()} is not found") + + assert snap["next_branch"] is None # we don't deal with large snapshot in tests + snap.pop("next_branch") + actual_snap = Snapshot.from_dict(snap) + + assert expected_snapshot == actual_snap + + return snap # for retro compat, returned the dict, remove when clients are migrated def get_stats(storage) -> Dict: """Adaptation utils to unify the stats counters across storage implementation. """ storage.refresh_stat_counters() stats = storage.stat_counters() keys = [ "content", "directory", "origin", "origin_visit", "person", "release", "revision", "skipped_content", "snapshot", ] return {k: stats.get(k) for k in keys} diff --git a/swh/loader/tests/test_init.py b/swh/loader/tests/test_init.py index ccc7d9e..cbd6693 100644 --- a/swh/loader/tests/test_init.py +++ b/swh/loader/tests/test_init.py @@ -1,250 +1,258 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import pytest import os import subprocess -from swh.loader.tests import prepare_repository_from_archive, assert_last_visit_matches from swh.model.model import ( OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) from swh.model.hashutil import hash_to_bytes from swh.loader.tests import ( - decode_target, + assert_last_visit_matches, + encode_target, check_snapshot, + prepare_repository_from_archive, ) hash_hex = "43e45d56f88993aae6a0198013efa80716fd8920" ORIGIN_VISIT = OriginVisit( origin="some-url", visit=1, date=datetime.datetime.now(tz=datetime.timezone.utc), type="archive", ) ORIGIN_VISIT_STATUS = OriginVisitStatus( origin="some-url", visit=1, date=datetime.datetime.now(tz=datetime.timezone.utc), status="full", snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), metadata=None, ) @pytest.fixture def mock_storage(mocker): mock_storage = mocker.patch("swh.loader.tests.origin_get_latest_visit_status") mock_storage.return_value = ORIGIN_VISIT, ORIGIN_VISIT_STATUS return mock_storage def test_assert_last_visit_matches_raise(mock_storage, mocker): """Not finding origin visit_and_statu should raise """ # overwrite so we raise because we do not find the right visit mock_storage.return_value = None with pytest.raises(AssertionError, match="Origin url has no visits"): assert_last_visit_matches(mock_storage, "url", status="full") assert mock_storage.called is True def test_assert_last_visit_matches_wrong_status(mock_storage, mocker): """Wrong visit detected should raise AssertionError """ expected_status = "partial" assert ORIGIN_VISIT_STATUS.status != expected_status with pytest.raises(AssertionError, match="Visit_status has status"): assert_last_visit_matches(mock_storage, "url", status=expected_status) assert mock_storage.called is True def test_assert_last_visit_matches_wrong_type(mock_storage, mocker): """Wrong visit detected should raise AssertionError """ expected_type = "git" assert ORIGIN_VISIT.type != expected_type with pytest.raises(AssertionError, match="Visit has type"): assert_last_visit_matches( mock_storage, "url", status=ORIGIN_VISIT_STATUS.status, type=expected_type, # mismatched type will raise ) assert mock_storage.called is True def test_assert_last_visit_matches_wrong_snapshot(mock_storage, mocker): """Wrong visit detected should raise AssertionError """ expected_snapshot_id = hash_to_bytes("e92cc0710eb6cf9efd5b920a8453e1e07157b6cd") assert ORIGIN_VISIT_STATUS.snapshot != expected_snapshot_id with pytest.raises(AssertionError, match="Visit_status points to snapshot"): assert_last_visit_matches( mock_storage, "url", status=ORIGIN_VISIT_STATUS.status, snapshot=expected_snapshot_id, # mismatched snapshot will raise ) assert mock_storage.called is True def test_assert_last_visit_matches(mock_storage, mocker): """Correct visit detected should return the visit_status """ visit_type = ORIGIN_VISIT.type visit_status = ORIGIN_VISIT_STATUS.status visit_snapshot = ORIGIN_VISIT_STATUS.snapshot actual_visit_status = assert_last_visit_matches( mock_storage, "url", type=visit_type, status=visit_status, snapshot=visit_snapshot, ) assert actual_visit_status == ORIGIN_VISIT_STATUS assert mock_storage.called is True def test_prepare_repository_from_archive_failure(): # does not deal with inexistent archive so raise assert os.path.exists("unknown-archive") is False with pytest.raises(subprocess.CalledProcessError, match="exit status 2"): prepare_repository_from_archive("unknown-archive") def test_prepare_repository_from_archive(datadir, tmp_path): archive_name = "0805nexter-1.1.0" archive_path = os.path.join(str(datadir), f"{archive_name}.tar.gz") assert os.path.exists(archive_path) is True tmp_path = str(tmp_path) # deals with path string repo_url = prepare_repository_from_archive( archive_path, filename=archive_name, tmp_path=tmp_path ) expected_uncompressed_archive_path = os.path.join(tmp_path, archive_name) assert repo_url == f"file://{expected_uncompressed_archive_path}" assert os.path.exists(expected_uncompressed_archive_path) def test_prepare_repository_from_archive_no_filename(datadir, tmp_path): archive_name = "0805nexter-1.1.0" archive_path = os.path.join(str(datadir), f"{archive_name}.tar.gz") assert os.path.exists(archive_path) is True # deals with path as posix path (for tmp_path) repo_url = prepare_repository_from_archive(archive_path, tmp_path=tmp_path) tmp_path = str(tmp_path) expected_uncompressed_archive_path = os.path.join(tmp_path, archive_name) expected_repo_url = os.path.join(tmp_path, f"{archive_name}.tar.gz") assert repo_url == f"file://{expected_repo_url}" # passing along the filename does not influence the on-disk extraction # just the repo-url computation assert os.path.exists(expected_uncompressed_archive_path) -def test_decode_target_edge(): - assert not decode_target(None) +def test_encode_target(): + assert encode_target(None) is None + for target_alias in ["something", b"something"]: + target = { + "target_type": "alias", + "target": target_alias, + } + actual_alias_encode_target = encode_target(target) + assert actual_alias_encode_target == { + "target_type": "alias", + "target": b"something", + } -def test_decode_target(): - actual_alias_decode_target = decode_target( - {"target_type": "alias", "target": b"something",} - ) - - assert actual_alias_decode_target == { - "target_type": "alias", - "target": "something", - } - - actual_decode_target = decode_target( - {"target_type": "revision", "target": hash_to_bytes(hash_hex),} - ) - - assert actual_decode_target == { - "target_type": "revision", - "target": hash_hex, - } + for hash_ in [hash_hex, hash_to_bytes(hash_hex)]: + target = {"target_type": "revision", "target": hash_} + actual_encode_target = encode_target(target) + assert actual_encode_target == { + "target_type": "revision", + "target": hash_to_bytes(hash_hex), + } def test_check_snapshot(swh_storage): - snap_id = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" + """Check snapshot should not raise when everything is fine""" snapshot = Snapshot( - id=hash_to_bytes(snap_id), + id=hash_to_bytes("2498dbf535f882bc7f9a18fb16c9ad27fda7bab7"), branches={ b"master": SnapshotBranch( target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, ), }, ) s = swh_storage.snapshot_add([snapshot]) assert s == { "snapshot:add": 1, } - expected_snapshot = { - "id": snap_id, - "branches": {"master": {"target": hash_hex, "target_type": "revision",}}, - } - check_snapshot(expected_snapshot, swh_storage) + for snap in [snapshot, snapshot.to_dict()]: + check_snapshot(snap, swh_storage) def test_check_snapshot_failure(swh_storage): + """check_snapshot should raise if something goes wrong""" + snap_id_hex = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" snapshot = Snapshot( - id=hash_to_bytes("2498dbf535f882bc7f9a18fb16c9ad27fda7bab7"), + id=hash_to_bytes(snap_id_hex), branches={ b"master": SnapshotBranch( target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, ), }, ) s = swh_storage.snapshot_add([snapshot]) assert s == { "snapshot:add": 1, } unexpected_snapshot = { "id": "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7", # id is correct "branches": { "master": {"target": hash_hex, "target_type": "release",} # wrong branch }, } - with pytest.raises(AssertionError, match="Differing items"): - check_snapshot(unexpected_snapshot, swh_storage) + # id is correct, the branch is wrong, that should raise nonetheless + for snap_id in [snap_id_hex, snapshot.id]: + with pytest.raises(AssertionError, match="Differing attributes"): + unexpected_snapshot["id"] = snap_id + check_snapshot(unexpected_snapshot, swh_storage) # snapshot id which does not exist - unexpected_snapshot["id"] = "999666f535f882bc7f9a18fb16c9ad27fda7bab7" - with pytest.raises(AssertionError, match="is not found"): - check_snapshot(unexpected_snapshot, swh_storage) + wrong_snap_id_hex = "999666f535f882bc7f9a18fb16c9ad27fda7bab7" + for snap_id in [wrong_snap_id_hex, hash_to_bytes(wrong_snap_id_hex)]: + unexpected_snapshot["id"] = wrong_snap_id_hex + with pytest.raises(AssertionError, match="is not found"): + check_snapshot(unexpected_snapshot, swh_storage) + + # not a Snapshot object, raise! + with pytest.raises(AssertionError, match="variable 'snapshot' must be a snapshot"): + check_snapshot(ORIGIN_VISIT, swh_storage)