diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py --- a/swh/loader/core/discovery.py +++ b/swh/loader/core/discovery.py @@ -11,6 +11,8 @@ import random from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union +from typing_extensions import Protocol, runtime_checkable + from swh.model.from_disk import model from swh.model.model import Sha1Git from swh.storage.interface import StorageInterface @@ -26,6 +28,59 @@ ) +@runtime_checkable +class ArchiveDiscoveryInterface(Protocol): + contents: List[model.Content] + skipped_contents: List[model.SkippedContent] + directories: List[model.Directory] + + async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]: + """List content missing from the archive by sha1""" + + async def skipped_content_missing( + self, skipped_contents: List[Sha1Git] + ) -> Iterable[Sha1Git]: + """List skipped content missing from the archive by sha1""" + + async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: + """List directories missing from the archive by sha1""" + + +class DiscoveryStorageConnection(ArchiveDiscoveryInterface): + """Use the storage APIs to query the archive""" + + def __init__( + self, + contents: List[model.Content], + skipped_contents: List[model.SkippedContent], + directories: List[model.Directory], + swh_storage: StorageInterface, + ) -> None: + super().__init__() + self.storage = swh_storage + self.contents = contents + self.skipped_contents = skipped_contents + self.directories = directories + + async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]: + """List content missing from the archive by sha1""" + return self.storage.content_missing_per_sha1_git(contents) + + async def skipped_content_missing( + self, skipped_contents: List[Sha1Git] + ) -> Iterable[Sha1Git]: + """List skipped content missing from the archive by sha1""" + # TODO remove type ignore when workaround for inconsistent APIs is removed + # in ``do_query`` + matcher = self._all_contents # type: ignore + contents = [matcher[s].to_dict() for s in skipped_contents] + return (d["sha1_git"] for d in self.storage.skipped_content_missing(contents)) + + async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: + """List directories missing from the archive by sha1""" + return self.storage.directory_missing(directories) + + class BaseDiscoveryGraph: """Creates the base structures and methods needed for discovery algorithms. Subclasses should override ``get_sample`` to affect how the discovery is made.""" @@ -89,7 +144,7 @@ next_entries = transitive_mapping.get(current, set()) & self.undecided to_process.update(next_entries) - def get_sample( + async def get_sample( self, ) -> Sample: """Return a three-tuple of samples from the undecided sets of contents, @@ -98,36 +153,27 @@ which are known.""" raise NotImplementedError() - def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None: - """Given a three-tuple of samples, ask the storage which are known or + async def do_query( + self, archive: ArchiveDiscoveryInterface, sample: Sample + ) -> None: + """Given a three-tuple of samples, ask the archive which are known or unknown and mark them as such.""" - contents_sample, skipped_contents_sample, directories_sample = sample - # TODO unify those APIs, and make it so only have to manipulate hashes - if contents_sample: - known = set(contents_sample) - unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample)) - known -= unknown - - self.mark_known(known) - self.mark_unknown(unknown) - - if skipped_contents_sample: - known = set(skipped_contents_sample) - as_dicts = [ - self._all_contents[s].to_dict() for s in skipped_contents_sample - ] - unknown = { - d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts) - } - known -= unknown + methods = ( + archive.content_missing, + archive.skipped_content_missing, + archive.directory_missing, + ) - self.mark_known(known) - self.mark_unknown(unknown) + # hack to work around the inconsistent APIs without modifying + # ArchiveDiscoveryInterface. TODO remove this when the APIs are aligned + archive._all_contents = self._all_contents # type: ignore - if directories_sample: - known = set(directories_sample) - unknown = set(swh_storage.directory_missing(directories_sample)) + for sample_per_type, method in zip(sample, methods): + if not sample_per_type: + continue + known = set(sample_per_type) + unknown = set(await method(sample_per_type)) known -= unknown self.mark_known(known) @@ -143,7 +189,7 @@ are left: we send them directly to the storage since they should be few and their structure flat.""" - def get_sample(self) -> Sample: + async def get_sample(self) -> Sample: if self._undecided_directories: if len(self._undecided_directories) <= SAMPLE_SIZE: return Sample( @@ -175,14 +221,14 @@ ) -def filter_known_objects( - swh_storage: StorageInterface, - contents: List[model.Content], - skipped_contents: List[model.SkippedContent], - directories: List[model.Directory], -): - """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return - those that are unknown to the SWH archive using a discovery algorithm.""" +async def filter_known_objects(archive: ArchiveDiscoveryInterface): + """Filter ``archive``'s ``contents``, ``skipped_contents`` and ``directories`` + to only return those that are unknown to the SWH archive using a discovery + algorithm.""" + contents = archive.contents + skipped_contents = archive.skipped_contents + directories = archive.directories + contents_count = len(contents) skipped_contents_count = len(skipped_contents) directories_count = len(directories) @@ -190,8 +236,8 @@ graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories) while graph.undecided: - sample = graph.get_sample() - graph.do_query(swh_storage, sample) + sample = await graph.get_sample() + await graph.do_query(archive, sample) contents = [c for c in contents if c.sha1_git in graph.unknown] skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown] diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio import datetime import hashlib from itertools import islice @@ -820,8 +821,12 @@ # Instead of sending everything from the bottom up to the storage, # use a Merkle graph discovery algorithm to filter out known objects. - contents, skipped_contents, directories = discovery.filter_known_objects( - self.storage, contents, skipped_contents, directories + contents, skipped_contents, directories = asyncio.run( + discovery.filter_known_objects( + discovery.DiscoveryStorageConnection( + contents, skipped_contents, directories, self.storage + ), + ) ) logger.debug("Number of skipped contents: %s", len(skipped_contents))