diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py new file mode 100644 --- /dev/null +++ b/swh/loader/core/discovery.py @@ -0,0 +1,207 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Primitives for finding the unknown parts of disk contents efficiently.""" + +from collections import namedtuple +import itertools +import logging +import random +from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union + +from swh.model.from_disk import model +from swh.model.model import Sha1Git +from swh.storage.interface import StorageInterface + +logger = logging.getLogger(__name__) + +# Maximum amount when sampling from the undecided set of directory entries +SAMPLE_SIZE = 1000 + +# Sets of sha1 of contents, skipped contents and directories respectively +Sample: NamedTuple = namedtuple( + "Sample", ["contents", "skipped_contents", "directories"] +) + + +class BaseDiscoveryGraph: + """Creates the base structures and methods needed for discovery algorithms. + Subclasses should override ``get_sample`` to affect how the discovery is made.""" + + def __init__(self, contents, skipped_contents, directories): + self._all_contents: Mapping[ + Sha1Git, Union[model.Content, model.SkippedContent] + ] = {} + self._undecided_directories: Set[Sha1Git] = set() + self._children: Mapping[Sha1Git, model.DirectoryEntry] = {} + self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {} + self.undecided: Set[Sha1Git] = set() + + for content in itertools.chain(contents, skipped_contents): + self.undecided.add(content.sha1_git) + self._all_contents[content.sha1_git] = content + + for directory in directories: + self.undecided.add(directory.id) + self._undecided_directories.add(directory.id) + self._children[directory.id] = {c.target for c in directory.entries} + for child in directory.entries: + self._parents.setdefault(child.target, set()).add(directory.id) + + self.undecided |= self._undecided_directories + self.known: Set[Sha1Git] = set() + self.unknown: Set[Sha1Git] = set() + + def mark_known(self, entries: Iterable[Sha1Git]): + """Mark ``entries`` and those they imply as known in the SWH archive""" + self._mark_entries(entries, self._children, self.known) + + def mark_unknown(self, entries: Iterable[Sha1Git]): + """Mark ``entries`` and those they imply as unknown in the SWH archive""" + self._mark_entries(entries, self._parents, self.unknown) + + def _mark_entries( + self, + entries: Iterable[Sha1Git], + transitive_mapping: Mapping[Any, Any], + target_set: Set[Any], + ): + """Use Merkle graph properties to mark a directory entry as known or unknown. + + If an entry is known, then all of its descendants are known. If it's + unknown, then all of its ancestors are unknown. + + - ``entries``: directory entries to mark along with their ancestors/descendants + where applicable. + - ``transitive_mapping``: mapping from an entry to the next entries to mark + in the hierarchy, if any. + - ``target_set``: set where marked entries will be added. + + """ + to_process = set(entries) + while to_process: + current = to_process.pop() + target_set.add(current) + self.undecided.discard(current) + self._undecided_directories.discard(current) + next_entries = transitive_mapping.get(current, set()) & self.undecided + to_process.update(next_entries) + + def get_sample( + self, + ) -> Sample: + """Return a three-tuple of samples from the undecided sets of contents, + skipped contents and directories respectively. + These samples will be queried against the storage which will tell us + which are known.""" + raise NotImplementedError() + + def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None: + """Given a three-tuple of samples, ask the storage which are known or + unknown and mark them as such.""" + contents_sample, skipped_contents_sample, directories_sample = sample + + # TODO unify those APIs, and make it so only have to manipulate hashes + if contents_sample: + known = set(contents_sample) + unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample)) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + if skipped_contents_sample: + known = set(skipped_contents_sample) + as_dicts = [ + self._all_contents[s].to_dict() for s in skipped_contents_sample + ] + unknown = { + d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts) + } + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + if directories_sample: + known = set(directories_sample) + unknown = set(swh_storage.directory_missing(directories_sample)) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + +class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph): + """Use a random sampling using only directories. + + This allows us to find a statistically good spread of entries in the graph + with a smaller population than using all types of entries. When there are + no more directories, only contents or skipped contents are undecided if any + are left: we send them directly to the storage since they should be few and + their structure flat.""" + + def get_sample(self) -> Sample: + if self._undecided_directories: + if len(self._undecided_directories) <= SAMPLE_SIZE: + return Sample( + contents=set(), + skipped_contents=set(), + directories=set(self._undecided_directories), + ) + sample = random.sample(self._undecided_directories, SAMPLE_SIZE) + directories = {o for o in sample} + return Sample( + contents=set(), skipped_contents=set(), directories=directories + ) + + contents = set() + skipped_contents = set() + + for sha1 in self.undecided: + obj = self._all_contents[sha1] + obj_type = obj.object_type + if obj_type == model.Content.object_type: + contents.add(sha1) + elif obj_type == model.SkippedContent.object_type: + skipped_contents.add(sha1) + else: + raise TypeError(f"Unexpected object type {obj_type}") + + return Sample( + contents=contents, skipped_contents=skipped_contents, directories=set() + ) + + +def filter_known_objects( + swh_storage: StorageInterface, + contents: List[model.Content], + skipped_contents: List[model.SkippedContent], + directories: List[model.Directory], +): + """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return + those that are unknown to the SWH archive using a discovery algorithm.""" + contents_count = len(contents) + skipped_contents_count = len(skipped_contents) + directories_count = len(directories) + + graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories) + + while graph.undecided: + sample = graph.get_sample() + graph.do_query(swh_storage, sample) + + contents = [c for c in contents if c.sha1_git in graph.unknown] + skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown] + directories = [c for c in directories if c.id in graph.unknown] + + logger.debug( + "Filtered out %d contents, %d skipped contents and %d directories", + contents_count - len(contents), + skipped_contents_count - len(skipped_contents), + directories_count - len(directories), + ) + + return (contents, skipped_contents, directories) diff --git a/swh/loader/package/archive/tests/test_archive.py b/swh/loader/package/archive/tests/test_archive.py --- a/swh/loader/package/archive/tests/test_archive.py +++ b/swh/loader/package/archive/tests/test_archive.py @@ -93,6 +93,13 @@ } +@pytest.fixture(autouse=True, scope="function") +def lower_sample_rate(mocker): + """Lower the number of entries per discovery sample so the minimum threshold + for discovery is hit in tests without creating huge test data""" + mocker.patch("swh.loader.package.loader.discovery.SAMPLE_SIZE", 1) + + def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir): url = URL unknown_artifact_url = "https://ftp.g.o/unknown/8sync-0.1.0.tar.gz" @@ -129,6 +136,122 @@ assert_last_visit_matches(swh_storage, url, status="partial", type="tar") +def test_archive_visit_with_skipped_content(swh_storage, requests_mock_datadir): + """With no prior visit, load a gnu project and set the max content size + to something low to check that the loader skips "big" content.""" + loader = ArchiveLoader( + swh_storage, URL, artifacts=GNU_ARTIFACTS[:1], max_content_size=10 * 1024 + ) + + actual_load_status = loader.load() + assert actual_load_status["status"] == "eventful" + + expected_snapshot_first_visit_id = hash_to_bytes( + "9efecc835e8f99254934f256b5301b94f348fd17" + ) + + assert actual_load_status["snapshot_id"] == hash_to_hex( + expected_snapshot_first_visit_id + ) + + assert_last_visit_matches(swh_storage, URL, status="full", type="tar") + + _expected_new_non_skipped_contents_first_visit = [ + "ae9be03bd2a06ed8f4f118d3fe76330bb1d77f62", + "809788434b433eb2e3cfabd5d591c9a659d5e3d8", + "1572607d456d7f633bc6065a2b3048496d679a31", + "27de3b3bc6545d2a797aeeb4657c0e215a0c2e55", + "fbd27c3f41f2668624ffc80b7ba5db9b92ff27ac", + "4f9709e64a9134fe8aefb36fd827b84d8b617ab5", + "84fb589b554fcb7f32b806951dcf19518d67b08f", + "3046e5d1f70297e2a507b98224b6222c9688d610", + "e08441aeab02704cfbd435d6445f7c072f8f524e", + "49d4c0ce1a16601f1e265d446b6c5ea6b512f27c", + "7d149b28eaa228b3871c91f0d5a95a2fa7cb0c68", + "f0c97052e567948adf03e641301e9983c478ccff", + "2e6db43f5cd764e677f416ff0d0c78c7a82ef19b", + "e9258d81faf5881a2f96a77ba609396f82cb97ad", + "7350628ccf194c2c3afba4ac588c33e3f3ac778d", + "0057bec9b5422aff9256af240b177ac0e3ac2608", + "6b5cc594ac466351450f7f64a0b79fdaf4435ad3", + ] + + _expected_new_skipped_contents_first_visit = [ + "1170cf105b04b7e2822a0e09d2acf71da7b9a130", + "2b8d0d0b43a1078fc708930c8ddc2956a86c566e", + "edeb33282b2bffa0e608e9d2fd960fd08093c0ea", + "d64e64d4c73679323f8d4cde2643331ba6c20af9", + "7a756602914be889c0a2d3952c710144b3e64cb0", + "8624bcdae55baeef00cd11d5dfcfa60f68710a02", + "f67935bc3a83a67259cda4b2d43373bd56703844", + "7d7c6c8c5ebaeff879f61f37083a3854184f6c41", + "b99fec102eb24bffd53ab61fc30d59e810f116a2", + "7fb724242e2b62b85ca64190c31dcae5303e19b3", + "0bb892d9391aa706dc2c3b1906567df43cbe06a2", + ] + + # Check that the union of both sets make up the original set (without skipping) + union = set(_expected_new_non_skipped_contents_first_visit) | set( + _expected_new_skipped_contents_first_visit + ) + assert union == set(_expected_new_contents_first_visit) + + stats = get_stats(swh_storage) + assert { + "content": len(_expected_new_non_skipped_contents_first_visit), + "directory": len(_expected_new_directories_first_visit), + "origin": 1, + "origin_visit": 1, + "release": len(_expected_new_releases_first_visit), + "revision": 0, + "skipped_content": len(_expected_new_skipped_contents_first_visit), + "snapshot": 1, + } == stats + + release_id = hash_to_bytes(list(_expected_new_releases_first_visit)[0]) + expected_snapshot = Snapshot( + id=expected_snapshot_first_visit_id, + branches={ + b"HEAD": SnapshotBranch( + target_type=TargetType.ALIAS, + target=b"releases/0.1.0", + ), + b"releases/0.1.0": SnapshotBranch( + target_type=TargetType.RELEASE, + target=release_id, + ), + }, + ) + check_snapshot(expected_snapshot, swh_storage) + + assert swh_storage.release_get([release_id])[0] == Release( + id=release_id, + name=b"0.1.0", + message=( + b"Synthetic release for archive at " + b"https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz\n" + ), + target=hash_to_bytes("3aebc29ed1fccc4a6f2f2010fb8e57882406b528"), + target_type=ObjectType.DIRECTORY, + synthetic=True, + author=Person.from_fullname(b""), + date=TimestampWithTimezone.from_datetime( + datetime.datetime(1999, 12, 9, 8, 53, 30, tzinfo=datetime.timezone.utc) + ), + ) + + expected_contents = map( + hash_to_bytes, _expected_new_non_skipped_contents_first_visit + ) + assert list(swh_storage.content_missing_per_sha1(expected_contents)) == [] + + expected_dirs = map(hash_to_bytes, _expected_new_directories_first_visit) + assert list(swh_storage.directory_missing(expected_dirs)) == [] + + expected_rels = map(hash_to_bytes, _expected_new_releases_first_visit) + assert list(swh_storage.release_missing(expected_rels)) == [] + + def test_archive_visit_with_release_artifact_no_prior_visit( swh_storage, requests_mock_datadir ): diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -31,6 +31,7 @@ import sentry_sdk from swh.core.tarball import uncompress +from swh.loader.core import discovery from swh.loader.core.loader import BaseLoader from swh.loader.exception import NotFound from swh.loader.package.utils import download @@ -817,6 +818,12 @@ contents, skipped_contents, directories = from_disk.iter_directory(directory) + # Instead of sending everything from the bottom up to the storage, + # use a Merkle graph discovery algorithm to filter out known objects. + contents, skipped_contents, directories = discovery.filter_known_objects( + self.storage, contents, skipped_contents, directories + ) + logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents))