Page MenuHomeSoftware Heritage

D8538.diff
No OneTemporary

D8538.diff

diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py
--- a/swh/loader/core/discovery.py
+++ b/swh/loader/core/discovery.py
@@ -5,6 +5,7 @@
"""Primitives for finding the unknown parts of disk contents efficiently."""
+import abc
from collections import namedtuple
import itertools
import logging
@@ -26,6 +27,72 @@
)
+class ArchiveDiscoveryInterface(abc.ABC):
+ """Interface used in discovery code to abstract over ways of connecting to
+ the SWH archive (direct storage, web API, etc.) for all methods needed by
+ discovery algorithms."""
+
+ contents: List[model.Content]
+ skipped_contents: List[model.SkippedContent]
+ directories: List[model.Directory]
+
+ def __init__(
+ self,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+ ) -> None:
+ self.contents = contents
+ self.skipped_contents = skipped_contents
+ self.directories = directories
+
+ @abc.abstractmethod
+ async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List content missing from the archive by sha1"""
+
+ @abc.abstractmethod
+ async def skipped_content_missing(
+ self, skipped_contents: List[Sha1Git]
+ ) -> Iterable[Sha1Git]:
+ """List skipped content missing from the archive by sha1"""
+
+ @abc.abstractmethod
+ async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List directories missing from the archive by sha1"""
+
+
+class DiscoveryStorageConnection(ArchiveDiscoveryInterface):
+ """Use the storage APIs to query the archive"""
+
+ def __init__(
+ self,
+ contents: List[model.Content],
+ skipped_contents: List[model.SkippedContent],
+ directories: List[model.Directory],
+ swh_storage: StorageInterface,
+ ) -> None:
+ super().__init__(contents, skipped_contents, directories)
+ self.storage = swh_storage
+
+ async def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List content missing from the archive by sha1"""
+ return self.storage.content_missing_per_sha1_git(contents)
+
+ async def skipped_content_missing(
+ self, skipped_contents: List[Sha1Git]
+ ) -> Iterable[Sha1Git]:
+ """List skipped content missing from the archive by sha1"""
+ contents = [
+ {"sha1_git": s, "sha1": None, "sha256": None, "blake2s256": None}
+ for s in skipped_contents
+ ]
+ return (d["sha1_git"] for d in self.storage.skipped_content_missing(contents))
+
+ async def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
+ """List directories missing from the archive by sha1"""
+ return self.storage.directory_missing(directories)
+
+
class BaseDiscoveryGraph:
"""Creates the base structures and methods needed for discovery algorithms.
Subclasses should override ``get_sample`` to affect how the discovery is made."""
@@ -89,7 +156,7 @@
next_entries = transitive_mapping.get(current, set()) & self.undecided
to_process.update(next_entries)
- def get_sample(
+ async def get_sample(
self,
) -> Sample:
"""Return a three-tuple of samples from the undecided sets of contents,
@@ -98,36 +165,23 @@
which are known."""
raise NotImplementedError()
- def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None:
- """Given a three-tuple of samples, ask the storage which are known or
+ async def do_query(
+ self, archive: ArchiveDiscoveryInterface, sample: Sample
+ ) -> None:
+ """Given a three-tuple of samples, ask the archive which are known or
unknown and mark them as such."""
- contents_sample, skipped_contents_sample, directories_sample = sample
- # TODO unify those APIs, and make it so only have to manipulate hashes
- if contents_sample:
- known = set(contents_sample)
- unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample))
- known -= unknown
-
- self.mark_known(known)
- self.mark_unknown(unknown)
-
- if skipped_contents_sample:
- known = set(skipped_contents_sample)
- as_dicts = [
- self._all_contents[s].to_dict() for s in skipped_contents_sample
- ]
- unknown = {
- d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts)
- }
- known -= unknown
-
- self.mark_known(known)
- self.mark_unknown(unknown)
+ methods = (
+ archive.content_missing,
+ archive.skipped_content_missing,
+ archive.directory_missing,
+ )
- if directories_sample:
- known = set(directories_sample)
- unknown = set(swh_storage.directory_missing(directories_sample))
+ for sample_per_type, method in zip(sample, methods):
+ if not sample_per_type:
+ continue
+ known = set(sample_per_type)
+ unknown = set(await method(sample_per_type))
known -= unknown
self.mark_known(known)
@@ -143,7 +197,7 @@
are left: we send them directly to the storage since they should be few and
their structure flat."""
- def get_sample(self) -> Sample:
+ async def get_sample(self) -> Sample:
if self._undecided_directories:
if len(self._undecided_directories) <= SAMPLE_SIZE:
return Sample(
@@ -175,14 +229,14 @@
)
-def filter_known_objects(
- swh_storage: StorageInterface,
- contents: List[model.Content],
- skipped_contents: List[model.SkippedContent],
- directories: List[model.Directory],
-):
- """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return
- those that are unknown to the SWH archive using a discovery algorithm."""
+async def filter_known_objects(archive: ArchiveDiscoveryInterface):
+ """Filter ``archive``'s ``contents``, ``skipped_contents`` and ``directories``
+ to only return those that are unknown to the SWH archive using a discovery
+ algorithm."""
+ contents = archive.contents
+ skipped_contents = archive.skipped_contents
+ directories = archive.directories
+
contents_count = len(contents)
skipped_contents_count = len(skipped_contents)
directories_count = len(directories)
@@ -190,8 +244,8 @@
graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories)
while graph.undecided:
- sample = graph.get_sample()
- graph.do_query(swh_storage, sample)
+ sample = await graph.get_sample()
+ await graph.do_query(archive, sample)
contents = [c for c in contents if c.sha1_git in graph.unknown]
skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py
--- a/swh/loader/package/loader.py
+++ b/swh/loader/package/loader.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import asyncio
import datetime
import hashlib
from itertools import islice
@@ -820,8 +821,12 @@
# Instead of sending everything from the bottom up to the storage,
# use a Merkle graph discovery algorithm to filter out known objects.
- contents, skipped_contents, directories = discovery.filter_known_objects(
- self.storage, contents, skipped_contents, directories
+ contents, skipped_contents, directories = asyncio.run(
+ discovery.filter_known_objects(
+ discovery.DiscoveryStorageConnection(
+ contents, skipped_contents, directories, self.storage
+ ),
+ )
)
logger.debug("Number of skipped contents: %s", len(skipped_contents))

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 5:37 PM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221327

Event Timeline