Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123045
D8538.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
D8538.diff
View Options
diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
--- a/swh/loader/core/discovery.py
+++ b/swh/loader/core/discovery.py
@@ -5,6 +5,7 @@
"""Primitives for finding the unknown parts of disk contents efficiently."""
+import abc
from collections import namedtuple
import itertools
import logging
@@ -26,6 +27,72 @@
)
+class ArchiveDiscoveryInterface(abc.ABC):
+ """Interface used in discovery code to abstract over ways of connecting to
+ the SWH archive (direct storage, web API, etc.) for all methods needed by
+ discovery algorithms."""
+
+ contents: List[model.Content]
+ skipped_contents: List[model.SkippedContent]
+ directories: List[model.Directory]
+
+ def __init__(
+ self,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+ ) -> None:
+ self.contents = contents
+ self.skipped_contents = skipped_contents
+ self.directories = directories
+
+ @abc.abstractmethod
+ async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List content missing from the archive by sha1"""
+
+ @abc.abstractmethod
+ async def skipped_content_missing(
+ self, skipped_contents: List[Sha1Git]
+ ) -> Iterable[Sha1Git]:
+ """List skipped content missing from the archive by sha1"""
+
+ @abc.abstractmethod
+ async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List directories missing from the archive by sha1"""
+
+
+class DiscoveryStorageConnection(ArchiveDiscoveryInterface):
+ """Use the storage APIs to query the archive"""
+
+ def __init__(
+ self,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+ swh_storage: StorageInterface,
+ ) -> None:
+ super().__init__(contents, skipped_contents, directories)
+ self.storage = swh_storage
+
+ async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List content missing from the archive by sha1"""
+ return self.storage.content_missing_per_sha1_git(contents)
+
+ async def skipped_content_missing(
+ self, skipped_contents: List[Sha1Git]
+ ) -> Iterable[Sha1Git]:
+ """List skipped content missing from the archive by sha1"""
+ contents = [
+ {"sha1_git": s, "sha1": None, "sha256": None, "blake2s256": None}
+ for s in skipped_contents
+ ]
+ return (d["sha1_git"] for d in self.storage.skipped_content_missing(contents))
+
+ async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List directories missing from the archive by sha1"""
+ return self.storage.directory_missing(directories)
+
+
class BaseDiscoveryGraph:
"""Creates the base structures and methods needed for discovery algorithms.
Subclasses should override ``get_sample`` to affect how the discovery is made."""
@@ -89,7 +156,7 @@
next_entries = transitive_mapping.get(current, set()) & self.undecided
to_process.update(next_entries)
- def get_sample(
+ async def get_sample(
self,
) -> Sample:
"""Return a three-tuple of samples from the undecided sets of contents,
@@ -98,36 +165,23 @@
which are known."""
raise NotImplementedError()
- def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None:
- """Given a three-tuple of samples, ask the storage which are known or
+ async def do_query(
+ self, archive: ArchiveDiscoveryInterface, sample: Sample
+ ) -> None:
+ """Given a three-tuple of samples, ask the archive which are known or
unknown and mark them as such."""
- contents_sample, skipped_contents_sample, directories_sample = sample
- # TODO unify those APIs, and make it so only have to manipulate hashes
- if contents_sample:
- known = set(contents_sample)
- unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
- known -= unknown
-
- self.mark_known(known)
- self.mark_unknown(unknown)
-
- if skipped_contents_sample:
- known = set(skipped_contents_sample)
- as_dicts = [
- self._all_contents[s].to_dict() for s in skipped_contents_sample
- ]
- unknown = {
- d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
- }
- known -= unknown
-
- self.mark_known(known)
- self.mark_unknown(unknown)
+ methods = (
+ archive.content_missing,
+ archive.skipped_content_missing,
+ archive.directory_missing,
+ )
- if directories_sample:
- known = set(directories_sample)
- unknown = set(swh_storage.directory_missing(directories_sample))
+ for sample_per_type, method in zip(sample, methods):
+ if not sample_per_type:
+ continue
+ known = set(sample_per_type)
+ unknown = set(await method(sample_per_type))
known -= unknown
self.mark_known(known)
@@ -143,7 +197,7 @@
are left: we send them directly to the storage since they should be few and
their structure flat."""
- def get_sample(self) -> Sample:
+ async def get_sample(self) -> Sample:
if self._undecided_directories:
if len(self._undecided_directories) <= SAMPLE_SIZE:
return Sample(
@@ -175,14 +229,14 @@
)
-def filter_known_objects(
- swh_storage: StorageInterface,
- contents: List[model.Content],
- skipped_contents: List[model.SkippedContent],
- directories: List[model.Directory],
-):
- """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return
- those that are unknown to the SWH archive using a discovery algorithm."""
+async def filter_known_objects(archive: ArchiveDiscoveryInterface):
+ """Filter ``archive``'s ``contents``, ``skipped_contents`` and ``directories``
+ to only return those that are unknown to the SWH archive using a discovery
+ algorithm."""
+ contents = archive.contents
+ skipped_contents = archive.skipped_contents
+ directories = archive.directories
+
contents_count = len(contents)
skipped_contents_count = len(skipped_contents)
directories_count = len(directories)
@@ -190,8 +244,8 @@
graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
while graph.undecided:
- sample = graph.get_sample()
- graph.do_query(swh_storage, sample)
+ sample = await graph.get_sample()
+ await graph.do_query(archive, sample)
contents = [c for c in contents if c.sha1_git in graph.unknown]
skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import asyncio
import datetime
import hashlib
from itertools import islice
@@ -820,8 +821,12 @@
# Instead of sending everything from the bottom up to the storage,
# use a Merkle graph discovery algorithm to filter out known objects.
- contents, skipped_contents, directories = discovery.filter_known_objects(
- self.storage, contents, skipped_contents, directories
+ contents, skipped_contents, directories = asyncio.run(
+ discovery.filter_known_objects(
+ discovery.DiscoveryStorageConnection(
+ contents, skipped_contents, directories, self.storage
+ ),
+ )
)
logger.debug("Number of skipped contents: %s", len(skipped_contents))
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 5:37 PM (2 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221327
Attached To
D8538: Setup async interface for discovery module
Event Timeline
Log In to Comment