diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py new file mode 100644 --- /dev/null +++ b/swh/loader/core/discovery.py @@ -0,0 +1,167 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Primitives for finding the unknown parts of disk contents efficiently.""" + +import itertools +import logging +import random +from typing import List + +from swh.model.from_disk import model + +logger = logging.getLogger(__name__) + +# Maximum amount when sampling from the undecided set of Merkle nodes +SAMPLE_SIZE = 1000 + + +class BaseDiscoveryGraph: + """Creates the base structures and methods needed for discovery algorithms. + Subclasses should override `get_sample` to affect how the discovery is made.""" + + def __init__(self, contents, skipped_contents, directories): + self._all = {} + self._undecided_directories = set(d.id for d in directories) + for content in itertools.chain(contents, skipped_contents): + self._all[content.sha1_git] = content + for directory in directories: + self._all[directory.id] = directory + + self.undecided = set(self._all.keys()) + self.undecided |= self._undecided_directories + self.known = set() + self.unknown = set() + + self._children = {} + self._parents = {} + + for directory in directories: + self._children[directory.id] = set(c.target for c in directory.entries) + for child in directory.entries: + self._parents.setdefault(child.target, set()).add(directory.id) + + def mark_known(self, nodes): + """Mark `nodes` and those they imply as known in the SWH archive""" + self._mark_nodes(nodes, self.known, self._children) + + def mark_unknown(self, nodes): + """Mark `nodes` and those they imply as unknown in the SWH archive""" + self._mark_nodes(nodes, self.unknown, self._parents) + + def _mark_nodes(self, nodes, target_set, transitive_mapping): + """Uses Merkle graph properties to mark a node as known or unknown. + + If a node is known, then all of its descendants are known. If a node is + unknown, then all of its ancestors are unknown. + """ + to_proceed = set(nodes) + while to_proceed: + current = to_proceed.pop() + target_set.add(current) + self.undecided.discard(current) + self._undecided_directories.discard(current) + next_node = transitive_mapping.get(current, set()) & self.undecided + to_proceed.update(next_node) + + def get_sample(self): + """Returns a three-tuple of samples from the undecided sets of contents, + skipped contents and directories respectively. + These samples will be queried against the storage which will tell us + which are known.""" + raise NotImplementedError() + + def do_query(self, swh_storage, sample): + """Given a three-tuple of samples, asks the storage which are known or + unknown and mark them as such.""" + contents_sample, skipped_contents_sample, directories_sample = sample + + # TODO unify those APIs, and make it so only have to manipulate small hashes + if contents_sample: + known = set(contents_sample) + unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample)) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + if skipped_contents_sample: + known = set(skipped_contents_sample) + as_dicts = [self._all[s].to_dict() for s in skipped_contents_sample] + unknown = set( + d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts) + ) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + if directories_sample: + known = set(directories_sample) + unknown = set(swh_storage.directory_missing(directories_sample)) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + +class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph): + """Uses a random sampling of only directories until only contents are undecided.""" + + def get_sample(self): + if self._undecided_directories: + if len(self._undecided_directories) <= SAMPLE_SIZE: + return ([], [], list(self._undecided_directories)) + sample = random.sample(self._undecided_directories, SAMPLE_SIZE) + return ([], [], sample) + + contents = [] + skipped_contents = [] + + for sha1 in self.undecided: + obj = self._all[sha1] + obj_type = obj.object_type + if obj_type == model.Content.object_type: + contents.append(sha1) + elif obj_type == model.SkippedContent.object_type: + contents.append(sha1) + else: + raise TypeError(f"Unexpected object type {obj_type}") + + return (contents, skipped_contents, []) + + +def filter_known_objects( + swh_storage, + contents: List[model.Content], + skipped_contents: List[model.SkippedContent], + directories: List[model.Directory], +): + """Filters `contents`, `skipped_contents` and `directories` to only return + those that are unknown to the SWH archive using a discovery algorithm.""" + contents_count = len(contents) + skipped_contents_count = len(skipped_contents) + directories_count = len(directories) + + graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories) + + while graph.undecided: + sample = graph.get_sample() + graph.do_query(swh_storage, sample) + + contents = [c for c in contents if c.sha1_git in graph.unknown] + skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown] + directories = [c for c in directories if c.id in graph.unknown] + + logger.debug( + "Filtered out %d contents, %d skipped contents and %d directories" + % ( + contents_count - len(contents), + skipped_contents_count - len(skipped_contents), + directories_count - len(directories), + ) + ) + + return (contents, skipped_contents, directories) diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -31,6 +31,7 @@ import sentry_sdk from swh.core.tarball import uncompress +from swh.loader.core import discovery from swh.loader.core.loader import BaseLoader from swh.loader.exception import NotFound from swh.loader.package.utils import download @@ -817,6 +818,12 @@ contents, skipped_contents, directories = from_disk.iter_directory(directory) + # Instead of sending everything from the bottom up to the storage, + # use a Merkle graph discovery algorithm to filter out known objects. + contents, skipped_contents, directories = discovery.filter_known_objects( + self.storage, contents, skipped_contents, directories + ) + logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents))