diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py index 6e28e5f..1bed899 100644 --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,89 +1,94 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient +from swh.core.utils import iter_chunks from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT +SHA1_SIZE = 20 + class RemoteObjStorage: """Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ def __init__(self, **kwargs): self._proxy = RPCClient( api_exception=ObjStorageAPIError, reraise_exceptions=[ObjNotFoundError, Error], **kwargs, ) def check_config(self, *, check_write): return self._proxy.post("check_config", {"check_write": check_write}) def __contains__(self, obj_id): return self._proxy.post("content/contains", {"obj_id": obj_id}) def add(self, content, obj_id=None, check_presence=True): return self._proxy.post( "content/add", {"content": content, "obj_id": obj_id, "check_presence": check_presence}, ) def add_batch(self, contents, check_presence=True): return self._proxy.post( "content/add/batch", {"contents": contents, "check_presence": check_presence,}, ) def restore(self, content, obj_id=None, *args, **kwargs): return self.add(content, obj_id, check_presence=False) def get(self, obj_id): return self._proxy.post("content/get", {"obj_id": obj_id}) def get_batch(self, obj_ids): return self._proxy.post("content/get/batch", {"obj_ids": obj_ids}) def check(self, obj_id): return self._proxy.post("content/check", {"obj_id": obj_id}) def delete(self, obj_id): # deletion permission are checked server-side return self._proxy.post("content/delete", {"obj_id": obj_id}) # Management methods def get_random(self, batch_size): return self._proxy.post("content/get/random", {"batch_size": batch_size}) # Streaming methods def add_stream(self, content_iter, obj_id, check_presence=True): raise NotImplementedError def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): obj_id = hashutil.hash_to_hex(obj_id) return self._proxy.get_stream( "content/get_stream/{}".format(obj_id), chunk_size=chunk_size ) def __iter__(self): - yield from self._proxy.get_stream("content") + yield from self.list_content() def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): params = {"limit": limit} if last_obj_id: params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id) - yield from self._proxy.get_stream("content", params=params) + yield from iter_chunks( + self._proxy.get_stream("content", params=params), chunk_size=SHA1_SIZE + )