diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 3.5.0 +swh.loader.core >= 3.6.0 swh.model >= 4.3.0 swh.scheduler >= 0.0.39 swh.storage >= 0.22.0 diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py --- a/swh/loader/git/base.py +++ b/swh/loader/git/base.py @@ -128,7 +128,7 @@ self.statsd.increment("filtered_objects_total_count", total, tags=tags) self.log.info( - "Fetched %d objects; %d are new", + "store_data got %d objects; %d are new", sum(counts.values()), sum(storage_summary[f"{object_type}:add"] for object_type in counts), ) diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -179,6 +179,10 @@ self.symbolic_refs: Dict[bytes, HexBytes] = {} self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} + self.reachable_from_known_refs: Set[bytes] = set() + """Set of all git objects that are referenced by objects known to be already + in storage prior to this load.""" + def fetch_pack_from_origin( self, origin_url: str, @@ -282,6 +286,7 @@ prev_snapshot = self.get_full_snapshot(self.origin.url) self.statsd.constant_tags["has_previous_snapshot"] = bool(prev_snapshot) if prev_snapshot: + logger.info("Loading from previous snapshot") self.prev_snapshot = prev_snapshot self.base_snapshots.append(prev_snapshot) @@ -292,9 +297,15 @@ for parent_origin in self.parent_origins: parent_snapshot = self.get_full_snapshot(parent_origin.url) if parent_snapshot is not None: + logger.info("Loading from parent origin: %s", parent_origin.url) self.statsd.constant_tags["has_parent_snapshot"] = True self.base_snapshots.append(parent_snapshot) + # Initialize the set of reachable objects + for snapshot in self.base_snapshots: + for branch in snapshot.branches.values(): + self.reachable_from_known_refs.add(branch.target) + # Increments a metric with full name 'swh_loader_git'; which is useful to # count how many runs of the loader are with each incremental mode self.statsd.increment("git_total", tags={}) @@ -367,6 +378,8 @@ }, ) + if not self.dumb: + self.compute_known_objects() # No more data to fetch return False @@ -393,6 +406,57 @@ with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) + def process_data(self) -> bool: + self.compute_known_objects() + return True + + def compute_known_objects(self): + """Runs a graph traversal on all objects set by the remote, to update + :attr:`reachable_from_known_refs`. + """ + if self.dumb: + # When using the smart protocol, the remote may send us many objects + # we told it we already have. + return + + # Adjacency lists of the graph of all objects received from the remote. + # TODO: use tuple() instead of set() for small sets? they are almost as fast, + # and have 160 fewer bytes of overhead + edges: Dict[bytes, Set[bytes]] = {} + + for obj in PackInflater.for_pack_data( + PackData.from_file(self.pack_buffer, self.pack_size) + ): + id_ = obj.sha().digest() + if id_ in edges: + # Duplicate object? Skip it + continue + elif obj.type_name == b"blob": + continue + elif obj.type_name == b"tree": + edges[id_] = { + hashutil.hash_to_bytes(entry.sha.decode("ascii")) + for entry in obj.iteritems() + } + elif obj.type_name == b"commit": + edges[id_] = { + hashutil.hash_to_bytes(parent.decode("ascii")) + for parent in obj.parents + } + edges[id_].add(hashutil.hash_to_bytes(obj.tree.decode("ascii"))) + elif obj.type_name == b"tag": + (target_type, target) = obj.object + edges[id_] = {hashutil.hash_to_bytes(target.decode("ascii"))} + else: + raise ValueError(f"Unexpected object type: {obj.type_name!r}") + + to_visit = set(self.reachable_from_known_refs) + while to_visit: + node = to_visit.pop() + parents = edges.pop(node, set()) + self.reachable_from_known_refs.update(parents) + to_visit.update(parents) + def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: """Read all the objects of type `object_type` from the packfile""" if self.dumb: @@ -409,9 +473,45 @@ count += 1 logger.debug("packfile_read_count_%s=%s", object_type.decode(), count) + def iter_new_objects(self, object_type: bytes) -> Iterator[ShaFile]: + """Read all the objects of type `object_type` from the packfile, + yielding only those not reachable from any of the base snapshots. + + This differs from :meth:`iter_objects` when origins send objects + reachable from the ``known`` set in the packfiles.""" + total = 0 + known_reachable = 0 + + for raw_obj in self.iter_objects(object_type): + total += 1 + id_ = raw_obj.id.decode("ascii") + if hashutil.hash_to_bytes(id_) in self.reachable_from_known_refs: + known_reachable += 1 + continue + + yield raw_obj + + tags = {"object_type": object_type.decode()} + + if not self.dumb: + # average weighted by total + self.statsd.increment( + "unwanted_packfile_objects_total_sum", known_reachable, tags=tags + ) + self.statsd.increment( + "unwanted_packfile_objects_total_count", total, tags=tags + ) + + self.log.info( + "packfile contains %d %s objects; %d are reachable from known snapshots", + total, + object_type.decode(), + known_reachable, + ) + def get_contents(self) -> Iterable[BaseContent]: """Format the blobs from the git repository as swh contents""" - for raw_obj in self.iter_objects(b"blob"): + for raw_obj in self.iter_new_objects(b"blob"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.CONTENT @@ -421,7 +521,7 @@ def get_directories(self) -> Iterable[Directory]: """Format the trees as swh directories""" - for raw_obj in self.iter_objects(b"tree"): + for raw_obj in self.iter_new_objects(b"tree"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY @@ -429,7 +529,7 @@ def get_revisions(self) -> Iterable[Revision]: """Format commits as swh revisions""" - for raw_obj in self.iter_objects(b"commit"): + for raw_obj in self.iter_new_objects(b"commit"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.REVISION @@ -437,7 +537,7 @@ def get_releases(self) -> Iterable[Release]: """Retrieve all the release objects from the git repository""" - for raw_obj in self.iter_objects(b"tag"): + for raw_obj in self.iter_new_objects(b"tag"): if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.RELEASE diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -7,6 +7,7 @@ from functools import partial from http.server import HTTPServer, SimpleHTTPRequestHandler import os +import re import subprocess import sys from tempfile import SpooledTemporaryFile @@ -126,6 +127,19 @@ call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", 0.0, {}, 1), ] + + # all 0, because they are not referenced by any snapshot + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + 0, + {"object_type": object_type}, + 1, + ) + for object_type in ("blob", "tree", "commit", "tag") + ] + total_sum_name = "filtered_objects_total_sum" total_count_name = "filtered_objects_total_count" percent_sum_name = "filtered_objects_percent_sum" @@ -148,6 +162,7 @@ call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1), call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1), ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -220,6 +235,7 @@ call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1), call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1), ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -300,11 +316,24 @@ # TODO: assert "incremental" is added to constant tags before these # metrics are sent - assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [ + statsd_calls = statsd_report.mock_calls + assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [ call("git_total", "c", 1, {}, 1), call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", 0.0, {}, 1), ] + + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + 0, + {"object_type": object_type}, + 1, + ) + for object_type in ("blob", "tree", "commit", "tag") + ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -433,11 +462,24 @@ # TODO: assert "incremental*" is added to constant tags before these # metrics are sent - assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [ + statsd_calls = statsd_report.mock_calls + assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [ call("git_total", "c", 1, {}, 1), call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", 1.0, {}, 1), ] + + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + 0, + {"object_type": object_type}, + 1, + ) + for object_type in ("blob", "tree", "commit", "tag") + ] + assert self.loader.statsd.constant_tags == { "visit_type": "git", "incremental_enabled": True, @@ -447,7 +489,8 @@ } @pytest.mark.parametrize( - "parent_snapshot,previous_snapshot,expected_git_known_refs_percent", + "parent_snapshot,previous_snapshot,expected_git_known_refs_percent," + "expected_unwanted", [ pytest.param( Snapshot( @@ -457,6 +500,8 @@ ), Snapshot(branches={}), 0.25, + # not zero, because it reuses an existing packfile + dict(blob=2, tree=2, commit=2, tag=0), id="partial-parent-and-empty-previous", ), pytest.param( @@ -467,6 +512,9 @@ } ), 1.0, + # all zeros, because there is no object to fetch at all (SNAPSHOT1 + # is the complete set of refs) + dict(blob=0, tree=0, commit=0, tag=0), id="full-parent-and-partial-previous", ), ], @@ -476,6 +524,7 @@ parent_snapshot, previous_snapshot, expected_git_known_refs_percent, + expected_unwanted, mocker, ): """Snapshot of parent origin has all branches, but previous snapshot was @@ -561,12 +610,25 @@ "has_previous_snapshot": True, "has_parent_origins": True, } - assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [ + + statsd_calls = statsd_report.mock_calls + assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [ call("git_total", "c", 1, {}, 1), call("git_ignored_refs_percent", "h", 0.0, {}, 1), call("git_known_refs_percent", "h", expected_git_known_refs_percent, {}, 1), ] + assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [ + call( + "unwanted_packfile_objects_total_sum", + "c", + nb, + {"object_type": object_type}, + 1, + ) + for (object_type, nb) in expected_unwanted.items() + ] + class DumbGitLoaderTestBase(FullGitLoaderTests): """Prepare a git repository to be loaded using the HTTP dumb transfer protocol."""