Page MenuHomeSoftware Heritage

D8521.id30687.diff
No OneTemporary

D8521.id30687.diff

diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/discovery.py
@@ -0,0 +1,167 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding the unknown parts of disk contents efficiently."""
+
+import itertools
+import logging
+import random
+from typing import List
+
+from swh.model.from_disk import model
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of Merkle nodes
+SAMPLE_SIZE = 1000
+
+
+class BaseDiscoveryGraph:
+ """Creates the base structures and methods needed for discovery algorithms.
+ Subclasses should override `get_sample` to affect how the discovery is made."""
+
+ def __init__(self, contents, skipped_contents, directories):
+ self._all = {}
+ self._undecided_directories = set(d.id for d in directories)
+ for content in itertools.chain(contents, skipped_contents):
+ self._all[content.sha1_git] = content
+ for directory in directories:
+ self._all[directory.id] = directory
+
+ self.undecided = set(self._all.keys())
+ self.undecided |= self._undecided_directories
+ self.known = set()
+ self.unknown = set()
+
+ self._children = {}
+ self._parents = {}
+
+ for directory in directories:
+ self._children[directory.id] = set(c.target for c in directory.entries)
+ for child in directory.entries:
+ self._parents.setdefault(child.target, set()).add(directory.id)
+
+ def mark_known(self, nodes):
+ """Mark `nodes` and those they imply as known in the SWH archive"""
+ self._mark_nodes(nodes, self.known, self._children)
+
+ def mark_unknown(self, nodes):
+ """Mark `nodes` and those they imply as unknown in the SWH archive"""
+ self._mark_nodes(nodes, self.unknown, self._parents)
+
+ def _mark_nodes(self, nodes, target_set, transitive_mapping):
+ """Uses Merkle graph properties to mark a node as known or unknown.
+
+ If a node is known, then all of its descendants are known. If a node is
+ unknown, then all of its ancestors are unknown.
+ """
+ to_proceed = set(nodes)
+ while to_proceed:
+ current = to_proceed.pop()
+ target_set.add(current)
+ self.undecided.discard(current)
+ self._undecided_directories.discard(current)
+ next_node = transitive_mapping.get(current, set()) & self.undecided
+ to_proceed.update(next_node)
+
+ def get_sample(self):
+ """Returns a three-tuple of samples from the undecided sets of contents,
+ skipped contents and directories respectively.
+ These samples will be queried against the storage which will tell us
+ which are known."""
+ raise NotImplementedError()
+
+ def do_query(self, swh_storage, sample):
+ """Given a three-tuple of samples, asks the storage which are known or
+ unknown and mark them as such."""
+ contents_sample, skipped_contents_sample, directories_sample = sample
+
+ # TODO unify those APIs, and make it so only have to manipulate small hashes
+ if contents_sample:
+ known = set(contents_sample)
+ unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if skipped_contents_sample:
+ known = set(skipped_contents_sample)
+ as_dicts = [self._all[s].to_dict() for s in skipped_contents_sample]
+ unknown = set(
+ d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
+ )
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if directories_sample:
+ known = set(directories_sample)
+ unknown = set(swh_storage.directory_missing(directories_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+ """Uses a random sampling of only directories until only contents are undecided."""
+
+ def get_sample(self):
+ if self._undecided_directories:
+ if len(self._undecided_directories) <= SAMPLE_SIZE:
+ return ([], [], list(self._undecided_directories))
+ sample = random.sample(self._undecided_directories, SAMPLE_SIZE)
+ return ([], [], sample)
+
+ contents = []
+ skipped_contents = []
+
+ for sha1 in self.undecided:
+ obj = self._all[sha1]
+ obj_type = obj.object_type
+ if obj_type == model.Content.object_type:
+ contents.append(sha1)
+ elif obj_type == model.SkippedContent.object_type:
+ contents.append(sha1)
+ else:
+ raise TypeError(f"Unexpected object type {obj_type}")
+
+ return (contents, skipped_contents, [])
+
+
+def filter_known_objects(
+ swh_storage,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+):
+ """Filters `contents`, `skipped_contents` and `directories` to only return
+ those that are unknown to the SWH archive using a discovery algorithm."""
+ contents_count = len(contents)
+ skipped_contents_count = len(skipped_contents)
+ directories_count = len(directories)
+
+ graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+ while graph.undecided:
+ sample = graph.get_sample()
+ graph.do_query(swh_storage, sample)
+
+ contents = [c for c in contents if c.sha1_git in graph.unknown]
+ skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+ directories = [c for c in directories if c.id in graph.unknown]
+
+ logger.debug(
+ "Filtered out %d contents, %d skipped contents and %d directories"
+ % (
+ contents_count - len(contents),
+ skipped_contents_count - len(skipped_contents),
+ directories_count - len(directories),
+ )
+ )
+
+ return (contents, skipped_contents, directories)
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -31,6 +31,7 @@
import sentry_sdk
from swh.core.tarball import uncompress
+from swh.loader.core import discovery
from swh.loader.core.loader import BaseLoader
from swh.loader.exception import NotFound
from swh.loader.package.utils import download
@@ -817,6 +818,12 @@
contents, skipped_contents, directories = from_disk.iter_directory(directory)
+ # Instead of sending everything from the bottom up to the storage,
+ # use a Merkle graph discovery algorithm to filter out known objects.
+ contents, skipped_contents, directories = discovery.filter_known_objects(
+ self.storage, contents, skipped_contents, directories
+ )
+
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 1:49 PM (11 w, 13 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3234461

Event Timeline