diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -77,6 +77,7 @@ lister_instance_name: Name of the lister instance which triggered this load. Must be None iff lister_name is, but it may be the empty string for listers with a single instance. + """ visit_type: str @@ -98,6 +99,7 @@ lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, metadata_fetcher_credentials: CredentialsType = None, + create_partial_snapshot: bool = False, ): if lister_name == "": raise ValueError("lister_name must not be the empty string") @@ -116,6 +118,7 @@ self.lister_name = lister_name self.lister_instance_name = lister_instance_name self.metadata_fetcher_credentials = metadata_fetcher_credentials or {} + self.create_partial_snapshot = create_partial_snapshot if logging_class is None: logging_class = "%s.%s" % ( @@ -275,17 +278,18 @@ """Run any additional processing between fetching and storing the data Returns: - a value that is interpreted as a boolean. If True, fetch_data needs - to be called again to complete loading. - Ignored if ``fetch_data`` already returned :const:`False`. + a value that is interpreted as a boolean. If True, :meth:`fetch_data` needs + to be called again to complete loading. Ignored if :meth:`fetch_data` + already returned :const:`False`. """ return True def store_data(self) -> None: - """Store fetched data in the database. + """Store fetched and processed data in the storage. + + This should call the `storage._add` methods, which handle the objects to + store in the storage. - Should call the :func:`maybe_load_xyz` methods, which handle the - bundles sent to storage, rather than send directly. """ raise NotImplementedError @@ -332,6 +336,16 @@ """ pass + def build_partial_snapshot(self) -> Optional[Snapshot]: + """When the loader is configured to serialize partial snapshot, this allows the + loader to give an implementation that builds a partial snapshot. This is used + when the ingestion is taking multiple calls to :meth:`fetch_data` and + :meth:`store_data`. Ignored when the loader is not configured to serialize + partial snapshot. + + """ + return None + def load(self) -> Dict[str, str]: r"""Loading logic for the loader to follow: @@ -411,6 +425,26 @@ self.store_data() t4 = time.monotonic() total_time_store_data += t4 - t3 + + # At the end of each ingestion loop, if the loader is configured for + # partial snapshot (see self.create_partial_snapshot) and there are more + # data to fetch, allows the loader to record an intermediary snapshot of + # the ingestion. This could help when failing to load large repositories + # for technical reasons (running out of disk, memory, etc...). + if more_data_to_fetch and self.create_partial_snapshot: + partial_snapshot = self.build_partial_snapshot() + if partial_snapshot is not None: + self.storage.snapshot_add([partial_snapshot]) + visit_status = OriginVisitStatus( + origin=self.origin.url, + visit=self.visit.visit, + type=self.visit_type, + date=now(), + status="partial", + snapshot=partial_snapshot.id, + ) + self.storage.origin_visit_status_add([visit_status]) + if not more_data_to_fetch: break diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -21,13 +21,17 @@ ) from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol from swh.loader.exception import NotFound, UnsupportedChecksumComputation -from swh.loader.tests import assert_last_visit_matches +from swh.loader.tests import assert_last_visit_matches, get_stats +from swh.model.hashutil import hash_to_bytes from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, RawExtrinsicMetadata, + Snapshot, + SnapshotBranch, + TargetType, ) import swh.storage.exc @@ -77,7 +81,7 @@ class DummyBaseLoader(DummyLoader, BaseLoader): """Buffered loader will send new data when threshold is reached""" - def store_data(self): + def store_data(self) -> None: pass @@ -365,6 +369,51 @@ assert loader.statsd.constant_tags == {"visit_type": "my-visit-type"} +class DummyLoaderWithPartialSnapshot(DummyBaseLoader): + call = 0 + + def fetch_data(self): + self.call += 1 + # Let's have one call to fetch data and then another to fetch further data + return self.call == 1 + + def store_data(self) -> None: + # First call does nothing and the last one flushes the final snapshot + if self.call != 1: + self.storage.snapshot_add([Snapshot(branches={})]) + + def build_partial_snapshot(self): + """Build partial snapshot to serialize during loading.""" + return Snapshot( + branches={ + b"alias": SnapshotBranch( + target=hash_to_bytes(b"0" * 20), + target_type=TargetType.DIRECTORY, + ) + } + ) + + +def test_loader_with_partial_snapshot(swh_storage, sentry_events): + """Ensure loader can write partial snapshot when configured to.""" + loader = DummyLoaderWithPartialSnapshot( + swh_storage, "dummy-url", create_partial_snapshot=True + ) + status = loader.load() + + assert status == {"status": "eventful"} + + actual_stats = get_stats(swh_storage) + + expected_stats = { + "origin": 1, # only 1 origin + "origin_visit": 1, # with 1 visit + "snapshot": 1 + 1, # 1 partial snapshot and 1 final snapshot + } + for key in expected_stats.keys(): + assert actual_stats[key] == expected_stats[key] + + class DummyLoaderWithError(DummyBaseLoader): def prepare(self, *args, **kwargs): raise Exception("error")