diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -1,11 +1,12 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import contextlib from typing import Any, Dict, Iterable, Union -from swh.core.api import RemoteException, RPCClient +from swh.core.api import AsyncRPCClient, BaseRPCClient, RemoteException, RPCClient from swh.model.model import Content from ..exc import HashCollision, StorageAPIError, StorageArgumentException @@ -13,8 +14,8 @@ from .serializers import DECODERS, ENCODERS -class RemoteStorage(RPCClient): - """Proxy to a remote storage API""" +class BaseRemoteStorage(BaseRPCClient): + """Base class for :class:`RemoteStorage` and :class:`AsyncRemoteStorage`.""" api_exception = StorageAPIError backend_class = StorageInterface @@ -24,13 +25,13 @@ extra_type_decoders = DECODERS extra_type_encoders = ENCODERS - def raise_for_status(self, response) -> None: + def raise_for_status(self, response): try: super().raise_for_status(response) except RemoteException as e: if ( e.response is not None - and e.response.status_code == 500 + and self._get_status_code(e.response) == 500 and e.args and e.args[0].get("type") == "HashCollision" ): @@ -40,10 +41,6 @@ else: raise - def content_add(self, content: Iterable[Union[Content, Dict[str, Any]]]): - content = [c.with_data() if isinstance(c, Content) else c for c in content] - return self.post("content/add", {"content": content}) - def reset(self): return self.post("reset", {}) @@ -52,3 +49,23 @@ def refresh_stat_counters(self): return self.get("stat/refresh") + + +class RemoteStorage(BaseRemoteStorage, RPCClient): + """Sync proxy to a remote storage API""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.async_storage = lambda: AsyncRemoteStorage(*args, **kwargs) + + def content_add(self, content: Iterable[Union[Content, Dict[str, Any]]]): + content = [c.with_data() if isinstance(c, Content) else c for c in content] + return self.post("content/add", {"content": content}) + + +class AsyncRemoteStorage(BaseRemoteStorage, AsyncRPCClient): + """Async proxy to a remote storage API""" + + async def content_add(self, content: Iterable[Union[Content, Dict[str, Any]]]): + content = [c.with_data() if isinstance(c, Content) else c for c in content] + return await self.post("content/add", {"content": content})