Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_from_disk.py
# Copyright (C) 2020-2021 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
from hashlib import sha1 | from hashlib import sha1 | ||||
import os | import os | ||||
from pathlib import Path | from pathlib import Path | ||||
import subprocess | import subprocess | ||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from swh.loader.mercurial.loader import HgBundle20Loader | |||||
from swh.loader.mercurial.utils import parse_visit_date | from swh.loader.mercurial.utils import parse_visit_date | ||||
from swh.loader.tests import ( | from swh.loader.tests import ( | ||||
assert_last_visit_matches, | assert_last_visit_matches, | ||||
check_snapshot, | check_snapshot, | ||||
get_stats, | get_stats, | ||||
prepare_repository_from_archive, | prepare_repository_from_archive, | ||||
) | ) | ||||
from swh.model.from_disk import Content, DentryPerms | from swh.model.from_disk import Content, DentryPerms | ||||
▲ Show 20 Lines • Show All 381 Lines • ▼ Show 20 Lines | def test_closed_branch_incremental(swh_storage, datadir, tmp_path): | ||||
} | } | ||||
assert get_stats(loader.storage) == expected_stats | assert get_stats(loader.storage) == expected_stats | ||||
assert loader.load() == {"status": "uneventful"} | assert loader.load() == {"status": "uneventful"} | ||||
assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1} | assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 1 + 1} | ||||
assert loader.load() == {"status": "uneventful"} | assert loader.load() == {"status": "uneventful"} | ||||
assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} | assert get_stats(loader.storage) == {**expected_stats, "origin_visit": 2 + 1} | ||||
def test_old_loader_new_loader(swh_storage, datadir, tmp_path): | |||||
archive_name = "example" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | |||||
repo_path = repo_url.replace("file://", "") | |||||
old_loader = HgBundle20Loader(swh_storage, repo_path) | |||||
assert old_loader.load() == {"status": "eventful"} | |||||
expected_stats = { | |||||
"content": 7, | |||||
"directory": 16, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 9, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
assert get_stats(old_loader.storage) == expected_stats | |||||
# Shouldn't pick up anything | |||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | |||||
assert loader.load() == {"status": "uneventful"} | |||||
# Shouldn't pick up anything either | |||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | |||||
assert loader.load() == {"status": "uneventful"} | |||||
def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): | def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): | ||||
"""Checks the loader will load revisions targeted by an ExtID if the | """Checks the loader will load revisions targeted by an ExtID if the | ||||
revisions are missing from the storage""" | revisions are missing from the storage""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
repo_path = repo_url.replace("file://", "") | repo_path = repo_url.replace("file://", "") | ||||
▲ Show 20 Lines • Show All 117 Lines • Show Last 20 Lines |