Page MenuHomeSoftware Heritage

D8521.id30759.diff
No OneTemporary

D8521.id30759.diff

diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/discovery.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding the unknown parts of disk contents efficiently."""
+
+from collections import namedtuple
+import itertools
+import logging
+import random
+from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union
+
+from swh.model.from_disk import model
+from swh.model.model import Sha1Git
+from swh.storage.interface import StorageInterface
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of directory entries
+SAMPLE_SIZE = 1000
+
+# contents, skipped contents and directories respectively
+Sample: NamedTuple[Set[bytes], Set[bytes], Set[bytes]] = namedtuple(
+ "Sample", ["contents", "skipped_contents", "directories"]
+)
+
+
+class BaseDiscoveryGraph:
+ """Creates the base structures and methods needed for discovery algorithms.
+ Subclasses should override ``get_sample`` to affect how the discovery is made."""
+
+ def __init__(self, contents, skipped_contents, directories):
+ self._all_contents: Mapping[
+ Sha1Git, Union[model.Content, model.SkippedContent]
+ ] = {}
+ self._undecided_directories: Set[Sha1Git] = set()
+ self._children: Mapping[Sha1Git, model.DirectoryEntry] = {}
+ self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {}
+ self.undecided: Set[Sha1Git] = set()
+
+ for content in itertools.chain(contents, skipped_contents):
+ self.undecided.add(content.sha1_git)
+ self._all_contents[content.sha1_git] = content
+
+ for directory in directories:
+ self.undecided.add(directory.id)
+ self._undecided_directories.add(directory.id)
+ self._children[directory.id] = {c.target for c in directory.entries}
+ for child in directory.entries:
+ self._parents.setdefault(child.target, set()).add(directory.id)
+
+ self.undecided |= self._undecided_directories
+ self.known: Set[Sha1Git] = set()
+ self.unknown: Set[Sha1Git] = set()
+
+ def mark_known(self, entries: Iterable[Sha1Git]):
+ """Mark ``entries`` and those they imply as known in the SWH archive"""
+ self._mark_entries(entries, self._children, self.known)
+
+ def mark_unknown(self, entries: Iterable[Sha1Git]):
+ """Mark ``entries`` and those they imply as unknown in the SWH archive"""
+ self._mark_entries(entries, self._parents, self.unknown)
+
+ def _mark_entries(
+ self,
+ entries: Iterable[Sha1Git],
+ transitive_mapping: Mapping[Any, Any],
+ target_set: Set[Any],
+ ):
+ """Use Merkle graph properties to mark a directory entry as known or unknown.
+
+ If an entry is known, then all of its descendants are known. If it's
+ unknown, then all of its ancestors are unknown.
+
+ - ``entries``: directory entries to mark along with their ancestors/descendants
+ where applicable.
+ - ``transitive_mapping``: mapping from an entry to the next entries to mark
+ in the hierarchy, if any.
+ - ``target_set``: set where marked entries will be added.
+
+ """
+ to_process = set(entries)
+ while to_process:
+ current = to_process.pop()
+ target_set.add(current)
+ self.undecided.discard(current)
+ self._undecided_directories.discard(current)
+ next_entries = transitive_mapping.get(current, set()) & self.undecided
+ to_process.update(next_entries)
+
+ def get_sample(
+ self,
+ ) -> Sample:
+ """Return a three-tuple of samples from the undecided sets of contents,
+ skipped contents and directories respectively.
+ These samples will be queried against the storage which will tell us
+ which are known."""
+ raise NotImplementedError()
+
+ def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None:
+ """Given a three-tuple of samples, ask the storage which are known or
+ unknown and mark them as such."""
+ contents_sample, skipped_contents_sample, directories_sample = sample
+
+ # TODO unify those APIs, and make it so only have to manipulate hashes
+ if contents_sample:
+ known = set(contents_sample)
+ unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if skipped_contents_sample:
+ known = set(skipped_contents_sample)
+ as_dicts = [
+ self._all_contents[s].to_dict() for s in skipped_contents_sample
+ ]
+ unknown = {
+ d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
+ }
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if directories_sample:
+ known = set(directories_sample)
+ unknown = set(swh_storage.directory_missing(directories_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+ """Use a random sampling using only directories.
+
+ This allows us to find a statistically good spread of entries in the graph
+ with a smaller population than using all types of entries. When there are
+ no more directories, only contents or skipped contents are undecided if any
+ are left: we send them directly to the storage since they should be few and
+ their structure flat."""
+
+ def get_sample(self) -> Sample:
+ if self._undecided_directories:
+ if len(self._undecided_directories) <= SAMPLE_SIZE:
+ return Sample(
+ contents=set(),
+ skipped_contents=set(),
+ directories=set(self._undecided_directories),
+ )
+ sample = random.sample(self._undecided_directories, SAMPLE_SIZE)
+ directories = {o for o in sample}
+ return Sample(
+ contents=set(), skipped_contents=set(), directories=directories
+ )
+
+ contents = set()
+ skipped_contents = set()
+
+ for sha1 in self.undecided:
+ obj = self._all_contents[sha1]
+ obj_type = obj.object_type
+ if obj_type == model.Content.object_type:
+ contents.add(sha1)
+ elif obj_type == model.SkippedContent.object_type:
+ skipped_contents.add(sha1)
+ else:
+ raise TypeError(f"Unexpected object type {obj_type}")
+
+ return Sample(
+ contents=contents, skipped_contents=skipped_contents, directories=set()
+ )
+
+
+def filter_known_objects(
+ swh_storage: StorageInterface,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+):
+ """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return
+ those that are unknown to the SWH archive using a discovery algorithm."""
+ contents_count = len(contents)
+ skipped_contents_count = len(skipped_contents)
+ directories_count = len(directories)
+
+ graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+ while graph.undecided:
+ sample = graph.get_sample()
+ graph.do_query(swh_storage, sample)
+
+ contents = [c for c in contents if c.sha1_git in graph.unknown]
+ skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+ directories = [c for c in directories if c.id in graph.unknown]
+
+ logger.debug(
+ "Filtered out %d contents, %d skipped contents and %d directories",
+ contents_count - len(contents),
+ skipped_contents_count - len(skipped_contents),
+ directories_count - len(directories),
+ )
+
+ return (contents, skipped_contents, directories)
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -31,6 +31,7 @@
import sentry_sdk
from swh.core.tarball import uncompress
+from swh.loader.core import discovery
from swh.loader.core.loader import BaseLoader
from swh.loader.exception import NotFound
from swh.loader.package.utils import download
@@ -817,6 +818,12 @@
contents, skipped_contents, directories = from_disk.iter_directory(directory)
+ # Instead of sending everything from the bottom up to the storage,
+ # use a Merkle graph discovery algorithm to filter out known objects.
+ contents, skipped_contents, directories = discovery.filter_known_objects(
+ self.storage, contents, skipped_contents, directories
+ )
+
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 11:55 AM (18 h, 19 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3227370

Event Timeline