Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123029
D6380.id31886.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D6380.id31886.diff
View Options
diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py
--- a/swh/loader/core/loader.py
+++ b/swh/loader/core/loader.py
@@ -77,6 +77,7 @@
lister_instance_name: Name of the lister instance which triggered this load.
Must be None iff lister_name is, but it may be the empty string for listers
with a single instance.
+
"""
visit_type: str
@@ -98,6 +99,7 @@
lister_name: Optional[str] = None,
lister_instance_name: Optional[str] = None,
metadata_fetcher_credentials: CredentialsType = None,
+ create_partial_snapshot: bool = False,
):
if lister_name == "":
raise ValueError("lister_name must not be the empty string")
@@ -116,6 +118,7 @@
self.lister_name = lister_name
self.lister_instance_name = lister_instance_name
self.metadata_fetcher_credentials = metadata_fetcher_credentials or {}
+ self.create_partial_snapshot = create_partial_snapshot
if logging_class is None:
logging_class = "%s.%s" % (
@@ -275,17 +278,18 @@
"""Run any additional processing between fetching and storing the data
Returns:
- a value that is interpreted as a boolean. If True, fetch_data needs
- to be called again to complete loading.
- Ignored if ``fetch_data`` already returned :const:`False`.
+ a value that is interpreted as a boolean. If True, :meth:`fetch_data` needs
+ to be called again to complete loading. Ignored if :meth:`fetch_data`
+ already returned :const:`False`.
"""
return True
def store_data(self) -> None:
- """Store fetched data in the database.
+ """Store fetched and processed data in the storage.
+
+ This should call the `storage.<object>_add` methods, which handle the objects to
+ store in the storage.
- Should call the :func:`maybe_load_xyz` methods, which handle the
- bundles sent to storage, rather than send directly.
"""
raise NotImplementedError
@@ -332,6 +336,16 @@
"""
pass
+ def build_partial_snapshot(self) -> Optional[Snapshot]:
+ """When the loader is configured to serialize partial snapshot, this allows the
+ loader to give an implementation that builds a partial snapshot. This is used
+ when the ingestion is taking multiple calls to :meth:`fetch_data` and
+ :meth:`store_data`. Ignored when the loader is not configured to serialize
+ partial snapshot.
+
+ """
+ return None
+
def load(self) -> Dict[str, str]:
r"""Loading logic for the loader to follow:
@@ -411,6 +425,26 @@
self.store_data()
t4 = time.monotonic()
total_time_store_data += t4 - t3
+
+ # At the end of each ingestion loop, if the loader is configured for
+ # partial snapshot (see self.create_partial_snapshot) and there are more
+ # data to fetch, allows the loader to record an intermediary snapshot of
+ # the ingestion. This could help when failing to load large repositories
+ # for technical reasons (running out of disk, memory, etc...).
+ if more_data_to_fetch and self.create_partial_snapshot:
+ partial_snapshot = self.build_partial_snapshot()
+ if partial_snapshot is not None:
+ self.storage.snapshot_add([partial_snapshot])
+ visit_status = OriginVisitStatus(
+ origin=self.origin.url,
+ visit=self.visit.visit,
+ type=self.visit_type,
+ date=now(),
+ status="partial",
+ snapshot=partial_snapshot.id,
+ )
+ self.storage.origin_visit_status_add([visit_status])
+
if not more_data_to_fetch:
break
diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -21,13 +21,17 @@
)
from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol
from swh.loader.exception import NotFound, UnsupportedChecksumComputation
-from swh.loader.tests import assert_last_visit_matches
+from swh.loader.tests import assert_last_visit_matches, get_stats
+from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
MetadataAuthority,
MetadataAuthorityType,
MetadataFetcher,
Origin,
RawExtrinsicMetadata,
+ Snapshot,
+ SnapshotBranch,
+ TargetType,
)
import swh.storage.exc
@@ -77,7 +81,7 @@
class DummyBaseLoader(DummyLoader, BaseLoader):
"""Buffered loader will send new data when threshold is reached"""
- def store_data(self):
+ def store_data(self) -> None:
pass
@@ -365,6 +369,51 @@
assert loader.statsd.constant_tags == {"visit_type": "my-visit-type"}
+class DummyLoaderWithPartialSnapshot(DummyBaseLoader):
+ call = 0
+
+ def fetch_data(self):
+ self.call += 1
+ # Let's have one call to fetch data and then another to fetch further data
+ return self.call == 1
+
+ def store_data(self) -> None:
+ # First call does nothing and the last one flushes the final snapshot
+ if self.call != 1:
+ self.storage.snapshot_add([Snapshot(branches={})])
+
+ def build_partial_snapshot(self):
+ """Build partial snapshot to serialize during loading."""
+ return Snapshot(
+ branches={
+ b"alias": SnapshotBranch(
+ target=hash_to_bytes(b"0" * 20),
+ target_type=TargetType.DIRECTORY,
+ )
+ }
+ )
+
+
+def test_loader_with_partial_snapshot(swh_storage, sentry_events):
+ """Ensure loader can write partial snapshot when configured to."""
+ loader = DummyLoaderWithPartialSnapshot(
+ swh_storage, "dummy-url", create_partial_snapshot=True
+ )
+ status = loader.load()
+
+ assert status == {"status": "eventful"}
+
+ actual_stats = get_stats(swh_storage)
+
+ expected_stats = {
+ "origin": 1, # only 1 origin
+ "origin_visit": 1, # with 1 visit
+ "snapshot": 1 + 1, # 1 partial snapshot and 1 final snapshot
+ }
+ for key in expected_stats.keys():
+ assert actual_stats[key] == expected_stats[key]
+
+
class DummyLoaderWithError(DummyBaseLoader):
def prepare(self, *args, **kwargs):
raise Exception("error")
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 4:19 PM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226121
Attached To
D6380: Allow partial snapshot creation during ingestion
Event Timeline
Log In to Comment