Page MenuHomeSoftware Heritage

D8521.id30787.diff
No OneTemporary

D8521.id30787.diff

diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/discovery.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding the unknown parts of disk contents efficiently."""
+
+from collections import namedtuple
+import itertools
+import logging
+import random
+from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union
+
+from swh.model.from_disk import model
+from swh.model.model import Sha1Git
+from swh.storage.interface import StorageInterface
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of directory entries
+SAMPLE_SIZE = 1000
+
+# Sets of sha1 of contents, skipped contents and directories respectively
+Sample: NamedTuple = namedtuple(
+ "Sample", ["contents", "skipped_contents", "directories"]
+)
+
+
+class BaseDiscoveryGraph:
+ """Creates the base structures and methods needed for discovery algorithms.
+ Subclasses should override ``get_sample`` to affect how the discovery is made."""
+
+ def __init__(self, contents, skipped_contents, directories):
+ self._all_contents: Mapping[
+ Sha1Git, Union[model.Content, model.SkippedContent]
+ ] = {}
+ self._undecided_directories: Set[Sha1Git] = set()
+ self._children: Mapping[Sha1Git, model.DirectoryEntry] = {}
+ self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {}
+ self.undecided: Set[Sha1Git] = set()
+
+ for content in itertools.chain(contents, skipped_contents):
+ self.undecided.add(content.sha1_git)
+ self._all_contents[content.sha1_git] = content
+
+ for directory in directories:
+ self.undecided.add(directory.id)
+ self._undecided_directories.add(directory.id)
+ self._children[directory.id] = {c.target for c in directory.entries}
+ for child in directory.entries:
+ self._parents.setdefault(child.target, set()).add(directory.id)
+
+ self.undecided |= self._undecided_directories
+ self.known: Set[Sha1Git] = set()
+ self.unknown: Set[Sha1Git] = set()
+
+ def mark_known(self, entries: Iterable[Sha1Git]):
+ """Mark ``entries`` and those they imply as known in the SWH archive"""
+ self._mark_entries(entries, self._children, self.known)
+
+ def mark_unknown(self, entries: Iterable[Sha1Git]):
+ """Mark ``entries`` and those they imply as unknown in the SWH archive"""
+ self._mark_entries(entries, self._parents, self.unknown)
+
+ def _mark_entries(
+ self,
+ entries: Iterable[Sha1Git],
+ transitive_mapping: Mapping[Any, Any],
+ target_set: Set[Any],
+ ):
+ """Use Merkle graph properties to mark a directory entry as known or unknown.
+
+ If an entry is known, then all of its descendants are known. If it's
+ unknown, then all of its ancestors are unknown.
+
+ - ``entries``: directory entries to mark along with their ancestors/descendants
+ where applicable.
+ - ``transitive_mapping``: mapping from an entry to the next entries to mark
+ in the hierarchy, if any.
+ - ``target_set``: set where marked entries will be added.
+
+ """
+ to_process = set(entries)
+ while to_process:
+ current = to_process.pop()
+ target_set.add(current)
+ self.undecided.discard(current)
+ self._undecided_directories.discard(current)
+ next_entries = transitive_mapping.get(current, set()) & self.undecided
+ to_process.update(next_entries)
+
+ def get_sample(
+ self,
+ ) -> Sample:
+ """Return a three-tuple of samples from the undecided sets of contents,
+ skipped contents and directories respectively.
+ These samples will be queried against the storage which will tell us
+ which are known."""
+ raise NotImplementedError()
+
+ def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None:
+ """Given a three-tuple of samples, ask the storage which are known or
+ unknown and mark them as such."""
+ contents_sample, skipped_contents_sample, directories_sample = sample
+
+ # TODO unify those APIs, and make it so only have to manipulate hashes
+ if contents_sample:
+ known = set(contents_sample)
+ unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if skipped_contents_sample:
+ known = set(skipped_contents_sample)
+ as_dicts = [
+ self._all_contents[s].to_dict() for s in skipped_contents_sample
+ ]
+ unknown = {
+ d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
+ }
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if directories_sample:
+ known = set(directories_sample)
+ unknown = set(swh_storage.directory_missing(directories_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+ """Use a random sampling using only directories.
+
+ This allows us to find a statistically good spread of entries in the graph
+ with a smaller population than using all types of entries. When there are
+ no more directories, only contents or skipped contents are undecided if any
+ are left: we send them directly to the storage since they should be few and
+ their structure flat."""
+
+ def get_sample(self) -> Sample:
+ if self._undecided_directories:
+ if len(self._undecided_directories) <= SAMPLE_SIZE:
+ return Sample(
+ contents=set(),
+ skipped_contents=set(),
+ directories=set(self._undecided_directories),
+ )
+ sample = random.sample(self._undecided_directories, SAMPLE_SIZE)
+ directories = {o for o in sample}
+ return Sample(
+ contents=set(), skipped_contents=set(), directories=directories
+ )
+
+ contents = set()
+ skipped_contents = set()
+
+ for sha1 in self.undecided:
+ obj = self._all_contents[sha1]
+ obj_type = obj.object_type
+ if obj_type == model.Content.object_type:
+ contents.add(sha1)
+ elif obj_type == model.SkippedContent.object_type:
+ skipped_contents.add(sha1)
+ else:
+ raise TypeError(f"Unexpected object type {obj_type}")
+
+ return Sample(
+ contents=contents, skipped_contents=skipped_contents, directories=set()
+ )
+
+
+def filter_known_objects(
+ swh_storage: StorageInterface,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+):
+ """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return
+ those that are unknown to the SWH archive using a discovery algorithm."""
+ contents_count = len(contents)
+ skipped_contents_count = len(skipped_contents)
+ directories_count = len(directories)
+
+ graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+ while graph.undecided:
+ sample = graph.get_sample()
+ graph.do_query(swh_storage, sample)
+
+ contents = [c for c in contents if c.sha1_git in graph.unknown]
+ skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+ directories = [c for c in directories if c.id in graph.unknown]
+
+ logger.debug(
+ "Filtered out %d contents, %d skipped contents and %d directories",
+ contents_count - len(contents),
+ skipped_contents_count - len(skipped_contents),
+ directories_count - len(directories),
+ )
+
+ return (contents, skipped_contents, directories)
diff --git a/swh/loader/package/archive/tests/test_archive.py b/swh/loader/package/archive/tests/test_archive.py
--- a/swh/loader/package/archive/tests/test_archive.py
+++ b/swh/loader/package/archive/tests/test_archive.py
@@ -93,6 +93,13 @@
}
+@pytest.fixture(autouse=True, scope="function")
+def lower_sample_rate(mocker):
+ """Lower the number of entries per discovery sample so the minimum threshold
+ for discovery is hit in tests without creating huge test data"""
+ mocker.patch("swh.loader.package.loader.discovery.SAMPLE_SIZE", 1)
+
+
def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir):
url = URL
unknown_artifact_url = "https://ftp.g.o/unknown/8sync-0.1.0.tar.gz"
@@ -129,6 +136,122 @@
assert_last_visit_matches(swh_storage, url, status="partial", type="tar")
+def test_archive_visit_with_skipped_content(swh_storage, requests_mock_datadir):
+ """With no prior visit, load a gnu project and set the max content size
+ to something low to check that the loader skips "big" content."""
+ loader = ArchiveLoader(
+ swh_storage, URL, artifacts=GNU_ARTIFACTS[:1], max_content_size=10 * 1024
+ )
+
+ actual_load_status = loader.load()
+ assert actual_load_status["status"] == "eventful"
+
+ expected_snapshot_first_visit_id = hash_to_bytes(
+ "9efecc835e8f99254934f256b5301b94f348fd17"
+ )
+
+ assert actual_load_status["snapshot_id"] == hash_to_hex(
+ expected_snapshot_first_visit_id
+ )
+
+ assert_last_visit_matches(swh_storage, URL, status="full", type="tar")
+
+ _expected_new_non_skipped_contents_first_visit = [
+ "ae9be03bd2a06ed8f4f118d3fe76330bb1d77f62",
+ "809788434b433eb2e3cfabd5d591c9a659d5e3d8",
+ "1572607d456d7f633bc6065a2b3048496d679a31",
+ "27de3b3bc6545d2a797aeeb4657c0e215a0c2e55",
+ "fbd27c3f41f2668624ffc80b7ba5db9b92ff27ac",
+ "4f9709e64a9134fe8aefb36fd827b84d8b617ab5",
+ "84fb589b554fcb7f32b806951dcf19518d67b08f",
+ "3046e5d1f70297e2a507b98224b6222c9688d610",
+ "e08441aeab02704cfbd435d6445f7c072f8f524e",
+ "49d4c0ce1a16601f1e265d446b6c5ea6b512f27c",
+ "7d149b28eaa228b3871c91f0d5a95a2fa7cb0c68",
+ "f0c97052e567948adf03e641301e9983c478ccff",
+ "2e6db43f5cd764e677f416ff0d0c78c7a82ef19b",
+ "e9258d81faf5881a2f96a77ba609396f82cb97ad",
+ "7350628ccf194c2c3afba4ac588c33e3f3ac778d",
+ "0057bec9b5422aff9256af240b177ac0e3ac2608",
+ "6b5cc594ac466351450f7f64a0b79fdaf4435ad3",
+ ]
+
+ _expected_new_skipped_contents_first_visit = [
+ "1170cf105b04b7e2822a0e09d2acf71da7b9a130",
+ "2b8d0d0b43a1078fc708930c8ddc2956a86c566e",
+ "edeb33282b2bffa0e608e9d2fd960fd08093c0ea",
+ "d64e64d4c73679323f8d4cde2643331ba6c20af9",
+ "7a756602914be889c0a2d3952c710144b3e64cb0",
+ "8624bcdae55baeef00cd11d5dfcfa60f68710a02",
+ "f67935bc3a83a67259cda4b2d43373bd56703844",
+ "7d7c6c8c5ebaeff879f61f37083a3854184f6c41",
+ "b99fec102eb24bffd53ab61fc30d59e810f116a2",
+ "7fb724242e2b62b85ca64190c31dcae5303e19b3",
+ "0bb892d9391aa706dc2c3b1906567df43cbe06a2",
+ ]
+
+ # Check that the union of both sets make up the original set (without skipping)
+ union = set(_expected_new_non_skipped_contents_first_visit) | set(
+ _expected_new_skipped_contents_first_visit
+ )
+ assert union == set(_expected_new_contents_first_visit)
+
+ stats = get_stats(swh_storage)
+ assert {
+ "content": len(_expected_new_non_skipped_contents_first_visit),
+ "directory": len(_expected_new_directories_first_visit),
+ "origin": 1,
+ "origin_visit": 1,
+ "release": len(_expected_new_releases_first_visit),
+ "revision": 0,
+ "skipped_content": len(_expected_new_skipped_contents_first_visit),
+ "snapshot": 1,
+ } == stats
+
+ release_id = hash_to_bytes(list(_expected_new_releases_first_visit)[0])
+ expected_snapshot = Snapshot(
+ id=expected_snapshot_first_visit_id,
+ branches={
+ b"HEAD": SnapshotBranch(
+ target_type=TargetType.ALIAS,
+ target=b"releases/0.1.0",
+ ),
+ b"releases/0.1.0": SnapshotBranch(
+ target_type=TargetType.RELEASE,
+ target=release_id,
+ ),
+ },
+ )
+ check_snapshot(expected_snapshot, swh_storage)
+
+ assert swh_storage.release_get([release_id])[0] == Release(
+ id=release_id,
+ name=b"0.1.0",
+ message=(
+ b"Synthetic release for archive at "
+ b"https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz\n"
+ ),
+ target=hash_to_bytes("3aebc29ed1fccc4a6f2f2010fb8e57882406b528"),
+ target_type=ObjectType.DIRECTORY,
+ synthetic=True,
+ author=Person.from_fullname(b""),
+ date=TimestampWithTimezone.from_datetime(
+ datetime.datetime(1999, 12, 9, 8, 53, 30, tzinfo=datetime.timezone.utc)
+ ),
+ )
+
+ expected_contents = map(
+ hash_to_bytes, _expected_new_non_skipped_contents_first_visit
+ )
+ assert list(swh_storage.content_missing_per_sha1(expected_contents)) == []
+
+ expected_dirs = map(hash_to_bytes, _expected_new_directories_first_visit)
+ assert list(swh_storage.directory_missing(expected_dirs)) == []
+
+ expected_rels = map(hash_to_bytes, _expected_new_releases_first_visit)
+ assert list(swh_storage.release_missing(expected_rels)) == []
+
+
def test_archive_visit_with_release_artifact_no_prior_visit(
swh_storage, requests_mock_datadir
):
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -31,6 +31,7 @@
import sentry_sdk
from swh.core.tarball import uncompress
+from swh.loader.core import discovery
from swh.loader.core.loader import BaseLoader
from swh.loader.exception import NotFound
from swh.loader.package.utils import download
@@ -817,6 +818,12 @@
contents, skipped_contents, directories = from_disk.iter_directory(directory)
+ # Instead of sending everything from the bottom up to the storage,
+ # use a Merkle graph discovery algorithm to filter out known objects.
+ contents, skipped_contents, directories = discovery.filter_known_objects(
+ self.storage, contents, skipped_contents, directories
+ )
+
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 12:52 PM (18 h, 45 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223925

Event Timeline