Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/tests/test_from_disk.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | |||||
import datetime | import datetime | ||||
import os.path | import os.path | ||||
import dulwich.repo | import dulwich.repo | ||||
import pytest | |||||
from unittest import TestCase | |||||
from swh.model.model import Snapshot, SnapshotBranch, TargetType | from swh.model.model import Snapshot, SnapshotBranch, TargetType | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.loader.core.tests import BaseLoaderTest | |||||
from swh.loader.tests.common import assert_last_visit_matches | from swh.loader.tests.common import assert_last_visit_matches | ||||
from swh.loader.git.from_disk import GitLoaderFromDisk | |||||
from swh.loader.git.from_disk import GitLoaderFromArchive | |||||
from swh.loader.package.tests.common import check_snapshot, get_stats | |||||
from swh.loader.git.from_disk import GitLoaderFromDisk as OrigGitLoaderFromDisk | from swh.loader.git.tests import prepare_repository_from_archive | ||||
from swh.loader.git.from_disk import GitLoaderFromArchive as OrigGitLoaderFromArchive | |||||
from . import TEST_LOADER_CONFIG | |||||
class GitLoaderFromArchive(OrigGitLoaderFromArchive): | |||||
def project_name_from_archive(self, archive_path): | |||||
# We don't want the project name to be 'resources'. | |||||
return "testrepo" | |||||
def parse_config_file(self, *args, **kwargs): | |||||
return TEST_LOADER_CONFIG | |||||
CONTENT1 = { | |||||
"33ab5639bfd8e7b95eb1d8d0b87781d4ffea4d5d", # README v1 | |||||
"349c4ff7d21f1ec0eda26f3d9284c293e3425417", # README v2 | |||||
"799c11e348d39f1704022b8354502e2f81f3c037", # file1.txt | |||||
"4bdb40dfd6ec75cb730e678b5d7786e30170c5fb", # file2.txt | |||||
} | |||||
SNAPSHOT_ID = "a23699280a82a043f8c0994cf1631b568f716f95" | SNAPSHOT_ID = "a23699280a82a043f8c0994cf1631b568f716f95" | ||||
SNAPSHOT1 = { | SNAPSHOT1 = { | ||||
"id": SNAPSHOT_ID, | "id": SNAPSHOT_ID, | ||||
"branches": { | "branches": { | ||||
"HEAD": {"target": "refs/heads/master", "target_type": "alias",}, | "HEAD": {"target": "refs/heads/master", "target_type": "alias",}, | ||||
"refs/heads/master": { | "refs/heads/master": { | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | "b0a77609903f767a2fd3d769904ef9ef68468b87": ( | ||||
"9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338" | "9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338" | ||||
), | ), | ||||
"bd746cd1913721b269b395a56a97baf6755151c2": ( | "bd746cd1913721b269b395a56a97baf6755151c2": ( | ||||
"e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" | "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" | ||||
), | ), | ||||
} | } | ||||
class BaseGitLoaderFromDiskTest(BaseLoaderTest): | class CommonGitLoaderTests: | ||||
def setUp(self, archive_name, uncompress_archive, filename="testrepo"): | |||||
super().setUp( | |||||
archive_name=archive_name, | |||||
filename=filename, | |||||
prefix_tmp_folder_name="swh.loader.git.", | |||||
start_path=os.path.dirname(__file__), | |||||
uncompress_archive=uncompress_archive, | |||||
) | |||||
class GitLoaderFromDiskTest(OrigGitLoaderFromDisk): | |||||
def parse_config_file(self, *args, **kwargs): | |||||
return TEST_LOADER_CONFIG | |||||
class BaseDirGitLoaderFromDiskTest(BaseGitLoaderFromDiskTest): | |||||
"""Mixin base loader test to prepare the git | |||||
repository to uncompress, load and test the results. | |||||
This sets up | |||||
""" | |||||
def setUp(self): | |||||
super().setUp("testrepo.tgz", uncompress_archive=True) | |||||
self.loader = GitLoaderFromDiskTest( | |||||
url=self.repo_url, | |||||
visit_date=datetime.datetime( | |||||
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc | |||||
), | |||||
directory=self.destination_path, | |||||
) | |||||
self.storage = self.loader.storage | |||||
self.repo = dulwich.repo.Repo(self.destination_path) | |||||
def load(self): | |||||
return self.loader.load() | |||||
class BaseGitLoaderFromArchiveTest(BaseGitLoaderFromDiskTest): | |||||
"""Mixin base loader test to prepare the git | |||||
repository to uncompress, load and test the results. | |||||
This sets up | |||||
""" | |||||
def setUp(self): | |||||
super().setUp("testrepo.tgz", uncompress_archive=False) | |||||
self.loader = GitLoaderFromArchive( | |||||
url=self.repo_url, | |||||
visit_date=datetime.datetime( | |||||
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc | |||||
), | |||||
archive_path=self.destination_path, | |||||
) | |||||
self.storage = self.loader.storage | |||||
def load(self): | |||||
return self.loader.load() | |||||
class GitLoaderFromDiskTests: | |||||
"""Common tests for all git loaders.""" | """Common tests for all git loaders.""" | ||||
def test_load(self): | def test_load(self): | ||||
"""Loads a simple repository (made available by `setUp()`), | """Loads a simple repository (made available by `setUp()`), | ||||
and checks everything was added in the storage.""" | and checks everything was added in the storage.""" | ||||
res = self.load() | res = self.loader.load() | ||||
self.assertEqual(res["status"], "eventful", res) | |||||
self.assertContentsContain(CONTENT1) | |||||
self.assertCountDirectories(7) | |||||
self.assertCountReleases(0) # FIXME: should be 2 after T2059 | |||||
self.assertCountRevisions(7) | |||||
self.assertCountSnapshots(1) | |||||
self.assertRevisionsContain(REVISIONS1) | |||||
self.assertSnapshotEqual(SNAPSHOT1) | assert res == {"status": "eventful"} | ||||
self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | |||||
self.assertEqual(self.loader.visit_status(), "full") | |||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
self.storage, | self.loader.storage, | ||||
self.repo_url, | self.repo_url, | ||||
status="full", | status="full", | ||||
type="git", | type="git", | ||||
snapshot=hash_to_bytes(SNAPSHOT1["id"]), | snapshot=hash_to_bytes(SNAPSHOT1["id"]), | ||||
) | ) | ||||
stats = get_stats(self.loader.storage) | |||||
assert stats == { | |||||
"content": 4, | |||||
"directory": 7, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"person": 1, | |||||
"release": 0, | |||||
"revision": 7, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
check_snapshot(SNAPSHOT1, self.loader.storage) | |||||
def test_load_unchanged(self): | def test_load_unchanged(self): | ||||
"""Checks loading a repository a second time does not add | """Checks loading a repository a second time does not add | ||||
any extra data.""" | any extra data.""" | ||||
res = self.load() | res = self.loader.load() | ||||
self.assertEqual(res["status"], "eventful") | assert res == {"status": "eventful"} | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
self.storage, | self.loader.storage, | ||||
self.repo_url, | self.repo_url, | ||||
status="full", | status="full", | ||||
type="git", | type="git", | ||||
snapshot=hash_to_bytes(SNAPSHOT1["id"]), | snapshot=hash_to_bytes(SNAPSHOT1["id"]), | ||||
) | ) | ||||
res = self.load() | stats0 = get_stats(self.loader.storage) | ||||
self.assertEqual(res["status"], "uneventful") | assert stats0 == { | ||||
self.assertCountSnapshots(1) | "content": 4, | ||||
"directory": 7, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"person": 1, | |||||
"release": 0, | |||||
"revision": 7, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
res = self.loader.load() | |||||
assert res == {"status": "uneventful"} | |||||
stats1 = get_stats(self.loader.storage) | |||||
expected_stats = copy.deepcopy(stats0) | |||||
expected_stats["origin_visit"] += 1 | |||||
assert stats1 == expected_stats | |||||
check_snapshot(SNAPSHOT1, self.loader.storage) | |||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
self.storage, | self.loader.storage, | ||||
self.repo_url, | self.repo_url, | ||||
status="full", | status="full", | ||||
type="git", | type="git", | ||||
snapshot=hash_to_bytes(SNAPSHOT1["id"]), | snapshot=hash_to_bytes(SNAPSHOT1["id"]), | ||||
) | ) | ||||
class DirGitLoaderTest(BaseDirGitLoaderFromDiskTest, GitLoaderFromDiskTests): | class FullGitLoaderTests(CommonGitLoaderTests): | ||||
"""Tests for the GitLoaderFromDisk. Includes the common ones, and | """Tests for GitLoader (from disk or not). Includes the common ones, and | ||||
add others that only work with a local dir.""" | add others that only work with a local dir. | ||||
""" | |||||
def test_load_changed(self): | def test_load_changed(self): | ||||
"""Loads a repository, makes some changes by adding files, commits, | """Loads a repository, makes some changes by adding files, commits, | ||||
and merges, load it again, and check the storage contains everything | and merges, load it again, and check the storage contains everything | ||||
it should.""" | it should.""" | ||||
# Initial load | # Initial load | ||||
res = self.load() | res = self.loader.load() | ||||
self.assertEqual(res["status"], "eventful", res) | assert res == {"status": "eventful"} | ||||
stats0 = get_stats(self.loader.storage) | |||||
assert stats0 == { | |||||
"content": 4, | |||||
"directory": 7, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"person": 1, | |||||
"release": 0, | |||||
"revision": 7, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
# Load with a new file + revision | # Load with a new file + revision | ||||
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: | with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: | ||||
fd.write("print('Hello world')\n") | fd.write("print('Hello world')\n") | ||||
self.repo.stage([b"hello.py"]) | self.repo.stage([b"hello.py"]) | ||||
new_revision = self.repo.do_commit(b"Hello world\n").decode() | new_revision = self.repo.do_commit(b"Hello world\n").decode() | ||||
new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858" | new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858" | ||||
assert self.repo[new_revision.encode()].tree == new_dir.encode() | assert self.repo[new_revision.encode()].tree == new_dir.encode() | ||||
revisions = REVISIONS1.copy() | revisions = REVISIONS1.copy() | ||||
assert new_revision not in revisions | assert new_revision not in revisions | ||||
revisions[new_revision] = new_dir | revisions[new_revision] = new_dir | ||||
res = self.load() | res = self.loader.load() | ||||
self.assertEqual(res["status"], "eventful") | assert res == {"status": "eventful"} | ||||
self.assertCountContents(4 + 1) | |||||
self.assertCountDirectories(7 + 1) | |||||
self.assertCountReleases(0) # FIXME: should be 2 after T2059 | |||||
self.assertCountRevisions(7 + 1) | |||||
self.assertCountSnapshots(1 + 1) | |||||
self.assertRevisionsContain(revisions) | stats1 = get_stats(self.loader.storage) | ||||
expected_stats = copy.deepcopy(stats0) | |||||
# did one new visit | |||||
expected_stats["origin_visit"] += 1 | |||||
# with one more of the following objects | |||||
expected_stats["person"] += 1 | |||||
expected_stats["content"] += 1 | |||||
expected_stats["directory"] += 1 | |||||
expected_stats["revision"] += 1 | |||||
# concluding into 1 new snapshot | |||||
expected_stats["snapshot"] += 1 | |||||
self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | assert stats1 == expected_stats | ||||
self.assertEqual(self.loader.visit_status(), "full") | |||||
visit_status = assert_last_visit_matches( | visit_status = assert_last_visit_matches( | ||||
self.storage, self.repo_url, status="full", type="git" | self.loader.storage, self.repo_url, status="full", type="git" | ||||
) | ) | ||||
self.assertIsNotNone(visit_status.snapshot) | assert visit_status.snapshot is not None | ||||
snapshot_id = visit_status.snapshot | snapshot_id = visit_status.snapshot | ||||
snapshot = self.storage.snapshot_get(snapshot_id) | snapshot = self.loader.storage.snapshot_get(snapshot_id) | ||||
branches = snapshot["branches"] | branches = snapshot["branches"] | ||||
assert branches[b"HEAD"] == { | assert branches[b"HEAD"] == { | ||||
"target": b"refs/heads/master", | "target": b"refs/heads/master", | ||||
"target_type": "alias", | "target_type": "alias", | ||||
} | } | ||||
assert branches[b"refs/heads/master"] == { | assert branches[b"refs/heads/master"] == { | ||||
"target": hash_to_bytes(new_revision), | "target": hash_to_bytes(new_revision), | ||||
"target_type": "revision", | "target_type": "revision", | ||||
Show All 16 Lines | def test_load_changed(self): | ||||
merge_commit = self.repo.do_commit( | merge_commit = self.repo.do_commit( | ||||
b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id] | b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id] | ||||
) | ) | ||||
assert merge_commit.decode() not in revisions | assert merge_commit.decode() not in revisions | ||||
revisions[merge_commit.decode()] = merged_tree.id.decode() | revisions[merge_commit.decode()] = merged_tree.id.decode() | ||||
res = self.load() | res = self.loader.load() | ||||
self.assertEqual(res["status"], "eventful") | assert res == {"status": "eventful"} | ||||
self.assertCountContents(4 + 1) | stats2 = get_stats(self.loader.storage) | ||||
self.assertCountDirectories(7 + 2) | expected_stats = copy.deepcopy(stats1) | ||||
self.assertCountReleases(0) # FIXME: should be 2 after T2059 | # one more visit | ||||
self.assertCountRevisions(7 + 2) | expected_stats["origin_visit"] += 1 | ||||
self.assertCountSnapshots(1 + 1 + 1) | # with 1 new directory and revision | ||||
expected_stats["directory"] += 1 | |||||
expected_stats["revision"] += 1 | |||||
# concluding into 1 new snapshot | |||||
expected_stats["snapshot"] += 1 | |||||
self.assertRevisionsContain(revisions) | assert stats2 == expected_stats | ||||
self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | |||||
self.assertEqual(self.loader.visit_status(), "full") | |||||
visit_status = assert_last_visit_matches( | visit_status = assert_last_visit_matches( | ||||
self.storage, self.repo_url, status="full", type="git" | self.loader.storage, self.repo_url, status="full", type="git" | ||||
) | ) | ||||
self.assertIsNotNone(visit_status.snapshot) | assert visit_status.snapshot is not None | ||||
merge_snapshot_id = visit_status.snapshot | merge_snapshot_id = visit_status.snapshot | ||||
assert merge_snapshot_id != snapshot_id | assert merge_snapshot_id != snapshot_id | ||||
merge_snapshot = self.storage.snapshot_get(merge_snapshot_id) | merge_snapshot = self.loader.storage.snapshot_get(merge_snapshot_id) | ||||
merge_branches = merge_snapshot["branches"] | merge_branches = merge_snapshot["branches"] | ||||
assert merge_branches[b"HEAD"] == { | assert merge_branches[b"HEAD"] == { | ||||
"target": b"refs/heads/master", | "target": b"refs/heads/master", | ||||
"target_type": "alias", | "target_type": "alias", | ||||
} | } | ||||
assert merge_branches[b"refs/heads/master"] == { | assert merge_branches[b"refs/heads/master"] == { | ||||
"target": hash_to_bytes(merge_commit.decode()), | "target": hash_to_bytes(merge_commit.decode()), | ||||
"target_type": "revision", | "target_type": "revision", | ||||
Show All 29 Lines | def test_load_filter_branches(self): | ||||
# ... and the unfiltered_branches, which are all pointing to the same | # ... and the unfiltered_branches, which are all pointing to the same | ||||
# commit as "refs/heads/master". | # commit as "refs/heads/master". | ||||
for branch_name in unfiltered_branches: | for branch_name in unfiltered_branches: | ||||
branches[branch_name] = branches[b"refs/heads/master"] | branches[branch_name] = branches[b"refs/heads/master"] | ||||
expected_snapshot = Snapshot(branches=branches) | expected_snapshot = Snapshot(branches=branches) | ||||
# Load the modified repository | # Load the modified repository | ||||
res = self.load() | res = self.loader.load() | ||||
assert res["status"] == "eventful" | assert res == {"status": "eventful"} | ||||
assert self.loader.load_status() == {"status": "eventful"} | |||||
assert self.loader.visit_status() == "full" | |||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
self.storage, | self.loader.storage, | ||||
self.repo_url, | self.repo_url, | ||||
status="full", | status="full", | ||||
type="git", | type="git", | ||||
snapshot=expected_snapshot.id, | snapshot=expected_snapshot.id, | ||||
) | ) | ||||
def test_load_dangling_symref(self): | def test_load_dangling_symref(self): | ||||
with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f: | with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f: | ||||
f.write(b"ref: refs/heads/dangling-branch\n") | f.write(b"ref: refs/heads/dangling-branch\n") | ||||
res = self.load() | res = self.loader.load() | ||||
self.assertEqual(res["status"], "eventful", res) | assert res == {"status": "eventful"} | ||||
self.assertContentsContain(CONTENT1) | |||||
self.assertCountDirectories(7) | |||||
self.assertCountReleases(0) # FIXME: should be 2 after T2059 | |||||
self.assertCountRevisions(7) | |||||
self.assertCountSnapshots(1) | |||||
visit_status = assert_last_visit_matches( | visit_status = assert_last_visit_matches( | ||||
self.storage, self.repo_url, status="full", type="git" | self.loader.storage, self.repo_url, status="full", type="git" | ||||
) | ) | ||||
snapshot_id = visit_status.snapshot | snapshot_id = visit_status.snapshot | ||||
assert snapshot_id is not None | assert snapshot_id is not None | ||||
snapshot = self.storage.snapshot_get(snapshot_id) | snapshot = self.loader.storage.snapshot_get(snapshot_id) | ||||
branches = snapshot["branches"] | branches = snapshot["branches"] | ||||
assert branches[b"HEAD"] == { | assert branches[b"HEAD"] == { | ||||
"target": b"refs/heads/dangling-branch", | "target": b"refs/heads/dangling-branch", | ||||
"target_type": "alias", | "target_type": "alias", | ||||
} | } | ||||
assert branches[b"refs/heads/dangling-branch"] is None | assert branches[b"refs/heads/dangling-branch"] is None | ||||
stats = get_stats(self.loader.storage) | |||||
assert stats == { | |||||
"content": 4, | |||||
"directory": 7, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"person": 1, | |||||
"release": 0, | |||||
"revision": 7, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderFromDiskTests): | class GitLoaderFromDiskTest(TestCase, FullGitLoaderTests): | ||||
"""Tests for GitLoaderFromArchive. Imports the common ones | """Prepare a git directory repository to be loaded through a GitLoaderFromDisk. | ||||
from GitLoaderTests.""" | This tests all git loader scenario. | ||||
pass | """ | ||||
@pytest.fixture(autouse=True) | |||||
def init(self, swh_config, datadir, tmp_path): | |||||
archive_name = "testrepo" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
tmp_path = str(tmp_path) | |||||
self.repo_url = prepare_repository_from_archive( | |||||
archive_path, archive_name, tmp_path=tmp_path | |||||
) | |||||
self.destination_path = os.path.join(tmp_path, archive_name) | |||||
self.loader = GitLoaderFromDisk( | |||||
url=self.repo_url, | |||||
visit_date=datetime.datetime( | |||||
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc | |||||
), | |||||
directory=self.destination_path, | |||||
) | |||||
self.repo = dulwich.repo.Repo(self.destination_path) | |||||
class GitLoaderFromArchiveTest(TestCase, CommonGitLoaderTests): | |||||
"""Tests for GitLoaderFromArchive. Only tests common scenario.""" | |||||
@pytest.fixture(autouse=True) | |||||
def init(self, swh_config, datadir, tmp_path): | |||||
archive_name = "testrepo" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
self.repo_url = archive_path | |||||
self.loader = GitLoaderFromArchive( | |||||
url=self.repo_url, | |||||
archive_path=archive_path, | |||||
visit_date=datetime.datetime( | |||||
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc | |||||
), | |||||
) |