diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -4,13 +4,14 @@ # See top-level LICENSE file for more information from collections import defaultdict +from dataclasses import dataclass import datetime from io import BytesIO import logging import os import pickle import sys -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Set import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker @@ -34,180 +35,83 @@ from . import converters +def ignore_branch_name(branch_name: bytes) -> bool: + """Should the git loader ignore the branch named `branch_name`?""" + if branch_name.endswith(b"^{}"): + # Peeled refs make the git protocol explode + return True + elif branch_name.startswith(b"refs/pull/") and branch_name.endswith(b"/merge"): + # We filter-out auto-merged GitHub pull requests + return True + + return False + + +def filter_refs(refs: Dict[bytes, bytes]) -> Dict[bytes, bytes]: + """Filter the refs dictionary using the policy set in `ignore_branch_name`""" + return { + name: target for name, target in refs.items() if not ignore_branch_name(name) + } + + class RepoRepresentation: """Repository representation for a Software Heritage origin.""" - def __init__(self, storage, base_snapshot=None, ignore_history=False): + def __init__( + self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False + ): self.storage = storage - - self._parents_cache = {} - self._type_cache: Dict[bytes, TargetType] = {} - self.ignore_history = ignore_history if base_snapshot and not ignore_history: - self.heads = set(self._cache_heads(base_snapshot)) + self.base_snapshot: Snapshot = base_snapshot else: - self.heads = set() - - def _fill_parents_cache(self, commits): - """When querying for a commit's parents, we fill the cache to a depth of 1000 - commits.""" - root_revs = self._encode_for_storage(commits) - for rev, parents in self.storage.revision_shortlog(root_revs, 1000): - rev_id = hashutil.hash_to_bytehex(rev) - if rev_id not in self._parents_cache: - self._parents_cache[rev_id] = [ - hashutil.hash_to_bytehex(parent) for parent in parents - ] - for rev in commits: - if rev not in self._parents_cache: - self._parents_cache[rev] = [] - - def _cache_heads(self, base_snapshot): - """Return all the known head commits for the given snapshot""" - _git_types = list(converters.DULWICH_TARGET_TYPES.values()) - - if not base_snapshot: - return [] - - snapshot_targets = set() - for branch in base_snapshot.branches.values(): - if branch and branch.target_type in _git_types: - snapshot_targets.add(branch.target) + self.base_snapshot = Snapshot(branches={}) - decoded_targets = self._decode_from_storage(snapshot_targets) + self.heads: Set[bytes] = set() - for id, objs in self.get_stored_objects(decoded_targets).items(): - if not objs: - logging.warn("Missing head: %s" % hashutil.hash_to_hex(id)) - return [] - - return decoded_targets - - def get_parents(self, commit): - """Bogus method to prevent expensive recursion, at the expense of less - efficient downloading""" + def get_parents(self, commit: bytes) -> List[bytes]: + """This method should return the list of known parents""" return [] - def get_heads(self): - return self.heads - - @staticmethod - def _encode_for_storage(objects): - return [hashutil.bytehex_to_hash(object) for object in objects] + def graph_walker(self) -> ObjectStoreGraphWalker: + return ObjectStoreGraphWalker(self.heads, self.get_parents) - @staticmethod - def _decode_from_storage(objects): - return set(hashutil.hash_to_bytehex(object) for object in objects) + def determine_wants(self, refs: Dict[bytes, bytes]) -> List[bytes]: + """Get the list of bytehex sha1s that the git loader should fetch. - def graph_walker(self): - return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) + This compares the remote refs sent by the server with the base snapshot + provided by the loader. - @staticmethod - def filter_unwanted_refs(refs): - """Filter the unwanted references from refs""" - ret = {} - for ref, val in refs.items(): - if ref.endswith(b"^{}"): - # Peeled refs make the git protocol explode - continue - elif ref.startswith(b"refs/pull/") and ref.endswith(b"/merge"): - # We filter-out auto-merged GitHub pull requests - continue - else: - ret[ref] = val - - return ret - - def determine_wants(self, refs): - """Filter the remote references to figure out which ones - Software Heritage needs. """ if not refs: return [] - # Find what objects Software Heritage has - refs = self.find_remote_ref_types_in_swh(refs) - - # Cache the objects found in swh as existing heads - for target in refs.values(): - if target["target_type"] is not None: - self.heads.add(target["target"]) - - ret = set() - for target in self.filter_unwanted_refs(refs).values(): - if target["target_type"] is None: - # The target doesn't exist in Software Heritage, let's retrieve - # it. - ret.add(target["target"]) - - return list(ret) - - def _get_stored_objects_batch( - self, query - ) -> Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]]: - results = self.storage.object_find_by_sha1_git(self._encode_for_storage(query)) - ret: Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]] = {} - for (id, objects) in results.items(): - assert id not in ret - ret[id] = [ - {"sha1_git": obj["sha1_git"], "type": TargetType(obj["type"]),} - for obj in objects - ] - return ret - - def get_stored_objects( - self, objects - ) -> Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]]: - """Find which of these objects were stored in the archive. - - Do the request in packets to avoid a server timeout. - """ - if self.ignore_history: - return {} - - packet_size = 1000 - - ret: Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]] = {} - query = [] - for object in objects: - query.append(object) - if len(query) >= packet_size: - ret.update(self._get_stored_objects_batch(query)) - query = [] - if query: - ret.update(self._get_stored_objects_batch(query)) - return ret - - def find_remote_ref_types_in_swh(self, remote_refs) -> Dict[bytes, Dict[str, Any]]: - """Parse the remote refs information and list the objects that exist in - Software Heritage. - - Returns: - dict whose keys are branch names, and values are tuples - `(target, target_type)`. - """ + # Cache existing heads + local_heads: Set[bytes] = set() + for branch_name, branch in self.base_snapshot.branches.items(): + if not branch or branch.target_type == TargetType.ALIAS: + continue + local_heads.add(hashutil.hash_to_hex(branch.target).encode()) - all_objs = set(remote_refs.values()) - set(self._type_cache) - type_by_id: Dict[bytes, TargetType] = {} + self.heads = local_heads - for id, objs in self.get_stored_objects(all_objs).items(): - id = hashutil.hash_to_bytehex(id) - if objs: - type_ = objs[0]["type"] - assert isinstance(type_, TargetType) - type_by_id[id] = type_ + # Get the remote heads that we want to fetch + remote_heads: Set[bytes] = set() + for ref_name, ref_target in refs.items(): + if ignore_branch_name(ref_name): + continue + remote_heads.add(ref_target) - self._type_cache.update(type_by_id) + return list(remote_heads - local_heads) - ret = {} - for ref, id in remote_refs.items(): - ret[ref] = { - "target": id, - "target_type": self._type_cache.get(id), - } - return ret + +@dataclass +class FetchPackReturn: + remote_refs: Dict[bytes, bytes] + symbolic_refs: Dict[bytes, bytes] + pack_buffer: BytesIO + pack_size: int class GitLoader(DVCSLoader): @@ -242,9 +146,9 @@ self.base_url = base_url self.ignore_history = ignore_history self.repo_representation = repo_representation + # state initialized in fetch_data self.remote_refs = [] - self.local_refs = {} self.symbolic_refs = {} def fetch_pack_from_origin(self, origin_url, base_snapshot, do_activity): @@ -283,25 +187,19 @@ progress=do_activity, ) - remote_refs = pack_result.refs - symbolic_refs = pack_result.symrefs - - if remote_refs: - local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) - else: - local_refs = remote_refs = {} + remote_refs = pack_result.refs or {} + symbolic_refs = pack_result.symrefs or {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) - return { - "remote_refs": base_repo.filter_unwanted_refs(remote_refs), - "local_refs": local_refs, - "symbolic_refs": symbolic_refs, - "pack_buffer": pack_buffer, - "pack_size": pack_size, - } + return FetchPackReturn( + remote_refs=filter_refs(remote_refs), + symbolic_refs=filter_refs(symbolic_refs), + pack_buffer=pack_buffer, + pack_size=pack_size, + ) def list_pack(self, pack_data, pack_size): id_to_type = {} @@ -338,15 +236,17 @@ if not self.ignore_history: prev_snapshot = self.get_full_snapshot(origin_url) - if self.base_url and not prev_snapshot: + if self.base_url and prev_snapshot is None: base_origin = Origin(url=self.base_url) base_origin = self.storage.origin_get(base_origin) if base_origin: - base_origin_url = base_origin["url"] + base_origin_url = base_origin.url prev_snapshot = self.get_full_snapshot(base_origin_url) - self.base_snapshot = prev_snapshot - self.base_origin_url = base_origin_url + if prev_snapshot is not None: + self.base_snapshot = prev_snapshot + else: + self.base_snapshot = Snapshot(branches={}) def fetch_data(self): def do_progress(msg): @@ -357,12 +257,11 @@ self.origin.url, self.base_snapshot, do_progress ) - self.pack_buffer = fetch_info["pack_buffer"] - self.pack_size = fetch_info["pack_size"] + self.pack_buffer = fetch_info.pack_buffer + self.pack_size = fetch_info.pack_size - self.remote_refs = fetch_info["remote_refs"] - self.local_refs = fetch_info["local_refs"] - self.symbolic_refs = fetch_info["symbolic_refs"] + self.remote_refs = fetch_info.remote_refs + self.symbolic_refs = fetch_info.symbolic_refs origin_url = self.origin.url @@ -381,6 +280,9 @@ self.id_to_type = id_to_type self.type_to_ids = type_to_ids + # No more data to fetch + return False + def save_data(self): """Store a pack for archival""" @@ -505,22 +407,77 @@ yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def get_snapshot(self) -> Snapshot: - branches: Dict[bytes, Optional[SnapshotBranch]] = {} + """Get the snapshot for the current visit. + + The main complexity of this function is mapping target objects to their + types, as the `refs` dictionaries returned by the git server only give + us the identifiers for the target objects, and not their types. + + The loader itself only knows the types of the objects that it has + fetched from the server (as it has parsed them while loading them to + the archive). As we only fetched an increment between the previous + snapshot and the current state of the server, we are missing the type + information for the objects that would already have been referenced by + the previous snapshot, and that the git server didn't send us. We infer + the type of these objects from the previous snapshot. - for ref in self.remote_refs: - ret_ref = self.local_refs[ref].copy() - if not ret_ref["target_type"]: - target_type = self.id_to_type[ret_ref["target"]] - ret_ref["target_type"] = converters.DULWICH_TARGET_TYPES[target_type] + """ + branches: Dict[bytes, Optional[SnapshotBranch]] = {} - ret_ref["target"] = hashutil.bytehex_to_hash(ret_ref["target"]) + unfetched_refs: Dict[bytes, bytes] = {} - branches[ref] = SnapshotBranch( - target_type=ret_ref["target_type"], target=ret_ref["target"], + # Retrieve types from the objects loaded by the current loader + for ref_name, ref_object in self.remote_refs.items(): + if ref_name in self.symbolic_refs: + continue + target = hashutil.hash_to_bytes(ref_object.decode()) + object_type = self.id_to_type.get(ref_object) + if object_type: + branches[ref_name] = SnapshotBranch( + target=target, + target_type=converters.DULWICH_TARGET_TYPES[object_type], + ) + else: + # The object pointed at by this ref was not fetched, supposedly + # because it existed in the base snapshot. We record it here, + # and we can get it from the base snapshot later. + unfetched_refs[ref_name] = target + + # Handle symbolic references as alias branches + for ref_name, target in self.symbolic_refs.items(): + branches[ref_name] = SnapshotBranch( + target_type=TargetType.ALIAS, target=target, ) - for ref, target in self.symbolic_refs.items(): - branches[ref] = SnapshotBranch(target_type=TargetType.ALIAS, target=target,) + if unfetched_refs: + # Handle inference of object types from the contents of the + # previous snapshot + unknown_objects = {} + base_snapshot_reverse_branches = { + branch.target: branch + for branch in self.base_snapshot.branches.values() + if branch and branch.target_type != TargetType.ALIAS + } + + for ref_name, target in unfetched_refs.items(): + branch = base_snapshot_reverse_branches.get(target) + branches[ref_name] = branch + if not branch: + unknown_objects[ref_name] = target + + if unknown_objects: + # This object was referenced by the server; We did not fetch + # it, and we do not know it from the previous snapshot. This is + # likely a bug in the loader. + raise RuntimeError( + "Unknown objects referenced by remote refs: %s" + % ( + ", ".join( + f"{name.decode()}: {hashutil.hash_to_hex(obj)}" + for name, obj in unknown_objects.items() + ) + ) + ) self.snapshot = Snapshot(branches=branches) return self.snapshot