Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/core/loader.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
import os | import os | ||||
import psycopg2 | import psycopg2 | ||||
import requests | import requests | ||||
import traceback | import traceback | ||||
import uuid | import uuid | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from retrying import retry | from retrying import retry | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union | ||||
from swh.core import config | from swh.core import config | ||||
from swh.storage import get_storage, HashCollision | from swh.storage import get_storage, HashCollision | ||||
from swh.loader.core.converters import content_for_storage | |||||
def retry_loading(error): | def retry_loading(error): | ||||
"""Retry policy when the database raises an integrity error""" | """Retry policy when the database raises an integrity error""" | ||||
exception_classes = [ | exception_classes = [ | ||||
# raised when two parallel insertions insert the same data. | # raised when two parallel insertions insert the same data. | ||||
psycopg2.IntegrityError, | psycopg2.IntegrityError, | ||||
HashCollision, | HashCollision, | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | class BufferedLoader(config.SWHConfig, metaclass=ABCMeta): | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'storage': ('dict', { | 'storage': ('dict', { | ||||
'cls': 'remote', | 'cls': 'remote', | ||||
'args': { | 'args': { | ||||
'url': 'http://localhost:5002/', | 'url': 'http://localhost:5002/', | ||||
} | } | ||||
}), | }), | ||||
'max_content_size': ('int', 100 * 1024 * 1024), | |||||
'save_data': ('bool', False), | 'save_data': ('bool', False), | ||||
'save_data_path': ('str', ''), | 'save_data_path': ('str', ''), | ||||
} # type: Dict[str, Tuple[str, Any]] | } # type: Dict[str, Tuple[str, Any]] | ||||
ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ||||
def __init__(self, logging_class=None, config=None): | def __init__(self, logging_class: Optional[str] = None, | ||||
config: Dict[str, Any] = {}): | |||||
if config: | if config: | ||||
self.config = config | self.config = config | ||||
else: | else: | ||||
self.config = self.parse_config_file( | self.config = self.parse_config_file( | ||||
additional_configs=[self.ADDITIONAL_CONFIG]) | additional_configs=[self.ADDITIONAL_CONFIG]) | ||||
self.storage = get_storage(**self.config['storage']) | self.storage = get_storage(**self.config['storage']) | ||||
if logging_class is None: | if logging_class is None: | ||||
logging_class = '%s.%s' % (self.__class__.__module__, | logging_class = '%s.%s' % (self.__class__.__module__, | ||||
self.__class__.__name__) | self.__class__.__name__) | ||||
self.log = logging.getLogger(logging_class) | self.log = logging.getLogger(logging_class) | ||||
_log = logging.getLogger('requests.packages.urllib3.connectionpool') | _log = logging.getLogger('requests.packages.urllib3.connectionpool') | ||||
_log.setLevel(logging.WARN) | _log.setLevel(logging.WARN) | ||||
self.counters = { | self.counters = { | ||||
'contents': 0, | 'contents': 0, | ||||
'directories': 0, | 'directories': 0, | ||||
'revisions': 0, | 'revisions': 0, | ||||
'releases': 0, | 'releases': 0, | ||||
} | } | ||||
self.max_content_size = self.config['max_content_size'] | |||||
# possibly overridden in self.prepare method | |||||
self.visit_date: Optional[Union[str, datetime.datetime]] = None | |||||
self.origin: Dict[str, Any] = {} | |||||
self.visit_type: Optional[str] = None | |||||
self.origin_metadata: Dict[str, Any] = {} | |||||
# Make sure the config is sane | # Make sure the config is sane | ||||
save_data = self.config.get('save_data') | save_data = self.config.get('save_data') | ||||
if save_data: | if save_data: | ||||
path = self.config['save_data_path'] | path = self.config['save_data_path'] | ||||
os.stat(path) | os.stat(path) | ||||
if not os.access(path, os.R_OK | os.W_OK): | if not os.access(path, os.R_OK | os.W_OK): | ||||
raise PermissionError("Permission denied: %r" % path) | raise PermissionError("Permission denied: %r" % path) | ||||
def save_data(self): | def save_data(self) -> None: | ||||
"""Save the data associated to the current load""" | """Save the data associated to the current load""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def get_save_data_path(self): | def get_save_data_path(self) -> str: | ||||
"""The path to which we archive the loader's raw data""" | """The path to which we archive the loader's raw data""" | ||||
if not hasattr(self, '__save_data_path'): | if not hasattr(self, '__save_data_path'): | ||||
year = str(self.visit_date.year) | year = str(self.visit_date.year) # type: ignore | ||||
url = self.origin['url'].encode('utf-8') | url = self.origin['url'].encode('utf-8') | ||||
origin_url_hash = hashlib.sha1(url).hexdigest() | origin_url_hash = hashlib.sha1(url).hexdigest() | ||||
path = '%s/sha1:%s/%s/%s' % ( | path = '%s/sha1:%s/%s/%s' % ( | ||||
self.config['save_data_path'], | self.config['save_data_path'], | ||||
origin_url_hash[0:2], | origin_url_hash[0:2], | ||||
origin_url_hash, | origin_url_hash, | ||||
year, | year, | ||||
) | ) | ||||
os.makedirs(path, exist_ok=True) | os.makedirs(path, exist_ok=True) | ||||
self.__save_data_path = path | self.__save_data_path = path | ||||
return self.__save_data_path | return self.__save_data_path | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_origin(self, origin): | def send_origin(self, origin: Dict[str, Any]) -> None: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug('Creating origin for %s' % origin['url'], | self.log.debug('Creating origin for %s' % origin['url'], | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'origin', | 'swh_content_type': 'origin', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
self.storage.origin_add_one(origin) | self.storage.origin_add_one(origin) | ||||
self.log.debug('Done creating origin for %s' % origin['url'], | self.log.debug('Done creating origin for %s' % origin['url'], | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'origin', | 'swh_content_type': 'origin', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_origin_visit(self, visit_date, visit_type): | def send_origin_visit(self, visit_date: Union[str, datetime.datetime], | ||||
visit_type: str) -> Dict[str, Any]: | |||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug( | self.log.debug( | ||||
'Creating origin_visit for origin %s at time %s' % ( | 'Creating origin_visit for origin %s at time %s' % ( | ||||
self.origin['url'], visit_date), | self.origin['url'], visit_date), | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'origin_visit', | 'swh_content_type': 'origin_visit', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
Show All 9 Lines | def send_origin_visit(self, visit_date: Union[str, datetime.datetime], | ||||
'swh_content_type': 'origin_visit', | 'swh_content_type': 'origin_visit', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
return origin_visit | return origin_visit | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_tool(self, tool): | def send_tool(self, tool: Dict[str, Any]) -> None: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug( | self.log.debug( | ||||
'Creating tool with name %s version %s configuration %s' % ( | 'Creating tool with name %s version %s configuration %s' % ( | ||||
tool['name'], tool['version'], tool['configuration']), | tool['name'], tool['version'], tool['configuration']), | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'tool', | 'swh_content_type': 'tool', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
Show All 10 Lines | def send_tool(self, tool: Dict[str, Any]) -> None: | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'tool', | 'swh_content_type': 'tool', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
return tool_id | return tool_id | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_provider(self, provider): | def send_provider(self, provider: Dict[str, Any]) -> None: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug( | self.log.debug( | ||||
'Creating metadata_provider with name %s type %s url %s' % ( | 'Creating metadata_provider with name %s type %s url %s' % ( | ||||
provider['provider_name'], provider['provider_type'], | provider['provider_name'], provider['provider_type'], | ||||
provider['provider_url']), | provider['provider_url']), | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'metadata_provider', | 'swh_content_type': 'metadata_provider', | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def send_origin_metadata(self, visit_date, provider_id, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'origin_metadata', | 'swh_content_type': 'origin_metadata', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def update_origin_visit(self, status): | def update_origin_visit(self, status: str) -> None: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug( | self.log.debug( | ||||
'Updating origin_visit for origin %s with status %s' % ( | 'Updating origin_visit for origin %s with status %s' % ( | ||||
self.origin['url'], status), | self.origin['url'], status), | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'origin_visit', | 'swh_content_type': 'origin_visit', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin['url'], self.visit, status) | self.origin['url'], self.visit, status) | ||||
self.log.debug( | self.log.debug( | ||||
'Done updating origin_visit for origin %s with status %s' % ( | 'Done updating origin_visit for origin %s with status %s' % ( | ||||
self.origin['url'], status), | self.origin['url'], status), | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'origin_visit', | 'swh_content_type': 'origin_visit', | ||||
'swh_num': 1, | 'swh_num': 1, | ||||
'swh_id': log_id | 'swh_id': log_id | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_contents(self, content_list): | def send_contents(self, contents: Iterable[Mapping[str, Any]]) -> None: | ||||
"""Actually send properly formatted contents to the database. | """Actually send properly formatted contents to the database. | ||||
""" | """ | ||||
num_contents = len(content_list) | contents = list(contents) | ||||
num_contents = len(contents) | |||||
if num_contents > 0: | if num_contents > 0: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug("Sending %d contents" % num_contents, | self.log.debug("Sending %d contents" % num_contents, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'content', | 'swh_content_type': 'content', | ||||
'swh_num': num_contents, | 'swh_num': num_contents, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
result = self.storage.content_add(content_list) | # FIXME: deal with this in model at some point | ||||
result = self.storage.content_add([ | |||||
content_for_storage( | |||||
c, max_content_size=self.max_content_size, | |||||
origin_url=self.origin['url']) | |||||
for c in contents | |||||
]) | |||||
self.counters['contents'] += result.get('content:add', 0) | self.counters['contents'] += result.get('content:add', 0) | ||||
self.log.debug("Done sending %d contents" % num_contents, | self.log.debug("Done sending %d contents" % num_contents, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'content', | 'swh_content_type': 'content', | ||||
'swh_num': num_contents, | 'swh_num': num_contents, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_directories(self, directory_list): | def send_directories(self, | ||||
directories: Iterable[Mapping[str, Any]]) -> None: | |||||
"""Actually send properly formatted directories to the database. | """Actually send properly formatted directories to the database. | ||||
""" | """ | ||||
num_directories = len(directory_list) | directories = list(directories) | ||||
num_directories = len(directories) | |||||
if num_directories > 0: | if num_directories > 0: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug("Sending %d directories" % num_directories, | self.log.debug("Sending %d directories" % num_directories, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'directory', | 'swh_content_type': 'directory', | ||||
'swh_num': num_directories, | 'swh_num': num_directories, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
result = self.storage.directory_add(directory_list) | result = self.storage.directory_add(directories) | ||||
self.counters['directories'] += result.get('directory:add', 0) | self.counters['directories'] += result.get('directory:add', 0) | ||||
self.log.debug("Done sending %d directories" % num_directories, | self.log.debug("Done sending %d directories" % num_directories, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'directory', | 'swh_content_type': 'directory', | ||||
'swh_num': num_directories, | 'swh_num': num_directories, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_revisions(self, revision_list): | def send_revisions(self, revisions: Iterable[Mapping[str, Any]]) -> None: | ||||
"""Actually send properly formatted revisions to the database. | """Actually send properly formatted revisions to the database. | ||||
""" | """ | ||||
num_revisions = len(revision_list) | revisions = list(revisions) | ||||
num_revisions = len(revisions) | |||||
if num_revisions > 0: | if num_revisions > 0: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug("Sending %d revisions" % num_revisions, | self.log.debug("Sending %d revisions" % num_revisions, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'revision', | 'swh_content_type': 'revision', | ||||
'swh_num': num_revisions, | 'swh_num': num_revisions, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
result = self.storage.revision_add(revision_list) | result = self.storage.revision_add(revisions) | ||||
self.counters['revisions'] += result.get('revision:add', 0) | self.counters['revisions'] += result.get('revision:add', 0) | ||||
self.log.debug("Done sending %d revisions" % num_revisions, | self.log.debug("Done sending %d revisions" % num_revisions, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'revision', | 'swh_content_type': 'revision', | ||||
'swh_num': num_revisions, | 'swh_num': num_revisions, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_releases(self, release_list): | def send_releases(self, releases: Iterable[Mapping[str, Any]]) -> None: | ||||
"""Actually send properly formatted releases to the database. | """Actually send properly formatted releases to the database. | ||||
""" | """ | ||||
num_releases = len(release_list) | releases = list(releases) | ||||
num_releases = len(releases) | |||||
if num_releases > 0: | if num_releases > 0: | ||||
log_id = str(uuid.uuid4()) | log_id = str(uuid.uuid4()) | ||||
self.log.debug("Sending %d releases" % num_releases, | self.log.debug("Sending %d releases" % num_releases, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_start', | 'swh_type': 'storage_send_start', | ||||
'swh_content_type': 'release', | 'swh_content_type': 'release', | ||||
'swh_num': num_releases, | 'swh_num': num_releases, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
result = self.storage.release_add(release_list) | result = self.storage.release_add(releases) | ||||
self.counters['releases'] += result.get('release:add', 0) | self.counters['releases'] += result.get('release:add', 0) | ||||
self.log.debug("Done sending %d releases" % num_releases, | self.log.debug("Done sending %d releases" % num_releases, | ||||
extra={ | extra={ | ||||
'swh_type': 'storage_send_end', | 'swh_type': 'storage_send_end', | ||||
'swh_content_type': 'release', | 'swh_content_type': 'release', | ||||
'swh_num': num_releases, | 'swh_num': num_releases, | ||||
'swh_id': log_id, | 'swh_id': log_id, | ||||
}) | }) | ||||
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) | ||||
def send_snapshot(self, snapshot): | def send_snapshot(self, snapshot: Mapping[str, Any]) -> None: | ||||
self.flush() # to ensure the snapshot targets existing objects | self.flush() # to ensure the snapshot targets existing objects | ||||
self.storage.snapshot_add([snapshot]) | self.storage.snapshot_add([snapshot]) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
self.origin['url'], self.visit, snapshot=snapshot['id']) | self.origin['url'], self.visit, snapshot=snapshot['id']) | ||||
def flush(self): | def flush(self) -> None: | ||||
"""Flush any potential dangling data not sent to swh-storage. | """Flush any potential dangling data not sent to swh-storage. | ||||
Bypass the maybe_load_* methods which awaits threshold reached | Bypass the maybe_load_* methods which awaits threshold reached | ||||
signal. We actually want to store those as we are done | signal. We actually want to store those as we are done | ||||
loading. | loading. | ||||
""" | """ | ||||
if hasattr(self.storage, 'flush'): | if hasattr(self.storage, 'flush'): | ||||
self.storage.flush() | self.storage.flush() | ||||
def prepare_metadata(self): | def prepare_metadata(self) -> None: | ||||
"""First step for origin_metadata insertion, resolving the | """First step for origin_metadata insertion, resolving the | ||||
provider_id and the tool_id by fetching data from the storage | provider_id and the tool_id by fetching data from the storage | ||||
or creating tool and provider on the fly if the data isn't available | or creating tool and provider on the fly if the data isn't available | ||||
""" | """ | ||||
origin_metadata = self.origin_metadata | origin_metadata = self.origin_metadata | ||||
tool = origin_metadata['tool'] | tool = origin_metadata['tool'] | ||||
try: | try: | ||||
tool_id = self.send_tool(tool) | tool_id = self.send_tool(tool) | ||||
self.origin_metadata['tool']['tool_id'] = tool_id | self.origin_metadata['tool']['tool_id'] = tool_id | ||||
except Exception: | except Exception: | ||||
self.log.exception('Problem when storing new tool') | self.log.exception('Problem when storing new tool') | ||||
raise | raise | ||||
provider = origin_metadata['provider'] | provider = origin_metadata['provider'] | ||||
try: | try: | ||||
provider_id = self.send_provider(provider) | provider_id = self.send_provider(provider) | ||||
self.origin_metadata['provider']['provider_id'] = provider_id | self.origin_metadata['provider']['provider_id'] = provider_id | ||||
except Exception: | except Exception: | ||||
self.log.exception('Problem when storing new provider') | self.log.exception('Problem when storing new provider') | ||||
raise | raise | ||||
@abstractmethod | @abstractmethod | ||||
def cleanup(self): | def cleanup(self) -> None: | ||||
"""Last step executed by the loader. | """Last step executed by the loader. | ||||
""" | """ | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def prepare_origin_visit(self, *args, **kwargs): | def prepare_origin_visit(self, *args, **kwargs) -> None: | ||||
"""First step executed by the loader to prepare origin and visit | """First step executed by the loader to prepare origin and visit | ||||
references. Set/update self.origin, and | references. Set/update self.origin, and | ||||
optionally self.origin_url, self.visit_date. | optionally self.origin_url, self.visit_date. | ||||
""" | """ | ||||
pass | pass | ||||
def _store_origin_visit(self): | def _store_origin_visit(self) -> None: | ||||
"""Store origin and visit references. Sets the self.origin_visit and | """Store origin and visit references. Sets the self.origin_visit and | ||||
self.visit references. | self.visit references. | ||||
""" | """ | ||||
origin = self.origin.copy() | origin = self.origin.copy() | ||||
self.send_origin(origin) | self.send_origin(origin) | ||||
if not self.visit_date: # now as default visit_date if not provided | if not self.visit_date: # now as default visit_date if not provided | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.origin_visit = self.send_origin_visit( | self.origin_visit = self.send_origin_visit( | ||||
self.visit_date, self.visit_type) | self.visit_date, self.visit_type) | ||||
self.visit = self.origin_visit['visit'] | self.visit = self.origin_visit['visit'] | ||||
@abstractmethod | @abstractmethod | ||||
def prepare(self, *args, **kwargs): | def prepare(self, *args, **kwargs) -> None: | ||||
"""Second step executed by the loader to prepare some state needed by | """Second step executed by the loader to prepare some state needed by | ||||
the loader. | the loader. | ||||
""" | """ | ||||
pass | pass | ||||
def get_origin(self): | def get_origin(self) -> Dict[str, Any]: | ||||
"""Get the origin that is currently being loaded. | """Get the origin that is currently being loaded. | ||||
self.origin should be set in :func:`prepare_origin` | self.origin should be set in :func:`prepare_origin` | ||||
Returns: | Returns: | ||||
dict: an origin ready to be sent to storage by | dict: an origin ready to be sent to storage by | ||||
:func:`origin_add_one`. | :func:`origin_add_one`. | ||||
""" | """ | ||||
return self.origin | return self.origin | ||||
@abstractmethod | @abstractmethod | ||||
def fetch_data(self): | def fetch_data(self) -> bool: | ||||
"""Fetch the data from the source the loader is currently loading | """Fetch the data from the source the loader is currently loading | ||||
(ex: git/hg/svn/... repository). | (ex: git/hg/svn/... repository). | ||||
Returns: | Returns: | ||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, fetch_data needs | ||||
to be called again to complete loading. | to be called again to complete loading. | ||||
""" | """ | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def store_data(self): | def store_data(self): | ||||
"""Store fetched data in the database. | """Store fetched data in the database. | ||||
Should call the :func:`maybe_load_xyz` methods, which handle the | Should call the :func:`maybe_load_xyz` methods, which handle the | ||||
bundles sent to storage, rather than send directly. | bundles sent to storage, rather than send directly. | ||||
""" | """ | ||||
pass | pass | ||||
def store_metadata(self): | def store_metadata(self) -> None: | ||||
"""Store fetched metadata in the database. | """Store fetched metadata in the database. | ||||
For more information, see implementation in :class:`DepositLoader`. | For more information, see implementation in :class:`DepositLoader`. | ||||
""" | """ | ||||
pass | pass | ||||
def load_status(self): | def load_status(self) -> Dict[str, str]: | ||||
"""Detailed loading status. | """Detailed loading status. | ||||
Defaults to logging an eventful load. | Defaults to logging an eventful load. | ||||
Returns: a dictionary that is eventually passed back as the task's | Returns: a dictionary that is eventually passed back as the task's | ||||
result to the scheduler, allowing tuning of the task recurrence | result to the scheduler, allowing tuning of the task recurrence | ||||
mechanism. | mechanism. | ||||
""" | """ | ||||
return { | return { | ||||
'status': 'eventful', | 'status': 'eventful', | ||||
} | } | ||||
def post_load(self, success=True): | def post_load(self, success: bool = True) -> None: | ||||
"""Permit the loader to do some additional actions according to status | """Permit the loader to do some additional actions according to status | ||||
after the loading is done. The flag success indicates the | after the loading is done. The flag success indicates the | ||||
loading's status. | loading's status. | ||||
Defaults to doing nothing. | Defaults to doing nothing. | ||||
This is up to the implementer of this method to make sure this | This is up to the implementer of this method to make sure this | ||||
does not break. | does not break. | ||||
Args: | Args: | ||||
success (bool): the success status of the loading | success (bool): the success status of the loading | ||||
""" | """ | ||||
pass | pass | ||||
def visit_status(self): | def visit_status(self) -> str: | ||||
"""Detailed visit status. | """Detailed visit status. | ||||
Defaults to logging a full visit. | Defaults to logging a full visit. | ||||
""" | """ | ||||
return 'full' | return 'full' | ||||
def pre_cleanup(self): | def pre_cleanup(self) -> None: | ||||
"""As a first step, will try and check for dangling data to cleanup. | """As a first step, will try and check for dangling data to cleanup. | ||||
This should do its best to avoid raising issues. | This should do its best to avoid raising issues. | ||||
""" | """ | ||||
pass | pass | ||||
def load(self, *args, **kwargs): | def load(self, *args, **kwargs) -> Dict[str, str]: | ||||
r"""Loading logic for the loader to follow: | r"""Loading logic for the loader to follow: | ||||
- 1. Call :meth:`prepare_origin_visit` to prepare the | - 1. Call :meth:`prepare_origin_visit` to prepare the | ||||
origin and visit we will associate loading data to | origin and visit we will associate loading data to | ||||
- 2. Store the actual ``origin_visit`` to storage | - 2. Store the actual ``origin_visit`` to storage | ||||
- 3. Call :meth:`prepare` to prepare any eventual state | - 3. Call :meth:`prepare` to prepare any eventual state | ||||
- 4. Call :meth:`get_origin` to get the origin we work with and store | - 4. Call :meth:`get_origin` to get the origin we work with and store | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | class UnbufferedLoader(BufferedLoader): | ||||
:class:`BulkUpdater`. | :class:`BulkUpdater`. | ||||
For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), | ||||
inherit directly from :class:`BufferedLoader`. | inherit directly from :class:`BufferedLoader`. | ||||
""" | """ | ||||
ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ||||
def __init__(self, logging_class=None, config=None): | def cleanup(self) -> None: | ||||
super().__init__(logging_class=logging_class, config=config) | |||||
self.visit_date = None # possibly overridden in self.prepare method | |||||
def cleanup(self): | |||||
"""Clean up an eventual state installed for computations.""" | """Clean up an eventual state installed for computations.""" | ||||
pass | pass | ||||
def has_contents(self): | def has_contents(self) -> bool: | ||||
"""Checks whether we need to load contents""" | """Checks whether we need to load contents""" | ||||
return True | return True | ||||
def get_contents(self): | def get_contents(self) -> Iterable[Dict[str, Any]]: | ||||
"""Get the contents that need to be loaded""" | """Get the contents that need to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def has_directories(self): | def has_directories(self) -> bool: | ||||
"""Checks whether we need to load directories""" | """Checks whether we need to load directories""" | ||||
return True | return True | ||||
def get_directories(self): | def get_directories(self) -> Iterable[Dict[str, Any]]: | ||||
"""Get the directories that need to be loaded""" | """Get the directories that need to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def has_revisions(self): | def has_revisions(self) -> bool: | ||||
"""Checks whether we need to load revisions""" | """Checks whether we need to load revisions""" | ||||
return True | return True | ||||
def get_revisions(self): | def get_revisions(self) -> Iterable[Dict[str, Any]]: | ||||
"""Get the revisions that need to be loaded""" | """Get the revisions that need to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def has_releases(self): | def has_releases(self) -> bool: | ||||
"""Checks whether we need to load releases""" | """Checks whether we need to load releases""" | ||||
return True | return True | ||||
def get_releases(self): | def get_releases(self) -> Iterable[Dict[str, Any]]: | ||||
"""Get the releases that need to be loaded""" | """Get the releases that need to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def get_snapshot(self): | def get_snapshot(self) -> Dict[str, Any]: | ||||
"""Get the snapshot that needs to be loaded""" | """Get the snapshot that needs to be loaded""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def eventful(self): | def eventful(self) -> bool: | ||||
"""Whether the load was eventful""" | """Whether the load was eventful""" | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def save_data(self): | def store_data(self) -> None: | ||||
"""Save the data associated to the current load""" | |||||
raise NotImplementedError | |||||
def store_data(self): | |||||
if self.config['save_data']: | if self.config['save_data']: | ||||
self.save_data() | self.save_data() | ||||
if self.has_contents(): | if self.has_contents(): | ||||
self.send_contents(self.get_contents()) | self.send_contents(self.get_contents()) | ||||
if self.has_directories(): | if self.has_directories(): | ||||
self.send_directories(self.get_directories()) | self.send_directories(self.get_directories()) | ||||
if self.has_revisions(): | if self.has_revisions(): | ||||
self.send_revisions(self.get_revisions()) | self.send_revisions(self.get_revisions()) | ||||
if self.has_releases(): | if self.has_releases(): | ||||
self.send_releases(self.get_releases()) | self.send_releases(self.get_releases()) | ||||
self.send_snapshot(self.get_snapshot()) | self.send_snapshot(self.get_snapshot()) | ||||
self.flush() |