Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
import os | import os | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from typing import Any, Dict, Iterable, Optional, Tuple, Union | from typing import Any, Dict, Iterable, Optional, Tuple, Union | ||||
from swh.core import config | from swh.core import config | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Origin, Revision, | BaseContent, Content, SkippedContent, Directory, Origin, Revision, | ||||
Release, Snapshot) | Release, Sha1Git, Snapshot | ||||
) | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
class BaseLoader(config.SWHConfig, metaclass=ABCMeta): | class BaseLoader(config.SWHConfig, metaclass=ABCMeta): | ||||
"""Mixin base class for loader. | """Mixin base class for loader. | ||||
To use this class, you must: | To use this class, you must: | ||||
▲ Show 20 Lines • Show All 223 Lines • ▼ Show 20 Lines | class BaseLoader(config.SWHConfig, metaclass=ABCMeta): | ||||
def visit_status(self) -> str: | def visit_status(self) -> str: | ||||
"""Detailed visit status. | """Detailed visit status. | ||||
Defaults to logging a full visit. | Defaults to logging a full visit. | ||||
""" | """ | ||||
return 'full' | return 'full' | ||||
def get_snapshot_id(self) -> Optional[Sha1Git]: | |||||
"""Get the snapshot id that needs to be loaded""" | |||||
raise NotImplementedError | |||||
def pre_cleanup(self) -> None: | def pre_cleanup(self) -> None: | ||||
"""As a first step, will try and check for dangling data to cleanup. | """As a first step, will try and check for dangling data to cleanup. | ||||
This should do its best to avoid raising issues. | This should do its best to avoid raising issues. | ||||
""" | """ | ||||
pass | pass | ||||
def load(self, *args, **kwargs) -> Dict[str, str]: | def load(self, *args, **kwargs) -> Dict[str, str]: | ||||
Show All 31 Lines | def load(self, *args, **kwargs) -> Dict[str, str]: | ||||
while True: | while True: | ||||
more_data_to_fetch = self.fetch_data() | more_data_to_fetch = self.fetch_data() | ||||
self.store_data() | self.store_data() | ||||
if not more_data_to_fetch: | if not more_data_to_fetch: | ||||
break | break | ||||
self.store_metadata() | self.store_metadata() | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin.url, self.visit.visit, self.visit_status() | self.origin.url, self.visit.visit, self.visit_status(), | ||||
snapshot=self.get_snapshot_id() | |||||
) | ) | ||||
self.post_load() | self.post_load() | ||||
except Exception: | except Exception: | ||||
self.log.exception('Loading failure, updating to `partial` status', | self.log.exception('Loading failure, updating to `partial` status', | ||||
extra={ | extra={ | ||||
'swh_task_args': args, | 'swh_task_args': args, | ||||
'swh_task_kwargs': kwargs, | 'swh_task_kwargs': kwargs, | ||||
}) | }) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin.url, self.visit.visit, 'partial' | self.origin.url, self.visit.visit, 'partial', | ||||
snapshot=self.get_snapshot_id() | |||||
) | ) | ||||
self.post_load(success=False) | self.post_load(success=False) | ||||
return {'status': 'failed'} | return {'status': 'failed'} | ||||
finally: | finally: | ||||
self.flush() | self.flush() | ||||
self.cleanup() | self.cleanup() | ||||
return self.load_status() | return self.load_status() | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | class DVCSLoader(BaseLoader): | ||||
def get_releases(self) -> Iterable[Release]: | def get_releases(self) -> Iterable[Release]: | ||||
"""Get the releases that need to be loaded""" | """Get the releases that need to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def get_snapshot(self) -> Snapshot: | def get_snapshot(self) -> Snapshot: | ||||
"""Get the snapshot that needs to be loaded""" | """Get the snapshot that needs to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def get_snapshot_id(self) -> Optional[Sha1Git]: | |||||
snapshot = self.get_snapshot() | |||||
return snapshot.id if snapshot else None | |||||
def eventful(self) -> bool: | def eventful(self) -> bool: | ||||
"""Whether the load was eventful""" | """Whether the load was eventful""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def store_data(self) -> None: | def store_data(self) -> None: | ||||
assert self.origin | assert self.origin | ||||
if self.config['save_data']: | if self.config['save_data']: | ||||
self.save_data() | self.save_data() | ||||
Show All 25 Lines |