diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -71,12 +71,7 @@ # Streaming methods def add_stream(self, content_iter, obj_id, check_presence=True): - obj_id = hashutil.hash_to_hex(obj_id) - return self._proxy.post_stream( - "content/add_stream/{}".format(obj_id), - params={"check_presence": check_presence}, - data=content_iter, - ) + raise NotImplementedError def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): obj_id = hashutil.hash_to_hex(obj_id) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -3,14 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import json import os import aiohttp.web from swh.core.api.asynchronous import RPCServerApp, decode_request from swh.core.api.asynchronous import encode_data_server as encode_data -from swh.core.api.serializers import SWHJSONDecoder, msgpack_loads from swh.core.config import read as config_read from swh.core.statsd import statsd from swh.model import hashutil @@ -105,39 +103,6 @@ # Streaming methods -@timed -async def add_stream(request): - hex_id = request.match_info["hex_id"] - obj_id = hashutil.hash_to_bytes(hex_id) - check_pres = request.query.get("check_presence", "").lower() == "true" - objstorage = request.app["objstorage"] - - if check_pres and obj_id in objstorage: - return encode_data(obj_id) - - # XXX this really should go in a decode_stream_request coroutine in - # swh.core, but since py35 does not support async generators, it cannot - # easily be made for now - content_type = request.headers.get("Content-Type") - if content_type == "application/x-msgpack": - decode = msgpack_loads - elif content_type == "application/json": - decode = lambda x: json.loads(x, cls=SWHJSONDecoder) # noqa - else: - raise ValueError("Wrong content type `%s` for API request" % content_type) - - buffer = b"" - with objstorage.chunk_writer(obj_id) as write: - while not request.content.at_eof(): - data, eot = await request.content.readchunk() - buffer += data - if eot: - write(decode(buffer)) - buffer = b"" - - return encode_data(obj_id) - - @timed async def get_stream(request): hex_id = request.match_info["hex_id"] @@ -188,7 +153,6 @@ app.router.add_route("POST", "/content/check", check) app.router.add_route("POST", "/content/delete", delete) app.router.add_route("GET", "/content", list_content) - app.router.add_route("POST", "/content/add_stream/{hex_id}", add_stream) app.router.add_route("GET", "/content/get_stream/{hex_id}", get_stream) return app diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py --- a/swh/objstorage/backends/pathslicing.py +++ b/swh/objstorage/backends/pathslicing.py @@ -335,6 +335,22 @@ f.write(compressor.flush()) def add_stream(self, content_iter, obj_id, check_presence=True): + """Add a new object to the object storage using streaming. + + This function is identical to add() except it takes a generator that + yields the chunked content instead of the whole content at once. + + Args: + content (bytes): chunked generator that yields the object's raw + content to add in storage. + obj_id (bytes): object identifier + check_presence (bool): indicate if the presence of the + content should be verified before adding the file. + + Returns: + the id (bytes) of the object into the storage. + + """ if check_presence and obj_id in self: return obj_id diff --git a/swh/objstorage/backends/seaweedfs/http.py b/swh/objstorage/backends/seaweedfs/http.py --- a/swh/objstorage/backends/seaweedfs/http.py +++ b/swh/objstorage/backends/seaweedfs/http.py @@ -95,7 +95,5 @@ params["lastFileName"] = dircontent["LastFileName"] else: - LOGGER.error( - 'Error listing "%s". [HTTP %d]', self.url, rsp.status_code - ) + LOGGER.error('Error listing "%s". [HTTP %d]', self.url, rsp.status_code) break