Page MenuHomeSoftware Heritage

D7882.id.diff
No OneTemporary

D7882.id.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.7
-swh.loader.core >= 3.5.0
+swh.loader.core >= 3.6.0
swh.model >= 4.3.0
swh.scheduler >= 0.0.39
swh.storage >= 0.22.0
diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py
--- a/swh/loader/git/base.py
+++ b/swh/loader/git/base.py
@@ -128,7 +128,7 @@
self.statsd.increment("filtered_objects_total_count", total, tags=tags)
self.log.info(
- "Fetched %d objects; %d are new",
+ "store_data got %d objects; %d are new",
sum(counts.values()),
sum(storage_summary[f"{object_type}:add"] for object_type in counts),
)
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -3,20 +3,21 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import collections
from dataclasses import dataclass
import datetime
import logging
import os
import pickle
import sys
-from tempfile import SpooledTemporaryFile
+import tempfile
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type
import dulwich.client
from dulwich.errors import GitProtocolError, NotGitRepository
from dulwich.object_store import ObjectStoreGraphWalker
from dulwich.objects import ShaFile
-from dulwich.pack import PackData, PackInflater
+from dulwich.pack import PackData, PackInflater, PackIndex2, Pack, MemoryPackIndex
from swh.core.statsd import Statsd
from swh.loader.exception import NotFound
@@ -113,11 +114,18 @@
return wanted_refs
+class BetterMemoryPackIndex(MemoryPackIndex):
+ """It's better because it doesn't crash"""
+
+ def _object_index(self, sha):
+ return self._by_sha[sha]
+
+
@dataclass
class FetchPackReturn:
remote_refs: Dict[bytes, HexBytes]
symbolic_refs: Dict[bytes, HexBytes]
- pack_buffer: SpooledTemporaryFile
+ pack_buffer: tempfile.SpooledTemporaryFile
pack_size: int
@@ -155,6 +163,7 @@
repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
+ traverse_graph: bool = True,
**kwargs: Any,
):
"""Initialize the bulk updater.
@@ -179,6 +188,11 @@
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
+ self.traverse_graph = traverse_graph
+ self.reachable_from_known_refs: Set[bytes] = set()
+ """Set of all git objects that are referenced by objects known to be already
+ in storage prior to this load."""
+
def fetch_pack_from_origin(
self,
origin_url: str,
@@ -187,7 +201,7 @@
) -> FetchPackReturn:
"""Fetch a pack from the origin"""
- pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
+ pack_buffer = tempfile.SpooledTemporaryFile(max_size=self.temp_file_cutoff)
# Hardcode the use of the tcp transport (for GitHub origins)
@@ -282,6 +296,7 @@
prev_snapshot = self.get_full_snapshot(self.origin.url)
self.statsd.constant_tags["has_previous_snapshot"] = bool(prev_snapshot)
if prev_snapshot:
+ logger.info("Loading from previous snapshot")
self.prev_snapshot = prev_snapshot
self.base_snapshots.append(prev_snapshot)
@@ -292,9 +307,15 @@
for parent_origin in self.parent_origins:
parent_snapshot = self.get_full_snapshot(parent_origin.url)
if parent_snapshot is not None:
+ logger.info("Loading from parent origin: %s", parent_origin.url)
self.statsd.constant_tags["has_parent_snapshot"] = True
self.base_snapshots.append(parent_snapshot)
+ # Initialize the set of reachable objects
+ for snapshot in self.base_snapshots:
+ for branch in snapshot.branches.values():
+ self.reachable_from_known_refs.add(branch.target)
+
# Increments a metric with full name 'swh_loader_git'; which is useful to
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
@@ -393,6 +414,91 @@
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+ def process_data(self) -> bool:
+ self.compute_known_objects()
+ return True
+
+ def compute_known_objects(self):
+ """Runs a graph traversal on all objects set by the remote, to update
+ :attr:`reachable_from_known_refs`.
+ """
+ if self.dumb:
+ # When using the smart protocol, the remote may send us many objects
+ # we told it we already have.
+ return
+
+ self.total_objects_in_pack = collections.defaultdict(int)
+
+ if self.traverse_graph:
+ logger.debug(
+ "computing reachable_from_known_refs (starting from %s)",
+ len(self.reachable_from_known_refs),
+ )
+ else:
+ logger.debug(
+ "skipping computation of reachable_from_known_refs (starting from %s)",
+ len(self.reachable_from_known_refs),
+ )
+ return
+
+ # Adjacency lists of the graph of all objects received from the remote.
+ # TODO: use tuple() instead of set() for small sets? they are almost as fast,
+ # and have 160 fewer bytes of overhead
+ edges: Dict[bytes, Set[bytes]] = {}
+
+ self.pack_buffer.seek(0)
+ for obj in PackInflater.for_pack_data(
+ PackData.from_file(self.pack_buffer, self.pack_size)
+ ):
+ id_ = obj.sha().digest()
+ self.total_objects_in_pack[obj.type_name] += 1
+ if id_ in edges:
+ # Duplicate object? Skip it
+ continue
+ elif obj.type_name == b"blob":
+ continue
+ elif obj.type_name == b"tree":
+ # skip it, takes too much memory
+ continue
+ edges[id_] = {
+ hashutil.hash_to_bytes(entry.sha.decode("ascii"))
+ for entry in obj.iteritems()
+ }
+ elif obj.type_name == b"commit":
+ edges[id_] = {
+ hashutil.hash_to_bytes(parent.decode("ascii"))
+ for parent in obj.parents
+ }
+ # skip it, takes too much memory
+ continue
+ edges[id_].add(hashutil.hash_to_bytes(obj.tree.decode("ascii")))
+ elif obj.type_name == b"tag":
+ (target_type, target) = obj.object
+ edges[id_] = {hashutil.hash_to_bytes(target.decode("ascii"))}
+ else:
+ raise ValueError(f"Unexpected object type: {obj.type_name!r}")
+
+ logger.debug(
+ "nodes=%s edges=%s",
+ len(edges),
+ sum(map(len, edges.values())),
+ )
+
+ to_visit = set(self.reachable_from_known_refs)
+ while to_visit:
+ node = to_visit.pop()
+ parents = edges.pop(node, set())
+
+ # already processed or about to be, don't visit them again
+ parents.difference_update(self.reachable_from_known_refs)
+
+ self.reachable_from_known_refs.update(parents)
+ to_visit.update(parents)
+
+ logger.debug(
+ "result: reachable_from_known_refs=%s", len(self.reachable_from_known_refs)
+ )
+
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
@@ -409,9 +515,89 @@
count += 1
logger.debug("packfile_read_count_%s=%s", object_type.decode(), count)
+ def iter_new_objects(self, object_type: bytes) -> Iterator[ShaFile]:
+ """Read all the objects of type `object_type` from the packfile,
+ yielding only those not reachable from any of the base snapshots.
+
+ This differs from :meth:`iter_objects` when origins send objects
+ reachable from the ``known`` set in the packfiles."""
+ if self.dumb or not self.traverse_graph:
+ yield from self.iter_objects(object_type)
+ else:
+ to_visit = set()
+ seen = set()
+ self.pack_buffer.seek(0)
+ pack_data = PackData.from_file(self.pack_buffer, self.pack_size)
+ for obj in PackInflater.for_pack_data(pack_data):
+ if obj.type_name in (b"commit", b"tag"):
+ id_ = obj.sha().digest()
+ if (
+ hashutil.hash_to_bytes(id_)
+ not in self.reachable_from_known_refs
+ ):
+ to_visit.add(id_)
+
+ index_file = tempfile.NamedTemporaryFile()
+ # pack_data.create_index_v2(index_file.name)
+ # pack_index = PackIndex2(index_file.name)
+ pack_index = BetterMemoryPackIndex(
+ pack_data.sorted_entries(),
+ pack_data.get_stored_checksum(), # why is this argument needed?
+ )
+ pack = Pack.from_objects(pack_data, pack_index)
+ while to_visit:
+ id_ = hashutil.hash_to_bytehex(to_visit.pop())
+ # offset = pack_index.object_index(id_)
+ # (offset, type_, (raw_obj,)) = pack_data.get_ref(id_)
+ obj = pack[id_]
+
+ assert id_ == obj.id
+ assert hashutil.hash_to_bytes(id_) not in self.reachable_from_known_refs
+ if obj.type_name == object_type:
+ if id_ not in seen:
+ seen.add(id_)
+ yield obj
+
+ if obj.type_name == b"tree":
+ to_visit.update(
+ {
+ hashutil.hash_to_bytes(entry.sha.decode("ascii"))
+ for entry in obj.iteritems()
+ }
+ )
+ elif obj.type_name == b"commit":
+ # relevant parents parents are already in to_visit
+ to_visit.add(hashutil.hash_to_bytes(obj.tree.decode("ascii")))
+ elif obj.type_name == b"tag":
+ (target_type, target) = obj.object
+ # TODO: don't do it if it is a commit
+ to_visit.add(hashutil.hash_to_bytes(target.decode("ascii")))
+
+ tags = {"object_type": object_type.decode()}
+
+ # average weighted by total
+ self.statsd.increment(
+ "unwanted_packfile_objects_total_sum",
+ self.total_objects_in_pack[object_type] - len(seen),
+ tags=tags,
+ )
+ self.statsd.increment(
+ "unwanted_packfile_objects_total_count",
+ self.total_objects_in_pack[object_type],
+ tags=tags,
+ )
+
+ self.log.info(
+ "packfile contains %d %s objects; %d appear to be unreachable "
+ "from known snapshots",
+ self.total_objects_in_pack[object_type],
+ object_type.decode(),
+ len(seen),
+ )
+
def get_contents(self) -> Iterable[BaseContent]:
"""Format the blobs from the git repository as swh contents"""
- for raw_obj in self.iter_objects(b"blob"):
+ for raw_obj in self.iter_new_objects(b"blob"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.CONTENT
@@ -421,7 +607,7 @@
def get_directories(self) -> Iterable[Directory]:
"""Format the trees as swh directories"""
- for raw_obj in self.iter_objects(b"tree"):
+ for raw_obj in self.iter_new_objects(b"tree"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY
@@ -429,7 +615,7 @@
def get_revisions(self) -> Iterable[Revision]:
"""Format commits as swh revisions"""
- for raw_obj in self.iter_objects(b"commit"):
+ for raw_obj in self.iter_new_objects(b"commit"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.REVISION
@@ -437,7 +623,7 @@
def get_releases(self) -> Iterable[Release]:
"""Retrieve all the release objects from the git repository"""
- for raw_obj in self.iter_objects(b"tag"):
+ for raw_obj in self.iter_new_objects(b"tag"):
if raw_obj.id in self.ref_object_types:
self.ref_object_types[raw_obj.id] = TargetType.RELEASE
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -7,6 +7,7 @@
from functools import partial
from http.server import HTTPServer, SimpleHTTPRequestHandler
import os
+import re
import subprocess
import sys
from tempfile import SpooledTemporaryFile
@@ -126,6 +127,19 @@
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
+
+ # all 0, because they are not referenced by any snapshot
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ 0,
+ {"object_type": object_type},
+ 1,
+ )
+ for object_type in ("blob", "tree", "commit", "tag")
+ ]
+
total_sum_name = "filtered_objects_total_sum"
total_count_name = "filtered_objects_total_count"
percent_sum_name = "filtered_objects_percent_sum"
@@ -148,6 +162,7 @@
call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1),
call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1),
]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -220,6 +235,7 @@
call(total_sum_name, "c", 0, {"object_type": "snapshot"}, 1),
call(total_count_name, "c", 1, {"object_type": "snapshot"}, 1),
]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -300,11 +316,24 @@
# TODO: assert "incremental" is added to constant tags before these
# metrics are sent
- assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
+ statsd_calls = statsd_report.mock_calls
+ assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 0.0, {}, 1),
]
+
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ 0,
+ {"object_type": object_type},
+ 1,
+ )
+ for object_type in ("blob", "tree", "commit", "tag")
+ ]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -433,11 +462,24 @@
# TODO: assert "incremental*" is added to constant tags before these
# metrics are sent
- assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
+ statsd_calls = statsd_report.mock_calls
+ assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", 1.0, {}, 1),
]
+
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ 0,
+ {"object_type": object_type},
+ 1,
+ )
+ for object_type in ("blob", "tree", "commit", "tag")
+ ]
+
assert self.loader.statsd.constant_tags == {
"visit_type": "git",
"incremental_enabled": True,
@@ -447,7 +489,8 @@
}
@pytest.mark.parametrize(
- "parent_snapshot,previous_snapshot,expected_git_known_refs_percent",
+ "parent_snapshot,previous_snapshot,expected_git_known_refs_percent,"
+ "expected_unwanted",
[
pytest.param(
Snapshot(
@@ -457,6 +500,8 @@
),
Snapshot(branches={}),
0.25,
+ # not zero, because it reuses an existing packfile
+ dict(blob=2, tree=2, commit=2, tag=0),
id="partial-parent-and-empty-previous",
),
pytest.param(
@@ -467,6 +512,9 @@
}
),
1.0,
+ # all zeros, because there is no object to fetch at all (SNAPSHOT1
+ # is the complete set of refs)
+ dict(blob=0, tree=0, commit=0, tag=0),
id="full-parent-and-partial-previous",
),
],
@@ -476,6 +524,7 @@
parent_snapshot,
previous_snapshot,
expected_git_known_refs_percent,
+ expected_unwanted,
mocker,
):
"""Snapshot of parent origin has all branches, but previous snapshot was
@@ -561,12 +610,25 @@
"has_previous_snapshot": True,
"has_parent_origins": True,
}
- assert [c for c in statsd_report.mock_calls if c[1][0].startswith("git_")] == [
+
+ statsd_calls = statsd_report.mock_calls
+ assert [c for c in statsd_calls if c[1][0].startswith("git_")] == [
call("git_total", "c", 1, {}, 1),
call("git_ignored_refs_percent", "h", 0.0, {}, 1),
call("git_known_refs_percent", "h", expected_git_known_refs_percent, {}, 1),
]
+ assert [c for c in statsd_calls if re.match("unwanted_.*_sum", c[1][0])] == [
+ call(
+ "unwanted_packfile_objects_total_sum",
+ "c",
+ nb,
+ {"object_type": object_type},
+ 1,
+ )
+ for (object_type, nb) in expected_unwanted.items()
+ ]
+
class DumbGitLoaderTestBase(FullGitLoaderTests):
"""Prepare a git repository to be loaded using the HTTP dumb transfer protocol."""

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 4:32 AM (8 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216940

Event Timeline