Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_from_disk.py
# Copyright (C) 2020-2021 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
from hashlib import sha1 | from hashlib import sha1 | ||||
import os | import os | ||||
from pathlib import Path | |||||
import subprocess | |||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from swh.loader.mercurial.utils import parse_visit_date | from swh.loader.mercurial.utils import parse_visit_date | ||||
from swh.loader.tests import ( | from swh.loader.tests import ( | ||||
assert_last_visit_matches, | assert_last_visit_matches, | ||||
check_snapshot, | check_snapshot, | ||||
▲ Show 20 Lines • Show All 412 Lines • ▼ Show 20 Lines | loader = HgLoaderFromDisk( | ||||
directory=directory, # specify directory to avoid clone | directory=directory, # specify directory to avoid clone | ||||
visit_date=VISIT_DATE, | visit_date=VISIT_DATE, | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "eventful"} | assert actual_load_status == {"status": "eventful"} | ||||
assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") | assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") | ||||
def hg_strip(repo: str, rev: str) -> None: | |||||
subprocess.check_call(["hg", "debugstrip", rev], cwd=repo) | |||||
def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): | |||||
archive_name = "hello" | |||||
archive_path = Path(datadir, f"{archive_name}.tgz") | |||||
json_path = Path(datadir, f"{archive_name}.json") | |||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | |||||
# first load with missing commits | |||||
hg_strip(repo_url.replace("file://", ""), "tip") | |||||
vlorentz: What does this do? | |||||
AlphareAuthorUnsubmitted Done Inline ActionsIt appears I forgot to add a docstring! Will do. Alphare: It appears I forgot to add a docstring! Will do.
(it straight-up removes the tip of the… | |||||
loader = HgLoaderFromDisk(swh_storage, repo_url) | |||||
assert loader.load() == {"status": "eventful"} | |||||
assert get_stats(loader.storage) == { | |||||
"content": 2, | |||||
"directory": 2, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 2, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
# second load with all commits | |||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | |||||
loader = HgLoaderFromDisk(swh_storage, repo_url) | |||||
checker = LoaderChecker( | |||||
loader=HgLoaderFromDisk(swh_storage, repo_url), | |||||
expected=ExpectedSwhids.load(json_path), | |||||
) | |||||
checker.check() | |||||
assert get_stats(loader.storage) == { | |||||
"content": 3, | |||||
"directory": 3, | |||||
"origin": 1, | |||||
"origin_visit": 2, | |||||
"release": 1, | |||||
"revision": 3, | |||||
"skipped_content": 0, | |||||
"snapshot": 2, | |||||
} |
What does this do?