Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
Show First 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | def prepare_origin_visit(self, *args, **kwargs) -> None: | ||||
"""First step executed by the loader to prepare origin and visit | """First step executed by the loader to prepare origin and visit | ||||
references. Set/update self.origin, and | references. Set/update self.origin, and | ||||
optionally self.origin_url, self.visit_date. | optionally self.origin_url, self.visit_date. | ||||
""" | """ | ||||
pass | pass | ||||
def _store_origin_visit(self) -> None: | def _store_origin_visit(self) -> None: | ||||
"""Store origin and visit references. Sets the self.origin_visit and | """Store origin and visit references. Sets the self.visit references. | ||||
self.visit references. | |||||
""" | """ | ||||
assert self.origin | assert self.origin | ||||
self.storage.origin_add_one(self.origin) | self.storage.origin_add_one(self.origin) | ||||
if not self.visit_date: # now as default visit_date if not provided | if not self.visit_date: # now as default visit_date if not provided | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.origin_visit = self.storage.origin_visit_add( | self.visit = self.storage.origin_visit_add( | ||||
self.origin.url, self.visit_date, self.visit_type) | self.origin.url, self.visit_date, self.visit_type) | ||||
self.visit = self.origin_visit['visit'] | |||||
@abstractmethod | @abstractmethod | ||||
def prepare(self, *args, **kwargs) -> None: | def prepare(self, *args, **kwargs) -> None: | ||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||
the loader. | the loader. | ||||
""" | """ | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 115 Lines • ▼ Show 20 Lines | def load(self, *args, **kwargs) -> Dict[str, str]: | ||||
while True: | while True: | ||||
more_data_to_fetch = self.fetch_data() | more_data_to_fetch = self.fetch_data() | ||||
self.store_data() | self.store_data() | ||||
if not more_data_to_fetch: | if not more_data_to_fetch: | ||||
break | break | ||||
self.store_metadata() | self.store_metadata() | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin.url, self.visit, self.visit_status() | self.origin.url, self.visit.visit, self.visit_status() | ||||
) | ) | ||||
self.post_load() | self.post_load() | ||||
except Exception: | except Exception: | ||||
self.log.exception('Loading failure, updating to `partial` status', | self.log.exception('Loading failure, updating to `partial` status', | ||||
extra={ | extra={ | ||||
'swh_task_args': args, | 'swh_task_args': args, | ||||
'swh_task_kwargs': kwargs, | 'swh_task_kwargs': kwargs, | ||||
}) | }) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin.url, self.visit, 'partial' | self.origin.url, self.visit.visit, 'partial' | ||||
) | ) | ||||
self.post_load(success=False) | self.post_load(success=False) | ||||
return {'status': 'failed'} | return {'status': 'failed'} | ||||
finally: | finally: | ||||
self.flush() | self.flush() | ||||
self.cleanup() | self.cleanup() | ||||
return self.load_status() | return self.load_status() | ||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | def store_data(self) -> None: | ||||
if self.has_revisions(): | if self.has_revisions(): | ||||
self.storage.revision_add(self.get_revisions()) | self.storage.revision_add(self.get_revisions()) | ||||
if self.has_releases(): | if self.has_releases(): | ||||
self.storage.release_add(self.get_releases()) | self.storage.release_add(self.get_releases()) | ||||
self.flush() # to ensure the snapshot targets existing objects | self.flush() # to ensure the snapshot targets existing objects | ||||
snapshot = self.get_snapshot() | snapshot = self.get_snapshot() | ||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin.url, self.visit, snapshot=snapshot.id) | self.origin.url, self.visit.visit, snapshot=snapshot.id) | ||||
self.flush() | self.flush() |