diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -80,6 +80,7 @@ lister_instance_name: Name of the lister instance which triggered this load. Must be None iff lister_name is, but it may be the empty string for listers with a single instance. + """ visit_type: str @@ -101,6 +102,7 @@ lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, metadata_fetcher_credentials: CredentialsType = None, + create_partial_snapshot: bool = False, ): if lister_name == "": raise ValueError("lister_name must not be the empty string") @@ -119,6 +121,7 @@ self.lister_name = lister_name self.lister_instance_name = lister_instance_name self.metadata_fetcher_credentials = metadata_fetcher_credentials or {} + self.create_partial_snapshot = create_partial_snapshot if logging_class is None: logging_class = "%s.%s" % ( @@ -284,11 +287,14 @@ """ return True - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False): """Store fetched data in the database. - Should call the :func:`maybe_load_xyz` methods, which handle the - bundles sent to storage, rather than send directly. + Use create_partial_snapshot boolean to to create partial snapshot at the end of + each call of this method. This could help when loading large repositories is + happening and a crash happens for some reasons (running out of disk, memory, + etc...). + """ raise NotImplementedError @@ -411,7 +417,9 @@ t3 = time.monotonic() total_time_process_data += t3 - t2 - self.store_data() + more_data_to_fetch = self.store_data( + self.create_partial_snapshot and more_data_to_fetch + ) t4 = time.monotonic() total_time_store_data += t4 - t3 if not more_data_to_fetch: @@ -625,7 +633,7 @@ """Whether the load was eventful""" raise NotImplementedError - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False) -> None: assert self.origin if self.save_data_path: self.save_data() @@ -647,8 +655,23 @@ if self.has_releases(): for release in self.get_releases(): self.storage.release_add([release]) + snapshot = self.get_snapshot() self.storage.snapshot_add([snapshot]) + + # More work to do, we make a partial visit targeting the snapshot though. That + # should ease further visit if we somehow can't make it through the ingestion. + if create_partial_snapshot: + assert self.visit.visit is not None + visit_status = OriginVisitStatus( + origin=self.origin.url, + visit=self.visit.visit, + type=self.visit_type, + date=now(), + status="partial", + snapshot=snapshot.id, + ) + self.storage.origin_visit_status_add([visit_status]) self.flush() self.loaded_snapshot_id = snapshot.id @@ -815,7 +838,7 @@ return False # no more data to process - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False) -> None: """Store newly retrieved Content and Snapshot.""" assert self.content is not None self.storage.content_add([self.content]) @@ -951,7 +974,7 @@ return False # no more data to process - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False) -> None: """Store newly retrieved Content and Snapshot.""" self.log.debug("Number of skipped contents: %s", len(self.skipped_cnts)) self.storage.skipped_content_add(self.skipped_cnts) diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -102,7 +102,7 @@ class DummyBaseLoader(DummyLoader, BaseLoader): """Buffered loader will send new data when threshold is reached""" - def store_data(self): + def store_data(self, create_partial_visit: bool = False): pass