Page MenuHomeSoftware Heritage

test_from_disk.py
No OneTemporary

test_from_disk.py

# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import os.path
import dulwich.objects
import dulwich.porcelain
import dulwich.repo
import pytest
from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk
from swh.loader.tests import (
assert_last_visit_matches,
check_snapshot,
get_stats,
prepare_repository_from_archive,
)
from swh.model.hashutil import bytehex_to_hash, hash_to_bytes
from swh.model.model import ObjectType, Release, Snapshot, SnapshotBranch, TargetType
from swh.storage.algos.snapshot import snapshot_get_all_branches
SNAPSHOT1 = Snapshot(
id=hash_to_bytes("a23699280a82a043f8c0994cf1631b568f716f95"),
branches={
b"HEAD": SnapshotBranch(
target=b"refs/heads/master", target_type=TargetType.ALIAS,
),
b"refs/heads/master": SnapshotBranch(
target=hash_to_bytes("2f01f5ca7e391a2f08905990277faf81e709a649"),
target_type=TargetType.REVISION,
),
b"refs/heads/branch1": SnapshotBranch(
target=hash_to_bytes("b0a77609903f767a2fd3d769904ef9ef68468b87"),
target_type=TargetType.REVISION,
),
b"refs/heads/branch2": SnapshotBranch(
target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"),
target_type=TargetType.REVISION,
),
b"refs/tags/branch2-after-delete": SnapshotBranch(
target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"),
target_type=TargetType.REVISION,
),
b"refs/tags/branch2-before-delete": SnapshotBranch(
target=hash_to_bytes("1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b"),
target_type=TargetType.REVISION,
),
},
)
# directory hashes obtained with:
# gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a
# swh-hashtree --ignore '.git' --path .
# gco 2f01f5ca7e391a2f08905990277faf81e709a649
# swh-hashtree --ignore '.git' --path .
# gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777
# swh-hashtree --ignore '.git' --path .
# gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b
# swh-hashtree --ignore '.git' --path .
# gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c
# swh-hashtree --ignore '.git' --path .
# gco b0a77609903f767a2fd3d769904ef9ef68468b87
# swh-hashtree --ignore '.git' --path .
# gco bd746cd1913721b269b395a56a97baf6755151c2
# swh-hashtree --ignore '.git' --path .
REVISIONS1 = {
"b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a": (
"40dbdf55dfd4065422462cc74a949254aefa972e"
),
"2f01f5ca7e391a2f08905990277faf81e709a649": (
"e1d0d894835f91a0f887a4bc8b16f81feefdfbd5"
),
"bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777": (
"b43724545b4759244bb54be053c690649161411c"
),
"1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b": (
"fbf70528223d263661b5ad4b80f26caf3860eb8e"
),
"79f65ac75f79dda6ff03d66e1242702ab67fb51c": (
"5df34ec74d6f69072d9a0a6677d8efbed9b12e60"
),
"b0a77609903f767a2fd3d769904ef9ef68468b87": (
"9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338"
),
"bd746cd1913721b269b395a56a97baf6755151c2": (
"e1d0d894835f91a0f887a4bc8b16f81feefdfbd5"
),
}
class CommonGitLoaderTests:
"""Common tests for all git loaders."""
def test_load(self):
"""Loads a simple repository (made available by `setUp()`),
and checks everything was added in the storage."""
res = self.loader.load()
assert res == {"status": "eventful"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=SNAPSHOT1.id,
)
stats = get_stats(self.loader.storage)
assert stats == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(SNAPSHOT1, self.loader.storage)
def test_load_unchanged(self):
"""Checks loading a repository a second time does not add
any extra data."""
res = self.loader.load()
assert res == {"status": "eventful"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=SNAPSHOT1.id,
)
stats0 = get_stats(self.loader.storage)
assert stats0 == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
res = self.loader.load()
assert res == {"status": "uneventful"}
stats1 = get_stats(self.loader.storage)
expected_stats = copy.deepcopy(stats0)
expected_stats["origin_visit"] += 1
assert stats1 == expected_stats
check_snapshot(SNAPSHOT1, self.loader.storage)
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=SNAPSHOT1.id,
)
def test_load_visit_without_snapshot_so_status_failed(self):
# unfortunately, monkey-patch the hard way, self.loader is already instantiated
# (patching won't work self.loader is already instantiated)
# Make get_contents fail for some reason
self.loader.get_contents = None
res = self.loader.load()
assert res == {"status": "failed"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="failed",
type="git",
snapshot=None,
)
def test_load_visit_with_snapshot_so_status_partial(self):
# unfortunately, monkey-patch the hard way, self.loader is already instantiated
# (patching won't work self.loader is already instantiated)
# fake store_metadata raising for some reason, so we could have a snapshot id
# at this point in time
self.loader.store_metadata = None
# fake having a snapshot so the visit status is partial
self.loader.loaded_snapshot_id = hash_to_bytes(
"a23699280a82a043f8c0994cf1631b568f716f95"
)
res = self.loader.load()
assert res == {"status": "failed"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="partial",
type="git",
snapshot=None,
)
class FullGitLoaderTests(CommonGitLoaderTests):
"""Tests for GitLoader (from disk or not). Includes the common ones, and
add others that only work with a local dir.
"""
def test_load_changed(self):
"""Loads a repository, makes some changes by adding files, commits,
and merges, load it again, and check the storage contains everything
it should."""
# Initial load
res = self.loader.load()
assert res == {"status": "eventful"}
stats0 = get_stats(self.loader.storage)
assert stats0 == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
# Load with a new file + revision
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd:
fd.write("print('Hello world')\n")
self.repo.stage([b"hello.py"])
new_revision = self.repo.do_commit(b"Hello world\n").decode()
new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858"
assert self.repo[new_revision.encode()].tree == new_dir.encode()
revisions = REVISIONS1.copy()
assert new_revision not in revisions
revisions[new_revision] = new_dir
res = self.loader.load()
assert res == {"status": "eventful"}
stats1 = get_stats(self.loader.storage)
expected_stats = copy.deepcopy(stats0)
# did one new visit
expected_stats["origin_visit"] += 1
# with one more of the following objects
expected_stats["content"] += 1
expected_stats["directory"] += 1
expected_stats["revision"] += 1
# concluding into 1 new snapshot
expected_stats["snapshot"] += 1
assert stats1 == expected_stats
visit_status = assert_last_visit_matches(
self.loader.storage, self.repo_url, status="full", type="git"
)
assert visit_status.snapshot is not None
snapshot_id = visit_status.snapshot
snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id)
branches = snapshot.branches
assert branches[b"HEAD"] == SnapshotBranch(
target=b"refs/heads/master", target_type=TargetType.ALIAS,
)
assert branches[b"refs/heads/master"] == SnapshotBranch(
target=hash_to_bytes(new_revision), target_type=TargetType.REVISION,
)
# Merge branch1 into HEAD.
current = self.repo[b"HEAD"]
branch1 = self.repo[b"refs/heads/branch1"]
merged_tree = dulwich.objects.Tree()
for item in self.repo[current.tree].items():
merged_tree.add(*item)
for item in self.repo[branch1.tree].items():
merged_tree.add(*item)
merged_dir_id = "dab8a37df8db8666d4e277bef9a546f585b5bedd"
assert merged_tree.id.decode() == merged_dir_id
self.repo.object_store.add_object(merged_tree)
merge_commit = self.repo.do_commit(
b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id]
)
assert merge_commit.decode() not in revisions
revisions[merge_commit.decode()] = merged_tree.id.decode()
res = self.loader.load()
assert res == {"status": "eventful"}
stats2 = get_stats(self.loader.storage)
expected_stats = copy.deepcopy(stats1)
# one more visit
expected_stats["origin_visit"] += 1
# with 1 new directory and revision
expected_stats["directory"] += 1
expected_stats["revision"] += 1
# concluding into 1 new snapshot
expected_stats["snapshot"] += 1
assert stats2 == expected_stats
visit_status = assert_last_visit_matches(
self.loader.storage, self.repo_url, status="full", type="git"
)
assert visit_status.snapshot is not None
merge_snapshot_id = visit_status.snapshot
assert merge_snapshot_id != snapshot_id
merge_snapshot = snapshot_get_all_branches(
self.loader.storage, merge_snapshot_id
)
merge_branches = merge_snapshot.branches
assert merge_branches[b"HEAD"] == SnapshotBranch(
target=b"refs/heads/master", target_type=TargetType.ALIAS,
)
assert merge_branches[b"refs/heads/master"] == SnapshotBranch(
target=hash_to_bytes(merge_commit.decode()),
target_type=TargetType.REVISION,
)
def test_load_filter_branches(self):
filtered_branches = {b"refs/pull/42/merge"}
unfiltered_branches = {b"refs/pull/42/head"}
# Add branches to the repository on disk; some should be filtered by
# the loader, some should not.
for branch_name in filtered_branches | unfiltered_branches:
self.repo[branch_name] = self.repo[b"refs/heads/master"]
# Generate the expected snapshot from SNAPSHOT1 (which is the original
# state of the git repo)...
branches = dict(SNAPSHOT1.branches)
# ... and the unfiltered_branches, which are all pointing to the same
# commit as "refs/heads/master".
for branch_name in unfiltered_branches:
branches[branch_name] = branches[b"refs/heads/master"]
expected_snapshot = Snapshot(branches=branches)
# Load the modified repository
res = self.loader.load()
assert res == {"status": "eventful"}
check_snapshot(expected_snapshot, self.loader.storage)
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=expected_snapshot.id,
)
def test_load_dangling_symref(self):
with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f:
f.write(b"ref: refs/heads/dangling-branch\n")
res = self.loader.load()
assert res == {"status": "eventful"}
visit_status = assert_last_visit_matches(
self.loader.storage, self.repo_url, status="full", type="git"
)
snapshot_id = visit_status.snapshot
assert snapshot_id is not None
snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id)
branches = snapshot.branches
assert branches[b"HEAD"] == SnapshotBranch(
target=b"refs/heads/dangling-branch", target_type=TargetType.ALIAS,
)
assert branches[b"refs/heads/dangling-branch"] is None
stats = get_stats(self.loader.storage)
assert stats == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
def test_load_empty_tree(self):
empty_dir_id = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
# Check the empty tree does not already exist for some reason
# (it would make this test pointless)
assert list(
self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)])
) == [hash_to_bytes(empty_dir_id)]
empty_tree = dulwich.objects.Tree()
assert empty_tree.id.decode() == empty_dir_id
self.repo.object_store.add_object(empty_tree)
self.repo.do_commit(b"remove all bugs\n", tree=empty_tree.id)
res = self.loader.load()
assert res == {"status": "eventful"}
assert (
list(self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)]))
== []
)
results = self.loader.storage.directory_get_entries(hash_to_bytes(empty_dir_id))
assert results.next_page_token is None
assert results.results == []
def test_load_tag(self):
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd:
fd.write("print('Hello world')\n")
self.repo.stage([b"hello.py"])
new_revision = self.repo.do_commit(b"Hello world\n")
dulwich.porcelain.tag_create(
self.repo,
b"v1.0.0",
message=b"First release!",
annotated=True,
objectish=new_revision,
)
res = self.loader.load()
assert res == {"status": "eventful"}
branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id)
branch = branches["branches"][b"refs/tags/v1.0.0"]
assert branch.target_type == TargetType.RELEASE
release = self.loader.storage.release_get([branch.target])[0]
assert release.date is not None
assert release.author is not None
assert release == Release(
name=b"v1.0.0",
message=b"First release!\n",
target_type=ObjectType.REVISION,
target=bytehex_to_hash(new_revision),
author=release.author,
date=release.date,
synthetic=False,
)
def test_load_tag_minimal(self):
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd:
fd.write("print('Hello world')\n")
self.repo.stage([b"hello.py"])
new_revision = self.repo.do_commit(b"Hello world\n")
# dulwich.porcelain.tag_create doesn't allow creating tags without
# a tagger or a date, so we have to create it "manually"
tag = dulwich.objects.Tag()
tag.message = b"First release!\n"
tag.name = b"v1.0.0"
tag.object = (dulwich.objects.Commit, new_revision)
self.repo.object_store.add_object(tag)
self.repo[b"refs/tags/v1.0.0"] = tag.id
res = self.loader.load()
assert res == {"status": "eventful"}
branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id)
print(list(branches["branches"]))
branch = branches["branches"][b"refs/tags/v1.0.0"]
assert branch.target_type == TargetType.RELEASE
release = self.loader.storage.release_get([branch.target])[0]
assert release == Release(
id=bytehex_to_hash(tag.id),
name=b"v1.0.0",
message=b"First release!\n",
target_type=ObjectType.REVISION,
target=bytehex_to_hash(new_revision),
synthetic=False,
)
class TestGitLoaderFromDisk(FullGitLoaderTests):
"""Prepare a git directory repository to be loaded through a GitLoaderFromDisk.
This tests all git loader scenario.
"""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path):
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
tmp_path = str(tmp_path)
self.repo_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path=tmp_path
)
self.destination_path = os.path.join(tmp_path, archive_name)
self.loader = GitLoaderFromDisk(
swh_storage,
url=self.repo_url,
visit_date=datetime.datetime(
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc
),
directory=self.destination_path,
)
self.repo = dulwich.repo.Repo(self.destination_path)
class TestGitLoaderFromArchive(CommonGitLoaderTests):
"""Tests for GitLoaderFromArchive. Only tests common scenario."""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path):
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
self.repo_url = archive_path
self.loader = GitLoaderFromArchive(
swh_storage,
url=self.repo_url,
archive_path=archive_path,
visit_date=datetime.datetime(
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc
),
)

File Metadata

Mime Type
text/x-python
Expires
Thu, Jul 3, 12:21 PM (2 d, 2 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3405668

Event Timeline