diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html # remote storage API server -aiohttp >= 3 click requests psycopg2 diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,12 +7,13 @@ from swh.core.utils import iter_chunks from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError +from swh.objstorage.interface import ObjStorageInterface from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT SHA1_SIZE = 20 -class RemoteObjStorage: +class RemoteObjStorage(RPCClient): """Proxy to a remote object storage. This class allows to connect to an object storage server via @@ -25,60 +26,19 @@ """ - def __init__(self, **kwargs): - self._proxy = RPCClient( - api_exception=ObjStorageAPIError, - reraise_exceptions=[ObjNotFoundError, Error], - **kwargs, - ) - - def check_config(self, *, check_write): - return self._proxy.post("check_config", {"check_write": check_write}) - - def __contains__(self, obj_id): - return self._proxy.post("content/contains", {"obj_id": obj_id}) - - def add(self, content, obj_id=None, check_presence=True): - return self._proxy.post( - "content/add", - {"content": content, "obj_id": obj_id, "check_presence": check_presence}, - ) - - def add_batch(self, contents, check_presence=True): - return self._proxy.post( - "content/add/batch", - {"contents": contents, "check_presence": check_presence,}, - ) + api_exception = ObjStorageAPIError + reraise_exceptions = [ObjNotFoundError, Error] + backend_class = ObjStorageInterface def restore(self, content, obj_id=None, *args, **kwargs): return self.add(content, obj_id, check_presence=False) - def get(self, obj_id): - return self._proxy.post("content/get", {"obj_id": obj_id}) - - def get_batch(self, obj_ids): - return self._proxy.post("content/get/batch", {"obj_ids": obj_ids}) - - def check(self, obj_id): - return self._proxy.post("content/check", {"obj_id": obj_id}) - - def delete(self, obj_id): - # deletion permission are checked server-side - return self._proxy.post("content/delete", {"obj_id": obj_id}) - - # Management methods - - def get_random(self, batch_size): - return self._proxy.post("content/get/random", {"batch_size": batch_size}) - - # Streaming methods - def add_stream(self, content_iter, obj_id, check_presence=True): raise NotImplementedError def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): obj_id = hashutil.hash_to_hex(obj_id) - return self._proxy.get_stream( + return self._get_stream( "content/get_stream/{}".format(obj_id), chunk_size=chunk_size ) @@ -90,5 +50,5 @@ if last_obj_id: params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id) yield from iter_chunks( - self._proxy.get_stream("content", params=params), chunk_size=SHA1_SIZE + self._get_stream("content", params=params), chunk_size=SHA1_SIZE ) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,163 +1,122 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import contextlib +import functools +import logging import os -import aiohttp.web +from flask import request -from swh.core.api.asynchronous import RPCServerApp, decode_request -from swh.core.api.asynchronous import encode_data_server as encode_data +from swh.core.api import RPCServerApp +from swh.core.api import encode_data_server as encode_data +from swh.core.api import error_handler from swh.core.config import read as config_read from swh.core.statsd import statsd -from swh.core.utils import grouper from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.factory import get_objstorage +from swh.objstorage.factory import get_objstorage as get_swhobjstorage +from swh.objstorage.interface import ObjStorageInterface from swh.objstorage.objstorage import DEFAULT_LIMIT def timed(f): - async def w(*a, **kw): + @functools.wraps(f) + def w(*a, **kw): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__} ): - return await f(*a, **kw) + return f(*a, **kw) return w -@timed -async def index(request): - return aiohttp.web.Response(body="SWH Objstorage API server") - - -@timed -async def check_config(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].check_config(**req)) - - -@timed -async def contains(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].__contains__(**req)) - - -@timed -async def add_bytes(request): - req = await decode_request(request) - statsd.increment( - "swh_objstorage_in_bytes_total", - len(req["content"]), - tags={"endpoint": "add_bytes"}, - ) - return encode_data(request.app["objstorage"].add(**req)) - +@contextlib.contextmanager +def timed_context(f_name): + with statsd.timed( + "swh_objstorage_request_duration_seconds", tags={"endpoint": f_name} + ): + yield -@timed -async def add_batch(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].add_batch(**req)) +def get_objstorage(): + global objstorage + if not objstorage: + objstorage = get_swhobjstorage(**app.config["objstorage"]) -@timed -async def get_bytes(request): - req = await decode_request(request) + return objstorage - ret = request.app["objstorage"].get(**req) - statsd.increment( - "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} - ) - return encode_data(ret) +class ObjStorageServerApp(RPCServerApp): + client_exception_classes = (ObjNotFoundError, Error) + method_decorators = [timed] + def pre_add(self, kw): + """Called before the 'add' method.""" + statsd.increment( + "swh_objstorage_in_bytes_total", + len(kw["content"]), + tags={"endpoint": "add_bytes"}, + ) -@timed -async def get_batch(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].get_batch(**req)) + def post_get(self, ret, kw): + """Called after the 'get' method.""" + statsd.increment( + "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} + ) -@timed -async def check(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].check(**req)) +app = ObjStorageServerApp( + __name__, backend_class=ObjStorageInterface, backend_factory=get_objstorage, +) +objstorage = None -@timed -async def delete(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].delete(**req)) +@app.errorhandler(Error) +def argument_error_handler(exception): + return error_handler(exception, encode_data, status_code=400) -# Management methods +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) +@app.route("/") @timed -async def get_random_contents(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].get_random(**req)) +def index(): + return "SWH Objstorage API server" # Streaming methods -@timed -async def get_stream(request): - hex_id = request.match_info["hex_id"] +@app.route("/content/get_stream/") +def get_stream(hex_id): obj_id = hashutil.hash_to_bytes(hex_id) - response = aiohttp.web.StreamResponse() - await response.prepare(request) - for chunk in request.app["objstorage"].get_stream(obj_id, 2 << 20): - await response.write(chunk) - await response.write_eof() - return response + def generate(): + with timed_context("get_stream"): + yield from objstorage.get_stream(obj_id, 2 << 20) -@timed -async def list_content(request): - last_obj_id = request.query.get("last_obj_id") + return app.response_class(generate()) + + +@app.route("/content") +def list_content(): + last_obj_id = request.args.get("last_obj_id") if last_obj_id: last_obj_id = bytes.fromhex(last_obj_id) - limit = int(request.query.get("limit", DEFAULT_LIMIT)) - response = aiohttp.web.StreamResponse() - response.enable_chunked_encoding() - await response.prepare(request) - for group in grouper( - request.app["objstorage"].list_content(last_obj_id, limit=limit), 100 - ): - await response.write(b"".join(group)) - await response.write_eof() - return response - + limit = int(request.args.get("limit", DEFAULT_LIMIT)) -def make_app(config): - """Initialize the remote api application. + def generate(): + yield b"" + with timed_context("get_stream"): + yield from objstorage.list_content(last_obj_id, limit=limit) - """ - client_max_size = config.get("client_max_size", 1024 * 1024 * 1024) - app = RPCServerApp(client_max_size=client_max_size) - app.client_exception_classes = (ObjNotFoundError, Error) - - # retro compatibility configuration settings - app["config"] = config - app["objstorage"] = get_objstorage(**config["objstorage"]) - - app.router.add_route("GET", "/", index) - app.router.add_route("POST", "/check_config", check_config) - app.router.add_route("POST", "/content/contains", contains) - app.router.add_route("POST", "/content/add", add_bytes) - app.router.add_route("POST", "/content/add/batch", add_batch) - app.router.add_route("POST", "/content/get", get_bytes) - app.router.add_route("POST", "/content/get/batch", get_batch) - app.router.add_route("POST", "/content/get/random", get_random_contents) - app.router.add_route("POST", "/content/check", check) - app.router.add_route("POST", "/content/delete", delete) - app.router.add_route("GET", "/content", list_content) - app.router.add_route("GET", "/content/get_stream/{hex_id}", get_stream) - return app + return app.response_class(generate()) def load_and_check_config(config_file): @@ -230,8 +189,10 @@ """ config_file = os.environ.get("SWH_CONFIG_FILENAME") - config = load_and_check_config(config_file) - return make_app(config=config) + app.config = load_and_check_config(config_file) + handler = logging.StreamHandler() + app.logger.addHandler(handler) + return app if __name__ == "__main__": diff --git a/swh/objstorage/cli.py b/swh/objstorage/cli.py --- a/swh/objstorage/cli.py +++ b/swh/objstorage/cli.py @@ -66,20 +66,24 @@ show_default=True, help="Binding port of the server", ) +@click.option( + "--debug/--no-debug", + default=True, + help="Indicates if the server should run in debug mode", +) @click.pass_context -def serve(ctx, host, port): +def serve(ctx, host, port, debug): """Run a standalone objstorage server. This is not meant to be run on production systems. """ - import aiohttp.web - - from swh.objstorage.api.server import make_app, validate_config + from swh.objstorage.api.server import app, validate_config - app = make_app(validate_config(ctx.obj["config"])) - if ctx.obj["log_level"] == "DEBUG": - app.update(debug=True) - aiohttp.web.run_app(app, host=host, port=int(port)) + if "log_level" in ctx.obj: + logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) + validate_config(ctx.obj["config"]) + app.config.update(ctx.obj["config"]) + app.run(host, port=int(port), debug=bool(debug)) @objstorage_cli_group.command("import") diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/interface.py copy from swh/objstorage/objstorage.py copy to swh/objstorage/interface.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/interface.py @@ -1,78 +1,13 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import abc -import bz2 -from itertools import dropwhile, islice -import lzma -from typing import Dict -import zlib +from swh.core.api import remote_api_endpoint +from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT -from swh.model import hashutil -from .exc import ObjNotFoundError - -ID_HASH_ALGO = "sha1" -ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. -DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 # Size in bytes of the streaming chunks -DEFAULT_LIMIT = 10000 - - -def compute_hash(content): - """Compute the content's hash. - - Args: - content (bytes): The raw content to hash - hash_name (str): Hash's name (default to ID_HASH_ALGO) - - Returns: - The ID_HASH_ALGO for the content - - """ - return ( - hashutil.MultiHash.from_data(content, hash_names=[ID_HASH_ALGO],) - .digest() - .get(ID_HASH_ALGO) - ) - - -class NullCompressor: - def compress(self, data): - return data - - def flush(self): - return b"" - - -class NullDecompressor: - def decompress(self, data): - return data - - @property - def unused_data(self): - return b"" - - -decompressors = { - "bz2": bz2.BZ2Decompressor, - "lzma": lzma.LZMADecompressor, - "gzip": lambda: zlib.decompressobj(wbits=31), - "zlib": zlib.decompressobj, - "none": NullDecompressor, -} - -compressors = { - "bz2": bz2.BZ2Compressor, - "lzma": lzma.LZMACompressor, - "gzip": lambda: zlib.compressobj(wbits=31), - "zlib": zlib.compressobj, - "none": NullCompressor, -} - - -class ObjStorage(metaclass=abc.ABCMeta): +class ObjStorageInterface: """ High-level API to manipulate the Software Heritage object storage. Conceptually, the object storage offers the following methods: @@ -92,20 +27,13 @@ Some of the methods have available streaming equivalents: - - add_stream() same as add() but with a chunked iterator - - restore_stream() same as add_stream() but erase already existing content - get_stream() same as get() but returns a chunked iterator Each implementation of this interface can have a different behavior and its own way to store the contents. """ - def __init__(self, *, allow_delete=False, **kwargs): - # A more complete permission system could be used in place of that if - # it becomes needed - self.allow_delete = allow_delete - - @abc.abstractmethod + @remote_api_endpoint("check_config") def check_config(self, *, check_write): """Check whether the object storage is properly configured. @@ -116,10 +44,10 @@ Returns: True if the configuration check worked, an exception if it didn't. """ - pass + ... - @abc.abstractmethod - def __contains__(self, obj_id, *args, **kwargs): + @remote_api_endpoint("content/contains") + def __contains__(self, obj_id): """Indicate if the given object is present in the storage. Args: @@ -130,10 +58,10 @@ storage. """ - pass + ... - @abc.abstractmethod - def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + @remote_api_endpoint("content/add") + def add(self, content, obj_id=None, check_presence=True): """Add a new object to the object storage. Args: @@ -149,9 +77,10 @@ the id (bytes) of the object into the storage. """ - pass + ... - def add_batch(self, contents, check_presence=True) -> Dict: + @remote_api_endpoint("content/add/batch") + def add_batch(self, contents, check_presence=True): """Add a batch of new objects to the object storage. Args: @@ -162,16 +91,9 @@ count of bytes object) """ - summary = {"object:add": 0, "object:add:bytes": 0} - for obj_id, content in contents.items(): - if check_presence and obj_id in self: - continue - self.add(content, obj_id, check_presence=False) - summary["object:add"] += 1 - summary["object:add:bytes"] += len(content) - return summary - - def restore(self, content, obj_id=None, *args, **kwargs): + ... + + def restore(self, content, obj_id=None): """Restore a content that have been corrupted. This function is identical to add but does not check if @@ -187,11 +109,10 @@ the fly. """ - # check_presence to false will erase the potential previous content. - return self.add(content, obj_id, check_presence=False) + ... - @abc.abstractmethod - def get(self, obj_id, *args, **kwargs): + @remote_api_endpoint("content/get") + def get(self, obj_id): """Retrieve the content of a given object. Args: @@ -204,9 +125,10 @@ ObjNotFoundError: if the requested object is missing. """ - pass + ... - def get_batch(self, obj_ids, *args, **kwargs): + @remote_api_endpoint("content/get/batch") + def get_batch(self, obj_ids): """Retrieve objects' raw content in bulk from storage. Note: This function does have a default implementation in @@ -225,14 +147,10 @@ one content will not cancel the whole request. """ - for obj_id in obj_ids: - try: - yield self.get(obj_id) - except ObjNotFoundError: - yield None - - @abc.abstractmethod - def check(self, obj_id, *args, **kwargs): + ... + + @remote_api_endpoint("content/check") + def check(self, obj_id): """Perform an integrity check for a given object. Verify that the file object is in place and that the content matches @@ -246,10 +164,10 @@ Error: if the request object is corrupted. """ - pass + ... - @abc.abstractmethod - def delete(self, obj_id, *args, **kwargs): + @remote_api_endpoint("content/delete") + def delete(self, obj_id): """Delete an object. Args: @@ -259,12 +177,12 @@ ObjNotFoundError: if the requested object is missing. """ - if not self.allow_delete: - raise PermissionError("Delete is not allowed.") + ... # Management methods - def get_random(self, batch_size, *args, **kwargs): + @remote_api_endpoint("content/get/random") + def get_random(self, batch_size): """Get random ids of existing contents. This method is used in order to get random ids to perform @@ -278,46 +196,10 @@ current object storage. """ - pass + ... # Streaming methods - def add_stream(self, content_iter, obj_id, check_presence=True): - """Add a new object to the object storage using streaming. - - This function is identical to add() except it takes a generator that - yields the chunked content instead of the whole content at once. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ - raise NotImplementedError - - def restore_stream(self, content_iter, obj_id=None): - """Restore a content that have been corrupted using streaming. - - This function is identical to restore() except it takes a generator - that yields the chunked content instead of the whole content at once. - The default implementation provided by the current class is - suitable for most cases. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - - """ - # check_presence to false will erase the potential previous content. - return self.add_stream(content_iter, obj_id, check_presence=False) - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): """Retrieve the content of a given object as a chunked iterator. @@ -331,7 +213,10 @@ ObjNotFoundError: if the requested object is missing. """ - raise NotImplementedError + ... + + def __iter__(self): + ... def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): """Generates known object ids. @@ -344,7 +229,4 @@ Generates: obj_id (bytes): object ids. """ - it = iter(self) - if last_obj_id: - it = dropwhile(lambda x: x <= last_obj_id, it) - return islice(it, limit) + ... diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -73,33 +73,6 @@ class ObjStorage(metaclass=abc.ABCMeta): - """ High-level API to manipulate the Software Heritage object storage. - - Conceptually, the object storage offers the following methods: - - - check_config() check if the object storage is properly configured - - __contains__() check if an object is present, by object id - - add() add a new object, returning an object id - - restore() same as add() but erase an already existed content - - get() retrieve the content of an object, by object id - - check() check the integrity of an object, by object id - - delete() remove an object - - And some management methods: - - - get_random() get random object id of existing contents (used for the - content integrity checker). - - Some of the methods have available streaming equivalents: - - - add_stream() same as add() but with a chunked iterator - - restore_stream() same as add_stream() but erase already existing content - - get_stream() same as get() but returns a chunked iterator - - Each implementation of this interface can have a different behavior and - its own way to store the contents. - """ - def __init__(self, *, allow_delete=False, **kwargs): # A more complete permission system could be used in place of that if # it becomes needed @@ -107,61 +80,17 @@ @abc.abstractmethod def check_config(self, *, check_write): - """Check whether the object storage is properly configured. - - Args: - check_write (bool): if True, check if writes to the object storage - can succeed. - - Returns: - True if the configuration check worked, an exception if it didn't. - """ pass @abc.abstractmethod def __contains__(self, obj_id, *args, **kwargs): - """Indicate if the given object is present in the storage. - - Args: - obj_id (bytes): object identifier. - - Returns: - True if and only if the object is present in the current object - storage. - - """ pass @abc.abstractmethod def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): - """Add a new object to the object storage. - - Args: - content (bytes): object's raw content to add in storage. - obj_id (bytes): checksum of [bytes] using [ID_HASH_ALGO] - algorithm. When given, obj_id will be trusted to match - the bytes. If missing, obj_id will be computed on the - fly. - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ pass def add_batch(self, contents, check_presence=True) -> Dict: - """Add a batch of new objects to the object storage. - - Args: - contents: mapping from obj_id to object contents - - Returns: - the summary of objects added to the storage (count of object, - count of bytes object) - - """ summary = {"object:add": 0, "object:add:bytes": 0} for obj_id, content in contents.items(): if check_presence and obj_id in self: @@ -172,59 +101,14 @@ return summary def restore(self, content, obj_id=None, *args, **kwargs): - """Restore a content that have been corrupted. - - This function is identical to add but does not check if - the object id is already in the file system. - The default implementation provided by the current class is - suitable for most cases. - - Args: - content (bytes): object's raw content to add in storage - obj_id (bytes): checksum of `bytes` as computed by - ID_HASH_ALGO. When given, obj_id will be trusted to - match bytes. If missing, obj_id will be computed on - the fly. - - """ # check_presence to false will erase the potential previous content. return self.add(content, obj_id, check_presence=False) @abc.abstractmethod def get(self, obj_id, *args, **kwargs): - """Retrieve the content of a given object. - - Args: - obj_id (bytes): object id. - - Returns: - the content of the requested object as bytes. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ pass def get_batch(self, obj_ids, *args, **kwargs): - """Retrieve objects' raw content in bulk from storage. - - Note: This function does have a default implementation in - ObjStorage that is suitable for most cases. - - For object storages that needs to do the minimal number of - requests possible (ex: remote object storages), that method - can be overridden to perform a more efficient operation. - - Args: - obj_ids ([bytes]: list of object ids. - - Returns: - list of resulting contents, or None if the content could - not be retrieved. Do not raise any exception as a fail for - one content will not cancel the whole request. - - """ for obj_id in obj_ids: try: yield self.get(obj_id) @@ -233,117 +117,27 @@ @abc.abstractmethod def check(self, obj_id, *args, **kwargs): - """Perform an integrity check for a given object. - - Verify that the file object is in place and that the content matches - the object id. - - Args: - obj_id (bytes): object identifier. - - Raises: - ObjNotFoundError: if the requested object is missing. - Error: if the request object is corrupted. - - """ pass @abc.abstractmethod def delete(self, obj_id, *args, **kwargs): - """Delete an object. - - Args: - obj_id (bytes): object identifier. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ if not self.allow_delete: raise PermissionError("Delete is not allowed.") # Management methods def get_random(self, batch_size, *args, **kwargs): - """Get random ids of existing contents. - - This method is used in order to get random ids to perform - content integrity verifications on random contents. - - Args: - batch_size (int): Number of ids that will be given - - Yields: - An iterable of ids (bytes) of contents that are in the - current object storage. - - """ pass # Streaming methods def add_stream(self, content_iter, obj_id, check_presence=True): - """Add a new object to the object storage using streaming. - - This function is identical to add() except it takes a generator that - yields the chunked content instead of the whole content at once. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ raise NotImplementedError - def restore_stream(self, content_iter, obj_id=None): - """Restore a content that have been corrupted using streaming. - - This function is identical to restore() except it takes a generator - that yields the chunked content instead of the whole content at once. - The default implementation provided by the current class is - suitable for most cases. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - - """ - # check_presence to false will erase the potential previous content. - return self.add_stream(content_iter, obj_id, check_presence=False) - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - """Retrieve the content of a given object as a chunked iterator. - - Args: - obj_id (bytes): object id. - - Returns: - the content of the requested object as bytes. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ raise NotImplementedError def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): - """Generates known object ids. - - Args: - last_obj_id (bytes): object id from which to iterate from - (excluded). - limit (int): max number of object ids to generate. - - Generates: - obj_id (bytes): object ids. - """ it = iter(self) if last_obj_id: it = dropwhile(lambda x: x <= last_obj_id, it) diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -9,15 +9,13 @@ import pytest -from swh.core.api.tests.server_testing import ServerTestFixtureAsync -from swh.objstorage.api.server import make_app +from swh.core.api.tests.server_testing import ServerTestFixture +from swh.objstorage.api.server import app from swh.objstorage.factory import get_objstorage from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture -class TestRemoteObjStorage( - ServerTestFixtureAsync, ObjStorageTestFixture, unittest.TestCase -): +class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase): """ Test the remote archive API. """ @@ -33,7 +31,7 @@ "client_max_size": 8 * 1024 * 1024, } - self.app = make_app(self.config) + self.app = app super().setUp() self.storage = get_objstorage("remote", {"url": self.url()})