diff --git a/requirements-swh.txt b/requirements-swh.txt index 7d53946f..cb0758cc 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ -swh.core[db,http] >= 2.10 +swh.core[db,http] >= 2.14 swh.counters >= v0.8.0 swh.model >= 6.3.0 swh.objstorage >= 0.2.2 diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py index 22c69ec0..db416280 100644 --- a/swh/storage/api/server.py +++ b/swh/storage/api/server.py @@ -1,195 +1,205 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from typing import Any, Dict, Optional -from psycopg2.errors import QueryCanceled +from psycopg2.errors import OperationalError, QueryCanceled from swh.core import config from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler, serializers from swh.storage import get_storage as get_swhstorage from ..exc import StorageArgumentException from ..interface import StorageInterface from ..metrics import send_metric, timed from .serializers import DECODERS, ENCODERS def get_storage(): global storage if not storage: storage = get_swhstorage(**app.config["storage"]) return storage class StorageServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS method_decorators = [timed] def _process_metrics(self, metrics, endpoint): for metric, count in metrics.items(): send_metric(metric=metric, count=count, method_name=endpoint) def post_content_add(self, ret, kw): self._process_metrics(ret, "content_add") def post_content_add_metadata(self, ret, kw): self._process_metrics(ret, "content_add_metadata") def post_skipped_content_add(self, ret, kw): self._process_metrics(ret, "skipped_content_add") def post_directory_add(self, ret, kw): self._process_metrics(ret, "directory_add") def post_revision_add(self, ret, kw): self._process_metrics(ret, "revision_add") def post_release_add(self, ret, kw): self._process_metrics(ret, "release_add") def post_snapshot_add(self, ret, kw): self._process_metrics(ret, "snapshot_add") def post_origin_visit_status_add(self, ret, kw): self._process_metrics(ret, "origin_visit_status_add") def post_origin_add(self, ret, kw): self._process_metrics(ret, "origin_add") def post_raw_extrinsic_metadata_add(self, ret, kw): self._process_metrics(ret, "raw_extrinsic_metadata_add") def post_metadata_fetcher_add(self, ret, kw): self._process_metrics(ret, "metadata_fetcher_add") def post_metadata_authority_add(self, ret, kw): self._process_metrics(ret, "metadata_authority_add") def post_extid_add(self, ret, kw): self._process_metrics(ret, "extid_add") def post_origin_visit_add(self, ret, kw): nb_visits = len(ret) send_metric( "origin_visit:add", count=nb_visits, # method_name should be "origin_visit_add", but changing it now would break # existing metrics method_name="origin_visit", ) app = StorageServerApp( __name__, backend_class=StorageInterface, backend_factory=get_storage ) storage = None @app.errorhandler(StorageArgumentException) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400) -@app.errorhandler(QueryCanceled) -def querycanceled_error_handler(exception): +@app.errorhandler(OperationalError) +def operationalerror_exception_handler(exception): # Same as error_handler(exception, encode_data); but does not log or send to Sentry. # These errors are noisy, and are better logged on the caller's side after it - # retried a few times + # retried a few times. + # Additionally, we return 503 instead of 500, telling clients they should retry. + response = encode_data(serializers.exception_to_dict(exception)) + response.status_code = 503 + return response + + +@app.errorhandler(QueryCanceled) +def querycancelled_exception_handler(exception): + # Ditto, but 500 instead of 503, because this is usually caused by the query + # size instead of a transient failure response = encode_data(serializers.exception_to_dict(exception)) response.status_code = 500 return response @app.errorhandler(Exception) def default_error_handler(exception): return error_handler(exception, encode_data) @app.route("/") @timed def index(): return """ Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

""" @app.route("/stat/counters", methods=["GET"]) @timed def stat_counters(): return encode_data(get_storage().stat_counters()) @app.route("/stat/refresh", methods=["GET"]) @timed def refresh_stat_counters(): return encode_data(get_storage().refresh_stat_counters()) api_cfg = None def load_and_check_config(config_path: Optional[str]) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path: Path to the configuration file to load Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_path: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) if "storage" not in cfg: raise KeyError("Missing 'storage' configuration") return cfg def make_app_from_configfile() -> StorageServerApp: """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print("Deprecated. Use swh-storage") diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py index 5c8a6390..e3663ac7 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -1,98 +1,140 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import psycopg2.errors import pytest +from swh.core.api import RemoteException, TransientRemoteException import swh.storage from swh.storage import get_storage import swh.storage.api.server as server from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) from swh.storage.tests.storage_tests import TestStorage as _TestStorage # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture def app_server(): server.storage = swh.storage.get_storage( cls="memory", journal_writer={"cls": "memory"} ) yield server @pytest.fixture def app(app_server): return app_server.app @pytest.fixture def swh_rpc_client_class(): def storage_factory(**kwargs): storage_config = { "cls": "remote", **kwargs, } return get_storage(**storage_config) return storage_factory @pytest.fixture def swh_storage(swh_rpc_client, app_server): # This version of the swh_storage fixture uses the swh_rpc_client fixture # to instantiate a RemoteStorage (see swh_rpc_client_class above) that # proxies, via the swh.core RPC mechanism, the local (in memory) storage # configured in the app_server fixture above. # # Also note that, for the sake of # making it easier to write tests, the in-memory journal writer of the # in-memory backend storage is attached to the RemoteStorage as its # journal_writer attribute. storage = swh_rpc_client journal_writer = getattr(storage, "journal_writer", None) storage.journal_writer = app_server.storage.journal_writer storage.objstorage = app_server.storage.objstorage yield storage storage.journal_writer = journal_writer class TestStorageApi(_TestStorage): @pytest.mark.skip( 'The "person" table of the pgsql is a legacy thing, and not ' "supported by the cassandra backend." ) def test_person_fullname_unicity(self): pass @pytest.mark.skip("content_update is not yet implemented for Cassandra") def test_content_update(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count(self): pass + def test_exception(self, app_server, swh_storage, mocker): + """Checks the client re-raises unknown exceptions as a :exc:`RemoteException`""" + assert swh_storage.revision_get(["\x01" * 20]) == [None] + mocker.patch.object( + app_server.storage._cql_runner, + "revision_get", + side_effect=ValueError("crash"), + ) + with pytest.raises(RemoteException) as e: + swh_storage.revision_get(["\x01" * 20]) + assert not isinstance(e, TransientRemoteException) + + def test_operationalerror_exception(self, app_server, swh_storage, mocker): + """Checks the client re-raises as a :exc:`TransientRemoteException` + rather than the base :exc:`RemoteException`; so the retrying proxy + retries for longer.""" + assert swh_storage.revision_get(["\x01" * 20]) == [None] + mocker.patch.object( + app_server.storage._cql_runner, + "revision_get", + side_effect=psycopg2.errors.AdminShutdown("cluster is shutting down"), + ) + with pytest.raises(RemoteException) as excinfo: + swh_storage.revision_get(["\x01" * 20]) + assert isinstance(excinfo.value, TransientRemoteException) + + def test_querycancelled_exception(self, app_server, swh_storage, mocker): + """Checks the client re-raises as a :exc:`TransientRemoteException` + rather than the base :exc:`RemoteException`; so the retrying proxy + retries for longer.""" + assert swh_storage.revision_get(["\x01" * 20]) == [None] + mocker.patch.object( + app_server.storage._cql_runner, + "revision_get", + side_effect=psycopg2.errors.QueryCanceled("too big!"), + ) + with pytest.raises(RemoteException) as excinfo: + swh_storage.revision_get(["\x01" * 20]) + assert not isinstance(excinfo.value, TransientRemoteException) + class TestStorageApiGeneratedData(_TestStorageGeneratedData): @pytest.mark.skip("Not supported by Cassandra") def test_origin_count(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_no_visits(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_with_visits_and_snapshot(self): pass @pytest.mark.skip("Not supported by Cassandra") def test_origin_count_with_visit_with_visits_no_snapshot(self): pass