diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,6 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner -retrying psutil requests iso8601 diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,50 +7,16 @@ import hashlib import logging import os -import psycopg2 -import requests -import traceback -import uuid from abc import ABCMeta, abstractmethod -from retrying import retry -from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Optional, Tuple, Union from swh.core import config -from swh.storage import get_storage, HashCollision +from swh.storage import get_storage from swh.loader.core.converters import content_for_storage -def retry_loading(error): - """Retry policy when the database raises an integrity error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - HashCollision, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - logger = logging.getLogger('swh.loader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class BufferedLoader(config.SWHConfig, metaclass=ABCMeta): +class BaseLoader(config.SWHConfig, metaclass=ABCMeta): """Mixin base class for loader. To use this class, you must: @@ -119,12 +85,6 @@ _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) - self.counters = { - 'contents': 0, - 'directories': 0, - 'revisions': 0, - 'releases': 0, - } self.max_content_size = self.config['max_content_size'] # possibly overridden in self.prepare method @@ -169,194 +129,6 @@ return self.__save_data_path - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_origin(self, origin: Dict[str, Any]) -> None: - log_id = str(uuid.uuid4()) - self.log.debug('Creating origin for %s' % origin['url'], - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - self.storage.origin_add_one(origin) - self.log.debug('Done creating origin for %s' % origin['url'], - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_origin_visit(self, visit_date: Union[str, datetime.datetime], - visit_type: str) -> Dict[str, Any]: - log_id = str(uuid.uuid4()) - self.log.debug( - 'Creating origin_visit for origin %s at time %s' % ( - self.origin['url'], visit_date), - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin_visit', - 'swh_num': 1, - 'swh_id': log_id - }) - origin_visit = self.storage.origin_visit_add( - self.origin['url'], visit_date, visit_type) - self.log.debug( - 'Done Creating %s origin_visit for origin %s at time %s' % ( - visit_type, self.origin['url'], visit_date), - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin_visit', - 'swh_num': 1, - 'swh_id': log_id - }) - - return origin_visit - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def update_origin_visit(self, status: str) -> None: - log_id = str(uuid.uuid4()) - self.log.debug( - 'Updating origin_visit for origin %s with status %s' % ( - self.origin['url'], status), - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin_visit', - 'swh_num': 1, - 'swh_id': log_id - }) - self.storage.origin_visit_update( - self.origin['url'], self.visit, status) - self.log.debug( - 'Done updating origin_visit for origin %s with status %s' % ( - self.origin['url'], status), - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin_visit', - 'swh_num': 1, - 'swh_id': log_id - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, contents: Iterable[Mapping[str, Any]]) -> None: - """Actually send properly formatted contents to the database. - - """ - contents = list(contents) - num_contents = len(contents) - if num_contents > 0: - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - # FIXME: deal with this in model at some point - result = self.storage.content_add([ - content_for_storage( - c, max_content_size=self.max_content_size, - origin_url=self.origin['url']) - for c in contents - ]) - self.counters['contents'] += result.get('content:add', 0) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, - directories: Iterable[Mapping[str, Any]]) -> None: - """Actually send properly formatted directories to the database. - - """ - directories = list(directories) - num_directories = len(directories) - if num_directories > 0: - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - result = self.storage.directory_add(directories) - self.counters['directories'] += result.get('directory:add', 0) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revisions: Iterable[Mapping[str, Any]]) -> None: - """Actually send properly formatted revisions to the database. - - """ - revisions = list(revisions) - num_revisions = len(revisions) - if num_revisions > 0: - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - result = self.storage.revision_add(revisions) - self.counters['revisions'] += result.get('revision:add', 0) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, releases: Iterable[Mapping[str, Any]]) -> None: - """Actually send properly formatted releases to the database. - - """ - releases = list(releases) - num_releases = len(releases) - if num_releases > 0: - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - result = self.storage.release_add(releases) - self.counters['releases'] += result.get('release:add', 0) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_snapshot(self, snapshot: Mapping[str, Any]) -> None: - self.flush() # to ensure the snapshot targets existing objects - self.storage.snapshot_add([snapshot]) - self.storage.origin_visit_update( - self.origin['url'], self.visit, snapshot=snapshot['id']) - def flush(self) -> None: """Flush any potential dangling data not sent to swh-storage. @@ -390,12 +162,12 @@ """ origin = self.origin.copy() - self.send_origin(origin) + self.storage.origin_add_one(origin) if not self.visit_date: # now as default visit_date if not provided self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - self.origin_visit = self.send_origin_visit( - self.visit_date, self.visit_type) + self.origin_visit = self.storage.origin_visit_add( + origin['url'], self.visit_date, self.visit_type) self.visit = self.origin_visit['visit'] @abstractmethod @@ -524,7 +296,9 @@ break self.store_metadata() - self.update_origin_visit(status=self.visit_status()) + self.storage.origin_visit_update( + self.origin['url'], self.visit, self.visit_status() + ) self.post_load() except Exception: self.log.exception('Loading failure, updating to `partial` status', @@ -532,7 +306,9 @@ 'swh_task_args': args, 'swh_task_kwargs': kwargs, }) - self.update_origin_visit(status='partial') + self.storage.origin_visit_update( + self.origin['url'], self.visit, 'partial' + ) self.post_load(success=False) return {'status': 'failed'} finally: @@ -542,15 +318,18 @@ return self.load_status() -class UnbufferedLoader(BufferedLoader): - """This base class is a pattern for unbuffered loaders. +# retro-compatibility +BufferedLoader = BaseLoader + + +class DVCSLoader(BaseLoader): + """This base class is a pattern for dvcs loaders (e.g. git, mercurial). - UnbufferedLoader loaders are able to load all the data in one go. For - example, the loader defined in swh-loader-git - :class:`BulkUpdater`. + Those loaders are able to load all the data in one go. For example, the + loader defined in swh-loader-git :class:`BulkUpdater`. For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), - inherit directly from :class:`BufferedLoader`. + inherit directly from :class:`BaseLoader`. """ ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] @@ -604,12 +383,25 @@ self.save_data() if self.has_contents(): - self.send_contents(self.get_contents()) + self.storage.content_add([ + content_for_storage( + c, max_content_size=self.max_content_size, + origin_url=self.origin['url']) + for c in self.get_contents() + ]) if self.has_directories(): - self.send_directories(self.get_directories()) + self.storage.directory_add(self.get_directories()) if self.has_revisions(): - self.send_revisions(self.get_revisions()) + self.storage.revision_add(self.get_revisions()) if self.has_releases(): - self.send_releases(self.get_releases()) - self.send_snapshot(self.get_snapshot()) + self.storage.release_add(self.get_releases()) + self.flush() # to ensure the snapshot targets existing objects + snapshot = self.get_snapshot() + self.storage.snapshot_add([snapshot]) + self.storage.origin_visit_update( + self.origin['url'], self.visit, snapshot=snapshot['id']) self.flush() + + +# Deprecated name +UnbufferedLoader = DVCSLoader diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -8,11 +8,7 @@ import logging import pytest -from swh.model.hashutil import hash_to_bytes - -from swh.loader.core.loader import BufferedLoader, UnbufferedLoader - -from . import BaseLoaderTest +from swh.loader.core.loader import BaseLoader, DVCSLoader class DummyLoader: @@ -29,8 +25,7 @@ pass def prepare_origin_visit(self, *args, **kwargs): - origin = self.storage.origin_get( - self._test_prepare_origin_visit_data['origin']) + origin = {'url': 'some-url'} self.origin = origin self.origin_url = origin['url'] self.visit_date = datetime.datetime.utcnow() @@ -39,7 +34,7 @@ self.visit_type) -class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader): +class DummyDVCSLoader(DummyLoader, DVCSLoader): """Unbuffered loader will send directly to storage new data """ @@ -50,17 +45,20 @@ 'cls': 'pipeline', 'steps': [ { - 'cls': 'filter' + 'cls': 'retry', + }, + { + 'cls': 'filter', }, { - 'cls': 'memory' + 'cls': 'memory', }, ] }, } -class DummyBufferedLoader(DummyLoader, BufferedLoader): +class DummyBaseLoader(DummyLoader, BaseLoader): """Buffered loader will send new data when threshold is reached """ @@ -71,7 +69,10 @@ 'cls': 'pipeline', 'steps': [ { - 'cls': 'filter' + 'cls': 'retry', + }, + { + 'cls': 'filter', }, { 'cls': 'buffer', @@ -84,264 +85,40 @@ }, }, { - 'cls': 'memory' + 'cls': 'memory', }, ] }, } -class DummyBaseLoaderTest(BaseLoaderTest): - def setUp(self): - self.loader = self.loader_class(logging_class='dummyloader') - self.loader.visit_type = 'git' - # do not call voluntarily super().setUp() - self.storage = self.loader.storage - contents = [ - { - 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', - 'sha1_git': b'bar1', - 'sha256': b'baz1', - 'blake2s256': b'qux1', - 'status': 'visible', - 'data': b'data1', - 'length': 5, - }, - { - 'sha1': '61c2b3a30496d329e21af70dd2d7e097046d07b7', - 'sha1_git': b'bar2', - 'sha256': b'baz2', - 'blake2s256': b'qux2', - 'status': 'visible', - 'data': b'data2', - 'length': 5, - }, - ] - self.expected_contents = [content['sha1'] for content in contents] - self.in_contents = contents.copy() - for content in self.in_contents: - content['sha1'] = hash_to_bytes(content['sha1']) - self.in_directories = [ - {'id': hash_to_bytes(id_), 'entries': []} - for id_ in [ - '44e45d56f88993aae6a0198013efa80716fd8921', - '54e45d56f88993aae6a0198013efa80716fd8920', - '43e45d56f88993aae6a0198013efa80716fd8920', - ] - ] - person = { - 'name': b'John Doe', - 'email': b'john.doe@institute.org', - 'fullname': b'John Doe ' - } - rev1_id = b'\x00'*20 - rev2_id = b'\x01'*20 - self.in_revisions = [ - { - 'id': rev1_id, - 'type': 'git', - 'date': 1567591673, - 'committer_date': 1567591673, - 'author': person, - 'committer': person, - 'message': b'msg1', - 'directory': None, - 'synthetic': False, - 'metadata': None, - 'parents': [], - }, - { - 'id': rev2_id, - 'type': 'hg', - 'date': 1567591673, - 'committer_date': 1567591673, - 'author': person, - 'committer': person, - 'message': b'msg2', - 'directory': None, - 'synthetic': False, - 'metadata': None, - 'parents': [], - }, - ] - self.in_releases = [ - { - 'name': b'rel1', - 'id': b'\x02'*20, - 'date': None, - 'author': person, - 'target_type': 'revision', - 'target': rev1_id, - 'message': None, - 'synthetic': False, - }, - { - 'name': b'rel2', - 'id': b'\x03'*20, - 'date': None, - 'author': person, - 'target_type': 'revision', - 'target': rev2_id, - 'message': None, - 'synthetic': False, - }, - ] - self.in_origin = { - 'type': self.loader.visit_type, - 'url': 'http://example.com/', - } - self.in_snapshot = { - 'id': b'snap1', - 'branches': {}, - } - self.in_provider = { - 'provider_name': 'Test Provider', - 'provider_type': 'test_provider', - 'provider_url': 'http://example.org/metadata_provider', - 'metadata': {'working': True}, - } - self.in_tool = { - 'name': 'Test Tool', - 'version': 'v1.2.3', - 'configuration': {'in_the_Matrix': 'maybe'}, - } - - self.storage.origin_add([self.in_origin]) - - # used by prepare_origin_visit() when it gets called - self.loader._test_prepare_origin_visit_data = { - 'origin': self.in_origin, - } - - def tearDown(self): - # do not call voluntarily super().tearDown() - pass - - -class CoreUnbufferedLoaderTest(DummyBaseLoaderTest): - loader_class = DummyUnbufferedLoader - - def test_unbuffered_loader(self): - self.loader.load() # initialize the loader - - self.loader.send_contents(self.in_contents[0:1]) - self.loader.send_directories(self.in_directories[0:1]) - self.loader.send_revisions(self.in_revisions[0:1]) - self.loader.send_releases(self.in_releases[0:1]) - - self.assertCountContents(1) - self.assertCountDirectories(1) - self.assertCountRevisions(1) - self.assertCountReleases(1) - - self.loader.send_contents(self.in_contents[1:]) - self.loader.send_directories(self.in_directories[1:]) - self.loader.send_revisions(self.in_revisions[1:]) - self.loader.send_releases(self.in_releases[1:]) - - self.assertCountContents(len(self.in_contents)) - self.assertCountDirectories(len(self.in_directories)) - self.assertCountRevisions(len(self.in_revisions)) - self.assertCountReleases(len(self.in_releases)) - - -class CoreBufferedLoaderTest(DummyBaseLoaderTest): - loader_class = DummyBufferedLoader - - def test_buffered_loader(self): - self.loader.load() # initialize the loader - - self.loader.send_contents(self.in_contents[0:1]) - self.loader.send_directories(self.in_directories[0:1]) - self.loader.send_revisions(self.in_revisions[0:1]) - self.loader.send_releases(self.in_releases[0:1]) - - self.assertCountContents(0) - self.assertCountDirectories(0) - self.assertCountRevisions(0) - self.assertCountReleases(0) - - self.loader.send_contents(self.in_contents[1:]) - self.loader.send_directories(self.in_directories[1:]) - self.loader.send_revisions(self.in_revisions[1:]) - self.loader.send_releases(self.in_releases) - - self.assertCountContents(len(self.in_contents)) - self.assertCountDirectories(len(self.in_directories)) - self.assertCountRevisions(len(self.in_revisions)) - self.assertCountReleases(len(self.in_releases)) - - def test_directory_cascade(self): - """Checks that sending a directory triggers sending contents""" - self.loader.load() # initialize the loader - - self.loader.send_contents(self.in_contents[0:1]) - self.loader.send_directories(self.in_directories) - - self.assertCountContents(1) - self.assertCountDirectories(len(self.in_directories)) - - def test_revision_cascade(self): - """Checks that sending a revision triggers sending contents and - directories.""" - - self.loader.load() # initialize the loader - - self.loader.send_contents(self.in_contents[0:1]) - self.loader.send_directories(self.in_directories[0:1]) - self.loader.send_revisions(self.in_revisions) - - self.assertCountContents(1) - self.assertCountDirectories(1) - self.assertCountRevisions(len(self.in_revisions)) - - def test_release_cascade(self): - """Checks that sending a release triggers sending revisions, - contents, and directories.""" - self.loader.load() # initialize the loader - - self.loader.send_contents(self.in_contents[0:1]) - self.loader.send_directories(self.in_directories[0:1]) - self.loader.send_revisions(self.in_revisions[0:1]) - self.loader.send_releases(self.in_releases) - - self.assertCountContents(1) - self.assertCountDirectories(1) - self.assertCountRevisions(1) - self.assertCountReleases(len(self.in_releases)) +def test_base_loader(): + loader = DummyBaseLoader() + result = loader.load() - def test_snapshot_cascade(self): - """Checks that sending a snapshot triggers sending releases, - revisions, contents, and directories.""" - self.loader.load() # initialize the loader + assert result == {'status': 'eventful'} - self.loader.send_contents(self.in_contents[0:1]) - self.loader.send_directories(self.in_directories[0:1]) - self.loader.send_revisions(self.in_revisions[0:1]) - self.loader.send_releases(self.in_releases[0:1]) - self.loader.send_snapshot(self.in_snapshot) - self.assertCountContents(1) - self.assertCountDirectories(1) - self.assertCountRevisions(1) - self.assertCountReleases(1) - self.assertCountSnapshots(1) +def test_dvcs_loader(): + loader = DummyDVCSLoader() + result = loader.load() + assert result == {'status': 'eventful'} def test_loader_logger_default_name(): - loader = DummyBufferedLoader() + loader = DummyBaseLoader() assert isinstance(loader.log, logging.Logger) assert loader.log.name == \ - 'swh.loader.core.tests.test_loader.DummyBufferedLoader' + 'swh.loader.core.tests.test_loader.DummyBaseLoader' - loader = DummyUnbufferedLoader() + loader = DummyDVCSLoader() assert isinstance(loader.log, logging.Logger) assert loader.log.name == \ - 'swh.loader.core.tests.test_loader.DummyUnbufferedLoader' + 'swh.loader.core.tests.test_loader.DummyDVCSLoader' def test_loader_logger_with_name(): - loader = DummyBufferedLoader('some.logger.name') + loader = DummyBaseLoader('some.logger.name') assert isinstance(loader.log, logging.Logger) assert loader.log.name == \ 'some.logger.name' @@ -349,7 +126,7 @@ @pytest.mark.fs def test_loader_save_data_path(tmp_path): - loader = DummyBufferedLoader('some.logger.name.1') + loader = DummyBaseLoader('some.logger.name.1') url = 'http://bitbucket.org/something' loader.origin = { 'url': url,