diff --git a/swh/loader/git/tests/test_converters.py b/swh/loader/git/tests/test_converters.py index 21c8f89..45de2ec 100644 --- a/swh/loader/git/tests/test_converters.py +++ b/swh/loader/git/tests/test_converters.py @@ -1,473 +1,471 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile -import unittest import dulwich.repo import pytest import swh.loader.git.converters as converters from swh.model.hashutil import bytehex_to_hash, hash_to_bytes from swh.model.model import ( Content, ObjectType, Person, Release, Revision, RevisionType, Timestamp, TimestampWithTimezone, ) TEST_DATA = os.path.join(os.path.dirname(__file__), "data") GPGSIG = ( b"-----BEGIN PGP SIGNATURE-----\n" b"\n" b"iQJLBAABCAA1FiEEAOWDevQbOk/9ITMF6ImSleOlnUcFAl8EnS4XHGRhdmlkLmRv\n" b"dWFyZEBzZGZhMy5vcmcACgkQ6ImSleOlnUdrqQ/8C5RO4NZ5Qr/dwAy2cPA7ktkY\n" b"1oUjKtspQoPbC1X3MXVa1aWo9B3KuOMR2URw44RhMNFwjccLOhfss06E8p7CZr2H\n" b"uR3CzdDw7i52jHLCL2M2ZMaPAEbQuHjXWiUWIUXz9So8YwpTyd2XQneyOC2RDDEI\n" b"I2NVbmiMeDz33jJYPrQO0QayW+ErW+xgBF7N/qS9jFWsdV1ZNfn9NxkTH8UdGuAX\n" b"583P+0tVC2DjXc6vORVhyFzyfn1A9wHosbtWI2Mpa+zezPjoPSkcyQAJu2GyOkMC\n" b"YzSjJdQVqyovo+INkIf6PuUNdp41886BG/06xwT8fl4sVsyO51lNIfgH0DMwfTTB\n" b"ZgThYnvvO7SrXDm3QzBTXkvAiHiFFl3iNyGkCyxvgVmaTntuFT+cP+HD/pCiGaC+\n" b"jHzRwfUrmuLd/lLPyq3JXBibyjnfd3SVS+7q1NZHJ4WUmCboZ0+pfrEl65mEQ/Hz\n" b"J1qCwQ/3SsTB77ANf6lLzGSowjjrtHcBTkTbFxR4ACUhiBbosyDKpHTM7fzGFGjo\n" b"EIjohzrEnqR3bbyxJkK+nxoOByhIRdowgyeJ02I4neMyLJqcaup8NMWCddxqjaPt\n" b"YobghnjaDqEd+suL/v83hbZUAZHNO3i1OZYGMqzp1WHikDPoTwGP76baqBoXi56T\n" b"4WSpxCAJRDODHLk1HgU=\n" b"=73wF" b"\n" b"-----END PGP SIGNATURE-----" ) MERGETAG = ( b"object 9768d0b576dbaaecd80abedad6dfd0d72f1476da\n" b"type commit\n" b"tag v0.0.1\n" b"tagger David Douard 1594138133 +0200\n" b"\n" b"v0.0.1\n" b"-----BEGIN PGP SIGNATURE-----\n" b"\n" b"iQJLBAABCAA1FiEEAOWDevQbOk/9ITMF6ImSleOlnUcFAl8EnhkXHGRhdmlkLmRv\n" b"dWFyZEBzZGZhMy5vcmcACgkQ6ImSleOlnUcdzg//ZW9y2xU5JFQuUsBe/LfKrs+m\n" b"0ohVInPKXwAfpB3+gn/XtTSLe+Nnr8+QEZyVRCUz2gpGZ2tNqRjhYLIX4x5KKlaV\n" b"rfl/6Cy7zibsxxuzA1h7HylCs3IPsueQpznVHUwD9jQ5baGJSc2Lt1LufXTueHZJ\n" b"Oc0oLiP5xCZcPqeX8R/4zUUImJZ1QrPeKmQ/3F+Iq62iWp7nWDp8PtwpykSiYlNf\n" b"KrJM8omGvrlrWLtfPNUaQFClXwnwK1/HyNY2kYan6K5NtsIl2UX0LZ42GkRjJIrb\n" b"q4TFIZWZ6xndtEhHEX6B8Q5TZV6sqPgNnfGpbhj8BDoZgjD0Y43fzfDiZ0Bl2tph\n" b"tXaLg3SX/UUjFVzC1zkoQ2MR7+j8NVKauAsBINpKF4pMGsrsVRk8764pgO49iQ+S\n" b"8JVCVV76dNNm1gd7BbhFAdIAiegBtsEF69niJBoHKYLlrT8E8hDkF/gk4IkimPqf\n" b"UHtw/fPhVW3B4G2skd013NJGcnRj5oKtaM99d2Roxc3vhSRiTsoaM8BM9NDvLmJg\n" b"35rWEOnet39iJIMCHk3AYaJl8QmUhllDdr6vygaBVeVEf27m2c3NzONmIKpWqa2J\n" b"kTpF4cmzHYro34G7WuJ1bYvmLb6qWNQt9wd8RW+J1kVm5I8dkjPzLUougBpOd0YL\n" b"Bl5UTQILbV4Tv8ZlmJM=\n" b"=s1lv\n" b"-----END PGP SIGNATURE-----" ) class SWHObjectType: """Dulwich lookalike ObjectType class """ def __init__(self, type_name): self.type_name = type_name class SWHTag: """Dulwich lookalike tag class """ def __init__( self, name, type_name, target, target_type, tagger, tag_time, tag_timezone, message, signature, ): self.name = name self.type_name = type_name self.object = SWHObjectType(target_type), target self.tagger = tagger self.message = message self.signature = signature self.tag_time = tag_time self.tag_timezone = tag_timezone self._tag_timezone_neg_utc = False def sha(self): from hashlib import sha1 return sha1() @pytest.mark.fs -class TestConverters(unittest.TestCase): +class TestConverters: @classmethod - def setUpClass(cls): - super().setUpClass() + def setup_class(cls): cls.repo_path = tempfile.mkdtemp() bundle = os.path.join(TEST_DATA, "git-repos", "example-submodule.bundle") git = subprocess.Popen( ["git", "clone", "--quiet", "--bare", "--mirror", bundle, cls.repo_path], cwd=TEST_DATA, ) # flush stdout of xz git.communicate() cls.repo = dulwich.repo.Repo(cls.repo_path) @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.repo_path) def test_blob_to_content(self): content_id = b"28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0" content = converters.dulwich_blob_to_content(self.repo[content_id]) expected_content = Content( sha1_git=bytehex_to_hash(content_id), sha1=hash_to_bytes("4850a3420a2262ff061cb296fb915430fa92301c"), sha256=hash_to_bytes( "fee7c8a485a10321ad94b64135073cb5" "5f22cb9f57fa2417d2adfb09d310adef" ), blake2s256=hash_to_bytes( "5d71873f42a137f6d89286e43677721e574" "1fa05ce4cd5e3c7ea7c44d4c2d10b" ), data=( b'[submodule "example-dependency"]\n' b"\tpath = example-dependency\n" b"\turl = https://github.com/githubtraining/" b"example-dependency.git\n" ), length=124, status="visible", ) - self.assertEqual(content, expected_content) + assert content == expected_content def test_convertion_wrong_input(self): class Something: type_name = b"something-not-the-right-type" m = { "blob": converters.dulwich_blob_to_content, "blob2": converters.dulwich_blob_to_content_id, "tree": converters.dulwich_tree_to_directory, "commit": converters.dulwich_tree_to_directory, "tag": converters.dulwich_tag_to_release, } for _callable in m.values(): - with self.assertRaises(ValueError): + with pytest.raises(ValueError): _callable(Something()) def test_commit_to_revision(self): sha1 = b"9768d0b576dbaaecd80abedad6dfd0d72f1476da" revision = converters.dulwich_commit_to_revision(self.repo[sha1]) expected_revision = Revision( id=hash_to_bytes("9768d0b576dbaaecd80abedad6dfd0d72f1476da"), directory=b"\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca", type=RevisionType.GIT, committer=Person( name=b"Stefano Zacchiroli", fullname=b"Stefano Zacchiroli ", email=b"zack@upsilon.cc", ), author=Person( name=b"Stefano Zacchiroli", fullname=b"Stefano Zacchiroli ", email=b"zack@upsilon.cc", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1443083765, microseconds=0,), negative_utc=False, offset=120, ), message=b"add submodule dependency\n", metadata=None, extra_headers=(), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1443083765, microseconds=0,), negative_utc=False, offset=120, ), parents=(b"\xc3\xc5\x88q23`\x9f[\xbb\xb2\xd9\xe7\xf3\xfbJf\x0f?r",), synthetic=False, ) - self.assertEqual(revision, expected_revision) + assert revision == expected_revision def test_commit_to_revision_with_extra_headers(self): sha1 = b"322f5bc915e50fc25e85226b5a182bded0e98e4b" revision = converters.dulwich_commit_to_revision(self.repo[sha1]) expected_revision = Revision( id=hash_to_bytes(sha1.decode()), directory=bytes.fromhex("f8ec06e4ed7b9fff4918a0241a48023143f30000"), type=RevisionType.GIT, committer=Person( name=b"David Douard", fullname=b"David Douard ", email=b"david.douard@sdfa3.org", ), author=Person( name=b"David Douard", fullname=b"David Douard ", email=b"david.douard@sdfa3.org", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1594137902, microseconds=0,), negative_utc=False, offset=120, ), message=b"Am\xe9lioration du fichier READM\xa4\n", metadata=None, extra_headers=((b"encoding", b"ISO-8859-15"), (b"gpgsig", GPGSIG)), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1594136900, microseconds=0,), negative_utc=False, offset=120, ), parents=(bytes.fromhex("c730509025c6e81947102b2d77bc4dc1cade9489"),), synthetic=False, ) assert revision == expected_revision def test_commit_to_revision_with_extra_headers_mergetag(self): sha1 = b"3ab3da4bf0f81407be16969df09cd1c8af9ac703" revision = converters.dulwich_commit_to_revision(self.repo[sha1]) expected_revision = Revision( id=hash_to_bytes(sha1.decode()), directory=bytes.fromhex("faa4b64a841ca3e3f07d6501caebda2e3e8e544e"), type=RevisionType.GIT, committer=Person( name=b"David Douard", fullname=b"David Douard ", email=b"david.douard@sdfa3.org", ), author=Person( name=b"David Douard", fullname=b"David Douard ", email=b"david.douard@sdfa3.org", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1594138183, microseconds=0,), negative_utc=False, offset=120, ), message=b"Merge tag 'v0.0.1' into readme\n\nv0.0.1\n", metadata=None, extra_headers=((b"encoding", b"ISO-8859-15"), (b"mergetag", MERGETAG)), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1594138183, microseconds=0,), negative_utc=False, offset=120, ), parents=( bytes.fromhex("322f5bc915e50fc25e85226b5a182bded0e98e4b"), bytes.fromhex("9768d0b576dbaaecd80abedad6dfd0d72f1476da"), ), synthetic=False, ) assert revision == expected_revision def test_author_line_to_author(self): # edge case out of the way - with self.assertRaises(TypeError): + with pytest.raises(TypeError): converters.parse_author(None) tests = { b"a ": Person( name=b"a", email=b"b@c.com", fullname=b"a ", ), b"": Person( name=None, email=b"foo@bar.com", fullname=b"", ), b"malformed ": Person( name=b"trailing", email=b"sp@c.e", fullname=b"trailing ", ), b"no": Person(name=b"no", email=b"sp@c.e", fullname=b"no",), b" <>": Person(name=None, email=None, fullname=b" <>",), b"something": Person(name=b"something", email=None, fullname=b"something"), } for author in sorted(tests): parsed_author = tests[author] - self.assertEqual(parsed_author, converters.parse_author(author)) + assert parsed_author == converters.parse_author(author) def test_dulwich_tag_to_release_no_author_no_date(self): target = b"641fb6e08ddb2e4fd096dcf18e80b894bf" message = b"some release message" tag = SWHTag( name=b"blah", type_name=b"tag", target=target, target_type=b"commit", message=message, signature=None, tagger=None, tag_time=None, tag_timezone=None, ) # when actual_release = converters.dulwich_tag_to_release(tag) # then expected_release = Release( author=None, date=None, id=b"\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t", message=message, metadata=None, name=b"blah", synthetic=False, target=hash_to_bytes(target.decode()), target_type=ObjectType.REVISION, ) - self.assertEqual(actual_release, expected_release) + assert actual_release == expected_release def test_dulwich_tag_to_release_author_and_date(self): tagger = b"hey dude " target = b"641fb6e08ddb2e4fd096dcf18e80b894bf" message = b"some release message" import datetime date = datetime.datetime(2007, 12, 5, tzinfo=datetime.timezone.utc).timestamp() tag = SWHTag( name=b"blah", type_name=b"tag", target=target, target_type=b"commit", message=message, signature=None, tagger=tagger, tag_time=date, tag_timezone=0, ) # when actual_release = converters.dulwich_tag_to_release(tag) # then expected_release = Release( author=Person( email=b"hello@mail.org", fullname=b"hey dude ", name=b"hey dude", ), date=TimestampWithTimezone( negative_utc=False, offset=0, timestamp=Timestamp(seconds=1196812800, microseconds=0,), ), id=b"\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t", message=message, metadata=None, name=b"blah", synthetic=False, target=hash_to_bytes(target.decode()), target_type=ObjectType.REVISION, ) - self.assertEqual(actual_release, expected_release) + assert actual_release == expected_release def test_dulwich_tag_to_release_author_no_date(self): # to reproduce bug T815 (fixed) tagger = b"hey dude " target = b"641fb6e08ddb2e4fd096dcf18e80b894bf" message = b"some release message" tag = SWHTag( name=b"blah", type_name=b"tag", target=target, target_type=b"commit", message=message, signature=None, tagger=tagger, tag_time=None, tag_timezone=None, ) # when actual_release = converters.dulwich_tag_to_release(tag) # then expected_release = Release( author=Person( email=b"hello@mail.org", fullname=b"hey dude ", name=b"hey dude", ), date=None, id=b"\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t", message=message, metadata=None, name=b"blah", synthetic=False, target=hash_to_bytes(target.decode()), target_type=ObjectType.REVISION, ) - self.assertEqual(actual_release, expected_release) + assert actual_release == expected_release def test_dulwich_tag_to_release_signature(self): target = b"641fb6e08ddb2e4fd096dcf18e80b894bf" message = b"some release message" tag = SWHTag( name=b"blah", type_name=b"tag", target=target, target_type=b"commit", message=message, signature=GPGSIG, tagger=None, tag_time=None, tag_timezone=None, ) # when actual_release = converters.dulwich_tag_to_release(tag) # then expected_release = Release( author=None, date=None, id=b"\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t", message=message + GPGSIG, metadata=None, name=b"blah", synthetic=False, target=hash_to_bytes(target.decode()), target_type=ObjectType.REVISION, ) - self.assertEqual(actual_release, expected_release) + assert actual_release == expected_release diff --git a/swh/loader/git/tests/test_from_disk.py b/swh/loader/git/tests/test_from_disk.py index 718513d..725259a 100644 --- a/swh/loader/git/tests/test_from_disk.py +++ b/swh/loader/git/tests/test_from_disk.py @@ -1,544 +1,543 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os.path -from unittest import TestCase import dulwich.objects import dulwich.porcelain import dulwich.repo import pytest from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import bytehex_to_hash, hash_to_bytes from swh.model.model import ObjectType, Release, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.snapshot import snapshot_get_all_branches SNAPSHOT1 = Snapshot( id=hash_to_bytes("a23699280a82a043f8c0994cf1631b568f716f95"), branches={ b"HEAD": SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ), b"refs/heads/master": SnapshotBranch( target=hash_to_bytes("2f01f5ca7e391a2f08905990277faf81e709a649"), target_type=TargetType.REVISION, ), b"refs/heads/branch1": SnapshotBranch( target=hash_to_bytes("b0a77609903f767a2fd3d769904ef9ef68468b87"), target_type=TargetType.REVISION, ), b"refs/heads/branch2": SnapshotBranch( target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"), target_type=TargetType.REVISION, ), b"refs/tags/branch2-after-delete": SnapshotBranch( target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"), target_type=TargetType.REVISION, ), b"refs/tags/branch2-before-delete": SnapshotBranch( target=hash_to_bytes("1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b"), target_type=TargetType.REVISION, ), }, ) # directory hashes obtained with: # gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a # swh-hashtree --ignore '.git' --path . # gco 2f01f5ca7e391a2f08905990277faf81e709a649 # swh-hashtree --ignore '.git' --path . # gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777 # swh-hashtree --ignore '.git' --path . # gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b # swh-hashtree --ignore '.git' --path . # gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c # swh-hashtree --ignore '.git' --path . # gco b0a77609903f767a2fd3d769904ef9ef68468b87 # swh-hashtree --ignore '.git' --path . # gco bd746cd1913721b269b395a56a97baf6755151c2 # swh-hashtree --ignore '.git' --path . REVISIONS1 = { "b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a": ( "40dbdf55dfd4065422462cc74a949254aefa972e" ), "2f01f5ca7e391a2f08905990277faf81e709a649": ( "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" ), "bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777": ( "b43724545b4759244bb54be053c690649161411c" ), "1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b": ( "fbf70528223d263661b5ad4b80f26caf3860eb8e" ), "79f65ac75f79dda6ff03d66e1242702ab67fb51c": ( "5df34ec74d6f69072d9a0a6677d8efbed9b12e60" ), "b0a77609903f767a2fd3d769904ef9ef68468b87": ( "9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338" ), "bd746cd1913721b269b395a56a97baf6755151c2": ( "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" ), } class CommonGitLoaderTests: """Common tests for all git loaders.""" def test_load(self): """Loads a simple repository (made available by `setUp()`), and checks everything was added in the storage.""" res = self.loader.load() assert res == {"status": "eventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) stats = get_stats(self.loader.storage) assert stats == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(SNAPSHOT1, self.loader.storage) def test_load_unchanged(self): """Checks loading a repository a second time does not add any extra data.""" res = self.loader.load() assert res == {"status": "eventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) stats0 = get_stats(self.loader.storage) assert stats0 == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } res = self.loader.load() assert res == {"status": "uneventful"} stats1 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats0) expected_stats["origin_visit"] += 1 assert stats1 == expected_stats check_snapshot(SNAPSHOT1, self.loader.storage) assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) def test_load_visit_without_snapshot_so_status_failed(self): # unfortunately, monkey-patch the hard way, self.loader is already instantiated # (patching won't work self.loader is already instantiated) # Make get_contents fail for some reason self.loader.get_contents = None res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="failed", type="git", snapshot=None, ) def test_load_visit_with_snapshot_so_status_partial(self): # unfortunately, monkey-patch the hard way, self.loader is already instantiated # (patching won't work self.loader is already instantiated) # fake store_metadata raising for some reason, so we could have a snapshot id # at this point in time self.loader.store_metadata = None # fake having a snapshot so the visit status is partial self.loader.loaded_snapshot_id = hash_to_bytes( "a23699280a82a043f8c0994cf1631b568f716f95" ) res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="partial", type="git", snapshot=None, ) class FullGitLoaderTests(CommonGitLoaderTests): """Tests for GitLoader (from disk or not). Includes the common ones, and add others that only work with a local dir. """ def test_load_changed(self): """Loads a repository, makes some changes by adding files, commits, and merges, load it again, and check the storage contains everything it should.""" # Initial load res = self.loader.load() assert res == {"status": "eventful"} stats0 = get_stats(self.loader.storage) assert stats0 == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } # Load with a new file + revision with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: fd.write("print('Hello world')\n") self.repo.stage([b"hello.py"]) new_revision = self.repo.do_commit(b"Hello world\n").decode() new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858" assert self.repo[new_revision.encode()].tree == new_dir.encode() revisions = REVISIONS1.copy() assert new_revision not in revisions revisions[new_revision] = new_dir res = self.loader.load() assert res == {"status": "eventful"} stats1 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats0) # did one new visit expected_stats["origin_visit"] += 1 # with one more of the following objects expected_stats["content"] += 1 expected_stats["directory"] += 1 expected_stats["revision"] += 1 # concluding into 1 new snapshot expected_stats["snapshot"] += 1 assert stats1 == expected_stats visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) assert visit_status.snapshot is not None snapshot_id = visit_status.snapshot snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id) branches = snapshot.branches assert branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ) assert branches[b"refs/heads/master"] == SnapshotBranch( target=hash_to_bytes(new_revision), target_type=TargetType.REVISION, ) # Merge branch1 into HEAD. current = self.repo[b"HEAD"] branch1 = self.repo[b"refs/heads/branch1"] merged_tree = dulwich.objects.Tree() for item in self.repo[current.tree].items(): merged_tree.add(*item) for item in self.repo[branch1.tree].items(): merged_tree.add(*item) merged_dir_id = "dab8a37df8db8666d4e277bef9a546f585b5bedd" assert merged_tree.id.decode() == merged_dir_id self.repo.object_store.add_object(merged_tree) merge_commit = self.repo.do_commit( b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id] ) assert merge_commit.decode() not in revisions revisions[merge_commit.decode()] = merged_tree.id.decode() res = self.loader.load() assert res == {"status": "eventful"} stats2 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats1) # one more visit expected_stats["origin_visit"] += 1 # with 1 new directory and revision expected_stats["directory"] += 1 expected_stats["revision"] += 1 # concluding into 1 new snapshot expected_stats["snapshot"] += 1 assert stats2 == expected_stats visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) assert visit_status.snapshot is not None merge_snapshot_id = visit_status.snapshot assert merge_snapshot_id != snapshot_id merge_snapshot = snapshot_get_all_branches( self.loader.storage, merge_snapshot_id ) merge_branches = merge_snapshot.branches assert merge_branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ) assert merge_branches[b"refs/heads/master"] == SnapshotBranch( target=hash_to_bytes(merge_commit.decode()), target_type=TargetType.REVISION, ) def test_load_filter_branches(self): filtered_branches = {b"refs/pull/42/merge"} unfiltered_branches = {b"refs/pull/42/head"} # Add branches to the repository on disk; some should be filtered by # the loader, some should not. for branch_name in filtered_branches | unfiltered_branches: self.repo[branch_name] = self.repo[b"refs/heads/master"] # Generate the expected snapshot from SNAPSHOT1 (which is the original # state of the git repo)... branches = dict(SNAPSHOT1.branches) # ... and the unfiltered_branches, which are all pointing to the same # commit as "refs/heads/master". for branch_name in unfiltered_branches: branches[branch_name] = branches[b"refs/heads/master"] expected_snapshot = Snapshot(branches=branches) # Load the modified repository res = self.loader.load() assert res == {"status": "eventful"} check_snapshot(expected_snapshot, self.loader.storage) assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=expected_snapshot.id, ) def test_load_dangling_symref(self): with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f: f.write(b"ref: refs/heads/dangling-branch\n") res = self.loader.load() assert res == {"status": "eventful"} visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) snapshot_id = visit_status.snapshot assert snapshot_id is not None snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id) branches = snapshot.branches assert branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/dangling-branch", target_type=TargetType.ALIAS, ) assert branches[b"refs/heads/dangling-branch"] is None stats = get_stats(self.loader.storage) assert stats == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } def test_load_empty_tree(self): empty_dir_id = "4b825dc642cb6eb9a060e54bf8d69288fbee4904" # Check the empty tree does not already exist for some reason # (it would make this test pointless) assert list( self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)]) ) == [hash_to_bytes(empty_dir_id)] empty_tree = dulwich.objects.Tree() assert empty_tree.id.decode() == empty_dir_id self.repo.object_store.add_object(empty_tree) self.repo.do_commit(b"remove all bugs\n", tree=empty_tree.id) res = self.loader.load() assert res == {"status": "eventful"} assert ( list(self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)])) == [] ) results = self.loader.storage.directory_get_entries(hash_to_bytes(empty_dir_id)) assert results.next_page_token is None assert results.results == [] def test_load_tag(self): with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: fd.write("print('Hello world')\n") self.repo.stage([b"hello.py"]) new_revision = self.repo.do_commit(b"Hello world\n") dulwich.porcelain.tag_create( self.repo, b"v1.0.0", message=b"First release!", annotated=True, objectish=new_revision, ) res = self.loader.load() assert res == {"status": "eventful"} branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id) branch = branches["branches"][b"refs/tags/v1.0.0"] assert branch.target_type == TargetType.RELEASE release = self.loader.storage.release_get([branch.target])[0] assert release.date is not None assert release.author is not None assert release == Release( name=b"v1.0.0", message=b"First release!\n", target_type=ObjectType.REVISION, target=bytehex_to_hash(new_revision), author=release.author, date=release.date, synthetic=False, ) def test_load_tag_minimal(self): with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: fd.write("print('Hello world')\n") self.repo.stage([b"hello.py"]) new_revision = self.repo.do_commit(b"Hello world\n") # dulwich.porcelain.tag_create doesn't allow creating tags without # a tagger or a date, so we have to create it "manually" tag = dulwich.objects.Tag() tag.message = b"First release!\n" tag.name = b"v1.0.0" tag.object = (dulwich.objects.Commit, new_revision) self.repo.object_store.add_object(tag) self.repo[b"refs/tags/v1.0.0"] = tag.id res = self.loader.load() assert res == {"status": "eventful"} branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id) print(list(branches["branches"])) branch = branches["branches"][b"refs/tags/v1.0.0"] assert branch.target_type == TargetType.RELEASE release = self.loader.storage.release_get([branch.target])[0] assert release == Release( id=bytehex_to_hash(tag.id), name=b"v1.0.0", message=b"First release!\n", target_type=ObjectType.REVISION, target=bytehex_to_hash(new_revision), synthetic=False, ) -class GitLoaderFromDiskTest(TestCase, FullGitLoaderTests): +class TestGitLoaderFromDisk(FullGitLoaderTests): """Prepare a git directory repository to be loaded through a GitLoaderFromDisk. This tests all git loader scenario. """ @pytest.fixture(autouse=True) def init(self, swh_storage, datadir, tmp_path): archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) self.loader = GitLoaderFromDisk( swh_storage, url=self.repo_url, visit_date=datetime.datetime( 2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc ), directory=self.destination_path, ) self.repo = dulwich.repo.Repo(self.destination_path) -class GitLoaderFromArchiveTest(TestCase, CommonGitLoaderTests): +class TestGitLoaderFromArchive(CommonGitLoaderTests): """Tests for GitLoaderFromArchive. Only tests common scenario.""" @pytest.fixture(autouse=True) def init(self, swh_storage, datadir, tmp_path): archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") self.repo_url = archive_path self.loader = GitLoaderFromArchive( swh_storage, url=self.repo_url, archive_path=archive_path, visit_date=datetime.datetime( 2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc ), ) diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py index b0b0881..f18d9c2 100644 --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -1,124 +1,119 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -from unittest import TestCase from dulwich.errors import GitProtocolError, NotGitRepository, ObjectFormatException import dulwich.repo import pytest from swh.loader.git.loader import GitLoader from swh.loader.git.tests.test_from_disk import FullGitLoaderTests from swh.loader.tests import assert_last_visit_matches, prepare_repository_from_archive class CommonGitLoaderNotFound: @pytest.fixture(autouse=True) def __inject_fixtures(self, mocker): """Inject required fixtures in unittest.TestCase class """ self.mocker = mocker - def test_load_visit_not_found(self): - """Ingesting an unknown url result in a visit with not_found status - - """ - for failure_exception in [ + @pytest.mark.parametrize( + "failure_exception", + [ GitProtocolError("Repository unavailable"), # e.g DMCA takedown GitProtocolError("Repository not found"), GitProtocolError("unexpected http resp 401"), NotGitRepository("not a git repo"), - ]: - with self.subTest(failure_exception=failure_exception): - # simulate an initial communication error (e.g no repository found, ...) - mock = self.mocker.patch( - "swh.loader.git.loader.GitLoader.fetch_pack_from_origin" - ) - mock.side_effect = failure_exception - - res = self.loader.load() - assert res == {"status": "uneventful"} - - assert_last_visit_matches( - self.loader.storage, - self.repo_url, - status="not_found", - type="git", - snapshot=None, - ) - - def test_load_visit_failure(self): + ], + ) + def test_load_visit_not_found(self, failure_exception): + """Ingesting an unknown url result in a visit with not_found status + + """ + # simulate an initial communication error (e.g no repository found, ...) + mock = self.mocker.patch( + "swh.loader.git.loader.GitLoader.fetch_pack_from_origin" + ) + mock.side_effect = failure_exception + + res = self.loader.load() + assert res == {"status": "uneventful"} + + assert_last_visit_matches( + self.loader.storage, + self.repo_url, + status="not_found", + type="git", + snapshot=None, + ) + + @pytest.mark.parametrize( + "failure_exception", + [IOError, ObjectFormatException, OSError, ValueError, GitProtocolError,], + ) + def test_load_visit_failure(self, failure_exception): """Failing during the fetch pack step result in failing visit """ - for failure_exception in [ - IOError, - ObjectFormatException, - OSError, - ValueError, - GitProtocolError, - ]: - with self.subTest(failure_exception=failure_exception): - # simulate a fetch communication error after the initial connection - # server error (e.g IOError, ObjectFormatException, ...) - mock = self.mocker.patch( - "swh.loader.git.loader.GitLoader.fetch_pack_from_origin" - ) - - mock.side_effect = failure_exception("failure") - - res = self.loader.load() - assert res == {"status": "failed"} - - assert_last_visit_matches( - self.loader.storage, - self.repo_url, - status="failed", - type="git", - snapshot=None, - ) - - -class GitLoaderTest(TestCase, FullGitLoaderTests, CommonGitLoaderNotFound): + # simulate a fetch communication error after the initial connection + # server error (e.g IOError, ObjectFormatException, ...) + mock = self.mocker.patch( + "swh.loader.git.loader.GitLoader.fetch_pack_from_origin" + ) + + mock.side_effect = failure_exception("failure") + + res = self.loader.load() + assert res == {"status": "failed"} + + assert_last_visit_matches( + self.loader.storage, + self.repo_url, + status="failed", + type="git", + snapshot=None, + ) + + +class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound): """Prepare a git directory repository to be loaded through a GitLoader. This tests all git loader scenario. """ @pytest.fixture(autouse=True) def init(self, swh_storage, datadir, tmp_path): - super().setUp() archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) self.loader = GitLoader(swh_storage, self.repo_url) self.repo = dulwich.repo.Repo(self.destination_path) -class GitLoader2Test(TestCase, FullGitLoaderTests, CommonGitLoaderNotFound): +class TestGitLoader2(FullGitLoaderTests, CommonGitLoaderNotFound): """Mostly the same loading scenario but with a base-url different than the repo-url. To walk slightly different paths, the end result should stay the same. """ @pytest.fixture(autouse=True) def init(self, swh_storage, datadir, tmp_path): - super().setUp() archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) base_url = f"base://{self.repo_url}" self.loader = GitLoader(swh_storage, self.repo_url, base_url=base_url) self.repo = dulwich.repo.Repo(self.destination_path) diff --git a/swh/loader/git/tests/test_utils.py b/swh/loader/git/tests/test_utils.py index a994eb6..7b1acb1 100644 --- a/swh/loader/git/tests/test_utils.py +++ b/swh/loader/git/tests/test_utils.py @@ -1,30 +1,30 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest +import pytest from swh.loader.git import utils -class TestUtils(unittest.TestCase): +class TestUtils: def test_check_date_time(self): """A long as datetime is fine, date time check does not raise """ for e in range(32, 37): ts = 2 ** e utils.check_date_time(ts) def test_check_date_time_empty_value(self): - self.assertIsNone(utils.check_date_time(None)) + assert utils.check_date_time(None) is None def test_check_date_time_raises(self): """From a give threshold, check will no longer works. """ exp = 38 timestamp = 2 ** exp - with self.assertRaisesRegex(ValueError, "is out of range"): + with pytest.raises(ValueError, match=".*is out of range.*"): utils.check_date_time(timestamp)