diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -14,10 +14,11 @@ from abc import ABCMeta, abstractmethod from retrying import retry -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union from swh.core import config from swh.storage import get_storage, HashCollision +from swh.loader.core.converters import content_for_storage def retry_loading(error): @@ -92,6 +93,7 @@ } }), + 'max_content_size': ('int', 100 * 1024 * 1024), 'save_data': ('bool', False), 'save_data_path': ('str', ''), @@ -99,7 +101,8 @@ ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] - def __init__(self, logging_class=None, config=None): + def __init__(self, logging_class: Optional[str] = None, + config: Dict[str, Any] = {}): if config: self.config = config else: @@ -122,6 +125,13 @@ 'revisions': 0, 'releases': 0, } + self.max_content_size = self.config['max_content_size'] + + # possibly overridden in self.prepare method + self.visit_date: Optional[Union[str, datetime.datetime]] = None + self.origin: Dict[str, Any] = {} + self.visit_type: Optional[str] = None + self.origin_metadata: Dict[str, Any] = {} # Make sure the config is sane save_data = self.config.get('save_data') @@ -131,14 +141,14 @@ if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) - def save_data(self): + def save_data(self) -> None: """Save the data associated to the current load""" raise NotImplementedError - def get_save_data_path(self): + def get_save_data_path(self) -> str: """The path to which we archive the loader's raw data""" if not hasattr(self, '__save_data_path'): - year = str(self.visit_date.year) + year = str(self.visit_date.year) # type: ignore url = self.origin['url'].encode('utf-8') origin_url_hash = hashlib.sha1(url).hexdigest() @@ -156,7 +166,7 @@ return self.__save_data_path @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_origin(self, origin): + def send_origin(self, origin: Dict[str, Any]) -> None: log_id = str(uuid.uuid4()) self.log.debug('Creating origin for %s' % origin['url'], extra={ @@ -175,7 +185,8 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_origin_visit(self, visit_date, visit_type): + def send_origin_visit(self, visit_date: Union[str, datetime.datetime], + visit_type: str) -> Dict[str, Any]: log_id = str(uuid.uuid4()) self.log.debug( 'Creating origin_visit for origin %s at time %s' % ( @@ -201,7 +212,7 @@ return origin_visit @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_tool(self, tool): + def send_tool(self, tool: Dict[str, Any]) -> None: log_id = str(uuid.uuid4()) self.log.debug( 'Creating tool with name %s version %s configuration %s' % ( @@ -228,7 +239,7 @@ return tool_id @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_provider(self, provider): + def send_provider(self, provider: Dict[str, Any]) -> None: log_id = str(uuid.uuid4()) self.log.debug( 'Creating metadata_provider with name %s type %s url %s' % ( @@ -290,7 +301,7 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def update_origin_visit(self, status): + def update_origin_visit(self, status: str) -> None: log_id = str(uuid.uuid4()) self.log.debug( 'Updating origin_visit for origin %s with status %s' % ( @@ -314,11 +325,12 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): + def send_contents(self, contents: Iterable[Mapping[str, Any]]) -> None: """Actually send properly formatted contents to the database. """ - num_contents = len(content_list) + contents = list(contents) + num_contents = len(contents) if num_contents > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, @@ -328,7 +340,13 @@ 'swh_num': num_contents, 'swh_id': log_id, }) - result = self.storage.content_add(content_list) + # FIXME: deal with this in model at some point + result = self.storage.content_add([ + content_for_storage( + c, max_content_size=self.max_content_size, + origin_url=self.origin['url']) + for c in contents + ]) self.counters['contents'] += result.get('content:add', 0) self.log.debug("Done sending %d contents" % num_contents, extra={ @@ -339,11 +357,13 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): + def send_directories(self, + directories: Iterable[Mapping[str, Any]]) -> None: """Actually send properly formatted directories to the database. """ - num_directories = len(directory_list) + directories = list(directories) + num_directories = len(directories) if num_directories > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, @@ -353,7 +373,7 @@ 'swh_num': num_directories, 'swh_id': log_id, }) - result = self.storage.directory_add(directory_list) + result = self.storage.directory_add(directories) self.counters['directories'] += result.get('directory:add', 0) self.log.debug("Done sending %d directories" % num_directories, extra={ @@ -364,11 +384,12 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): + def send_revisions(self, revisions: Iterable[Mapping[str, Any]]) -> None: """Actually send properly formatted revisions to the database. """ - num_revisions = len(revision_list) + revisions = list(revisions) + num_revisions = len(revisions) if num_revisions > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, @@ -378,7 +399,7 @@ 'swh_num': num_revisions, 'swh_id': log_id, }) - result = self.storage.revision_add(revision_list) + result = self.storage.revision_add(revisions) self.counters['revisions'] += result.get('revision:add', 0) self.log.debug("Done sending %d revisions" % num_revisions, extra={ @@ -389,11 +410,12 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): + def send_releases(self, releases: Iterable[Mapping[str, Any]]) -> None: """Actually send properly formatted releases to the database. """ - num_releases = len(release_list) + releases = list(releases) + num_releases = len(releases) if num_releases > 0: log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, @@ -403,7 +425,7 @@ 'swh_num': num_releases, 'swh_id': log_id, }) - result = self.storage.release_add(release_list) + result = self.storage.release_add(releases) self.counters['releases'] += result.get('release:add', 0) self.log.debug("Done sending %d releases" % num_releases, extra={ @@ -414,13 +436,13 @@ }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_snapshot(self, snapshot): + def send_snapshot(self, snapshot: Mapping[str, Any]) -> None: self.flush() # to ensure the snapshot targets existing objects self.storage.snapshot_add([snapshot]) self.storage.origin_visit_update( self.origin['url'], self.visit, snapshot=snapshot['id']) - def flush(self): + def flush(self) -> None: """Flush any potential dangling data not sent to swh-storage. Bypass the maybe_load_* methods which awaits threshold reached @@ -431,7 +453,7 @@ if hasattr(self.storage, 'flush'): self.storage.flush() - def prepare_metadata(self): + def prepare_metadata(self) -> None: """First step for origin_metadata insertion, resolving the provider_id and the tool_id by fetching data from the storage or creating tool and provider on the fly if the data isn't available @@ -456,14 +478,14 @@ raise @abstractmethod - def cleanup(self): + def cleanup(self) -> None: """Last step executed by the loader. """ pass @abstractmethod - def prepare_origin_visit(self, *args, **kwargs): + def prepare_origin_visit(self, *args, **kwargs) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. @@ -471,7 +493,7 @@ """ pass - def _store_origin_visit(self): + def _store_origin_visit(self) -> None: """Store origin and visit references. Sets the self.origin_visit and self.visit references. @@ -486,14 +508,14 @@ self.visit = self.origin_visit['visit'] @abstractmethod - def prepare(self, *args, **kwargs): + def prepare(self, *args, **kwargs) -> None: """Second step executed by the loader to prepare some state needed by the loader. """ pass - def get_origin(self): + def get_origin(self) -> Dict[str, Any]: """Get the origin that is currently being loaded. self.origin should be set in :func:`prepare_origin` @@ -504,7 +526,7 @@ return self.origin @abstractmethod - def fetch_data(self): + def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading (ex: git/hg/svn/... repository). @@ -524,14 +546,14 @@ """ pass - def store_metadata(self): + def store_metadata(self) -> None: """Store fetched metadata in the database. For more information, see implementation in :class:`DepositLoader`. """ pass - def load_status(self): + def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. @@ -544,7 +566,7 @@ 'status': 'eventful', } - def post_load(self, success=True): + def post_load(self, success: bool = True) -> None: """Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading's status. @@ -560,21 +582,21 @@ """ pass - def visit_status(self): + def visit_status(self) -> str: """Detailed visit status. Defaults to logging a full visit. """ return 'full' - def pre_cleanup(self): + def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ pass - def load(self, *args, **kwargs): + def load(self, *args, **kwargs) -> Dict[str, str]: r"""Loading logic for the loader to follow: - 1. Call :meth:`prepare_origin_visit` to prepare the @@ -642,59 +664,51 @@ """ ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] - def __init__(self, logging_class=None, config=None): - super().__init__(logging_class=logging_class, config=config) - self.visit_date = None # possibly overridden in self.prepare method - - def cleanup(self): + def cleanup(self) -> None: """Clean up an eventual state installed for computations.""" pass - def has_contents(self): + def has_contents(self) -> bool: """Checks whether we need to load contents""" return True - def get_contents(self): + def get_contents(self) -> Iterable[Dict[str, Any]]: """Get the contents that need to be loaded""" raise NotImplementedError - def has_directories(self): + def has_directories(self) -> bool: """Checks whether we need to load directories""" return True - def get_directories(self): + def get_directories(self) -> Iterable[Dict[str, Any]]: """Get the directories that need to be loaded""" raise NotImplementedError - def has_revisions(self): + def has_revisions(self) -> bool: """Checks whether we need to load revisions""" return True - def get_revisions(self): + def get_revisions(self) -> Iterable[Dict[str, Any]]: """Get the revisions that need to be loaded""" raise NotImplementedError - def has_releases(self): + def has_releases(self) -> bool: """Checks whether we need to load releases""" return True - def get_releases(self): + def get_releases(self) -> Iterable[Dict[str, Any]]: """Get the releases that need to be loaded""" raise NotImplementedError - def get_snapshot(self): + def get_snapshot(self) -> Dict[str, Any]: """Get the snapshot that needs to be loaded""" raise NotImplementedError - def eventful(self): + def eventful(self) -> bool: """Whether the load was eventful""" raise NotImplementedError - def save_data(self): - """Save the data associated to the current load""" - raise NotImplementedError - - def store_data(self): + def store_data(self) -> None: if self.config['save_data']: self.save_data() @@ -707,3 +721,4 @@ if self.has_releases(): self.send_releases(self.get_releases()) self.send_snapshot(self.get_snapshot()) + self.flush() diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -45,6 +45,7 @@ """ def parse_config_file(self, *args, **kwargs): return { + 'max_content_size': 100 * 1024 * 1024, 'storage': { 'cls': 'pipeline', 'steps': [ @@ -65,6 +66,7 @@ """ def parse_config_file(self, *args, **kwargs): return { + 'max_content_size': 100 * 1024 * 1024, 'storage': { 'cls': 'pipeline', 'steps': [