diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 3.5.0 +swh.loader.core >= 3.6.0 swh.model >= 4.3.0 swh.scheduler >= 0.0.39 swh.storage >= 0.22.0 diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py --- a/swh/loader/git/base.py +++ b/swh/loader/git/base.py @@ -128,7 +128,7 @@ self.statsd.increment("filtered_objects_total_count", total, tags=tags) self.log.info( - "Fetched %d objects; %d are new", + "store_data got %d objects; %d are new", sum(counts.values()), sum(storage_summary[f"{object_type}:add"] for object_type in counts), ) diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -3,20 +3,21 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import collections from dataclasses import dataclass import datetime import logging import os import pickle import sys -from tempfile import SpooledTemporaryFile +import tempfile from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type import dulwich.client from dulwich.errors import GitProtocolError, NotGitRepository from dulwich.object_store import ObjectStoreGraphWalker from dulwich.objects import ShaFile -from dulwich.pack import PackData, PackInflater +from dulwich.pack import PackData, PackInflater, PackIndex2, Pack, MemoryPackIndex from swh.core.statsd import Statsd from swh.loader.exception import NotFound @@ -113,11 +114,18 @@ return wanted_refs +class BetterMemoryPackIndex(MemoryPackIndex): + """It's better because it doesn't crash""" + + def _object_index(self, sha): + return self._by_sha[sha] + + @dataclass class FetchPackReturn: remote_refs: Dict[bytes, HexBytes] symbolic_refs: Dict[bytes, HexBytes] - pack_buffer: SpooledTemporaryFile + pack_buffer: tempfile.SpooledTemporaryFile pack_size: int @@ -155,6 +163,7 @@ repo_representation: Type[RepoRepresentation] = RepoRepresentation, pack_size_bytes: int = 4 * 1024 * 1024 * 1024, temp_file_cutoff: int = 100 * 1024 * 1024, + traverse_graph: bool = True, **kwargs: Any, ): """Initialize the bulk updater. @@ -179,6 +188,11 @@ self.symbolic_refs: Dict[bytes, HexBytes] = {} self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} + self.traverse_graph = traverse_graph + self.reachable_from_known_refs: Set[bytes] = set() + """Set of all git objects that are referenced by objects known to be already + in storage prior to this load.""" + def fetch_pack_from_origin( self, origin_url: str, @@ -187,7 +201,7 @@ ) -> FetchPackReturn: """Fetch a pack from the origin""" - pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff) + pack_buffer = tempfile.SpooledTemporaryFile(max_size=self.temp_file_cutoff) # Hardcode the use of the tcp transport (for GitHub origins) @@ -282,6 +296,7 @@ prev_snapshot = self.get_full_snapshot(self.origin.url) self.statsd.constant_tags["has_previous_snapshot"] = bool(prev_snapshot) if prev_snapshot: + logger.info("Loading from previous snapshot") self.prev_snapshot = prev_snapshot self.base_snapshots.append(prev_snapshot) @@ -292,9 +307,15 @@ for parent_origin in self.parent_origins: parent_snapshot = self.get_full_snapshot(parent_origin.url) if parent_snapshot is not None: + logger.info("Loading from parent origin: %s", parent_origin.url) self.statsd.constant_tags["has_parent_snapshot"] = True self.base_snapshots.append(parent_snapshot) + # Initialize the set of reachable objects + for snapshot in self.base_snapshots: + for branch in snapshot.branches.values(): + self.reachable_from_known_refs.add(branch.target) + # Increments a metric with full name 'swh_loader_git'; which is useful to # count how many runs of the loader are with each incremental mode self.statsd.increment("git_total", tags={}) @@ -393,6 +414,91 @@ with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) + def process_data(self) -> bool: + self.compute_known_objects() + return True + + def compute_known_objects(self): + """Runs a graph traversal on all objects set by the remote, to update + :attr:`reachable_from_known_refs`. + """ + if self.dumb: + # When using the smart protocol, the remote may send us many objects + # we told it we already have. + return + + self.total_objects_in_pack = collections.defaultdict(int) + + if self.traverse_graph: + logger.debug( + "computing reachable_from_known_refs (starting from %s)", + len(self.reachable_from_known_refs), + ) + else: + logger.debug( + "skipping computation of reachable_from_known_refs (starting from %s)", + len(self.reachable_from_known_refs), + ) + return + + # Adjacency lists of the graph of all objects received from the remote. + # TODO: use tuple() instead of set() for small sets? they are almost as fast, + # and have 160 fewer bytes of overhead + edges: Dict[bytes, Set[bytes]] = {} + + self.pack_buffer.seek(0) + for obj in PackInflater.for_pack_data( + PackData.from_file(self.pack_buffer, self.pack_size) + ): + id_ = obj.sha().digest() + self.total_objects_in_pack[obj.type_name] += 1 + if id_ in edges: + # Duplicate object? Skip it + continue + elif obj.type_name == b"blob": + continue + elif obj.type_name == b"tree": + # skip it, takes too much memory + continue + edges[id_] = { + hashutil.hash_to_bytes(entry.sha.decode("ascii")) + for entry in obj.iteritems() + } + elif obj.type_name == b"commit": + edges[id_] = { + hashutil.hash_to_bytes(parent.decode("ascii")) + for parent in obj.parents + } + # skip it, takes too much memory + continue + edges[id_].add(hashutil.hash_to_bytes(obj.tree.decode("ascii"))) + elif obj.type_name == b"tag": + (target_type, target) = obj.object + edges[id_] = {hashutil.hash_to_bytes(target.decode("ascii"))} + else: + raise ValueError(f"Unexpected object type: {obj.type_name!r}") + + logger.debug( + "nodes=%s edges=%s", + len(edges), + sum(map(len, edges.values())), + ) + + to_visit = set(self.reachable_from_known_refs) + while to_visit: + node = to_visit.pop() + parents = edges.pop(node, set()) + + # already processed or about to be, don't visit them again + parents.difference_update(self.reachable_from_known_refs) + + self.reachable_from_known_refs.update(parents) + to_visit.update(parents) + + logger.debug( + "result: reachable_from_known_refs=%s", len(self.reachable_from_known_refs) + ) + def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: """Read all the objects of type `object_type` from the packfile""" if self.dumb: @@ -409,9 +515,89 @@ count += 1 logger.debug("packfile_read_count_%s=%s", object_type.decode(), count) + def iter_new_objects(self, object_type: bytes) -> Iterator[ShaFile]: + """Read all the objects of type `object_type` from the packfile, + yielding only those not reachable from any of the base snapshots. + + This differs from :meth:`iter_objects` when origins send objects + reachable from the ``known`` set in the packfiles.""" + if self.dumb or not self.traverse_graph: + yield from self.iter_objects(object_type) + else: + to_visit = set() + seen = set() + self.pack_buffer.seek(0) + pack_data = PackData.from_file(self.pack_buffer, self.pack_size) + for obj in PackInflater.for_pack_data(pack_data): + if obj.type_name in (b"commit", b"tag"): + id_ = obj.sha().digest() + if ( + hashutil.hash_to_bytes(id_) + not in self.reachable_from_known_refs + ): + to_visit.add(id_) + + index_file = tempfile.NamedTemporaryFile() + # pack_data.create_index_v2(index_file.name) + # pack_index = PackIndex2(index_file.name) + pack_index = BetterMemoryPackIndex( + pack_data.sorted_entries(), + pack_data.get_stored_checksum(), # why is this argument needed? + ) + pack = Pack.from_objects(pack_data, pack_index) + while to_visit: + id_ = hashutil.hash_to_bytehex(to_visit.pop()) + # offset = pack_index.object_index(id_) + # (offset, type_, (raw_obj,)) = pack_data.get_ref(id_) + obj = pack[id_] + + assert id_ == obj.id + assert hashutil.hash_to_bytes(id_) not in self.reachable_from_known_refs + if obj.type_name == object_type: + if id_ not in seen: + seen.add(id_) + yield obj + + if obj.type_name == b"tree": + to_visit.update( + { + hashutil.hash_to_bytes(entry.sha.decode("ascii")) + for entry in obj.iteritems() + } + ) + elif obj.type_name == b"commit": + # relevant parents parents are already in to_visit + to_visit.add(hashutil.hash_to_bytes(obj.tree.decode("ascii"))) + elif obj.type_name == b"tag": + (target_type, target) = obj.object + # TODO: don't do it if it is a commit + to_visit.add(hashutil.hash_to_bytes(target.decode("ascii"))) + + tags = {"object_type": object_type.decode()} + + # average weighted by total + self.statsd.increment( + "unwanted_packfile_objects_total_sum", + self.total_objects_in_pack[object_type] - len(seen), + tags=tags, + ) + self.statsd.increment( + "unwanted_packfile_objects_total_count", + self.total_objects_in_pack[object_type], + tags=tags, + ) + + self.log.info( + "packfile contains %d %s objects; %d appear to be unreachable " + "from known snapshots", + self.total_objects_in_pack[object_type], + object_type.decode(), + len(seen), + ) + def get_contents(self) -> Iterable[BaseContent]: """Format the blobs from the git repository as swh contents""" - for raw_obj in self.iter_objects(b"blob"): + for raw_obj in self.iter_new_objects(b"blob"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.CONTENT @@ -421,7 +607,7 @@ def get_directories(self) -> Iterable[Directory]: """Format the trees as swh directories""" - for raw_obj in self.iter_objects(b"tree"): + for raw_obj in self.iter_new_objects(b"tree"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY @@ -429,7 +615,7 @@ def get_revisions(self) -> Iterable[Revision]: """Format commits as swh revisions""" - for raw_obj in self.iter_objects(b"commit"): + for raw_obj in self.iter_new_objects(b"commit"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.REVISION @@ -437,7 +623,7 @@ def get_releases(self) -> Iterable[Release]: """Retrieve all the release objects from the git repository""" - for raw_obj in self.iter_objects(b"tag"): + for raw_obj in self.iter_new_objects(b"tag"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.RELEASE diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -7,6 +7,7 @@ from functools import partial from http.server import HTTPServer, SimpleHTTPRequestHandler import os +import re import subprocess import sys from tempfile import SpooledTemporaryFile @@ -126,6 +127,19 @@ call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", 0.0, {}, 1), ] + + # all 0, because they are not referenced by any snapshot + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + 0, + {"object_type": object_type}, + 1, + ) + for object_type in ("blob", "tree", "commit", "tag") + ] + total_sum_name = "filtered_objects_total_sum" total_count_name = "filtered_objects_total_count" percent_sum_name = "filtered_objects_percent_sum" @@ -148,6 +162,7 @@ call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1), call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1), ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -220,6 +235,7 @@ call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1), call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1), ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -300,11 +316,24 @@ # TODO: assert "incremental" is added to constant tags before these # metrics are sent - assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [ + statsd_calls = statsd_report.mock_calls + assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [ call("git_total", "c", 1, {}, 1), call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", 0.0, {}, 1), ] + + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + 0, + {"object_type": object_type}, + 1, + ) + for object_type in ("blob", "tree", "commit", "tag") + ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -433,11 +462,24 @@ # TODO: assert "incremental*" is added to constant tags before these # metrics are sent - assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [ + statsd_calls = statsd_report.mock_calls + assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [ call("git_total", "c", 1, {}, 1), call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", 1.0, {}, 1), ] + + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + 0, + {"object_type": object_type}, + 1, + ) + for object_type in ("blob", "tree", "commit", "tag") + ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -447,7 +489,8 @@ } @pytest.mark.parametrize( - "parent_snapshot,previous_snapshot,expected_git_known_refs_percent", + "parent_snapshot,previous_snapshot,expected_git_known_refs_percent," + "expected_unwanted", [ pytest.param( Snapshot( @@ -457,6 +500,8 @@ ), Snapshot(branches={}), 0.25, + # not zero, because it reuses an existing packfile + dict(blob=2, tree=2, commit=2, tag=0), id="partial-parent-and-empty-previous", ), pytest.param( @@ -467,6 +512,9 @@ } ), 1.0, + # all zeros, because there is no object to fetch at all (SNAPSHOT1 + # is the complete set of refs) + dict(blob=0, tree=0, commit=0, tag=0), id="full-parent-and-partial-previous", ), ], @@ -476,6 +524,7 @@ parent_snapshot, previous_snapshot, expected_git_known_refs_percent, + expected_unwanted, mocker, ): """Snapshot of parent origin has all branches, but previous snapshot was @@ -561,12 +610,25 @@ "has_previous_snapshot": True, "has_parent_origins": True, } - assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [ + + statsd_calls = statsd_report.mock_calls + assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [ call("git_total", "c", 1, {}, 1), call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", expected_git_known_refs_percent, {}, 1), ] + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + nb, + {"object_type": object_type}, + 1, + ) + for (object_type, nb) in expected_unwanted.items() + ] + class DumbGitLoaderTestBase(FullGitLoaderTests): """Prepare a git repository to be loaded using the HTTP dumb transfer protocol."""