Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123606
D8521.id30787.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
D8521.id30787.diff
View Options
diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/discovery.py
@@ -0,0 +1,207 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Primitives for finding the unknown parts of disk contents efficiently."""
+
+from collections import namedtuple
+import itertools
+import logging
+import random
+from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union
+
+from swh.model.from_disk import model
+from swh.model.model import Sha1Git
+from swh.storage.interface import StorageInterface
+
+logger = logging.getLogger(__name__)
+
+# Maximum amount when sampling from the undecided set of directory entries
+SAMPLE_SIZE = 1000
+
+# Sets of sha1 of contents, skipped contents and directories respectively
+Sample: NamedTuple = namedtuple(
+ "Sample", ["contents", "skipped_contents", "directories"]
+)
+
+
+class BaseDiscoveryGraph:
+ """Creates the base structures and methods needed for discovery algorithms.
+ Subclasses should override ``get_sample`` to affect how the discovery is made."""
+
+ def __init__(self, contents, skipped_contents, directories):
+ self._all_contents: Mapping[
+ Sha1Git, Union[model.Content, model.SkippedContent]
+ ] = {}
+ self._undecided_directories: Set[Sha1Git] = set()
+ self._children: Mapping[Sha1Git, model.DirectoryEntry] = {}
+ self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {}
+ self.undecided: Set[Sha1Git] = set()
+
+ for content in itertools.chain(contents, skipped_contents):
+ self.undecided.add(content.sha1_git)
+ self._all_contents[content.sha1_git] = content
+
+ for directory in directories:
+ self.undecided.add(directory.id)
+ self._undecided_directories.add(directory.id)
+ self._children[directory.id] = {c.target for c in directory.entries}
+ for child in directory.entries:
+ self._parents.setdefault(child.target, set()).add(directory.id)
+
+ self.undecided |= self._undecided_directories
+ self.known: Set[Sha1Git] = set()
+ self.unknown: Set[Sha1Git] = set()
+
+ def mark_known(self, entries: Iterable[Sha1Git]):
+ """Mark ``entries`` and those they imply as known in the SWH archive"""
+ self._mark_entries(entries, self._children, self.known)
+
+ def mark_unknown(self, entries: Iterable[Sha1Git]):
+ """Mark ``entries`` and those they imply as unknown in the SWH archive"""
+ self._mark_entries(entries, self._parents, self.unknown)
+
+ def _mark_entries(
+ self,
+ entries: Iterable[Sha1Git],
+ transitive_mapping: Mapping[Any, Any],
+ target_set: Set[Any],
+ ):
+ """Use Merkle graph properties to mark a directory entry as known or unknown.
+
+ If an entry is known, then all of its descendants are known. If it's
+ unknown, then all of its ancestors are unknown.
+
+ - ``entries``: directory entries to mark along with their ancestors/descendants
+ where applicable.
+ - ``transitive_mapping``: mapping from an entry to the next entries to mark
+ in the hierarchy, if any.
+ - ``target_set``: set where marked entries will be added.
+
+ """
+ to_process = set(entries)
+ while to_process:
+ current = to_process.pop()
+ target_set.add(current)
+ self.undecided.discard(current)
+ self._undecided_directories.discard(current)
+ next_entries = transitive_mapping.get(current, set()) & self.undecided
+ to_process.update(next_entries)
+
+ def get_sample(
+ self,
+ ) -> Sample:
+ """Return a three-tuple of samples from the undecided sets of contents,
+ skipped contents and directories respectively.
+ These samples will be queried against the storage which will tell us
+ which are known."""
+ raise NotImplementedError()
+
+ def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None:
+ """Given a three-tuple of samples, ask the storage which are known or
+ unknown and mark them as such."""
+ contents_sample, skipped_contents_sample, directories_sample = sample
+
+ # TODO unify those APIs, and make it so only have to manipulate hashes
+ if contents_sample:
+ known = set(contents_sample)
+ unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if skipped_contents_sample:
+ known = set(skipped_contents_sample)
+ as_dicts = [
+ self._all_contents[s].to_dict() for s in skipped_contents_sample
+ ]
+ unknown = {
+ d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
+ }
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+ if directories_sample:
+ known = set(directories_sample)
+ unknown = set(swh_storage.directory_missing(directories_sample))
+ known -= unknown
+
+ self.mark_known(known)
+ self.mark_unknown(unknown)
+
+
+class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
+ """Use a random sampling using only directories.
+
+ This allows us to find a statistically good spread of entries in the graph
+ with a smaller population than using all types of entries. When there are
+ no more directories, only contents or skipped contents are undecided if any
+ are left: we send them directly to the storage since they should be few and
+ their structure flat."""
+
+ def get_sample(self) -> Sample:
+ if self._undecided_directories:
+ if len(self._undecided_directories) <= SAMPLE_SIZE:
+ return Sample(
+ contents=set(),
+ skipped_contents=set(),
+ directories=set(self._undecided_directories),
+ )
+ sample = random.sample(self._undecided_directories, SAMPLE_SIZE)
+ directories = {o for o in sample}
+ return Sample(
+ contents=set(), skipped_contents=set(), directories=directories
+ )
+
+ contents = set()
+ skipped_contents = set()
+
+ for sha1 in self.undecided:
+ obj = self._all_contents[sha1]
+ obj_type = obj.object_type
+ if obj_type == model.Content.object_type:
+ contents.add(sha1)
+ elif obj_type == model.SkippedContent.object_type:
+ skipped_contents.add(sha1)
+ else:
+ raise TypeError(f"Unexpected object type {obj_type}")
+
+ return Sample(
+ contents=contents, skipped_contents=skipped_contents, directories=set()
+ )
+
+
+def filter_known_objects(
+ swh_storage: StorageInterface,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+):
+ """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return
+ those that are unknown to the SWH archive using a discovery algorithm."""
+ contents_count = len(contents)
+ skipped_contents_count = len(skipped_contents)
+ directories_count = len(directories)
+
+ graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
+
+ while graph.undecided:
+ sample = graph.get_sample()
+ graph.do_query(swh_storage, sample)
+
+ contents = [c for c in contents if c.sha1_git in graph.unknown]
+ skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
+ directories = [c for c in directories if c.id in graph.unknown]
+
+ logger.debug(
+ "Filtered out %d contents, %d skipped contents and %d directories",
+ contents_count - len(contents),
+ skipped_contents_count - len(skipped_contents),
+ directories_count - len(directories),
+ )
+
+ return (contents, skipped_contents, directories)
diff --git a/swh/loader/package/archive/tests/test_archive.py b/swh/loader/package/archive/tests/test_archive.py
--- a/swh/loader/package/archive/tests/test_archive.py
+++ b/swh/loader/package/archive/tests/test_archive.py
@@ -93,6 +93,13 @@
}
+@pytest.fixture(autouse=True, scope="function")
+def lower_sample_rate(mocker):
+ """Lower the number of entries per discovery sample so the minimum threshold
+ for discovery is hit in tests without creating huge test data"""
+ mocker.patch("swh.loader.package.loader.discovery.SAMPLE_SIZE", 1)
+
+
def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir):
url = URL
unknown_artifact_url = "https://ftp.g.o/unknown/8sync-0.1.0.tar.gz"
@@ -129,6 +136,122 @@
assert_last_visit_matches(swh_storage, url, status="partial", type="tar")
+def test_archive_visit_with_skipped_content(swh_storage, requests_mock_datadir):
+ """With no prior visit, load a gnu project and set the max content size
+ to something low to check that the loader skips "big" content."""
+ loader = ArchiveLoader(
+ swh_storage, URL, artifacts=GNU_ARTIFACTS[:1], max_content_size=10 * 1024
+ )
+
+ actual_load_status = loader.load()
+ assert actual_load_status["status"] == "eventful"
+
+ expected_snapshot_first_visit_id = hash_to_bytes(
+ "9efecc835e8f99254934f256b5301b94f348fd17"
+ )
+
+ assert actual_load_status["snapshot_id"] == hash_to_hex(
+ expected_snapshot_first_visit_id
+ )
+
+ assert_last_visit_matches(swh_storage, URL, status="full", type="tar")
+
+ _expected_new_non_skipped_contents_first_visit = [
+ "ae9be03bd2a06ed8f4f118d3fe76330bb1d77f62",
+ "809788434b433eb2e3cfabd5d591c9a659d5e3d8",
+ "1572607d456d7f633bc6065a2b3048496d679a31",
+ "27de3b3bc6545d2a797aeeb4657c0e215a0c2e55",
+ "fbd27c3f41f2668624ffc80b7ba5db9b92ff27ac",
+ "4f9709e64a9134fe8aefb36fd827b84d8b617ab5",
+ "84fb589b554fcb7f32b806951dcf19518d67b08f",
+ "3046e5d1f70297e2a507b98224b6222c9688d610",
+ "e08441aeab02704cfbd435d6445f7c072f8f524e",
+ "49d4c0ce1a16601f1e265d446b6c5ea6b512f27c",
+ "7d149b28eaa228b3871c91f0d5a95a2fa7cb0c68",
+ "f0c97052e567948adf03e641301e9983c478ccff",
+ "2e6db43f5cd764e677f416ff0d0c78c7a82ef19b",
+ "e9258d81faf5881a2f96a77ba609396f82cb97ad",
+ "7350628ccf194c2c3afba4ac588c33e3f3ac778d",
+ "0057bec9b5422aff9256af240b177ac0e3ac2608",
+ "6b5cc594ac466351450f7f64a0b79fdaf4435ad3",
+ ]
+
+ _expected_new_skipped_contents_first_visit = [
+ "1170cf105b04b7e2822a0e09d2acf71da7b9a130",
+ "2b8d0d0b43a1078fc708930c8ddc2956a86c566e",
+ "edeb33282b2bffa0e608e9d2fd960fd08093c0ea",
+ "d64e64d4c73679323f8d4cde2643331ba6c20af9",
+ "7a756602914be889c0a2d3952c710144b3e64cb0",
+ "8624bcdae55baeef00cd11d5dfcfa60f68710a02",
+ "f67935bc3a83a67259cda4b2d43373bd56703844",
+ "7d7c6c8c5ebaeff879f61f37083a3854184f6c41",
+ "b99fec102eb24bffd53ab61fc30d59e810f116a2",
+ "7fb724242e2b62b85ca64190c31dcae5303e19b3",
+ "0bb892d9391aa706dc2c3b1906567df43cbe06a2",
+ ]
+
+ # Check that the union of both sets make up the original set (without skipping)
+ union = set(_expected_new_non_skipped_contents_first_visit) | set(
+ _expected_new_skipped_contents_first_visit
+ )
+ assert union == set(_expected_new_contents_first_visit)
+
+ stats = get_stats(swh_storage)
+ assert {
+ "content": len(_expected_new_non_skipped_contents_first_visit),
+ "directory": len(_expected_new_directories_first_visit),
+ "origin": 1,
+ "origin_visit": 1,
+ "release": len(_expected_new_releases_first_visit),
+ "revision": 0,
+ "skipped_content": len(_expected_new_skipped_contents_first_visit),
+ "snapshot": 1,
+ } == stats
+
+ release_id = hash_to_bytes(list(_expected_new_releases_first_visit)[0])
+ expected_snapshot = Snapshot(
+ id=expected_snapshot_first_visit_id,
+ branches={
+ b"HEAD": SnapshotBranch(
+ target_type=TargetType.ALIAS,
+ target=b"releases/0.1.0",
+ ),
+ b"releases/0.1.0": SnapshotBranch(
+ target_type=TargetType.RELEASE,
+ target=release_id,
+ ),
+ },
+ )
+ check_snapshot(expected_snapshot, swh_storage)
+
+ assert swh_storage.release_get([release_id])[0] == Release(
+ id=release_id,
+ name=b"0.1.0",
+ message=(
+ b"Synthetic release for archive at "
+ b"https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz\n"
+ ),
+ target=hash_to_bytes("3aebc29ed1fccc4a6f2f2010fb8e57882406b528"),
+ target_type=ObjectType.DIRECTORY,
+ synthetic=True,
+ author=Person.from_fullname(b""),
+ date=TimestampWithTimezone.from_datetime(
+ datetime.datetime(1999, 12, 9, 8, 53, 30, tzinfo=datetime.timezone.utc)
+ ),
+ )
+
+ expected_contents = map(
+ hash_to_bytes, _expected_new_non_skipped_contents_first_visit
+ )
+ assert list(swh_storage.content_missing_per_sha1(expected_contents)) == []
+
+ expected_dirs = map(hash_to_bytes, _expected_new_directories_first_visit)
+ assert list(swh_storage.directory_missing(expected_dirs)) == []
+
+ expected_rels = map(hash_to_bytes, _expected_new_releases_first_visit)
+ assert list(swh_storage.release_missing(expected_rels)) == []
+
+
def test_archive_visit_with_release_artifact_no_prior_visit(
swh_storage, requests_mock_datadir
):
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -31,6 +31,7 @@
import sentry_sdk
from swh.core.tarball import uncompress
+from swh.loader.core import discovery
from swh.loader.core.loader import BaseLoader
from swh.loader.exception import NotFound
from swh.loader.package.utils import download
@@ -817,6 +818,12 @@
contents, skipped_contents, directories = from_disk.iter_directory(directory)
+ # Instead of sending everything from the bottom up to the storage,
+ # use a Merkle graph discovery algorithm to filter out known objects.
+ contents, skipped_contents, directories = discovery.filter_known_objects(
+ self.storage, contents, skipped_contents, directories
+ )
+
logger.debug("Number of skipped contents: %s", len(skipped_contents))
self.storage.skipped_content_add(skipped_contents)
logger.debug("Number of contents: %s", len(contents))
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Dec 19, 12:52 PM (16 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223925
Attached To
D8521: Use a Merkle discovery algorithm with archives
Event Timeline
Log In to Comment