diff --git a/swh/loader/git/from_disk.py b/swh/loader/git/from_disk.py index 7dd1d10..70995f8 100644 --- a/swh/loader/git/from_disk.py +++ b/swh/loader/git/from_disk.py @@ -1,390 +1,448 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from datetime import datetime import os import shutil from typing import Dict, Optional from dulwich.errors import ObjectFormatException try: from dulwich.errors import EmptyFileException # type: ignore except ImportError: # dulwich >= 0.20 from dulwich.objects import EmptyFileException +import dulwich.objects import dulwich.repo from swh.loader.core.loader import DVCSLoader from swh.model import hashutil from swh.model.model import Origin, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.origin import origin_get_latest_visit_status from swh.storage.interface import StorageInterface from . import converters, utils +def _check_tag(tag): + """Copy-paste of dulwich.objects.Tag, minus the tagger and time checks, + which are too strict and error on old tags.""" + # Copyright (C) 2007 James Westby + # Copyright (C) 2008-2013 Jelmer Vernooij + # + # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU + # General Public License as public by the Free Software Foundation; version 2.0 + # or (at your option) any later version. You can redistribute it and/or + # modify it under the terms of either of these two licenses. + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # + # You should have received a copy of the licenses; if not, see + # for a copy of the GNU General Public License + # and for a copy of the Apache + # License, Version 2.0. + dulwich.objects.ShaFile.check(tag) + tag._check_has_member("_object_sha", "missing object sha") + tag._check_has_member("_object_class", "missing object type") + tag._check_has_member("_name", "missing tag name") + + if not tag._name: + raise ObjectFormatException("empty tag name") + + dulwich.objects.check_hexsha(tag._object_sha, "invalid object sha") + + if tag._tag_time is not None: + dulwich.objects.check_time(tag._tag_time) + + from dulwich.objects import ( + _OBJECT_HEADER, + _TAG_HEADER, + _TAGGER_HEADER, + _TYPE_HEADER, + ) + + last = None + for field, _ in dulwich.objects._parse_message(tag._chunked_text): + if field == _OBJECT_HEADER and last is not None: + raise ObjectFormatException("unexpected object") + elif field == _TYPE_HEADER and last != _OBJECT_HEADER: + raise ObjectFormatException("unexpected type") + elif field == _TAG_HEADER and last != _TYPE_HEADER: + raise ObjectFormatException("unexpected tag name") + elif field == _TAGGER_HEADER and last != _TAG_HEADER: + raise ObjectFormatException("unexpected tagger") + last = field + + class GitLoaderFromDisk(DVCSLoader): """Load a git repository from a directory. """ visit_type = "git" def __init__( self, storage: StorageInterface, url: str, visit_date: Optional[datetime] = None, directory: Optional[str] = None, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, ): super().__init__( storage=storage, save_data_path=save_data_path, max_content_size=max_content_size, ) self.origin_url = url self.visit_date = visit_date self.directory = directory def prepare_origin_visit(self): self.origin = Origin(url=self.origin_url) def prepare(self): self.repo = dulwich.repo.Repo(self.directory) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def _check(self, obj): """Check the object's repository representation. If any errors in check exists, an ObjectFormatException is raised. Args: obj (object): Dulwich object read from the repository. """ - obj.check() - from dulwich.objects import Commit, Tag + if isinstance(obj, dulwich.objects.Tag): + _check_tag(obj) + else: + obj.check() try: # For additional checks on dulwich objects with date # for now, only checks on *time - if isinstance(obj, Commit): + if isinstance(obj, dulwich.objects.Commit): commit_time = obj._commit_time utils.check_date_time(commit_time) author_time = obj._author_time utils.check_date_time(author_time) - elif isinstance(obj, Tag): + elif isinstance(obj, dulwich.objects.Tag): tag_time = obj._tag_time - utils.check_date_time(tag_time) + if tag_time: + utils.check_date_time(tag_time) except Exception as e: raise ObjectFormatException(e) def get_object(self, oid): """Given an object id, return the object if it is found and not malformed in some way. Args: oid (bytes): the object's identifier Returns: The object if found without malformation """ try: # some errors are raised when reading the object obj = self.repo[oid] # some we need to check ourselves self._check(obj) except KeyError: _id = oid.decode("utf-8") self.log.warn( "object %s not found, skipping" % _id, extra={ "swh_type": "swh_loader_git_missing_object", "swh_object_id": _id, "origin_url": self.origin.url, }, ) return None except ObjectFormatException as e: id_ = oid.decode("utf-8") self.log.warn( "object %s malformed (%s), skipping", id_, e.args[0], extra={ "swh_type": "swh_loader_git_missing_object", "swh_object_id": id_, "origin_url": self.origin.url, }, ) return None except EmptyFileException: id_ = oid.decode("utf-8") self.log.warn( "object %s corrupted (empty file), skipping", id_, extra={ "swh_type": "swh_loader_git_missing_object", "swh_object_id": id_, "origin_url": self.origin.url, }, ) else: return obj def fetch_data(self): """Fetch the data from the data source""" visit_status = origin_get_latest_visit_status( self.storage, self.origin_url, require_snapshot=True ) self.previous_snapshot_id = ( None if visit_status is None else visit_status.snapshot ) type_to_ids = defaultdict(list) for oid in self.iter_objects(): obj = self.get_object(oid) if obj is None: continue type_name = obj.type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b"blob"]) def get_content_ids(self): """Get the content identifiers from the git repository""" for oid in self.type_to_ids[b"blob"]: yield converters.dulwich_blob_to_content_id(self.repo[oid]) def get_contents(self): """Get the contents that need to be loaded""" missing_contents = set( self.storage.content_missing(self.get_content_ids(), "sha1_git") ) for oid in missing_contents: yield converters.dulwich_blob_to_content( self.repo[hashutil.hash_to_bytehex(oid)] ) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b"tree"]) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tree"]) def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set( self.storage.directory_missing(sorted(self.get_directory_ids())) ) for oid in missing_dirs: yield converters.dulwich_tree_to_directory( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log ) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b"commit"]) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return ( hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"commit"] ) def get_revisions(self): """Get the revisions that need to be loaded""" missing_revs = set( self.storage.revision_missing(sorted(self.get_revision_ids())) ) for oid in missing_revs: yield converters.dulwich_commit_to_revision( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log ) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b"tag"]) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tag"]) def get_releases(self): """Get the releases that need to be loaded""" missing_rels = set(self.storage.release_missing(sorted(self.get_release_ids()))) for oid in missing_rels: yield converters.dulwich_tag_to_release( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log ) def get_snapshot(self): """Turn the list of branches into a snapshot to load""" branches: Dict[bytes, Optional[SnapshotBranch]] = {} for ref, target in self.repo.refs.as_dict().items(): if utils.ignore_branch_name(ref): continue obj = self.get_object(target) if obj: target_type = converters.DULWICH_TARGET_TYPES[obj.type_name] branches[ref] = SnapshotBranch( target=hashutil.bytehex_to_hash(target), target_type=target_type, ) else: branches[ref] = None dangling_branches = {} for ref, target in self.repo.refs.get_symrefs().items(): if utils.ignore_branch_name(ref): continue branches[ref] = SnapshotBranch(target=target, target_type=TargetType.ALIAS) if target not in branches: # This handles the case where the pointer is "dangling". # There's a chance that a further symbolic reference will # override this default value, which is totally fine. dangling_branches[target] = ref branches[target] = None utils.warn_dangling_branches( branches, dangling_branches, self.log, self.origin_url ) self.snapshot = Snapshot(branches=branches) return self.snapshot def save_data(self): """We already have the data locally, no need to save it""" pass def load_status(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" eventful = False if self.previous_snapshot_id: eventful = self.snapshot.id != self.previous_snapshot_id else: eventful = bool(self.snapshot.branches) return {"status": ("eventful" if eventful else "uneventful")} class GitLoaderFromArchive(GitLoaderFromDisk): """Load a git repository from an archive. This loader ingests a git repository compressed into an archive. The supported archive formats are ``.zip`` and ``.tar.gz``. From an input tarball named ``my-git-repo.zip``, the following layout is expected in it:: my-git-repo/ ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... Nevertheless, the loader is able to ingest tarballs with the following layouts too:: . ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... or:: other-repo-name/ ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... """ def __init__(self, *args, archive_path, **kwargs): super().__init__(*args, **kwargs) self.temp_dir = self.repo_path = None self.archive_path = archive_path def project_name_from_archive(self, archive_path): """Compute the project name from the archive's path. """ archive_name = os.path.basename(archive_path) for ext in (".zip", ".tar.gz", ".tgz"): if archive_name.lower().endswith(ext): archive_name = archive_name[: -len(ext)] break return archive_name def prepare(self): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoaderFromDisk does 3. Load as GitLoaderFromDisk does """ project_name = self.project_name_from_archive(self.archive_path) self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( project_name, self.archive_path ) self.log.info( "Project %s - Uncompressing archive %s at %s", self.origin_url, os.path.basename(self.archive_path), self.repo_path, ) self.directory = self.repo_path super().prepare() def cleanup(self): """Cleanup the temporary location (if it exists). """ if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.log.info( "Project %s - Done injecting %s" % (self.origin_url, self.repo_path) ) diff --git a/swh/loader/git/tests/test_from_disk.py b/swh/loader/git/tests/test_from_disk.py index a0e9b28..7bbf572 100644 --- a/swh/loader/git/tests/test_from_disk.py +++ b/swh/loader/git/tests/test_from_disk.py @@ -1,471 +1,545 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os.path from unittest import TestCase +import dulwich.objects +import dulwich.porcelain import dulwich.repo import pytest from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) -from swh.model.hashutil import hash_to_bytes -from swh.model.model import Snapshot, SnapshotBranch, TargetType +from swh.model.hashutil import bytehex_to_hash, hash_to_bytes +from swh.model.model import ObjectType, Release, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.snapshot import snapshot_get_all_branches SNAPSHOT1 = Snapshot( id=hash_to_bytes("a23699280a82a043f8c0994cf1631b568f716f95"), branches={ b"HEAD": SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ), b"refs/heads/master": SnapshotBranch( target=hash_to_bytes("2f01f5ca7e391a2f08905990277faf81e709a649"), target_type=TargetType.REVISION, ), b"refs/heads/branch1": SnapshotBranch( target=hash_to_bytes("b0a77609903f767a2fd3d769904ef9ef68468b87"), target_type=TargetType.REVISION, ), b"refs/heads/branch2": SnapshotBranch( target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"), target_type=TargetType.REVISION, ), b"refs/tags/branch2-after-delete": SnapshotBranch( target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"), target_type=TargetType.REVISION, ), b"refs/tags/branch2-before-delete": SnapshotBranch( target=hash_to_bytes("1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b"), target_type=TargetType.REVISION, ), }, ) # directory hashes obtained with: # gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a # swh-hashtree --ignore '.git' --path . # gco 2f01f5ca7e391a2f08905990277faf81e709a649 # swh-hashtree --ignore '.git' --path . # gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777 # swh-hashtree --ignore '.git' --path . # gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b # swh-hashtree --ignore '.git' --path . # gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c # swh-hashtree --ignore '.git' --path . # gco b0a77609903f767a2fd3d769904ef9ef68468b87 # swh-hashtree --ignore '.git' --path . # gco bd746cd1913721b269b395a56a97baf6755151c2 # swh-hashtree --ignore '.git' --path . REVISIONS1 = { "b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a": ( "40dbdf55dfd4065422462cc74a949254aefa972e" ), "2f01f5ca7e391a2f08905990277faf81e709a649": ( "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" ), "bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777": ( "b43724545b4759244bb54be053c690649161411c" ), "1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b": ( "fbf70528223d263661b5ad4b80f26caf3860eb8e" ), "79f65ac75f79dda6ff03d66e1242702ab67fb51c": ( "5df34ec74d6f69072d9a0a6677d8efbed9b12e60" ), "b0a77609903f767a2fd3d769904ef9ef68468b87": ( "9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338" ), "bd746cd1913721b269b395a56a97baf6755151c2": ( "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" ), } class CommonGitLoaderTests: """Common tests for all git loaders.""" def test_load(self): """Loads a simple repository (made available by `setUp()`), and checks everything was added in the storage.""" res = self.loader.load() assert res == {"status": "eventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) stats = get_stats(self.loader.storage) assert stats == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(SNAPSHOT1, self.loader.storage) def test_load_unchanged(self): """Checks loading a repository a second time does not add any extra data.""" res = self.loader.load() assert res == {"status": "eventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) stats0 = get_stats(self.loader.storage) assert stats0 == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } res = self.loader.load() assert res == {"status": "uneventful"} stats1 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats0) expected_stats["origin_visit"] += 1 assert stats1 == expected_stats check_snapshot(SNAPSHOT1, self.loader.storage) assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) def test_load_visit_without_snapshot_so_status_failed(self): # unfortunately, monkey-patch the hard way, self.loader is already instantiated # (patching won't work self.loader is already instantiated) # Make get_contents fail for some reason self.loader.get_contents = None res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="failed", type="git", snapshot=None, ) def test_load_visit_with_snapshot_so_status_partial(self): # unfortunately, monkey-patch the hard way, self.loader is already instantiated # (patching won't work self.loader is already instantiated) # fake store_metadata raising for some reason, so we could have a snapshot id # at this point in time self.loader.store_metadata = None # fake having a snapshot so the visit status is partial self.loader.loaded_snapshot_id = hash_to_bytes( "a23699280a82a043f8c0994cf1631b568f716f95" ) res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="partial", type="git", snapshot=None, ) class FullGitLoaderTests(CommonGitLoaderTests): """Tests for GitLoader (from disk or not). Includes the common ones, and add others that only work with a local dir. """ def test_load_changed(self): """Loads a repository, makes some changes by adding files, commits, and merges, load it again, and check the storage contains everything it should.""" # Initial load res = self.loader.load() assert res == {"status": "eventful"} stats0 = get_stats(self.loader.storage) assert stats0 == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } # Load with a new file + revision with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: fd.write("print('Hello world')\n") self.repo.stage([b"hello.py"]) new_revision = self.repo.do_commit(b"Hello world\n").decode() new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858" assert self.repo[new_revision.encode()].tree == new_dir.encode() revisions = REVISIONS1.copy() assert new_revision not in revisions revisions[new_revision] = new_dir res = self.loader.load() assert res == {"status": "eventful"} stats1 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats0) # did one new visit expected_stats["origin_visit"] += 1 # with one more of the following objects expected_stats["content"] += 1 expected_stats["directory"] += 1 expected_stats["revision"] += 1 # concluding into 1 new snapshot expected_stats["snapshot"] += 1 assert stats1 == expected_stats visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) assert visit_status.snapshot is not None snapshot_id = visit_status.snapshot snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id) branches = snapshot.branches assert branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ) assert branches[b"refs/heads/master"] == SnapshotBranch( target=hash_to_bytes(new_revision), target_type=TargetType.REVISION, ) # Merge branch1 into HEAD. current = self.repo[b"HEAD"] branch1 = self.repo[b"refs/heads/branch1"] merged_tree = dulwich.objects.Tree() for item in self.repo[current.tree].items(): merged_tree.add(*item) for item in self.repo[branch1.tree].items(): merged_tree.add(*item) merged_dir_id = "dab8a37df8db8666d4e277bef9a546f585b5bedd" assert merged_tree.id.decode() == merged_dir_id self.repo.object_store.add_object(merged_tree) merge_commit = self.repo.do_commit( b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id] ) assert merge_commit.decode() not in revisions revisions[merge_commit.decode()] = merged_tree.id.decode() res = self.loader.load() assert res == {"status": "eventful"} stats2 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats1) # one more visit expected_stats["origin_visit"] += 1 # with 1 new directory and revision expected_stats["directory"] += 1 expected_stats["revision"] += 1 # concluding into 1 new snapshot expected_stats["snapshot"] += 1 assert stats2 == expected_stats visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) assert visit_status.snapshot is not None merge_snapshot_id = visit_status.snapshot assert merge_snapshot_id != snapshot_id merge_snapshot = snapshot_get_all_branches( self.loader.storage, merge_snapshot_id ) merge_branches = merge_snapshot.branches assert merge_branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ) assert merge_branches[b"refs/heads/master"] == SnapshotBranch( target=hash_to_bytes(merge_commit.decode()), target_type=TargetType.REVISION, ) def test_load_filter_branches(self): filtered_branches = {b"refs/pull/42/merge"} unfiltered_branches = {b"refs/pull/42/head"} # Add branches to the repository on disk; some should be filtered by # the loader, some should not. for branch_name in filtered_branches | unfiltered_branches: self.repo[branch_name] = self.repo[b"refs/heads/master"] # Generate the expected snapshot from SNAPSHOT1 (which is the original # state of the git repo)... branches = dict(SNAPSHOT1.branches) # ... and the unfiltered_branches, which are all pointing to the same # commit as "refs/heads/master". for branch_name in unfiltered_branches: branches[branch_name] = branches[b"refs/heads/master"] expected_snapshot = Snapshot(branches=branches) # Load the modified repository res = self.loader.load() assert res == {"status": "eventful"} check_snapshot(expected_snapshot, self.loader.storage) assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=expected_snapshot.id, ) def test_load_dangling_symref(self): with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f: f.write(b"ref: refs/heads/dangling-branch\n") res = self.loader.load() assert res == {"status": "eventful"} visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) snapshot_id = visit_status.snapshot assert snapshot_id is not None snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id) branches = snapshot.branches assert branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/dangling-branch", target_type=TargetType.ALIAS, ) assert branches[b"refs/heads/dangling-branch"] is None stats = get_stats(self.loader.storage) assert stats == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } def test_load_empty_tree(self): empty_dir_id = "4b825dc642cb6eb9a060e54bf8d69288fbee4904" # Check the empty tree does not already exist for some reason # (it would make this test pointless) assert list( self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)]) ) == [hash_to_bytes(empty_dir_id)] empty_tree = dulwich.objects.Tree() assert empty_tree.id.decode() == empty_dir_id self.repo.object_store.add_object(empty_tree) self.repo.do_commit(b"remove all bugs\n", tree=empty_tree.id) res = self.loader.load() assert res == {"status": "eventful"} assert ( list(self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)])) == [] ) results = self.loader.storage.directory_get_entries(hash_to_bytes(empty_dir_id)) assert results.next_page_token is None assert results.results == [] + def test_load_tag(self): + with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: + fd.write("print('Hello world')\n") + + self.repo.stage([b"hello.py"]) + new_revision = self.repo.do_commit(b"Hello world\n") + + dulwich.porcelain.tag_create( + self.repo, + b"v1.0.0", + message=b"First release!", + annotated=True, + objectish=new_revision, + ) + + res = self.loader.load() + assert res == {"status": "eventful"} + + branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id) + + print(list(branches["branches"])) + branch = branches["branches"][b"refs/tags/v1.0.0"] + assert branch.target_type == TargetType.RELEASE + + release = self.loader.storage.release_get([branch.target])[0] + assert release.date is not None + assert release.author is not None + assert release == Release( + name=b"v1.0.0", + message=b"First release!\n", + target_type=ObjectType.REVISION, + target=bytehex_to_hash(new_revision), + author=release.author, + date=release.date, + synthetic=False, + ) + + def test_load_tag_minimal(self): + with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: + fd.write("print('Hello world')\n") + + self.repo.stage([b"hello.py"]) + new_revision = self.repo.do_commit(b"Hello world\n") + + # dulwich.porcelain.tag_create doesn't allow creating tags without + # a tagger or a date, so we have to create it "manually" + tag = dulwich.objects.Tag() + tag.message = b"First release!\n" + tag.name = b"v1.0.0" + tag.object = (dulwich.objects.Commit, new_revision) + self.repo.object_store.add_object(tag) + self.repo[b"refs/tags/v1.0.0"] = tag.id + + res = self.loader.load() + assert res == {"status": "eventful"} + + branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id) + + print(list(branches["branches"])) + branch = branches["branches"][b"refs/tags/v1.0.0"] + assert branch.target_type == TargetType.RELEASE + + release = self.loader.storage.release_get([branch.target])[0] + assert release == Release( + id=bytehex_to_hash(tag.id), + name=b"v1.0.0", + message=b"First release!\n", + target_type=ObjectType.REVISION, + target=bytehex_to_hash(new_revision), + synthetic=False, + ) + class GitLoaderFromDiskTest(TestCase, FullGitLoaderTests): """Prepare a git directory repository to be loaded through a GitLoaderFromDisk. This tests all git loader scenario. """ @pytest.fixture(autouse=True) def init(self, swh_storage, datadir, tmp_path): archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) self.loader = GitLoaderFromDisk( swh_storage, url=self.repo_url, visit_date=datetime.datetime( 2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc ), directory=self.destination_path, ) self.repo = dulwich.repo.Repo(self.destination_path) class GitLoaderFromArchiveTest(TestCase, CommonGitLoaderTests): """Tests for GitLoaderFromArchive. Only tests common scenario.""" @pytest.fixture(autouse=True) def init(self, swh_storage, datadir, tmp_path): archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") self.repo_url = archive_path self.loader = GitLoaderFromArchive( swh_storage, url=self.repo_url, archive_path=archive_path, visit_date=datetime.datetime( 2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc ), )