Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/api/client.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | |||||
from typing import Any, Dict, Iterable, Union | from typing import Any, Dict, Iterable, Union | ||||
from swh.core.api import RemoteException, RPCClient | from swh.core.api import AsyncRPCClient, BaseRPCClient, RemoteException, RPCClient | ||||
from swh.model.model import Content | from swh.model.model import Content | ||||
from ..exc import HashCollision, StorageAPIError, StorageArgumentException | from ..exc import HashCollision, StorageAPIError, StorageArgumentException | ||||
from ..interface import StorageInterface | from ..interface import StorageInterface | ||||
from .serializers import DECODERS, ENCODERS | from .serializers import DECODERS, ENCODERS | ||||
class RemoteStorage(RPCClient): | class BaseRemoteStorage(BaseRPCClient): | ||||
"""Proxy to a remote storage API""" | """Base class for :class:`RemoteStorage` and :class:`AsyncRemoteStorage`.""" | ||||
api_exception = StorageAPIError | api_exception = StorageAPIError | ||||
backend_class = StorageInterface | backend_class = StorageInterface | ||||
reraise_exceptions = [ | reraise_exceptions = [ | ||||
StorageArgumentException, | StorageArgumentException, | ||||
] | ] | ||||
extra_type_decoders = DECODERS | extra_type_decoders = DECODERS | ||||
extra_type_encoders = ENCODERS | extra_type_encoders = ENCODERS | ||||
def raise_for_status(self, response) -> None: | def raise_for_status(self, response): | ||||
try: | try: | ||||
super().raise_for_status(response) | super().raise_for_status(response) | ||||
except RemoteException as e: | except RemoteException as e: | ||||
if ( | if ( | ||||
e.response is not None | e.response is not None | ||||
and e.response.status_code == 500 | and self._get_status_code(e.response) == 500 | ||||
and e.args | and e.args | ||||
and e.args[0].get("type") == "HashCollision" | and e.args[0].get("type") == "HashCollision" | ||||
): | ): | ||||
# XXX: workaround until we fix these HashCollisions happening | # XXX: workaround until we fix these HashCollisions happening | ||||
# when they shouldn't | # when they shouldn't | ||||
raise HashCollision(*e.args[0]["args"]) | raise HashCollision(*e.args[0]["args"]) | ||||
else: | else: | ||||
raise | raise | ||||
def content_add(self, content: Iterable[Union[Content, Dict[str, Any]]]): | |||||
content = [c.with_data() if isinstance(c, Content) else c for c in content] | |||||
return self.post("content/add", {"content": content}) | |||||
def reset(self): | def reset(self): | ||||
return self.post("reset", {}) | return self.post("reset", {}) | ||||
def stat_counters(self): | def stat_counters(self): | ||||
return self.get("stat/counters") | return self.get("stat/counters") | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
return self.get("stat/refresh") | return self.get("stat/refresh") | ||||
class RemoteStorage(BaseRemoteStorage, RPCClient): | |||||
"""Sync proxy to a remote storage API""" | |||||
def __init__(self, *args, **kwargs): | |||||
super().__init__(*args, **kwargs) | |||||
self.async_storage = lambda: AsyncRemoteStorage(*args, **kwargs) | |||||
def content_add(self, content: Iterable[Union[Content, Dict[str, Any]]]): | |||||
content = [c.with_data() if isinstance(c, Content) else c for c in content] | |||||
return self.post("content/add", {"content": content}) | |||||
class AsyncRemoteStorage(BaseRemoteStorage, AsyncRPCClient): | |||||
"""Async proxy to a remote storage API""" | |||||
async def content_add(self, content: Iterable[Union[Content, Dict[str, Any]]]): | |||||
content = [c.with_data() if isinstance(c, Content) else c for c in content] | |||||
return await self.post("content/add", {"content": content}) |