diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -80,6 +80,7 @@ lister_instance_name: Name of the lister instance which triggered this load. Must be None iff lister_name is, but it may be the empty string for listers with a single instance. + """ visit_type: str @@ -101,6 +102,7 @@ lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, metadata_fetcher_credentials: CredentialsType = None, + create_partial_snapshot: bool = False, ): if lister_name == "": raise ValueError("lister_name must not be the empty string") @@ -119,6 +121,7 @@ self.lister_name = lister_name self.lister_instance_name = lister_instance_name self.metadata_fetcher_credentials = metadata_fetcher_credentials or {} + self.create_partial_snapshot = create_partial_snapshot if logging_class is None: logging_class = "%s.%s" % ( @@ -284,11 +287,14 @@ """ return True - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False): """Store fetched data in the database. - Should call the :func:`maybe_load_xyz` methods, which handle the - bundles sent to storage, rather than send directly. + Use create_partial_snapshot boolean to to create partial snapshot at the end of + each call of this method. This could help when loading large repositories is + happening and a crash happens for some reasons (running out of disk, memory, + etc...). + """ raise NotImplementedError @@ -411,7 +417,9 @@ t3 = time.monotonic() total_time_process_data += t3 - t2 - self.store_data() + more_data_to_fetch = self.store_data( + self.create_partial_snapshot and more_data_to_fetch + ) t4 = time.monotonic() total_time_store_data += t4 - t3 if not more_data_to_fetch: @@ -625,7 +633,7 @@ """Whether the load was eventful""" raise NotImplementedError - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False) -> None: assert self.origin if self.save_data_path: self.save_data() @@ -647,8 +655,23 @@ if self.has_releases(): for release in self.get_releases(): self.storage.release_add([release]) + snapshot = self.get_snapshot() self.storage.snapshot_add([snapshot]) + + # More work to do, we make a partial visit targeting the snapshot though. That + # should ease further visit if we somehow can't make it through the ingestion. + if create_partial_snapshot: + assert self.visit.visit is not None + visit_status = OriginVisitStatus( + origin=self.origin.url, + visit=self.visit.visit, + type=self.visit_type, + date=now(), + status="partial", + snapshot=snapshot.id, + ) + self.storage.origin_visit_status_add([visit_status]) self.flush() self.loaded_snapshot_id = snapshot.id @@ -815,7 +838,7 @@ return False # no more data to process - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False) -> None: """Store newly retrieved Content and Snapshot.""" assert self.content is not None self.storage.content_add([self.content]) @@ -951,7 +974,7 @@ return False # no more data to process - def store_data(self) -> None: + def store_data(self, create_partial_snapshot: bool = False) -> None: """Store newly retrieved Content and Snapshot.""" self.log.debug("Number of skipped contents: %s", len(self.skipped_cnts)) self.storage.skipped_content_add(self.skipped_cnts) diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -22,7 +22,7 @@ ) from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol from swh.loader.exception import NotFound, UnsupportedChecksumComputation -from swh.loader.tests import assert_last_visit_matches +from swh.loader.tests import assert_last_visit_matches, get_stats from swh.model.hashutil import hash_to_bytes from swh.model.model import ( MetadataAuthority, @@ -31,6 +31,8 @@ Origin, RawExtrinsicMetadata, Snapshot, + SnapshotBranch, + TargetType, ) import swh.storage.exc @@ -70,7 +72,7 @@ def prepare(self, *args, **kwargs): pass - def fetch_data(self): + def fetch_data(self, create_partial_snapshot=False): pass def get_snapshot_id(self): @@ -102,7 +104,7 @@ class DummyBaseLoader(DummyLoader, BaseLoader): """Buffered loader will send new data when threshold is reached""" - def store_data(self): + def store_data(self, create_partial_snapshot: bool = False): pass @@ -134,6 +136,20 @@ return [PARENT_ORIGIN] +class DummyLoaderWithPartialSnapshot(DummyLoader, BaseLoader): + def __init__(self, *args, **kwargs): + super().__init__(*args, create_partial_snapshot=True, **kwargs) + + def fetch_data(self, create_partial_snapshot=False): + return True + + def store_data(self, create_partial_snapshot: bool = False): + # because self.create_partial_snapshot is True + assert self.create_partial_snapshot is True + # and fetch_data has more data to fetch, we want to create partial snapshot + assert self.fetch_data() and create_partial_snapshot + + def test_types(): assert isinstance( DummyMetadataFetcher(None, None, None, None), MetadataFetcherProtocol @@ -143,6 +159,14 @@ ) +def test_dummy_loader_with_incremental_snapshots(swh_storage): + loader = DummyLoaderWithPartialSnapshot(swh_storage) + assert loader.create_partial_snapshot is True + + result = loader.load() + assert result == {"status": "eventful"} + + def test_base_loader(swh_storage): loader = DummyBaseLoader(swh_storage) result = loader.load() @@ -440,6 +464,65 @@ ) +class DummyDVCSLoaderWithSnapshots(DummyDVCSLoader): + """Dummy DVCS loader which simulates one visit with multiple snapshots creation + during ingestion.""" + + call = 0 + + def get_snapshot(self): + """Simulate a different built snapshot depending on the loader state.""" + if self.call == 0: + return Snapshot(branches={}) + else: + # Another dummy snapshot + return Snapshot( + branches={ + b"alias": SnapshotBranch( + target=hash_to_bytes(b"0" * 20), + target_type=TargetType.DIRECTORY, + ) + } + ) + + def fetch_data(self) -> bool: + # Simulate we fetched data but we need some more to fetch. + # The first time, it will be True, after that, False + return self.call == 0 + + def store_data(self, create_partial_snapshot: bool = False) -> bool: + """Store data and simulate we need more data to fetch. This will be True the + first call time, False after that. + + """ + # Let's store data as is + super().store_data(create_partial_snapshot) + # and then return some more data to fetch for the first call. That will end up + # calling the initial store_data again (and create a 2nd snapshot). Later call + # just returns False beyond the 2nd call + self.call += 1 + return self.call == 1 + + +def test_dvcs_loader_ingestion_with_partial_snapshots(swh_storage): + loader = DummyDVCSLoaderWithSnapshots( + swh_storage, "dummy-url", create_partial_snapshot=True + ) + result = loader.load() + + # loading failed + assert result == {"status": "eventful"} + + expected_stats = { + "origin": 1, # only 1 origin + "origin_visit": 1, # with 1 visit + "snapshot": 1 + 1, # but 2 snapshots + } + actual_stats = get_stats(swh_storage) + for key in expected_stats.keys(): + assert actual_stats[key] == expected_stats[key] + + class BrokenStorageProxy: def __init__(self, storage): self.storage = storage