Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/tests/__init__.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import subprocess | import subprocess | ||||
from pathlib import PosixPath | from pathlib import PosixPath | ||||
from typing import Optional, Union | from typing import Dict, Optional, Union | ||||
from swh.model.model import OriginVisitStatus | from swh.model.model import OriginVisitStatus | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | |||||
from swh.storage.algos.origin import origin_get_latest_visit_status | from swh.storage.algos.origin import origin_get_latest_visit_status | ||||
def assert_last_visit_matches( | def assert_last_visit_matches( | ||||
storage, | storage, | ||||
url: str, | url: str, | ||||
status: str, | status: str, | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | ) -> str: | ||||
if not isinstance(tmp_path, str): | if not isinstance(tmp_path, str): | ||||
tmp_path = str(tmp_path) | tmp_path = str(tmp_path) | ||||
# uncompress folder/repositories/dump for the loader to ingest | # uncompress folder/repositories/dump for the loader to ingest | ||||
subprocess.check_output(["tar", "xf", archive_path, "-C", tmp_path]) | subprocess.check_output(["tar", "xf", archive_path, "-C", tmp_path]) | ||||
# build the origin url (or some derivative form) | # build the origin url (or some derivative form) | ||||
_fname = filename if filename else os.path.basename(archive_path) | _fname = filename if filename else os.path.basename(archive_path) | ||||
repo_url = f"file://{tmp_path}/{_fname}" | repo_url = f"file://{tmp_path}/{_fname}" | ||||
return repo_url | return repo_url | ||||
def decode_target(target): | |||||
"""Test helper to ease readability in test | |||||
""" | |||||
if not target: | |||||
return target | |||||
target_type = target["target_type"] | |||||
if target_type == "alias": | |||||
decoded_target = target["target"].decode("utf-8") | |||||
else: | |||||
decoded_target = hash_to_hex(target["target"]) | |||||
return {"target": decoded_target, "target_type": target_type} | |||||
def check_snapshot(expected_snapshot, storage): | |||||
"""Check for snapshot match. | |||||
Provide the hashes as hexadecimal, the conversion is done | |||||
within the method. | |||||
Args: | |||||
expected_snapshot (dict): full snapshot with hex ids | |||||
storage (Storage): expected storage | |||||
Returns: | |||||
the snapshot stored in the storage for further test assertion if any is | |||||
needed. | |||||
""" | |||||
expected_snapshot_id = expected_snapshot["id"] | |||||
expected_branches = expected_snapshot["branches"] | |||||
snap = storage.snapshot_get(hash_to_bytes(expected_snapshot_id)) | |||||
if snap is None: | |||||
# display known snapshots instead if possible | |||||
if hasattr(storage, "_snapshots"): # in-mem storage | |||||
from pprint import pprint | |||||
for snap_id, (_snap, _) in storage._snapshots.items(): | |||||
snapd = _snap.to_dict() | |||||
snapd["id"] = hash_to_hex(snapd["id"]) | |||||
branches = { | |||||
branch.decode("utf-8"): decode_target(target) | |||||
for branch, target in snapd["branches"].items() | |||||
} | |||||
snapd["branches"] = branches | |||||
pprint(snapd) | |||||
ardumont: Just noticed this is not covered.
It must be through the dvcs loader tests though, i guess. | |||||
Done Inline ActionsOh, i recall now, it's utility code to display the snapshot in case we don't find it. I'll let it be as it's not in the diff's perimeter. ardumont: Oh, i recall now, it's utility code to display the snapshot in case we don't find it.
So that… | |||||
raise AssertionError("Snapshot is not found") | |||||
branches = { | |||||
branch.decode("utf-8"): decode_target(target) | |||||
for branch, target in snap["branches"].items() | |||||
} | |||||
assert expected_branches == branches | |||||
return snap | |||||
def get_stats(storage) -> Dict: | |||||
"""Adaptation utils to unify the stats counters across storage | |||||
implementation. | |||||
""" | |||||
storage.refresh_stat_counters() | |||||
stats = storage.stat_counters() | |||||
keys = [ | |||||
"content", | |||||
"directory", | |||||
"origin", | |||||
"origin_visit", | |||||
"person", | |||||
"release", | |||||
"revision", | |||||
"skipped_content", | |||||
"snapshot", | |||||
] | |||||
return {k: stats.get(k) for k in keys} |
Just noticed this is not covered.
It must be through the dvcs loader tests though, i guess.
I'll add coverage in a follow-up diff.
In the mean time [1]
[1] T2486