Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348556
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
76 KB
Subscribers
None
View Options
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
index 5d0eed5..bc1d05e 100644
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -1,562 +1,604 @@
# Copyright (C) 2016-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import defaultdict
from dataclasses import dataclass
import datetime
import logging
import os
import pickle
import sys
from tempfile import SpooledTemporaryFile
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type
import dulwich.client
from dulwich.errors import GitProtocolError, NotGitRepository
from dulwich.object_store import ObjectStoreGraphWalker
from dulwich.objects import ShaFile
from dulwich.pack import PackData, PackInflater
from swh.core.statsd import Statsd
from swh.loader.exception import NotFound
from swh.model import hashutil
from swh.model.model import (
BaseContent,
Directory,
Release,
Revision,
Snapshot,
SnapshotBranch,
TargetType,
)
from swh.storage.algos.snapshot import snapshot_get_latest
from swh.storage.interface import StorageInterface
from . import converters, dumb, utils
from .base import BaseGitLoader
from .utils import HexBytes
logger = logging.getLogger(__name__)
heads_logger = logger.getChild("refs")
class RepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(
self,
storage,
base_snapshots: List[Snapshot] = None,
incremental: bool = True,
statsd: Statsd = None,
):
self.storage = storage
self.incremental = incremental
self.statsd = statsd
if base_snapshots and incremental:
self.base_snapshots: List[Snapshot] = base_snapshots
else:
self.base_snapshots = []
# Cache existing heads
self.local_heads: Set[HexBytes] = set()
heads_logger.debug("Heads known in the archive:")
for base_snapshot in self.base_snapshots:
for branch_name, branch in base_snapshot.branches.items():
if not branch or branch.target_type == TargetType.ALIAS:
continue
heads_logger.debug(" %r: %s", branch_name, branch.target.hex())
self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target)))
def graph_walker(self) -> ObjectStoreGraphWalker:
return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: [])
def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
"""Get the list of bytehex sha1s that the git loader should fetch.
This compares the remote refs sent by the server with the base snapshot
provided by the loader.
"""
if not refs:
return []
if heads_logger.isEnabledFor(logging.DEBUG):
heads_logger.debug("Heads returned by the git remote:")
for name, value in refs.items():
heads_logger.debug(" %r: %s", name, value.decode())
# Get the remote heads that we want to fetch
remote_heads: Set[HexBytes] = set()
for ref_name, ref_target in refs.items():
if utils.ignore_branch_name(ref_name):
continue
remote_heads.add(ref_target)
logger.debug("local_heads_count=%s", len(self.local_heads))
logger.debug("remote_heads_count=%s", len(remote_heads))
wanted_refs = list(remote_heads - self.local_heads)
logger.debug("wanted_refs_count=%s", len(wanted_refs))
if self.statsd is not None:
self.statsd.histogram(
"git_ignored_refs_percent",
len(remote_heads - set(refs.values())) / len(refs),
tags={},
)
self.statsd.histogram(
"git_known_refs_percent",
len(self.local_heads & remote_heads) / len(remote_heads),
tags={},
)
return wanted_refs
@dataclass
class FetchPackReturn:
remote_refs: Dict[bytes, HexBytes]
symbolic_refs: Dict[bytes, HexBytes]
pack_buffer: SpooledTemporaryFile
pack_size: int
class GitLoader(BaseGitLoader):
"""A bulk loader for a git repository
Emits the following statsd stats:
* increments ``swh_loader_git``
* histogram ``swh_loader_git_ignored_refs_percent`` is the ratio of refs ignored
over all refs of the remote repository
* histogram ``swh_loader_git_known_refs_percent`` is the ratio of (non-ignored)
remote heads that are already local over all non-ignored remote heads
All three are tagged with ``{{"incremental": "<incremental_mode>"}}`` where
``incremental_mode`` is one of:
* ``from_same_origin`` when the origin was already loaded
* ``from_parent_origin`` when the origin was not already loaded,
but it was detected as a forge-fork of an origin that was already loaded
* ``no_previous_snapshot`` when the origin was not already loaded,
and it was detected as a forge-fork of origins that were not already loaded either
* ``no_parent_origin`` when the origin was no already loaded, and it was not
detected as a forge-fork of any other origin
* ``disabled`` when incremental loading is disabled by configuration
"""
visit_type = "git"
def __init__(
self,
storage: StorageInterface,
url: str,
incremental: bool = True,
repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
**kwargs: Any,
):
"""Initialize the bulk updater.
Args:
repo_representation: swh's repository representation
which is in charge of filtering between known and remote
data.
...
incremental: If True, the default, this starts from the last known snapshot
(if any) references. Otherwise, this loads the full repository.
"""
super().__init__(storage=storage, origin_url=url, **kwargs)
self.incremental = incremental
self.repo_representation = repo_representation
self.pack_size_bytes = pack_size_bytes
self.temp_file_cutoff = temp_file_cutoff
# state initialized in fetch_data
self.remote_refs: Dict[bytes, HexBytes] = {}
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
def fetch_pack_from_origin(
self,
origin_url: str,
base_repo: RepoRepresentation,
do_activity: Callable[[bytes], None],
) -> FetchPackReturn:
"""Fetch a pack from the origin"""
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
transport_url = origin_url
logger.debug("Transport url to communicate with server: %s", transport_url)
client, path = dulwich.client.get_transport_and_path(
transport_url, thin_packs=False
)
logger.debug("Client %s to fetch pack at %s", client, path)
size_limit = self.pack_size_bytes
def do_pack(data: bytes) -> None:
cur_size = pack_buffer.tell()
would_write = len(data)
if cur_size + would_write > size_limit:
raise IOError(
f"Pack file too big for repository {origin_url}, "
f"limit is {size_limit} bytes, current size is {cur_size}, "
f"would write {would_write}"
)
pack_buffer.write(data)
pack_result = client.fetch_pack(
path,
base_repo.determine_wants,
base_repo.graph_walker(),
do_pack,
progress=do_activity,
)
remote_refs = pack_result.refs or {}
symbolic_refs = pack_result.symrefs or {}
pack_buffer.flush()
pack_size = pack_buffer.tell()
pack_buffer.seek(0)
logger.debug("fetched_pack_size=%s", pack_size)
# check if repository only supports git dumb transfer protocol,
# fetched pack file will be empty in that case as dulwich do
# not support it and do not fetch any refs
self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False)
return FetchPackReturn(
remote_refs=utils.filter_refs(remote_refs),
symbolic_refs=utils.filter_refs(symbolic_refs),
pack_buffer=pack_buffer,
pack_size=pack_size,
)
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]:
return snapshot_get_latest(self.storage, origin_url)
def prepare(self) -> None:
assert self.origin is not None
self.prev_snapshot = Snapshot(branches={})
"""Last snapshot of this origin if any; empty snapshot otherwise"""
self.base_snapshots = []
"""Last snapshot of this origin and all its parents, if any."""
self.statsd.constant_tags["incremental_enabled"] = self.incremental
self.statsd.constant_tags["has_parent_origins"] = bool(self.parent_origins)
# May be set to True later
self.statsd.constant_tags["has_parent_snapshot"] = False
if self.incremental:
prev_snapshot = self.get_full_snapshot(self.origin.url)
self.statsd.constant_tags["has_previous_snapshot"] = bool(prev_snapshot)
if prev_snapshot:
self.prev_snapshot = prev_snapshot
self.base_snapshots.append(prev_snapshot)
if self.parent_origins is not None:
# If this origin is a forge fork, load incrementally from the
# origins it was forked from
for parent_origin in self.parent_origins:
parent_snapshot = self.get_full_snapshot(parent_origin.url)
if parent_snapshot is not None:
self.statsd.constant_tags["has_parent_snapshot"] = True
self.base_snapshots.append(parent_snapshot)
# Increments a metric with full name 'swh_loader_git'; which is useful to
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
def fetch_data(self) -> bool:
assert self.origin is not None
base_repo = self.repo_representation(
storage=self.storage,
base_snapshots=self.base_snapshots,
incremental=self.incremental,
statsd=self.statsd,
)
def do_progress(msg: bytes) -> None:
sys.stderr.buffer.write(msg)
sys.stderr.flush()
try:
fetch_info = self.fetch_pack_from_origin(
self.origin.url, base_repo, do_progress
)
except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e:
raise NotFound(e)
except GitProtocolError as e:
# unfortunately, that kind of error is not specific to a not found
# scenario... It depends on the value of message within the exception.
for msg in [
"Repository unavailable", # e.g DMCA takedown
"Repository not found",
"unexpected http resp 401",
]:
if msg in e.args[0]:
raise NotFound(e)
# otherwise transmit the error
raise
except (AttributeError, NotImplementedError, ValueError):
# with old dulwich versions, those exceptions types can be raised
# by the fetch_pack operation when encountering a repository with
# dumb transfer protocol so we check if the repository supports it
# here to continue the loading if it is the case
self.dumb = dumb.check_protocol(self.origin.url)
if not self.dumb:
raise
logger.debug(
"Protocol used for communication: %s", "dumb" if self.dumb else "smart"
)
if self.dumb:
self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo)
self.dumb_fetcher.fetch_object_ids()
self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs)
self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head)
else:
self.pack_buffer = fetch_info.pack_buffer
self.pack_size = fetch_info.pack_size
self.remote_refs = fetch_info.remote_refs
self.symbolic_refs = fetch_info.symbolic_refs
self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
logger.info(
"Listed %d refs for repo %s",
len(self.remote_refs),
self.origin.url,
extra={
"swh_type": "git_repo_list_refs",
"swh_repo": self.origin.url,
"swh_num_refs": len(self.remote_refs),
},
)
# No more data to fetch
return False
def save_data(self) -> None:
"""Store a pack for archival"""
assert isinstance(self.visit_date, datetime.datetime)
write_size = 8192
pack_dir = self.get_save_data_path()
pack_name = "%s.pack" % self.visit_date.isoformat()
refs_name = "%s.refs" % self.visit_date.isoformat()
with open(os.path.join(pack_dir, pack_name), "xb") as f:
self.pack_buffer.seek(0)
while True:
r = self.pack_buffer.read(write_size)
if not r:
break
f.write(r)
self.pack_buffer.seek(0)
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
yield from self.dumb_fetcher.iter_objects(object_type)
else:
self.pack_buffer.seek(0)
count = 0
for obj in PackInflater.for_pack_data(
PackData.from_file(self.pack_buffer, self.pack_size)
):
if obj.type_name != object_type:
continue
yield obj
count += 1
logger.debug("packfile_read_count_%s=%s", object_type.decode(), count)
def get_contents(self) -> Iterable[BaseContent]:
"""Format the blobs from the git repository as swh contents"""
for raw_obj in self.iter_objects(b"blob"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.CONTENT
yield converters.dulwich_blob_to_content(
raw_obj, max_content_size=self.max_content_size
)
def get_directories(self) -> Iterable[Directory]:
"""Format the trees as swh directories"""
for raw_obj in self.iter_objects(b"tree"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY
yield converters.dulwich_tree_to_directory(raw_obj)
def get_revisions(self) -> Iterable[Revision]:
"""Format commits as swh revisions"""
for raw_obj in self.iter_objects(b"commit"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.REVISION
yield converters.dulwich_commit_to_revision(raw_obj)
def get_releases(self) -> Iterable[Release]:
"""Retrieve all the release objects from the git repository"""
for raw_obj in self.iter_objects(b"tag"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.RELEASE
yield converters.dulwich_tag_to_release(raw_obj)
def get_snapshot(self) -> Snapshot:
"""Get the snapshot for the current visit.
The main complexity of this function is mapping target objects to their
types, as the `refs` dictionaries returned by the git server only give
us the identifiers for the target objects, and not their types.
The loader itself only knows the types of the objects that it has
fetched from the server (as it has parsed them while loading them to
the archive). As we only fetched an increment between the previous
snapshot and the current state of the server, we are missing the type
information for the objects that would already have been referenced by
the previous snapshot, and that the git server didn't send us. We infer
the type of these objects from the previous snapshot.
"""
branches: Dict[bytes, Optional[SnapshotBranch]] = {}
unfetched_refs: Dict[bytes, bytes] = {}
# Retrieve types from the objects loaded by the current loader
for ref_name, ref_object in self.remote_refs.items():
if ref_name in self.symbolic_refs:
continue
target = hashutil.hash_to_bytes(ref_object.decode())
target_type = self.ref_object_types.get(ref_object)
if target_type:
branches[ref_name] = SnapshotBranch(
target=target, target_type=target_type
)
else:
# The object pointed at by this ref was not fetched, supposedly
# because it existed in the base snapshot. We record it here,
# and we can get it from the base snapshot later.
unfetched_refs[ref_name] = target
dangling_branches = {}
# Handle symbolic references as alias branches
for ref_name, target in self.symbolic_refs.items():
branches[ref_name] = SnapshotBranch(
target_type=TargetType.ALIAS,
target=target,
)
if target not in branches and target not in unfetched_refs:
# This handles the case where the pointer is "dangling".
# There's a chance that a further symbolic reference
# override this default value, which is totally fine.
dangling_branches[target] = ref_name
branches[target] = None
if unfetched_refs:
# Handle inference of object types from the contents of the
# previous snapshot
unknown_objects = {}
base_snapshot_reverse_branches = {
branch.target: branch
for base_snapshot in reversed(self.base_snapshots)
for branch in base_snapshot.branches.values()
if branch and branch.target_type != TargetType.ALIAS
}
assert all(
base_snapshot_reverse_branches[branch.target] == branch
for branch in self.prev_snapshot.branches.values()
if branch and branch.target_type != TargetType.ALIAS
), "base_snapshot_reverse_branches is not a superset of prev_snapshot"
for ref_name, target in unfetched_refs.items():
branch = base_snapshot_reverse_branches.get(target)
branches[ref_name] = branch
if not branch:
unknown_objects[ref_name] = target
+ if unknown_objects and self.base_snapshots:
+ # The remote has sent us a partial packfile. It will have skipped
+ # objects that it knows are ancestors of the heads we have sent as
+ # known. We can look these objects up in the archive, as they should
+ # have had all their ancestors loaded when the previous snapshot was
+ # loaded.
+ refs_for_target = defaultdict(list)
+ for ref_name, target in unknown_objects.items():
+ refs_for_target[target].append(ref_name)
+
+ targets_unknown = set(refs_for_target)
+
+ for method, target_type in (
+ (self.storage.revision_missing, TargetType.REVISION),
+ (self.storage.release_missing, TargetType.RELEASE),
+ (self.storage.directory_missing, TargetType.DIRECTORY),
+ (self.storage.content_missing_per_sha1_git, TargetType.CONTENT),
+ ):
+ missing = set(method(list(targets_unknown)))
+ known = targets_unknown - missing
+
+ for target in known:
+ for ref_name in refs_for_target[target]:
+ logger.debug(
+ "Inferred type %s for branch %s pointing at unfetched %s",
+ target_type.name,
+ ref_name.decode(),
+ hashutil.hash_to_hex(target),
+ extra={
+ "swh_type": "swh_loader_git_inferred_target_type"
+ },
+ )
+ branches[ref_name] = SnapshotBranch(
+ target=target, target_type=target_type
+ )
+ del unknown_objects[ref_name]
+
+ targets_unknown = missing
+ if not targets_unknown:
+ break
+
if unknown_objects:
# This object was referenced by the server; We did not fetch
# it, and we do not know it from the previous snapshot. This is
# likely a bug in the loader.
raise RuntimeError(
"Unknown objects referenced by remote refs: %s"
% (
", ".join(
f"{name.decode()}: {hashutil.hash_to_hex(obj)}"
for name, obj in unknown_objects.items()
)
)
)
utils.warn_dangling_branches(
branches, dangling_branches, logger, self.origin.url
)
self.snapshot = Snapshot(branches=branches)
return self.snapshot
def load_status(self) -> Dict[str, Any]:
"""The load was eventful if the current snapshot is different to
the one we retrieved at the beginning of the run"""
eventful = False
if self.prev_snapshot and self.snapshot:
eventful = self.snapshot.id != self.prev_snapshot.id
elif self.snapshot:
eventful = bool(self.snapshot.branches)
return {"status": ("eventful" if eventful else "uneventful")}
if __name__ == "__main__":
import click
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s"
)
from deprecated import deprecated
@deprecated(version="1.1", reason="Use `swh loader run git --help` instead")
@click.command()
@click.option("--origin-url", help="Origin url", required=True)
@click.option("--base-url", default=None, help="Optional Base url")
@click.option(
"--ignore-history/--no-ignore-history",
help="Ignore the repository history",
default=False,
)
def main(origin_url: str, incremental: bool) -> Dict[str, Any]:
from swh.storage import get_storage
storage = get_storage(cls="memory")
loader = GitLoader(
storage,
origin_url,
incremental=incremental,
)
return loader.load()
main()
diff --git a/swh/loader/git/tests/test_from_disk.py b/swh/loader/git/tests/test_from_disk.py
index 89c91f8..49049f0 100644
--- a/swh/loader/git/tests/test_from_disk.py
+++ b/swh/loader/git/tests/test_from_disk.py
@@ -1,553 +1,601 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import os.path
+from unittest.mock import patch
import dulwich.objects
import dulwich.porcelain
import dulwich.repo
import pytest
+from swh.loader.git import utils
from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk
from swh.loader.tests import (
assert_last_visit_matches,
check_snapshot,
get_stats,
prepare_repository_from_archive,
)
from swh.model.hashutil import bytehex_to_hash, hash_to_bytes
from swh.model.model import ObjectType, Release, Snapshot, SnapshotBranch, TargetType
from swh.storage.algos.snapshot import snapshot_get_all_branches
SNAPSHOT1 = Snapshot(
id=hash_to_bytes("a23699280a82a043f8c0994cf1631b568f716f95"),
branches={
b"HEAD": SnapshotBranch(
target=b"refs/heads/master",
target_type=TargetType.ALIAS,
),
b"refs/heads/master": SnapshotBranch(
target=hash_to_bytes("2f01f5ca7e391a2f08905990277faf81e709a649"),
target_type=TargetType.REVISION,
),
b"refs/heads/branch1": SnapshotBranch(
target=hash_to_bytes("b0a77609903f767a2fd3d769904ef9ef68468b87"),
target_type=TargetType.REVISION,
),
b"refs/heads/branch2": SnapshotBranch(
target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"),
target_type=TargetType.REVISION,
),
b"refs/tags/branch2-after-delete": SnapshotBranch(
target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"),
target_type=TargetType.REVISION,
),
b"refs/tags/branch2-before-delete": SnapshotBranch(
target=hash_to_bytes("1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b"),
target_type=TargetType.REVISION,
),
},
)
# directory hashes obtained with:
# gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a
# swh-hashtree --ignore '.git' --path .
# gco 2f01f5ca7e391a2f08905990277faf81e709a649
# swh-hashtree --ignore '.git' --path .
# gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777
# swh-hashtree --ignore '.git' --path .
# gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b
# swh-hashtree --ignore '.git' --path .
# gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c
# swh-hashtree --ignore '.git' --path .
# gco b0a77609903f767a2fd3d769904ef9ef68468b87
# swh-hashtree --ignore '.git' --path .
# gco bd746cd1913721b269b395a56a97baf6755151c2
# swh-hashtree --ignore '.git' --path .
REVISIONS1 = {
"b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a": (
"40dbdf55dfd4065422462cc74a949254aefa972e"
),
"2f01f5ca7e391a2f08905990277faf81e709a649": (
"e1d0d894835f91a0f887a4bc8b16f81feefdfbd5"
),
"bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777": (
"b43724545b4759244bb54be053c690649161411c"
),
"1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b": (
"fbf70528223d263661b5ad4b80f26caf3860eb8e"
),
"79f65ac75f79dda6ff03d66e1242702ab67fb51c": (
"5df34ec74d6f69072d9a0a6677d8efbed9b12e60"
),
"b0a77609903f767a2fd3d769904ef9ef68468b87": (
"9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338"
),
"bd746cd1913721b269b395a56a97baf6755151c2": (
"e1d0d894835f91a0f887a4bc8b16f81feefdfbd5"
),
}
class CommonGitLoaderTests:
"""Common tests for all git loaders."""
def test_load(self):
"""Loads a simple repository (made available by `setUp()`),
and checks everything was added in the storage."""
res = self.loader.load()
assert res == {"status": "eventful"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=SNAPSHOT1.id,
)
stats = get_stats(self.loader.storage)
assert stats == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
check_snapshot(SNAPSHOT1, self.loader.storage)
def test_load_unchanged(self):
"""Checks loading a repository a second time does not add
any extra data."""
res = self.loader.load()
assert res == {"status": "eventful"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=SNAPSHOT1.id,
)
stats0 = get_stats(self.loader.storage)
assert stats0 == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
res = self.loader.load()
assert res == {"status": "uneventful"}
stats1 = get_stats(self.loader.storage)
expected_stats = copy.deepcopy(stats0)
expected_stats["origin_visit"] += 1
assert stats1 == expected_stats
check_snapshot(SNAPSHOT1, self.loader.storage)
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=SNAPSHOT1.id,
)
def test_load_visit_without_snapshot_so_status_failed(self):
# unfortunately, monkey-patch the hard way, self.loader is already instantiated
# (patching won't work self.loader is already instantiated)
# Make get_contents fail for some reason
self.loader.get_contents = None
res = self.loader.load()
assert res == {"status": "failed"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="failed",
type="git",
snapshot=None,
)
def test_load_visit_with_snapshot_so_status_partial(self):
# unfortunately, monkey-patch the hard way, self.loader is already instantiated
# (patching won't work self.loader is already instantiated)
# fake store_data raising for some reason, so we could have a snapshot id
# at this point in time
self.loader.store_data = None
# fake having a snapshot so the visit status is partial
self.loader.loaded_snapshot_id = hash_to_bytes(
"a23699280a82a043f8c0994cf1631b568f716f95"
)
res = self.loader.load()
assert res == {"status": "failed"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="partial",
type="git",
snapshot=None,
)
+ def test_load_incremental_partial_history(self):
+ """Check that loading a partial snapshot, then negotiating a full snapshot, works."""
+
+ # We pick this branch because it contains the target of the refs/heads/master
+ # branch of the full snapshot in its history
+
+ interesting_branch = b"refs/tags/branch2-before-delete"
+ partial_snapshot = Snapshot(
+ branches={interesting_branch: SNAPSHOT1.branches[interesting_branch]}
+ )
+
+ with patch.object(
+ utils,
+ "ignore_branch_name",
+ lambda name: name != interesting_branch,
+ ), patch.object(
+ utils,
+ "filter_refs",
+ lambda refs: {
+ ref_name: utils.HexBytes(target)
+ for ref_name, target in refs.items()
+ if ref_name == interesting_branch
+ },
+ ):
+ # Ensure that only the interesting branch is loaded
+ res = self.loader.load()
+
+ assert res == {"status": "eventful"}
+
+ assert self.loader.storage.snapshot_get_branches(partial_snapshot.id)
+
+ res = self.loader.load()
+ assert res == {"status": "eventful"}
+
+ stats = get_stats(self.loader.storage)
+ assert stats == {
+ "content": 4,
+ "directory": 7,
+ "origin": 1,
+ "origin_visit": 2,
+ "release": 0,
+ "revision": 7,
+ "skipped_content": 0,
+ "snapshot": 2,
+ }
+
class FullGitLoaderTests(CommonGitLoaderTests):
"""Tests for GitLoader (from disk or not). Includes the common ones, and
add others that only work with a local dir.
"""
def test_load_changed(self):
"""Loads a repository, makes some changes by adding files, commits,
and merges, load it again, and check the storage contains everything
it should."""
# Initial load
res = self.loader.load()
assert res == {"status": "eventful"}
stats0 = get_stats(self.loader.storage)
assert stats0 == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
# Load with a new file + revision
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd:
fd.write("print('Hello world')\n")
self.repo.stage([b"hello.py"])
new_revision = self.repo.do_commit(b"Hello world\n").decode()
new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858"
assert self.repo[new_revision.encode()].tree == new_dir.encode()
revisions = REVISIONS1.copy()
assert new_revision not in revisions
revisions[new_revision] = new_dir
res = self.loader.load()
assert res == {"status": "eventful"}
stats1 = get_stats(self.loader.storage)
expected_stats = copy.deepcopy(stats0)
# did one new visit
expected_stats["origin_visit"] += 1
# with one more of the following objects
expected_stats["content"] += 1
expected_stats["directory"] += 1
expected_stats["revision"] += 1
# concluding into 1 new snapshot
expected_stats["snapshot"] += 1
assert stats1 == expected_stats
visit_status = assert_last_visit_matches(
self.loader.storage, self.repo_url, status="full", type="git"
)
assert visit_status.snapshot is not None
snapshot_id = visit_status.snapshot
snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id)
branches = snapshot.branches
assert branches[b"HEAD"] == SnapshotBranch(
target=b"refs/heads/master",
target_type=TargetType.ALIAS,
)
assert branches[b"refs/heads/master"] == SnapshotBranch(
target=hash_to_bytes(new_revision),
target_type=TargetType.REVISION,
)
# Merge branch1 into HEAD.
current = self.repo[b"HEAD"]
branch1 = self.repo[b"refs/heads/branch1"]
merged_tree = dulwich.objects.Tree()
for item in self.repo[current.tree].items():
merged_tree.add(*item)
for item in self.repo[branch1.tree].items():
merged_tree.add(*item)
merged_dir_id = "dab8a37df8db8666d4e277bef9a546f585b5bedd"
assert merged_tree.id.decode() == merged_dir_id
self.repo.object_store.add_object(merged_tree)
merge_commit = self.repo.do_commit(
b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id]
)
assert merge_commit.decode() not in revisions
revisions[merge_commit.decode()] = merged_tree.id.decode()
res = self.loader.load()
assert res == {"status": "eventful"}
stats2 = get_stats(self.loader.storage)
expected_stats = copy.deepcopy(stats1)
# one more visit
expected_stats["origin_visit"] += 1
# with 1 new directory and revision
expected_stats["directory"] += 1
expected_stats["revision"] += 1
# concluding into 1 new snapshot
expected_stats["snapshot"] += 1
assert stats2 == expected_stats
visit_status = assert_last_visit_matches(
self.loader.storage, self.repo_url, status="full", type="git"
)
assert visit_status.snapshot is not None
merge_snapshot_id = visit_status.snapshot
assert merge_snapshot_id != snapshot_id
merge_snapshot = snapshot_get_all_branches(
self.loader.storage, merge_snapshot_id
)
merge_branches = merge_snapshot.branches
assert merge_branches[b"HEAD"] == SnapshotBranch(
target=b"refs/heads/master",
target_type=TargetType.ALIAS,
)
assert merge_branches[b"refs/heads/master"] == SnapshotBranch(
target=hash_to_bytes(merge_commit.decode()),
target_type=TargetType.REVISION,
)
def test_load_filter_branches(self):
filtered_branches = {b"refs/pull/42/merge"}
unfiltered_branches = {b"refs/pull/42/head"}
# Add branches to the repository on disk; some should be filtered by
# the loader, some should not.
for branch_name in filtered_branches | unfiltered_branches:
self.repo[branch_name] = self.repo[b"refs/heads/master"]
# Generate the expected snapshot from SNAPSHOT1 (which is the original
# state of the git repo)...
branches = dict(SNAPSHOT1.branches)
# ... and the unfiltered_branches, which are all pointing to the same
# commit as "refs/heads/master".
for branch_name in unfiltered_branches:
branches[branch_name] = branches[b"refs/heads/master"]
expected_snapshot = Snapshot(branches=branches)
# Load the modified repository
res = self.loader.load()
assert res == {"status": "eventful"}
check_snapshot(expected_snapshot, self.loader.storage)
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="full",
type="git",
snapshot=expected_snapshot.id,
)
def test_load_dangling_symref(self):
with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f:
f.write(b"ref: refs/heads/dangling-branch\n")
res = self.loader.load()
assert res == {"status": "eventful"}
visit_status = assert_last_visit_matches(
self.loader.storage, self.repo_url, status="full", type="git"
)
snapshot_id = visit_status.snapshot
assert snapshot_id is not None
snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id)
branches = snapshot.branches
assert branches[b"HEAD"] == SnapshotBranch(
target=b"refs/heads/dangling-branch",
target_type=TargetType.ALIAS,
)
assert branches[b"refs/heads/dangling-branch"] is None
stats = get_stats(self.loader.storage)
assert stats == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
def test_load_empty_tree(self):
empty_dir_id = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
# Check the empty tree does not already exist for some reason
# (it would make this test pointless)
assert list(
self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)])
) == [hash_to_bytes(empty_dir_id)]
empty_tree = dulwich.objects.Tree()
assert empty_tree.id.decode() == empty_dir_id
self.repo.object_store.add_object(empty_tree)
self.repo.do_commit(b"remove all bugs\n", tree=empty_tree.id)
res = self.loader.load()
assert res == {"status": "eventful"}
assert (
list(self.loader.storage.directory_missing([hash_to_bytes(empty_dir_id)]))
== []
)
results = self.loader.storage.directory_get_entries(hash_to_bytes(empty_dir_id))
assert results.next_page_token is None
assert results.results == []
def test_load_tag(self):
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd:
fd.write("print('Hello world')\n")
self.repo.stage([b"hello.py"])
new_revision = self.repo.do_commit(b"Hello world\n")
# Newer Dulwich versions always add a \n to tag messages.
if dulwich.__version__ >= (0, 20, 22):
message = b"First release!"
else:
message = b"First release!\n"
dulwich.porcelain.tag_create(
self.repo,
b"v1.0.0",
message=message,
annotated=True,
objectish=new_revision,
)
res = self.loader.load()
assert res == {"status": "eventful"}
branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id)
branch = branches["branches"][b"refs/tags/v1.0.0"]
assert branch.target_type == TargetType.RELEASE
release = self.loader.storage.release_get([branch.target])[0]
assert release.date is not None
assert release.author is not None
assert release == Release(
name=b"v1.0.0",
message=b"First release!\n",
target_type=ObjectType.REVISION,
target=bytehex_to_hash(new_revision),
author=release.author,
date=release.date,
synthetic=False,
)
def test_load_tag_minimal(self):
with open(os.path.join(self.destination_path, "hello.py"), "a") as fd:
fd.write("print('Hello world')\n")
self.repo.stage([b"hello.py"])
new_revision = self.repo.do_commit(b"Hello world\n")
# dulwich.porcelain.tag_create doesn't allow creating tags without
# a tagger or a date, so we have to create it "manually"
tag = dulwich.objects.Tag()
tag.message = b"First release!\n"
tag.name = b"v1.0.0"
tag.object = (dulwich.objects.Commit, new_revision)
self.repo.object_store.add_object(tag)
self.repo[b"refs/tags/v1.0.0"] = tag.id
res = self.loader.load()
assert res == {"status": "eventful"}
branches = self.loader.storage.snapshot_get_branches(self.loader.snapshot.id)
branch = branches["branches"][b"refs/tags/v1.0.0"]
assert branch.target_type == TargetType.RELEASE
release = self.loader.storage.release_get([branch.target])[0]
assert release == Release(
id=bytehex_to_hash(tag.id),
name=b"v1.0.0",
message=b"First release!\n",
target_type=ObjectType.REVISION,
target=bytehex_to_hash(new_revision),
synthetic=False,
)
class TestGitLoaderFromDisk(FullGitLoaderTests):
"""Prepare a git directory repository to be loaded through a GitLoaderFromDisk.
This tests all git loader scenario.
"""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path):
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
tmp_path = str(tmp_path)
self.repo_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path=tmp_path
)
self.destination_path = os.path.join(tmp_path, archive_name)
self.loader = GitLoaderFromDisk(
swh_storage,
url=self.repo_url,
visit_date=datetime.datetime(
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc
),
directory=self.destination_path,
)
self.repo = dulwich.repo.Repo(self.destination_path)
class TestGitLoaderFromArchive(CommonGitLoaderTests):
"""Tests for GitLoaderFromArchive. Only tests common scenario."""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path):
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
self.repo_url = archive_path
self.loader = GitLoaderFromArchive(
swh_storage,
url=self.repo_url,
archive_path=archive_path,
visit_date=datetime.datetime(
2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc
),
)
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
index 45ee3e9..0e7aa19 100644
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -1,813 +1,777 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from functools import partial
from http.server import HTTPServer, SimpleHTTPRequestHandler
+import logging
import os
import subprocess
import sys
from tempfile import SpooledTemporaryFile
from threading import Thread
from unittest.mock import MagicMock, call
from dulwich.errors import GitProtocolError, NotGitRepository, ObjectFormatException
from dulwich.porcelain import push
import dulwich.repo
import pytest
from swh.loader.git import converters, dumb
from swh.loader.git.loader import GitLoader
from swh.loader.git.tests.test_from_disk import SNAPSHOT1, FullGitLoaderTests
from swh.loader.tests import (
assert_last_visit_matches,
get_stats,
prepare_repository_from_archive,
)
-from swh.model.model import (
- Origin,
- OriginVisit,
- OriginVisitStatus,
- Snapshot,
- SnapshotBranch,
- TargetType,
-)
+from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot
class CommonGitLoaderNotFound:
@pytest.fixture(autouse=True)
def __inject_fixtures(self, mocker):
"""Inject required fixtures in unittest.TestCase class"""
self.mocker = mocker
@pytest.mark.parametrize(
"failure_exception",
[
GitProtocolError("Repository unavailable"), # e.g DMCA takedown
GitProtocolError("Repository not found"),
GitProtocolError("unexpected http resp 401"),
NotGitRepository("not a git repo"),
],
)
def test_load_visit_not_found(self, failure_exception):
"""Ingesting an unknown url result in a visit with not_found status"""
# simulate an initial communication error (e.g no repository found, ...)
mock = self.mocker.patch(
"swh.loader.git.loader.GitLoader.fetch_pack_from_origin"
)
mock.side_effect = failure_exception
res = self.loader.load()
assert res == {"status": "uneventful"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="not_found",
type="git",
snapshot=None,
)
@pytest.mark.parametrize(
"failure_exception",
[
IOError,
ObjectFormatException,
OSError,
ValueError,
GitProtocolError,
],
)
def test_load_visit_failure(self, failure_exception):
"""Failing during the fetch pack step result in failing visit"""
# simulate a fetch communication error after the initial connection
# server error (e.g IOError, ObjectFormatException, ...)
mock = self.mocker.patch(
"swh.loader.git.loader.GitLoader.fetch_pack_from_origin"
)
mock.side_effect = failure_exception("failure")
res = self.loader.load()
assert res == {"status": "failed"}
assert_last_visit_matches(
self.loader.storage,
self.repo_url,
status="failed",
type="git",
snapshot=None,
)
class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound):
"""Prepare a git directory repository to be loaded through a GitLoader.
This tests all git loader scenario.
"""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path):
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
tmp_path = str(tmp_path)
self.repo_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path=tmp_path
)
self.destination_path = os.path.join(tmp_path, archive_name)
self.loader = GitLoader(swh_storage, self.repo_url)
self.repo = dulwich.repo.Repo(self.destination_path)
def test_metrics(self, mocker):
statsd_report = mocker.patch.object(self.loader.statsd, "_report")
res = self.loader.load()
assert res == {"status": "eventful"}
# TODO: assert "incremental" is added to constant tags before these
# metrics are sent
statsd_calls = statsd_report.mock_calls
assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
total_sum_name = "filtered_objects_total_sum"
total_count_name = "filtered_objects_total_count"
percent_name = "filtered_objects_percent"
assert [c for c in statsd_calls if c[1][0].startswith("filtered_")] == [
call(percent_name, "h", 0.0, {"object_type": "content"}, 1),
call(total_sum_name, "c", 0, {"object_type": "content"}, 1),
call(total_count_name, "c", 4, {"object_type": "content"}, 1),
call(percent_name, "h", 0.0, {"object_type": "directory"}, 1),
call(total_sum_name, "c", 0, {"object_type": "directory"}, 1),
call(total_count_name, "c", 7, {"object_type": "directory"}, 1),
call(percent_name, "h", 0.0, {"object_type": "revision"}, 1),
call(total_sum_name, "c", 0, {"object_type": "revision"}, 1),
call(total_count_name, "c", 7, {"object_type": "revision"}, 1),
call(percent_name, "h", 0.0, {"object_type": "snapshot"}, 1),
call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1),
call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1),
]
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
"has_parent_snapshot": False,
"has_previous_snapshot": False,
"has_parent_origins": False,
}
def test_metrics_filtered(self, mocker):
"""Tests that presence of some objects in the storage (but not referenced
by a snapshot) is reported"""
known_revs = [
converters.dulwich_commit_to_revision(self.repo[sha1])
for sha1 in [
b"b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a",
b"1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b",
]
]
known_dirs = [
converters.dulwich_tree_to_directory(self.repo[sha1])
for sha1 in [
b"fbf70528223d263661b5ad4b80f26caf3860eb8e",
b"9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338",
b"5df34ec74d6f69072d9a0a6677d8efbed9b12e60",
]
]
known_cnts = [
converters.dulwich_blob_to_content(self.repo[sha1])
for sha1 in [
b"534d61ecee4f6da4d6ca6ddd8abf258208d2d1bc",
]
]
self.loader.storage.revision_add(known_revs)
self.loader.storage.directory_add(known_dirs)
self.loader.storage.content_add(known_cnts)
self.loader.storage.flush()
statsd_report = mocker.patch.object(self.loader.statsd, "_report")
res = self.loader.load()
assert res == {"status": "eventful"}
# TODO: assert "incremental" is added to constant tags before these
# metrics are sent
statsd_calls = statsd_report.mock_calls
assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
total_sum_name = "filtered_objects_total_sum"
total_count_name = "filtered_objects_total_count"
percent_name = "filtered_objects_percent"
assert [c for c in statsd_calls if c[1][0].startswith("filtered_")] == [
call(percent_name, "h", 1 / 4, {"object_type": "content"}, 1),
call(total_sum_name, "c", 1, {"object_type": "content"}, 1),
call(total_count_name, "c", 4, {"object_type": "content"}, 1),
call(percent_name, "h", 3 / 7, {"object_type": "directory"}, 1),
call(total_sum_name, "c", 3, {"object_type": "directory"}, 1),
call(total_count_name, "c", 7, {"object_type": "directory"}, 1),
call(percent_name, "h", 2 / 7, {"object_type": "revision"}, 1),
call(total_sum_name, "c", 2, {"object_type": "revision"}, 1),
call(total_count_name, "c", 7, {"object_type": "revision"}, 1),
call(percent_name, "h", 0.0, {"object_type": "snapshot"}, 1),
call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1),
call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1),
]
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
"has_parent_snapshot": False,
"has_previous_snapshot": False,
"has_parent_origins": False,
}
+ def test_load_incremental_partial_history(self, caplog):
+ with caplog.at_level(logging.DEBUG, logger="swh.loader.git.loader"):
+ super().test_load_incremental_partial_history()
+
+ # Check that we've indeed inferred the target type for one of the snapshot
+ # branches
+ for record in caplog.records:
+ if (
+ hasattr(record, "swh_type")
+ and record.swh_type == "swh_loader_git_inferred_target_type"
+ ):
+ assert record.args == (
+ "REVISION",
+ "refs/heads/master",
+ SNAPSHOT1.branches[b"refs/heads/master"].target.hex(),
+ )
+ break
+ else:
+ assert False, "did not find log message for inferred branch target type"
+
class TestGitLoader2(FullGitLoaderTests, CommonGitLoaderNotFound):
"""Mostly the same loading scenario but with a ``parent_origin`` different from the
``origin``; as if the ``origin`` was a forge-fork of ``parent_origin``, detected
by the metadata loader.
To walk slightly different paths, the end result should stay the same.
"""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path, mocker):
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
tmp_path = str(tmp_path)
self.repo_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path=tmp_path
)
self.destination_path = os.path.join(tmp_path, archive_name)
self.fetcher = MagicMock()
self.fetcher.get_origin_metadata.return_value = []
self.fetcher.get_parent_origins.return_value = [
Origin(url=f"base://{self.repo_url}")
]
self.fetcher_cls = MagicMock(return_value=self.fetcher)
self.fetcher_cls.SUPPORTED_LISTERS = ["fake-lister"]
mocker.patch(
"swh.loader.core.metadata_fetchers._fetchers",
return_value=[self.fetcher_cls],
)
self.loader = GitLoader(
MagicMock(wraps=swh_storage),
self.repo_url,
lister_name="fake-lister",
lister_instance_name="",
)
self.repo = dulwich.repo.Repo(self.destination_path)
def test_no_previous_snapshot(self, mocker):
statsd_report = mocker.patch.object(self.loader.statsd, "_report")
res = self.loader.load()
assert res == {"status": "eventful"}
self.fetcher_cls.assert_called_once_with(
credentials={},
lister_name="fake-lister",
lister_instance_name="",
origin=Origin(url=self.repo_url),
)
self.fetcher.get_parent_origins.assert_called_once_with()
# First tries the same origin
assert self.loader.storage.origin_visit_get_latest.mock_calls == [
call(
self.repo_url,
allowed_statuses=None,
require_snapshot=True,
type=None,
),
# As it does not already have a snapshot, fall back to the parent origin
call(
f"base://{self.repo_url}",
allowed_statuses=None,
require_snapshot=True,
type=None,
),
]
# TODO: assert "incremental" is added to constant tags before these
# metrics are sent
assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
"has_parent_snapshot": False,
"has_previous_snapshot": False,
"has_parent_origins": True,
}
def test_load_incremental(self, mocker):
statsd_report = mocker.patch.object(self.loader.statsd, "_report")
snapshot_id = b"\x01" * 20
now = datetime.datetime.now(tz=datetime.timezone.utc)
def ovgl(origin_url, allowed_statuses, require_snapshot, type):
if origin_url == f"base://{self.repo_url}":
return OriginVisit(origin=origin_url, visit=42, date=now, type="git")
else:
return None
self.loader.storage.origin_visit_get_latest.side_effect = ovgl
self.loader.storage.origin_visit_status_get_latest.return_value = (
OriginVisitStatus(
origin=f"base://{self.repo_url}",
visit=42,
snapshot=snapshot_id,
date=now,
status="full",
)
)
self.loader.storage.snapshot_get_branches.return_value = {
"id": snapshot_id,
"branches": {
b"refs/heads/master": SNAPSHOT1.branches[b"refs/heads/master"]
},
"next_branch": None,
}
res = self.loader.load()
assert res == {"status": "eventful"}
self.fetcher_cls.assert_called_once_with(
credentials={},
lister_name="fake-lister",
lister_instance_name="",
origin=Origin(url=self.repo_url),
)
self.fetcher.get_parent_origins.assert_called_once_with()
# First tries the same origin
assert self.loader.storage.origin_visit_get_latest.mock_calls == [
call(
self.repo_url,
allowed_statuses=None,
require_snapshot=True,
type=None,
),
# As it does not already have a snapshot, fall back to the parent origin
call(
f"base://{self.repo_url}",
allowed_statuses=None,
require_snapshot=True,
type=None,
),
]
# TODO: assert "incremental*" is added to constant tags before these
# metrics are sent
assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.25, {}, 1),
]
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
"has_parent_snapshot": True,
"has_previous_snapshot": False,
"has_parent_origins": True,
}
self.fetcher.reset_mock()
self.fetcher_cls.reset_mock()
if sys.version_info >= (3, 9, 0):
self.loader.storage.reset_mock(return_value=True, side_effect=True)
else:
# Reimplement https://github.com/python/cpython/commit/aef7dc89879d099dc704bd8037b8a7686fb72838 # noqa
# for old Python versions:
def reset_mock(m):
m.reset_mock(return_value=True, side_effect=True)
for child in m._mock_children.values():
reset_mock(child)
reset_mock(self.loader.storage)
statsd_report.reset_mock()
# Load again
res = self.loader.load()
assert res == {"status": "uneventful"}
self.fetcher_cls.assert_called_once_with(
credentials={},
lister_name="fake-lister",
lister_instance_name="",
origin=Origin(url=self.repo_url),
)
self.fetcher.get_parent_origins.assert_not_called()
assert self.loader.storage.origin_visit_get_latest.mock_calls == [
# Tries the same origin, and finds a snapshot
call(
self.repo_url,
type=None,
allowed_statuses=None,
require_snapshot=True,
),
# also fetches the parent, in case the origin was rebased on the parent
# since the last visit
call(
f"base://{self.repo_url}",
type=None,
allowed_statuses=None,
require_snapshot=True,
),
]
# TODO: assert "incremental*" is added to constant tags before these
# metrics are sent
assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 1.0, {}, 1),
]
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
"has_parent_snapshot": False, # Because we reset the mock since last time
"has_previous_snapshot": True,
"has_parent_origins": True,
}
@pytest.mark.parametrize(
"parent_snapshot,previous_snapshot,expected_git_known_refs_percent",
[
pytest.param(
Snapshot(
branches={
b"refs/heads/master": SNAPSHOT1.branches[b"refs/heads/master"]
}
),
Snapshot(branches={}),
0.25,
id="partial-parent-and-empty-previous",
),
pytest.param(
SNAPSHOT1,
Snapshot(
branches={
b"refs/heads/master": SNAPSHOT1.branches[b"refs/heads/master"]
}
),
1.0,
id="full-parent-and-partial-previous",
),
],
)
def test_load_incremental_from(
self,
parent_snapshot,
previous_snapshot,
expected_git_known_refs_percent,
mocker,
):
"""Snapshot of parent origin has all branches, but previous snapshot was
empty."""
statsd_report = mocker.patch.object(self.loader.statsd, "_report")
now = datetime.datetime.now(tz=datetime.timezone.utc)
self.loader.storage.snapshot_add([parent_snapshot, previous_snapshot])
self.loader.storage.origin_add(
[Origin(url=f"base://{self.repo_url}"), Origin(url=self.repo_url)]
)
self.loader.storage.origin_visit_add(
[
OriginVisit(
origin=f"base://{self.repo_url}",
visit=42,
date=now - datetime.timedelta(seconds=-1),
type="git",
),
OriginVisit(
origin=self.repo_url,
visit=42,
date=now - datetime.timedelta(seconds=-1),
type="git",
),
]
)
self.loader.storage.origin_visit_status_add(
[
OriginVisitStatus(
origin=f"base://{self.repo_url}",
visit=42,
type="git",
snapshot=parent_snapshot.id,
date=now,
status="full",
),
OriginVisitStatus(
origin=self.repo_url,
visit=42,
type="git",
snapshot=previous_snapshot.id,
date=now,
status="full",
),
]
)
self.loader.storage.flush()
res = self.loader.load()
assert res == {"status": "eventful"}
self.fetcher_cls.assert_called_once_with(
credentials={},
lister_name="fake-lister",
lister_instance_name="",
origin=Origin(url=self.repo_url),
)
self.fetcher.get_parent_origins.assert_called_once_with()
# First tries the same origin
assert self.loader.storage.origin_visit_get_latest.mock_calls == [
call(
self.repo_url,
allowed_statuses=None,
require_snapshot=True,
type=None,
),
# As it does not already have a snapshot, fall back to the parent origin
call(
f"base://{self.repo_url}",
allowed_statuses=None,
require_snapshot=True,
type=None,
),
]
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
"has_parent_snapshot": True,
"has_previous_snapshot": True,
"has_parent_origins": True,
}
assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", expected_git_known_refs_percent, {}, 1),
]
- def test_load_incremental_negotiation(self):
- """Check that the packfile negotiated when running an incremental load only
- contains the "new" commits, and not all objects."""
-
- snapshot_id = b"\x01" * 20
- now = datetime.datetime.now(tz=datetime.timezone.utc)
-
- def ovgl(origin_url, allowed_statuses, require_snapshot, type):
- if origin_url == f"base://{self.repo_url}":
- return OriginVisit(origin=origin_url, visit=42, date=now, type="git")
- else:
- return None
-
- self.loader.storage.origin_visit_get_latest.side_effect = ovgl
- self.loader.storage.origin_visit_status_get_latest.return_value = (
- OriginVisitStatus(
- origin=f"base://{self.repo_url}",
- visit=42,
- snapshot=snapshot_id,
- date=now,
- status="full",
- )
- )
- self.loader.storage.snapshot_get_branches.return_value = {
- "id": snapshot_id,
- "branches": {
- b"refs/heads/master": SnapshotBranch(
- # id of the initial commit in the git repository fixture
- target=bytes.fromhex("b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a"),
- target_type=TargetType.REVISION,
- ),
- },
- "next_branch": None,
- }
-
- res = self.loader.load()
- assert res == {"status": "eventful"}
-
- stats = get_stats(self.loader.storage)
- assert stats == {
- "content": 3, # instead of 4 for the full repository
- "directory": 6, # instead of 7
- "origin": 1,
- "origin_visit": 1,
- "release": 0,
- "revision": 6, # instead of 7
- "skipped_content": 0,
- "snapshot": 1,
- }
-
class DumbGitLoaderTestBase(FullGitLoaderTests):
"""Prepare a git repository to be loaded using the HTTP dumb transfer protocol."""
@pytest.fixture(autouse=True)
def init(self, swh_storage, datadir, tmp_path):
# remove any proxy settings in order to successfully spawn a local HTTP server
http_proxy = os.environ.get("http_proxy")
https_proxy = os.environ.get("https_proxy")
if http_proxy:
del os.environ["http_proxy"]
if http_proxy:
del os.environ["https_proxy"]
# prepare test base repository using smart transfer protocol
archive_name = "testrepo"
archive_path = os.path.join(datadir, f"{archive_name}.tgz")
tmp_path = str(tmp_path)
base_repo_url = prepare_repository_from_archive(
archive_path, archive_name, tmp_path=tmp_path
)
destination_path = os.path.join(tmp_path, archive_name)
self.destination_path = destination_path
with_pack_files = self.with_pack_files
if with_pack_files:
# create a bare clone of that repository in another folder,
# all objects will be contained in one or two pack files in that case
http_root_dir = tmp_path
repo_name = archive_name + "_bare"
bare_repo_path = os.path.join(http_root_dir, repo_name)
subprocess.run(
["git", "clone", "--bare", base_repo_url, bare_repo_path],
check=True,
)
else:
# otherwise serve objects from the bare repository located in
# the .git folder of the base repository
http_root_dir = destination_path
repo_name = ".git"
bare_repo_path = os.path.join(http_root_dir, repo_name)
# spawn local HTTP server that will serve the bare repository files
hostname = "localhost"
handler = partial(SimpleHTTPRequestHandler, directory=http_root_dir)
httpd = HTTPServer((hostname, 0), handler, bind_and_activate=True)
def serve_forever(httpd):
with httpd:
httpd.serve_forever()
thread = Thread(target=serve_forever, args=(httpd,))
thread.start()
repo = dulwich.repo.Repo(self.destination_path)
class DumbGitLoaderTest(GitLoader):
def load(self):
"""
Override load method to ensure the bare repository will be synchronized
with the base one as tests can modify its content.
"""
if with_pack_files:
# ensure HEAD ref will be the same for both repositories
with open(os.path.join(bare_repo_path, "HEAD"), "wb") as fw:
with open(
os.path.join(destination_path, ".git/HEAD"), "rb"
) as fr:
head_ref = fr.read()
fw.write(head_ref)
# push possibly modified refs in the base repository to the bare one
for ref in repo.refs.allkeys():
if ref != b"HEAD" or head_ref in repo.refs:
push(
repo,
remote_location=f"file://{bare_repo_path}",
refspecs=ref,
)
# generate or update the info/refs file used in dumb protocol
subprocess.run(
["git", "-C", bare_repo_path, "update-server-info"],
check=True,
)
return super().load()
# bare repository with dumb protocol only URL
self.repo_url = f"http://{httpd.server_name}:{httpd.server_port}/{repo_name}"
self.loader = DumbGitLoaderTest(swh_storage, self.repo_url)
self.repo = repo
yield
# shutdown HTTP server
httpd.shutdown()
thread.join()
# restore HTTP proxy settings if any
if http_proxy:
os.environ["http_proxy"] = http_proxy
if https_proxy:
os.environ["https_proxy"] = https_proxy
@pytest.mark.parametrize(
"failure_exception", [AttributeError, NotImplementedError, ValueError]
)
def test_load_despite_dulwich_exception(self, mocker, failure_exception):
"""Checks repository can still be loaded when dulwich raises exception
when encountering a repository with dumb transfer protocol.
"""
fetch_pack_from_origin = mocker.patch(
"swh.loader.git.loader.GitLoader.fetch_pack_from_origin"
)
fetch_pack_from_origin.side_effect = failure_exception("failure")
res = self.loader.load()
assert res == {"status": "eventful"}
stats = get_stats(self.loader.storage)
assert stats == {
"content": 4,
"directory": 7,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 7,
"skipped_content": 0,
"snapshot": 1,
}
def test_load_empty_repository(self, mocker):
class GitObjectsFetcherNoRefs(dumb.GitObjectsFetcher):
def _get_refs(self):
return {}
mocker.patch.object(dumb, "GitObjectsFetcher", GitObjectsFetcherNoRefs)
res = self.loader.load()
assert res == {"status": "uneventful"}
stats = get_stats(self.loader.storage)
assert stats == {
"content": 0,
"directory": 0,
"origin": 1,
"origin_visit": 1,
"release": 0,
"revision": 0,
"skipped_content": 0,
"snapshot": 1,
}
class TestDumbGitLoaderWithPack(DumbGitLoaderTestBase):
@classmethod
def setup_class(cls):
cls.with_pack_files = True
def test_load_with_missing_pack(self, mocker):
"""Some dumb git servers might reference a no longer existing pack file
while it is possible to load a repository without it.
"""
class GitObjectsFetcherMissingPack(dumb.GitObjectsFetcher):
def _http_get(self, path: str) -> SpooledTemporaryFile:
buffer = super()._http_get(path)
if path == "objects/info/packs":
# prepend a non existing pack to the returned packs list
packs = buffer.read().decode("utf-8")
buffer.seek(0)
buffer.write(
(
"P pack-a70762ba1a901af3a0e76de02fc3a99226842745.pack\n"
+ packs
).encode()
)
buffer.flush()
buffer.seek(0)
return buffer
mocker.patch.object(dumb, "GitObjectsFetcher", GitObjectsFetcherMissingPack)
res = self.loader.load()
assert res == {"status": "eventful"}
class TestDumbGitLoaderWithoutPack(DumbGitLoaderTestBase):
@classmethod
def setup_class(cls):
cls.with_pack_files = False
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 6:36 PM (5 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3246602
Attached To
rDLDG Git loader
Event Timeline
Log In to Comment