Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/tests/test_from_disk.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os.path | import os.path | ||||
import subprocess | |||||
import dulwich.repo | |||||
from swh.loader.git.from_disk import GitLoaderFromDisk as OrigGitLoaderFromDisk | from swh.loader.git.from_disk import GitLoaderFromDisk as OrigGitLoaderFromDisk | ||||
from swh.loader.git.from_disk import GitLoaderFromArchive as OrigGitLoaderFromArchive | from swh.loader.git.from_disk import GitLoaderFromArchive as OrigGitLoaderFromArchive | ||||
from swh.loader.core.tests import BaseLoaderTest | from swh.loader.core.tests import BaseLoaderTest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from . import TEST_LOADER_CONFIG | from . import TEST_LOADER_CONFIG | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | class BaseDirGitLoaderFromDiskTest(BaseGitLoaderFromDiskTest): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp("testrepo.tgz", uncompress_archive=True) | super().setUp("testrepo.tgz", uncompress_archive=True) | ||||
self.loader = GitLoaderFromDiskTest( | self.loader = GitLoaderFromDiskTest( | ||||
url=self.repo_url, | url=self.repo_url, | ||||
visit_date="2016-05-03 15:16:32+00", | visit_date="2016-05-03 15:16:32+00", | ||||
directory=self.destination_path, | directory=self.destination_path, | ||||
) | ) | ||||
self.storage = self.loader.storage | self.storage = self.loader.storage | ||||
self.repo = dulwich.repo.Repo(self.destination_path) | |||||
def load(self): | def load(self): | ||||
return self.loader.load() | return self.loader.load() | ||||
class BaseGitLoaderFromArchiveTest(BaseGitLoaderFromDiskTest): | class BaseGitLoaderFromArchiveTest(BaseGitLoaderFromDiskTest): | ||||
"""Mixin base loader test to prepare the git | """Mixin base loader test to prepare the git | ||||
repository to uncompress, load and test the results. | repository to uncompress, load and test the results. | ||||
Show All 21 Lines | class GitLoaderFromDiskTests: | ||||
def test_load(self): | def test_load(self): | ||||
"""Loads a simple repository (made available by `setUp()`), | """Loads a simple repository (made available by `setUp()`), | ||||
and checks everything was added in the storage.""" | and checks everything was added in the storage.""" | ||||
res = self.load() | res = self.load() | ||||
self.assertEqual(res["status"], "eventful", res) | self.assertEqual(res["status"], "eventful", res) | ||||
self.assertContentsContain(CONTENT1) | self.assertContentsContain(CONTENT1) | ||||
self.assertCountDirectories(7) | self.assertCountDirectories(7) | ||||
self.assertCountReleases(0) # FIXME: why not 2? | self.assertCountReleases(0) # FIXME: should be 2 after T2059 | ||||
self.assertCountRevisions(7) | self.assertCountRevisions(7) | ||||
self.assertCountSnapshots(1) | self.assertCountSnapshots(1) | ||||
self.assertRevisionsContain(REVISIONS1) | self.assertRevisionsContain(REVISIONS1) | ||||
self.assertSnapshotEqual(SNAPSHOT1) | self.assertSnapshotEqual(SNAPSHOT1) | ||||
self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | ||||
Show All 21 Lines | def test_load_unchanged(self): | ||||
self.assertEqual(visit["snapshot"], hash_to_bytes(SNAPSHOT1["id"])) | self.assertEqual(visit["snapshot"], hash_to_bytes(SNAPSHOT1["id"])) | ||||
self.assertEqual(visit["status"], "full") | self.assertEqual(visit["status"], "full") | ||||
class DirGitLoaderTest(BaseDirGitLoaderFromDiskTest, GitLoaderFromDiskTests): | class DirGitLoaderTest(BaseDirGitLoaderFromDiskTest, GitLoaderFromDiskTests): | ||||
"""Tests for the GitLoaderFromDisk. Includes the common ones, and | """Tests for the GitLoaderFromDisk. Includes the common ones, and | ||||
add others that only work with a local dir.""" | add others that only work with a local dir.""" | ||||
def _git(self, *cmd): | |||||
"""Small wrapper around subprocess to call Git.""" | |||||
try: | |||||
return subprocess.check_output( | |||||
["git", "-C", self.destination_path] + list(cmd) | |||||
) | |||||
except subprocess.CalledProcessError as e: | |||||
print(e.output) | |||||
print(e.stderr) | |||||
raise | |||||
def test_load_changed(self): | def test_load_changed(self): | ||||
"""Loads a repository, makes some changes by adding files, commits, | """Loads a repository, makes some changes by adding files, commits, | ||||
and merges, load it again, and check the storage contains everything | and merges, load it again, and check the storage contains everything | ||||
it should.""" | it should.""" | ||||
# Initial load | # Initial load | ||||
res = self.load() | res = self.load() | ||||
self.assertEqual(res["status"], "eventful", res) | self.assertEqual(res["status"], "eventful", res) | ||||
self._git("config", "--local", "user.email", "you@example.com") | |||||
self._git("config", "--local", "user.name", "Your Name") | |||||
# Load with a new file + revision | # Load with a new file + revision | ||||
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: | with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: | ||||
fd.write("print('Hello world')\n") | fd.write("print('Hello world')\n") | ||||
self._git("add", "hello.py") | self.repo.stage([b"hello.py"]) | ||||
self._git("commit", "-m", "Hello world") | new_revision = self.repo.do_commit(b"Hello world\n").decode() | ||||
new_revision = self._git("rev-parse", "master").decode().strip() | new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858" | ||||
assert self.repo[new_revision.encode()].tree == new_dir.encode() | |||||
revisions = REVISIONS1.copy() | revisions = REVISIONS1.copy() | ||||
assert new_revision not in revisions | assert new_revision not in revisions | ||||
revisions[new_revision] = "85dae072a5aa9923ffa7a7568f819ff21bf49858" | revisions[new_revision] = new_dir | ||||
res = self.load() | res = self.load() | ||||
self.assertEqual(res["status"], "eventful") | self.assertEqual(res["status"], "eventful") | ||||
self.assertCountContents(4 + 1) | self.assertCountContents(4 + 1) | ||||
self.assertCountDirectories(7 + 1) | self.assertCountDirectories(7 + 1) | ||||
self.assertCountReleases(0) # FIXME: why not 2? | self.assertCountReleases(0) # FIXME: should be 2 after T2059 | ||||
self.assertCountRevisions(7 + 1) | self.assertCountRevisions(7 + 1) | ||||
self.assertCountSnapshots(1 + 1) | self.assertCountSnapshots(1 + 1) | ||||
self.assertRevisionsContain(revisions) | self.assertRevisionsContain(revisions) | ||||
# TODO: how to check the snapshot id? | |||||
# self.assertSnapshotEqual(SNAPSHOT1) | |||||
self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | ||||
self.assertEqual(self.loader.visit_status(), "full") | self.assertEqual(self.loader.visit_status(), "full") | ||||
visit = self.storage.origin_visit_get_latest(self.repo_url) | visit = self.storage.origin_visit_get_latest(self.repo_url) | ||||
self.assertIsNotNone(visit["snapshot"]) | self.assertIsNotNone(visit["snapshot"]) | ||||
self.assertEqual(visit["status"], "full") | self.assertEqual(visit["status"], "full") | ||||
# Load with a new merge | snapshot_id = visit["snapshot"] | ||||
self._git("merge", "branch1", "-m", "merge") | snapshot = self.storage.snapshot_get(snapshot_id) | ||||
new_revision = self._git("rev-parse", "master").decode().strip() | branches = snapshot["branches"] | ||||
assert branches[b"HEAD"] == { | |||||
"target": b"refs/heads/master", | |||||
"target_type": "alias", | |||||
} | |||||
assert branches[b"refs/heads/master"] == { | |||||
"target": hash_to_bytes(new_revision), | |||||
"target_type": "revision", | |||||
} | |||||
assert new_revision not in revisions | # Merge branch1 into HEAD. | ||||
revisions[new_revision] = "dab8a37df8db8666d4e277bef9a546f585b5bedd" | |||||
current = self.repo[b"HEAD"] | |||||
branch1 = self.repo[b"refs/heads/branch1"] | |||||
merged_tree = dulwich.objects.Tree() | |||||
for item in self.repo[current.tree].items(): | |||||
merged_tree.add(*item) | |||||
for item in self.repo[branch1.tree].items(): | |||||
merged_tree.add(*item) | |||||
merged_dir_id = "dab8a37df8db8666d4e277bef9a546f585b5bedd" | |||||
assert merged_tree.id.decode() == merged_dir_id | |||||
self.repo.object_store.add_object(merged_tree) | |||||
merge_commit = self.repo.do_commit( | |||||
b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id] | |||||
) | |||||
assert merge_commit.decode() not in revisions | |||||
revisions[merge_commit.decode()] = merged_tree.id.decode() | |||||
res = self.load() | res = self.load() | ||||
self.assertEqual(res["status"], "eventful") | self.assertEqual(res["status"], "eventful") | ||||
self.assertCountContents(4 + 1) | self.assertCountContents(4 + 1) | ||||
self.assertCountDirectories(7 + 2) | self.assertCountDirectories(7 + 2) | ||||
self.assertCountReleases(0) # FIXME: why not 2? | self.assertCountReleases(0) # FIXME: should be 2 after T2059 | ||||
self.assertCountRevisions(7 + 2) | self.assertCountRevisions(7 + 2) | ||||
self.assertCountSnapshots(1 + 1 + 1) | self.assertCountSnapshots(1 + 1 + 1) | ||||
self.assertRevisionsContain(revisions) | self.assertRevisionsContain(revisions) | ||||
# TODO: how to check the snapshot id? | |||||
# self.assertSnapshotEqual(SNAPSHOT1) | |||||
self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | self.assertEqual(self.loader.load_status(), {"status": "eventful"}) | ||||
self.assertEqual(self.loader.visit_status(), "full") | self.assertEqual(self.loader.visit_status(), "full") | ||||
visit = self.storage.origin_visit_get_latest(self.repo_url) | visit = self.storage.origin_visit_get_latest(self.repo_url) | ||||
self.assertIsNotNone(visit["snapshot"]) | self.assertIsNotNone(visit["snapshot"]) | ||||
self.assertEqual(visit["status"], "full") | self.assertEqual(visit["status"], "full") | ||||
merge_snapshot_id = visit["snapshot"] | |||||
assert merge_snapshot_id != snapshot_id | |||||
merge_snapshot = self.storage.snapshot_get(merge_snapshot_id) | |||||
merge_branches = merge_snapshot["branches"] | |||||
assert merge_branches[b"HEAD"] == { | |||||
"target": b"refs/heads/master", | |||||
"target_type": "alias", | |||||
} | |||||
assert merge_branches[b"refs/heads/master"] == { | |||||
"target": hash_to_bytes(merge_commit.decode()), | |||||
"target_type": "revision", | |||||
} | |||||
class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderFromDiskTests): | class GitLoaderFromArchiveTest(BaseGitLoaderFromArchiveTest, GitLoaderFromDiskTests): | ||||
"""Tests for GitLoaderFromArchive. Imports the common ones | """Tests for GitLoaderFromArchive. Imports the common ones | ||||
from GitLoaderTests.""" | from GitLoaderTests.""" | ||||
pass | pass |