Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066270
D7882.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
D7882.id.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.7
-swh.loader.core >= 3.5.0
+swh.loader.core >= 3.6.0
swh.model >= 4.3.0
swh.scheduler >= 0.0.39
swh.storage >= 0.22.0
diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py
--- a/swh/loader/git/base.py
+++ b/swh/loader/git/base.py
@@ -128,7 +128,7 @@
self.statsd.increment("filtered_objects_total_count", total, tags=tags)
self.log.info(
- "Fetched %d objects; %d are new",
+ "store_data got %d objects; %d are new",
sum(counts.values()),
sum(storage_summary[f"{object_type}:add"] for object_type in counts),
)
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -3,20 +3,21 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import collections
from dataclasses import dataclass
import datetime
import logging
import os
import pickle
import sys
-from tempfile import SpooledTemporaryFile
+import tempfile
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type
import dulwich.client
from dulwich.errors import GitProtocolError, NotGitRepository
from dulwich.object_store import ObjectStoreGraphWalker
from dulwich.objects import ShaFile
-from dulwich.pack import PackData, PackInflater
+from dulwich.pack import PackData, PackInflater, PackIndex2, Pack, MemoryPackIndex
from swh.core.statsd import Statsd
from swh.loader.exception import NotFound
@@ -113,11 +114,18 @@
return wanted_refs
+class BetterMemoryPackIndex(MemoryPackIndex):
+ """It's better because it doesn't crash"""
+
+ def _object_index(self, sha):
+ return self._by_sha[sha]
+
+
@dataclass
class FetchPackReturn:
remote_refs: Dict[bytes, HexBytes]
symbolic_refs: Dict[bytes, HexBytes]
- pack_buffer: SpooledTemporaryFile
+ pack_buffer: tempfile.SpooledTemporaryFile
pack_size: int
@@ -155,6 +163,7 @@
repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
+ traverse_graph: bool = True,
**kwargs: Any,
):
"""Initialize the bulk updater.
@@ -179,6 +188,11 @@
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
+ self.traverse_graph = traverse_graph
+ self.reachable_from_known_refs: Set[bytes] = set()
+ """Set of all git objects that are referenced by objects known to be already
+ in storage prior to this load."""
+
def fetch_pack_from_origin(
self,
origin_url: str,
@@ -187,7 +201,7 @@
) -> FetchPackReturn:
"""Fetch a pack from the origin"""
- pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
+ pack_buffer = tempfile.SpooledTemporaryFile(max_size=self.temp_file_cutoff)
# Hardcode the use of the tcp transport (for GitHub origins)
@@ -282,6 +296,7 @@
prev_snapshot = self.get_full_snapshot(self.origin.url)
self.statsd.constant_tags["has_previous_snapshot"] = bool(prev_snapshot)
if prev_snapshot:
+ logger.info("Loading from previous snapshot")
self.prev_snapshot = prev_snapshot
self.base_snapshots.append(prev_snapshot)
@@ -292,9 +307,15 @@
for parent_origin in self.parent_origins:
parent_snapshot = self.get_full_snapshot(parent_origin.url)
if parent_snapshot is not None:
+ logger.info("Loading from parent origin: %s", parent_origin.url)
self.statsd.constant_tags["has_parent_snapshot"] = True
self.base_snapshots.append(parent_snapshot)
+ # Initialize the set of reachable objects
+ for snapshot in self.base_snapshots:
+ for branch in snapshot.branches.values():
+ self.reachable_from_known_refs.add(branch.target)
+
# Increments a metric with full name 'swh_loader_git'; which is useful to
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
@@ -393,6 +414,91 @@
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+ def process_data(self) -> bool:
+ self.compute_known_objects()
+ return True
+
+ def compute_known_objects(self):
+ """Runs a graph traversal on all objects set by the remote, to update
+ :attr:`reachable_from_known_refs`.
+ """
+ if self.dumb:
+ # When using the smart protocol, the remote may send us many objects
+ # we told it we already have.
+ return
+
+ self.total_objects_in_pack = collections.defaultdict(int)
+
+ if self.traverse_graph:
+ logger.debug(
+ "computing reachable_from_known_refs (starting from %s)",
+ len(self.reachable_from_known_refs),
+ )
+ else:
+ logger.debug(
+ "skipping computation of reachable_from_known_refs (starting from %s)",
+ len(self.reachable_from_known_refs),
+ )
+ return
+
+ # Adjacency lists of the graph of all objects received from the remote.
+ # TODO: use tuple() instead of set() for small sets? they are almost as fast,
+ # and have 160 fewer bytes of overhead
+ edges: Dict[bytes, Set[bytes]] = {}
+
+ self.pack_buffer.seek(0)
+ for obj in PackInflater.for_pack_data(
+ PackData.from_file(self.pack_buffer, self.pack_size)
+ ):
+ id_ = obj.sha().digest()
+ self.total_objects_in_pack[obj.type_name] += 1
+ if id_ in edges:
+ # Duplicate object? Skip it
+ continue
+ elif obj.type_name == b"blob":
+ continue
+ elif obj.type_name == b"tree":
+ # skip it, takes too much memory
+ continue
+ edges[id_] = {
+ hashutil.hash_to_bytes(entry.sha.decode("ascii"))
+ for entry in obj.iteritems()
+ }
+ elif obj.type_name == b"commit":
+ edges[id_] = {
+ hashutil.hash_to_bytes(parent.decode("ascii"))
+ for parent in obj.parents
+ }
+ # skip it, takes too much memory
+ continue
+ edges[id_].add(hashutil.hash_to_bytes(obj.tree.decode("ascii")))
+ elif obj.type_name == b"tag":
+ (target_type, target) = obj.object
+ edges[id_] = {hashutil.hash_to_bytes(target.decode("ascii"))}
+ else:
+ raise ValueError(f"Unexpected object type: {obj.type_name!r}")
+
+ logger.debug(
+ "nodes=%s edges=%s",
+ len(edges),
+ sum(map(len, edges.values())),
+ )
+
+ to_visit = set(self.reachable_from_known_refs)
+ while to_visit:
+ node = to_visit.pop()
+ parents = edges.pop(node, set())
+
+ # already processed or about to be, don't visit them again
+ parents.difference_update(self.reachable_from_known_refs)
+
+ self.reachable_from_known_refs.update(parents)
+ to_visit.update(parents)
+
+ logger.debug(
+ "result: reachable_from_known_refs=%s", len(self.reachable_from_known_refs)
+ )
+
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
@@ -409,9 +515,89 @@
count += 1
logger.debug("packfile_read_count_%s=%s", object_type.decode(), count)
+ def iter_new_objects(self, object_type: bytes) -> Iterator[ShaFile]:
+ """Read all the objects of type `object_type` from the packfile,
+ yielding only those not reachable from any of the base snapshots.
+
+ This differs from :meth:`iter_objects` when origins send objects
+ reachable from the ``known`` set in the packfiles."""
+ if self.dumb or not self.traverse_graph:
+ yield from self.iter_objects(object_type)
+ else:
+ to_visit = set()
+ seen = set()
+ self.pack_buffer.seek(0)
+ pack_data = PackData.from_file(self.pack_buffer, self.pack_size)
+ for obj in PackInflater.for_pack_data(pack_data):
+ if obj.type_name in (b"commit", b"tag"):
+ id_ = obj.sha().digest()
+ if (
+ hashutil.hash_to_bytes(id_)
+ not in self.reachable_from_known_refs
+ ):
+ to_visit.add(id_)
+
+ index_file = tempfile.NamedTemporaryFile()
+ # pack_data.create_index_v2(index_file.name)
+ # pack_index = PackIndex2(index_file.name)
+ pack_index = BetterMemoryPackIndex(
+ pack_data.sorted_entries(),
+ pack_data.get_stored_checksum(), # why is this argument needed?
+ )
+ pack = Pack.from_objects(pack_data, pack_index)
+ while to_visit:
+ id_ = hashutil.hash_to_bytehex(to_visit.pop())
+ # offset = pack_index.object_index(id_)
+ # (offset, type_, (raw_obj,)) = pack_data.get_ref(id_)
+ obj = pack[id_]
+
+ assert id_ == obj.id
+ assert hashutil.hash_to_bytes(id_) not in self.reachable_from_known_refs
+ if obj.type_name == object_type:
+ if id_ not in seen:
+ seen.add(id_)
+ yield obj
+
+ if obj.type_name == b"tree":
+ to_visit.update(
+ {
+ hashutil.hash_to_bytes(entry.sha.decode("ascii"))
+ for entry in obj.iteritems()
+ }
+ )
+ elif obj.type_name == b"commit":
+ # relevant parents parents are already in to_visit
+ to_visit.add(hashutil.hash_to_bytes(obj.tree.decode("ascii")))
+ elif obj.type_name == b"tag":
+ (target_type, target) = obj.object
+ # TODO: don't do it if it is a commit
+ to_visit.add(hashutil.hash_to_bytes(target.decode("ascii")))
+
+ tags = {"object_type": object_type.decode()}
+
+ # average weighted by total
+ self.statsd.increment(
+ "unwanted_packfile_objects_total_sum",
+ self.total_objects_in_pack[object_type] - len(seen),
+ tags=tags,
+ )
+ self.statsd.increment(
+ "unwanted_packfile_objects_total_count",
+ self.total_objects_in_pack[object_type],
+ tags=tags,
+ )
+
+ self.log.info(
+ "packfile contains %d %s objects; %d appear to be unreachable "
+ "from known snapshots",
+ self.total_objects_in_pack[object_type],
+ object_type.decode(),
+ len(seen),
+ )
+
def get_contents(self) -> Iterable[BaseContent]:
"""Format the blobs from the git repository as swh contents"""
- for raw_obj in self.iter_objects(b"blob"):
+ for raw_obj in self.iter_new_objects(b"blob"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.CONTENT
@@ -421,7 +607,7 @@
def get_directories(self) -> Iterable[Directory]:
"""Format the trees as swh directories"""
- for raw_obj in self.iter_objects(b"tree"):
+ for raw_obj in self.iter_new_objects(b"tree"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY
@@ -429,7 +615,7 @@
def get_revisions(self) -> Iterable[Revision]:
"""Format commits as swh revisions"""
- for raw_obj in self.iter_objects(b"commit"):
+ for raw_obj in self.iter_new_objects(b"commit"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.REVISION
@@ -437,7 +623,7 @@
def get_releases(self) -> Iterable[Release]:
"""Retrieve all the release objects from the git repository"""
- for raw_obj in self.iter_objects(b"tag"):
+ for raw_obj in self.iter_new_objects(b"tag"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.RELEASE
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -7,6 +7,7 @@
from functools import partial
from http.server import HTTPServer, SimpleHTTPRequestHandler
import os
+import re
import subprocess
import sys
from tempfile import SpooledTemporaryFile
@@ -126,6 +127,19 @@
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
+
+ # all 0, because they are not referenced by any snapshot
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ 0,
+ {"object_type": object_type},
+ 1,
+ )
+ for object_type in ("blob", "tree", "commit", "tag")
+ ]
+
total_sum_name = "filtered_objects_total_sum"
total_count_name = "filtered_objects_total_count"
percent_sum_name = "filtered_objects_percent_sum"
@@ -148,6 +162,7 @@
call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1),
call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1),
]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -220,6 +235,7 @@
call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1),
call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1),
]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -300,11 +316,24 @@
# TODO: assert "incremental" is added to constant tags before these
# metrics are sent
- assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
+ statsd_calls = statsd_report.mock_calls
+ assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
+
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ 0,
+ {"object_type": object_type},
+ 1,
+ )
+ for object_type in ("blob", "tree", "commit", "tag")
+ ]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -433,11 +462,24 @@
# TODO: assert "incremental*" is added to constant tags before these
# metrics are sent
- assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
+ statsd_calls = statsd_report.mock_calls
+ assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 1.0, {}, 1),
]
+
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ 0,
+ {"object_type": object_type},
+ 1,
+ )
+ for object_type in ("blob", "tree", "commit", "tag")
+ ]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -447,7 +489,8 @@
}
@pytest.mark.parametrize(
- "parent_snapshot,previous_snapshot,expected_git_known_refs_percent",
+ "parent_snapshot,previous_snapshot,expected_git_known_refs_percent,"
+ "expected_unwanted",
[
pytest.param(
Snapshot(
@@ -457,6 +500,8 @@
),
Snapshot(branches={}),
0.25,
+ # not zero, because it reuses an existing packfile
+ dict(blob=2, tree=2, commit=2, tag=0),
id="partial-parent-and-empty-previous",
),
pytest.param(
@@ -467,6 +512,9 @@
}
),
1.0,
+ # all zeros, because there is no object to fetch at all (SNAPSHOT1
+ # is the complete set of refs)
+ dict(blob=0, tree=0, commit=0, tag=0),
id="full-parent-and-partial-previous",
),
],
@@ -476,6 +524,7 @@
parent_snapshot,
previous_snapshot,
expected_git_known_refs_percent,
+ expected_unwanted,
mocker,
):
"""Snapshot of parent origin has all branches, but previous snapshot was
@@ -561,12 +610,25 @@
"has_previous_snapshot": True,
"has_parent_origins": True,
}
- assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
+
+ statsd_calls = statsd_report.mock_calls
+ assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", expected_git_known_refs_percent, {}, 1),
]
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ nb,
+ {"object_type": object_type},
+ 1,
+ )
+ for (object_type, nb) in expected_unwanted.items()
+ ]
+
class DumbGitLoaderTestBase(FullGitLoaderTests):
"""Prepare a git repository to be loaded using the HTTP dumb transfer protocol."""
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 4:32 AM (8 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216940
Attached To
D7882: Run a graph traversal to compute objects in packfiles that we know we have
Event Timeline
Log In to Comment