diff --git a/swh/loader/tests/__init__.py b/swh/loader/tests/__init__.py index e824492..2ddbbe4 100644 --- a/swh/loader/tests/__init__.py +++ b/swh/loader/tests/__init__.py @@ -1,161 +1,148 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess from pathlib import PosixPath from typing import Dict, Optional, Union from swh.model.model import OriginVisitStatus from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.storage.algos.origin import origin_get_latest_visit_status def assert_last_visit_matches( storage, url: str, status: str, type: Optional[str] = None, snapshot: Optional[bytes] = None, ) -> OriginVisitStatus: """This retrieves the last visit and visit_status which are expected to exist. This also checks that the {visit|visit_status} have their respective properties correctly set. This returns the last visit_status for that given origin. Args: url: Origin url status: Check that the visit status has the given status type: Check that the returned visit has the given type snapshot: Check that the visit status points to the given snapshot Raises: AssertionError in case visit or visit status is not found, or any of the type, status and snapshot mismatch Returns: the visit status for further check during the remaining part of the test. """ visit_and_status = origin_get_latest_visit_status(storage, url) assert visit_and_status is not None, f"Origin {url} has no visits" visit, visit_status = visit_and_status if type: assert visit.type == type, f"Visit has type {visit.type} instead of {type}" assert ( visit_status.status == status ), f"Visit_status has status {visit_status.status} instead of {status}" if snapshot is not None: assert visit_status.snapshot is not None assert visit_status.snapshot == snapshot, ( f"Visit_status points to snapshot {visit_status.snapshot.hex()} " f"instead of {snapshot.hex()}" ) return visit_status def prepare_repository_from_archive( archive_path: str, filename: Optional[str] = None, tmp_path: Union[PosixPath, str] = "/tmp", ) -> str: """Given an existing archive_path, uncompress it. Returns a file repo url which can be used as origin url. This does not deal with the case where the archive passed along does not exist. """ if not isinstance(tmp_path, str): tmp_path = str(tmp_path) # uncompress folder/repositories/dump for the loader to ingest subprocess.check_output(["tar", "xf", archive_path, "-C", tmp_path]) # build the origin url (or some derivative form) _fname = filename if filename else os.path.basename(archive_path) repo_url = f"file://{tmp_path}/{_fname}" return repo_url def decode_target(target): """Test helper to ease readability in test """ if not target: return target target_type = target["target_type"] if target_type == "alias": decoded_target = target["target"].decode("utf-8") else: decoded_target = hash_to_hex(target["target"]) return {"target": decoded_target, "target_type": target_type} def check_snapshot(expected_snapshot, storage): """Check for snapshot match. Provide the hashes as hexadecimal, the conversion is done within the method. Args: expected_snapshot (dict): full snapshot with hex ids storage (Storage): expected storage Returns: the snapshot stored in the storage for further test assertion if any is needed. """ expected_snapshot_id = expected_snapshot["id"] expected_branches = expected_snapshot["branches"] snap = storage.snapshot_get(hash_to_bytes(expected_snapshot_id)) if snap is None: - # display known snapshots instead if possible - if hasattr(storage, "_snapshots"): # in-mem storage - from pprint import pprint - - for snap_id, (_snap, _) in storage._snapshots.items(): - snapd = _snap.to_dict() - snapd["id"] = hash_to_hex(snapd["id"]) - branches = { - branch.decode("utf-8"): decode_target(target) - for branch, target in snapd["branches"].items() - } - snapd["branches"] = branches - pprint(snapd) - raise AssertionError("Snapshot is not found") + raise AssertionError(f"Snapshot {expected_snapshot_id} is not found") branches = { branch.decode("utf-8"): decode_target(target) for branch, target in snap["branches"].items() } assert expected_branches == branches return snap def get_stats(storage) -> Dict: """Adaptation utils to unify the stats counters across storage implementation. """ storage.refresh_stat_counters() stats = storage.stat_counters() keys = [ "content", "directory", "origin", "origin_visit", "person", "release", "revision", "skipped_content", "snapshot", ] return {k: stats.get(k) for k in keys} diff --git a/swh/loader/tests/test_init.py b/swh/loader/tests/test_init.py index 6bc8274..f35902f 100644 --- a/swh/loader/tests/test_init.py +++ b/swh/loader/tests/test_init.py @@ -1,253 +1,258 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import pytest import os import subprocess from swh.storage import get_storage from swh.loader.tests import prepare_repository_from_archive, assert_last_visit_matches from swh.model.model import ( OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, TargetType, ) from swh.model.hashutil import hash_to_bytes from swh.loader.tests import ( decode_target, check_snapshot, ) hash_hex = "43e45d56f88993aae6a0198013efa80716fd8920" storage_config = {"cls": "pipeline", "steps": [{"cls": "memory",}]} ORIGIN_VISIT = OriginVisit( origin="some-url", visit=1, date=datetime.datetime.now(tz=datetime.timezone.utc), type="archive", ) ORIGIN_VISIT_STATUS = OriginVisitStatus( origin="some-url", visit=1, date=datetime.datetime.now(tz=datetime.timezone.utc), status="full", snapshot=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), metadata=None, ) @pytest.fixture def mock_storage(mocker): mock_storage = mocker.patch("swh.loader.tests.origin_get_latest_visit_status") mock_storage.return_value = ORIGIN_VISIT, ORIGIN_VISIT_STATUS return mock_storage def test_assert_last_visit_matches_raise(mock_storage, mocker): """Not finding origin visit_and_statu should raise """ # overwrite so we raise because we do not find the right visit mock_storage.return_value = None with pytest.raises(AssertionError, match="Origin url has no visits"): assert_last_visit_matches(mock_storage, "url", status="full") assert mock_storage.called is True def test_assert_last_visit_matches_wrong_status(mock_storage, mocker): """Wrong visit detected should raise AssertionError """ expected_status = "partial" assert ORIGIN_VISIT_STATUS.status != expected_status with pytest.raises(AssertionError, match="Visit_status has status"): assert_last_visit_matches(mock_storage, "url", status=expected_status) assert mock_storage.called is True def test_assert_last_visit_matches_wrong_type(mock_storage, mocker): """Wrong visit detected should raise AssertionError """ expected_type = "git" assert ORIGIN_VISIT.type != expected_type with pytest.raises(AssertionError, match="Visit has type"): assert_last_visit_matches( mock_storage, "url", status=ORIGIN_VISIT_STATUS.status, type=expected_type, # mismatched type will raise ) assert mock_storage.called is True def test_assert_last_visit_matches_wrong_snapshot(mock_storage, mocker): """Wrong visit detected should raise AssertionError """ expected_snapshot_id = hash_to_bytes("e92cc0710eb6cf9efd5b920a8453e1e07157b6cd") assert ORIGIN_VISIT_STATUS.snapshot != expected_snapshot_id with pytest.raises(AssertionError, match="Visit_status points to snapshot"): assert_last_visit_matches( mock_storage, "url", status=ORIGIN_VISIT_STATUS.status, snapshot=expected_snapshot_id, # mismatched snapshot will raise ) assert mock_storage.called is True def test_assert_last_visit_matches(mock_storage, mocker): """Correct visit detected should return the visit_status """ visit_type = ORIGIN_VISIT.type visit_status = ORIGIN_VISIT_STATUS.status visit_snapshot = ORIGIN_VISIT_STATUS.snapshot actual_visit_status = assert_last_visit_matches( mock_storage, "url", type=visit_type, status=visit_status, snapshot=visit_snapshot, ) assert actual_visit_status == ORIGIN_VISIT_STATUS assert mock_storage.called is True def test_prepare_repository_from_archive_failure(): # does not deal with inexistent archive so raise assert os.path.exists("unknown-archive") is False with pytest.raises(subprocess.CalledProcessError, match="exit status 2"): prepare_repository_from_archive("unknown-archive") def test_prepare_repository_from_archive(datadir, tmp_path): archive_name = "0805nexter-1.1.0" archive_path = os.path.join(str(datadir), f"{archive_name}.tar.gz") assert os.path.exists(archive_path) is True tmp_path = str(tmp_path) # deals with path string repo_url = prepare_repository_from_archive( archive_path, filename=archive_name, tmp_path=tmp_path ) expected_uncompressed_archive_path = os.path.join(tmp_path, archive_name) assert repo_url == f"file://{expected_uncompressed_archive_path}" assert os.path.exists(expected_uncompressed_archive_path) def test_prepare_repository_from_archive_no_filename(datadir, tmp_path): archive_name = "0805nexter-1.1.0" archive_path = os.path.join(str(datadir), f"{archive_name}.tar.gz") assert os.path.exists(archive_path) is True # deals with path as posix path (for tmp_path) repo_url = prepare_repository_from_archive(archive_path, tmp_path=tmp_path) tmp_path = str(tmp_path) expected_uncompressed_archive_path = os.path.join(tmp_path, archive_name) expected_repo_url = os.path.join(tmp_path, f"{archive_name}.tar.gz") assert repo_url == f"file://{expected_repo_url}" # passing along the filename does not influence the on-disk extraction # just the repo-url computation assert os.path.exists(expected_uncompressed_archive_path) def test_decode_target_edge(): assert not decode_target(None) def test_decode_target(): actual_alias_decode_target = decode_target( {"target_type": "alias", "target": b"something",} ) assert actual_alias_decode_target == { "target_type": "alias", "target": "something", } actual_decode_target = decode_target( {"target_type": "revision", "target": hash_to_bytes(hash_hex),} ) assert actual_decode_target == { "target_type": "revision", "target": hash_hex, } def test_check_snapshot(): storage = get_storage(**storage_config) snap_id = "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7" snapshot = Snapshot( id=hash_to_bytes(snap_id), branches={ b"master": SnapshotBranch( target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, ), }, ) s = storage.snapshot_add([snapshot]) assert s == { "snapshot:add": 1, } expected_snapshot = { "id": snap_id, "branches": {"master": {"target": hash_hex, "target_type": "revision",}}, } check_snapshot(expected_snapshot, storage) def test_check_snapshot_failure(): storage = get_storage(**storage_config) snapshot = Snapshot( id=hash_to_bytes("2498dbf535f882bc7f9a18fb16c9ad27fda7bab7"), branches={ b"master": SnapshotBranch( target=hash_to_bytes(hash_hex), target_type=TargetType.REVISION, ), }, ) s = storage.snapshot_add([snapshot]) assert s == { "snapshot:add": 1, } unexpected_snapshot = { - "id": "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7", + "id": "2498dbf535f882bc7f9a18fb16c9ad27fda7bab7", # id is correct "branches": { - "master": {"target": hash_hex, "target_type": "release",} # wrong value + "master": {"target": hash_hex, "target_type": "release",} # wrong branch }, } - with pytest.raises(AssertionError): + with pytest.raises(AssertionError, match="Differing items"): + check_snapshot(unexpected_snapshot, storage) + + # snapshot id which does not exist + unexpected_snapshot["id"] = "999666f535f882bc7f9a18fb16c9ad27fda7bab7" + with pytest.raises(AssertionError, match="is not found"): check_snapshot(unexpected_snapshot, storage)