Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066513
D8521.id30687.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D8521.id30687.diff
View Options
diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/discovery.py
@@ -0,0 +1,167 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding the unknown parts of disk contents efficiently."""
+
+import itertools
+import logging
+import random
+from typing import List
+
+from swh.model.from_disk import model
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of Merkle nodes
+SAMPLE_SIZE = 1000
+
+
+class BaseDiscoveryGraph:
+ """Creates the base structures and methods needed for discovery algorithms.
+ Subclasses should override `get_sample` to affect how the discovery is made."""
+
+ def __init__(self, contents, skipped_contents, directories):
+ self._all = {}
+ self._undecided_directories = set(d.id for d in directories)
+ for content in itertools.chain(contents, skipped_contents):
+ self._all[content.sha1_git] = content
+ for directory in directories:
+ self._all[directory.id] = directory
+
+ self.undecided = set(self._all.keys())
+ self.undecided |= self._undecided_directories
+ self.known = set()
+ self.unknown = set()
+
+ self._children = {}
+ self._parents = {}
+
+ for directory in directories:
+ self._children[directory.id] = set(c.target for c in directory.entries)
+ for child in directory.entries:
+ self._parents.setdefault(child.target, set()).add(directory.id)
+
+ def mark_known(self, nodes):
+ """Mark `nodes` and those they imply as known in the SWH archive"""
+ self._mark_nodes(nodes, self.known, self._children)
+
+ def mark_unknown(self, nodes):
+ """Mark `nodes` and those they imply as unknown in the SWH archive"""
+ self._mark_nodes(nodes, self.unknown, self._parents)
+
+ def _mark_nodes(self, nodes, target_set, transitive_mapping):
+ """Uses Merkle graph properties to mark a node as known or unknown.
+
+ If a node is known, then all of its descendants are known. If a node is
+ unknown, then all of its ancestors are unknown.
+ """
+ to_proceed = set(nodes)
+ while to_proceed:
+ current = to_proceed.pop()
+ target_set.add(current)
+ self.undecided.discard(current)
+ self._undecided_directories.discard(current)
+ next_node = transitive_mapping.get(current, set()) & self.undecided
+ to_proceed.update(next_node)
+
+ def get_sample(self):
+ """Returns a three-tuple of samples from the undecided sets of contents,
+ skipped contents and directories respectively.
+ These samples will be queried against the storage which will tell us
+ which are known."""
+ raise NotImplementedError()
+
+ def do_query(self, swh_storage, sample):
+ """Given a three-tuple of samples, asks the storage which are known or
+ unknown and mark them as such."""
+ contents_sample, skipped_contents_sample, directories_sample = sample
+
+ # TODO unify those APIs, and make it so only have to manipulate small hashes
+ if contents_sample:
+ known = set(contents_sample)
+ unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if skipped_contents_sample:
+ known = set(skipped_contents_sample)
+ as_dicts = [self._all[s].to_dict() for s in skipped_contents_sample]
+ unknown = set(
+ d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
+ )
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if directories_sample:
+ known = set(directories_sample)
+ unknown = set(swh_storage.directory_missing(directories_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+ """Uses a random sampling of only directories until only contents are undecided."""
+
+ def get_sample(self):
+ if self._undecided_directories:
+ if len(self._undecided_directories) <= SAMPLE_SIZE:
+ return ([], [], list(self._undecided_directories))
+ sample = random.sample(self._undecided_directories, SAMPLE_SIZE)
+ return ([], [], sample)
+
+ contents = []
+ skipped_contents = []
+
+ for sha1 in self.undecided:
+ obj = self._all[sha1]
+ obj_type = obj.object_type
+ if obj_type == model.Content.object_type:
+ contents.append(sha1)
+ elif obj_type == model.SkippedContent.object_type:
+ contents.append(sha1)
+ else:
+ raise TypeError(f"Unexpected object type {obj_type}")
+
+ return (contents, skipped_contents, [])
+
+
+def filter_known_objects(
+ swh_storage,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+):
+ """Filters `contents`, `skipped_contents` and `directories` to only return
+ those that are unknown to the SWH archive using a discovery algorithm."""
+ contents_count = len(contents)
+ skipped_contents_count = len(skipped_contents)
+ directories_count = len(directories)
+
+ graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+ while graph.undecided:
+ sample = graph.get_sample()
+ graph.do_query(swh_storage, sample)
+
+ contents = [c for c in contents if c.sha1_git in graph.unknown]
+ skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+ directories = [c for c in directories if c.id in graph.unknown]
+
+ logger.debug(
+ "Filtered out %d contents, %d skipped contents and %d directories"
+ % (
+ contents_count - len(contents),
+ skipped_contents_count - len(skipped_contents),
+ directories_count - len(directories),
+ )
+ )
+
+ return (contents, skipped_contents, directories)
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -31,6 +31,7 @@
import sentry_sdk
from swh.core.tarball import uncompress
+from swh.loader.core import discovery
from swh.loader.core.loader import BaseLoader
from swh.loader.exception import NotFound
from swh.loader.package.utils import download
@@ -817,6 +818,12 @@
contents, skipped_contents, directories = from_disk.iter_directory(directory)
+ # Instead of sending everything from the bottom up to the storage,
+ # use a Merkle graph discovery algorithm to filter out known objects.
+ contents, skipped_contents, directories = discovery.filter_known_objects(
+ self.storage, contents, skipped_contents, directories
+ )
+
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 1:49 PM (11 w, 13 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3234461
Attached To
D8521: Use a Merkle discovery algorithm with archives
Event Timeline
Log In to Comment