diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,11 +3,11 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html # remote storage API server -aiohttp >= 3 click requests psycopg2 sh +typing-extensions # optional dependencies # apache-libcloud diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,15 +1,21 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient +from swh.core.utils import iter_chunks from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT +from swh.objstorage.interface import ObjStorageInterface +from swh.objstorage.objstorage import ( + DEFAULT_CHUNK_SIZE, + DEFAULT_LIMIT, + ID_DIGEST_LENGTH, +) -class RemoteObjStorage: +class RemoteObjStorage(RPCClient): """Proxy to a remote object storage. This class allows to connect to an object storage server via @@ -22,73 +28,29 @@ """ - def __init__(self, **kwargs): - self._proxy = RPCClient( - api_exception=ObjStorageAPIError, - reraise_exceptions=[ObjNotFoundError, Error], - **kwargs, - ) - - def check_config(self, *, check_write): - return self._proxy.post("check_config", {"check_write": check_write}) - - def __contains__(self, obj_id): - return self._proxy.post("content/contains", {"obj_id": obj_id}) - - def add(self, content, obj_id=None, check_presence=True): - return self._proxy.post( - "content/add", - {"content": content, "obj_id": obj_id, "check_presence": check_presence}, - ) + api_exception = ObjStorageAPIError + reraise_exceptions = [ObjNotFoundError, Error] + backend_class = ObjStorageInterface - def add_batch(self, contents, check_presence=True): - return self._proxy.post( - "content/add/batch", - {"contents": contents, "check_presence": check_presence,}, - ) - - def restore(self, content, obj_id=None, *args, **kwargs): + def restore(self, content, obj_id=None): return self.add(content, obj_id, check_presence=False) - def get(self, obj_id): - return self._proxy.post("content/get", {"obj_id": obj_id}) - - def get_batch(self, obj_ids): - return self._proxy.post("content/get/batch", {"obj_ids": obj_ids}) - - def check(self, obj_id): - return self._proxy.post("content/check", {"obj_id": obj_id}) - - def delete(self, obj_id): - # deletion permission are checked server-side - return self._proxy.post("content/delete", {"obj_id": obj_id}) - - # Management methods - - def get_random(self, batch_size): - return self._proxy.post("content/get/random", {"batch_size": batch_size}) - - # Streaming methods - def add_stream(self, content_iter, obj_id, check_presence=True): - obj_id = hashutil.hash_to_hex(obj_id) - return self._proxy.post_stream( - "content/add_stream/{}".format(obj_id), - params={"check_presence": check_presence}, - data=content_iter, - ) + raise NotImplementedError def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): obj_id = hashutil.hash_to_hex(obj_id) - return self._proxy.get_stream( + return self._get_stream( "content/get_stream/{}".format(obj_id), chunk_size=chunk_size ) def __iter__(self): - yield from self._proxy.get_stream("content") + yield from self.list_content() def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): params = {"limit": limit} if last_obj_id: params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id) - yield from self._proxy.get_stream("content", params=params) + yield from iter_chunks( + self._get_stream("content", params=params), chunk_size=ID_DIGEST_LENGTH + ) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,196 +1,122 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import json +import contextlib +import functools +import logging import os -import aiohttp.web +from flask import request -from swh.core.api.asynchronous import RPCServerApp, decode_request -from swh.core.api.asynchronous import encode_data_server as encode_data -from swh.core.api.serializers import SWHJSONDecoder, msgpack_loads +from swh.core.api import RPCServerApp +from swh.core.api import encode_data_server as encode_data +from swh.core.api import error_handler from swh.core.config import read as config_read from swh.core.statsd import statsd from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.factory import get_objstorage +from swh.objstorage.factory import get_objstorage as get_swhobjstorage +from swh.objstorage.interface import ObjStorageInterface from swh.objstorage.objstorage import DEFAULT_LIMIT def timed(f): - async def w(*a, **kw): + @functools.wraps(f) + def w(*a, **kw): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__} ): - return await f(*a, **kw) + return f(*a, **kw) return w -@timed -async def index(request): - return aiohttp.web.Response(body="SWH Objstorage API server") - - -@timed -async def check_config(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].check_config(**req)) - - -@timed -async def contains(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].__contains__(**req)) - - -@timed -async def add_bytes(request): - req = await decode_request(request) - statsd.increment( - "swh_objstorage_in_bytes_total", - len(req["content"]), - tags={"endpoint": "add_bytes"}, - ) - return encode_data(request.app["objstorage"].add(**req)) - +@contextlib.contextmanager +def timed_context(f_name): + with statsd.timed( + "swh_objstorage_request_duration_seconds", tags={"endpoint": f_name} + ): + yield -@timed -async def add_batch(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].add_batch(**req)) +def get_objstorage(): + global objstorage + if not objstorage: + objstorage = get_swhobjstorage(**app.config["objstorage"]) -@timed -async def get_bytes(request): - req = await decode_request(request) + return objstorage - ret = request.app["objstorage"].get(**req) - statsd.increment( - "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} - ) - return encode_data(ret) +class ObjStorageServerApp(RPCServerApp): + client_exception_classes = (ObjNotFoundError, Error) + method_decorators = [timed] + def pre_add(self, kw): + """Called before the 'add' method.""" + statsd.increment( + "swh_objstorage_in_bytes_total", + len(kw["content"]), + tags={"endpoint": "add_bytes"}, + ) -@timed -async def get_batch(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].get_batch(**req)) + def post_get(self, ret, kw): + """Called after the 'get' method.""" + statsd.increment( + "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} + ) -@timed -async def check(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].check(**req)) +app = ObjStorageServerApp( + __name__, backend_class=ObjStorageInterface, backend_factory=get_objstorage, +) +objstorage = None -@timed -async def delete(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].delete(**req)) +@app.errorhandler(Error) +def argument_error_handler(exception): + return error_handler(exception, encode_data, status_code=400) -# Management methods +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) +@app.route("/") @timed -async def get_random_contents(request): - req = await decode_request(request) - return encode_data(request.app["objstorage"].get_random(**req)) +def index(): + return "SWH Objstorage API server" # Streaming methods -@timed -async def add_stream(request): - hex_id = request.match_info["hex_id"] +@app.route("/content/get_stream/") +def get_stream(hex_id): obj_id = hashutil.hash_to_bytes(hex_id) - check_pres = request.query.get("check_presence", "").lower() == "true" - objstorage = request.app["objstorage"] - - if check_pres and obj_id in objstorage: - return encode_data(obj_id) - - # XXX this really should go in a decode_stream_request coroutine in - # swh.core, but since py35 does not support async generators, it cannot - # easily be made for now - content_type = request.headers.get("Content-Type") - if content_type == "application/x-msgpack": - decode = msgpack_loads - elif content_type == "application/json": - decode = lambda x: json.loads(x, cls=SWHJSONDecoder) # noqa - else: - raise ValueError("Wrong content type `%s` for API request" % content_type) - - buffer = b"" - with objstorage.chunk_writer(obj_id) as write: - while not request.content.at_eof(): - data, eot = await request.content.readchunk() - buffer += data - if eot: - write(decode(buffer)) - buffer = b"" - - return encode_data(obj_id) + def generate(): + with timed_context("get_stream"): + yield from objstorage.get_stream(obj_id, 2 << 20) -@timed -async def get_stream(request): - hex_id = request.match_info["hex_id"] - obj_id = hashutil.hash_to_bytes(hex_id) - response = aiohttp.web.StreamResponse() - await response.prepare(request) - for chunk in request.app["objstorage"].get_stream(obj_id, 2 << 20): - await response.write(chunk) - await response.write_eof() - return response + return app.response_class(generate()) -@timed -async def list_content(request): - last_obj_id = request.query.get("last_obj_id") +@app.route("/content") +def list_content(): + last_obj_id = request.args.get("last_obj_id") if last_obj_id: last_obj_id = bytes.fromhex(last_obj_id) - limit = int(request.query.get("limit", DEFAULT_LIMIT)) - response = aiohttp.web.StreamResponse() - response.enable_chunked_encoding() - await response.prepare(request) - for obj_id in request.app["objstorage"].list_content(last_obj_id, limit=limit): - await response.write(obj_id) - await response.write_eof() - return response - + limit = int(request.args.get("limit", DEFAULT_LIMIT)) -def make_app(config): - """Initialize the remote api application. + def generate(): + yield b"" + with timed_context("get_stream"): + yield from objstorage.list_content(last_obj_id, limit=limit) - """ - client_max_size = config.get("client_max_size", 1024 * 1024 * 1024) - app = RPCServerApp(client_max_size=client_max_size) - app.client_exception_classes = (ObjNotFoundError, Error) - - # retro compatibility configuration settings - app["config"] = config - app["objstorage"] = get_objstorage(**config["objstorage"]) - - app.router.add_route("GET", "/", index) - app.router.add_route("POST", "/check_config", check_config) - app.router.add_route("POST", "/content/contains", contains) - app.router.add_route("POST", "/content/add", add_bytes) - app.router.add_route("POST", "/content/add/batch", add_batch) - app.router.add_route("POST", "/content/get", get_bytes) - app.router.add_route("POST", "/content/get/batch", get_batch) - app.router.add_route("POST", "/content/get/random", get_random_contents) - app.router.add_route("POST", "/content/check", check) - app.router.add_route("POST", "/content/delete", delete) - app.router.add_route("GET", "/content", list_content) - app.router.add_route("POST", "/content/add_stream/{hex_id}", add_stream) - app.router.add_route("GET", "/content/get_stream/{hex_id}", get_stream) - return app + return app.response_class(generate()) def load_and_check_config(config_file): @@ -263,8 +189,10 @@ """ config_file = os.environ.get("SWH_CONFIG_FILENAME") - config = load_and_check_config(config_file) - return make_app(config=config) + app.config = load_and_check_config(config_file) + handler = logging.StreamHandler() + app.logger.addHandler(handler) + return app if __name__ == "__main__": diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py --- a/swh/objstorage/backends/in_memory.py +++ b/swh/objstorage/backends/in_memory.py @@ -24,13 +24,13 @@ def check_config(self, *, check_write): return True - def __contains__(self, obj_id, *args, **kwargs): + def __contains__(self, obj_id): return obj_id in self.state def __iter__(self): return iter(sorted(self.state)) - def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) @@ -41,20 +41,20 @@ return obj_id - def get(self, obj_id, *args, **kwargs): + def get(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) return self.state[obj_id] - def check(self, obj_id, *args, **kwargs): + def check(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) if compute_hash(self.state[obj_id]) != obj_id: raise Error("Corrupt object %s" % obj_id) return True - def delete(self, obj_id, *args, **kwargs): + def delete(self, obj_id): super().delete(obj_id) # Check delete permission if obj_id not in self: raise ObjNotFoundError(obj_id) diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py --- a/swh/objstorage/backends/pathslicing.py +++ b/swh/objstorage/backends/pathslicing.py @@ -17,7 +17,7 @@ DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT, ID_HASH_ALGO, - ID_HASH_LENGTH, + ID_HEXDIGEST_LENGTH, ObjStorage, compressors, compute_hash, @@ -74,7 +74,7 @@ max_char = max( max(bound.start or 0, bound.stop or 0) for bound in self.bounds ) - if ID_HASH_LENGTH < max_char: + if ID_HEXDIGEST_LENGTH < max_char: raise ValueError( "Algorithm %s has too short hash for slicing to char %d" % (ID_HASH_ALGO, max_char) @@ -119,7 +119,7 @@ a list. """ - assert len(hex_obj_id) == ID_HASH_LENGTH + assert len(hex_obj_id) == ID_HEXDIGEST_LENGTH return [hex_obj_id[bound] for bound in self.bounds] def __len__(self) -> int: @@ -335,6 +335,22 @@ f.write(compressor.flush()) def add_stream(self, content_iter, obj_id, check_presence=True): + """Add a new object to the object storage using streaming. + + This function is identical to add() except it takes a generator that + yields the chunked content instead of the whole content at once. + + Args: + content (bytes): chunked generator that yields the object's raw + content to add in storage. + obj_id (bytes): object identifier + check_presence (bool): indicate if the presence of the + content should be verified before adding the file. + + Returns: + the id (bytes) of the object into the storage. + + """ if check_presence and obj_id in self: return obj_id diff --git a/swh/objstorage/backends/seaweedfs/http.py b/swh/objstorage/backends/seaweedfs/http.py --- a/swh/objstorage/backends/seaweedfs/http.py +++ b/swh/objstorage/backends/seaweedfs/http.py @@ -95,7 +95,5 @@ params["lastFileName"] = dircontent["LastFileName"] else: - LOGGER.error( - 'Error listing "%s". [HTTP %d]', self.url, rsp.status_code - ) + LOGGER.error('Error listing "%s". [HTTP %d]', self.url, rsp.status_code) break diff --git a/swh/objstorage/cli.py b/swh/objstorage/cli.py --- a/swh/objstorage/cli.py +++ b/swh/objstorage/cli.py @@ -66,20 +66,24 @@ show_default=True, help="Binding port of the server", ) +@click.option( + "--debug/--no-debug", + default=True, + help="Indicates if the server should run in debug mode", +) @click.pass_context -def serve(ctx, host, port): +def serve(ctx, host, port, debug): """Run a standalone objstorage server. This is not meant to be run on production systems. """ - import aiohttp.web - - from swh.objstorage.api.server import make_app, validate_config + from swh.objstorage.api.server import app, validate_config - app = make_app(validate_config(ctx.obj["config"])) - if ctx.obj["log_level"] == "DEBUG": - app.update(debug=True) - aiohttp.web.run_app(app, host=host, port=int(port)) + if "log_level" in ctx.obj: + logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) + validate_config(ctx.obj["config"]) + app.config.update(ctx.obj["config"]) + app.run(host, port=int(port), debug=bool(debug)) @objstorage_cli_group.command("import") diff --git a/swh/objstorage/factory.py b/swh/objstorage/factory.py --- a/swh/objstorage/factory.py +++ b/swh/objstorage/factory.py @@ -16,7 +16,7 @@ from swh.objstorage.backends.winery import WineryObjStorage from swh.objstorage.multiplexer import MultiplexerObjStorage, StripingObjStorage from swh.objstorage.multiplexer.filter import add_filters -from swh.objstorage.objstorage import ID_HASH_LENGTH, ObjStorage # noqa +from swh.objstorage.objstorage import ID_HEXDIGEST_LENGTH, ObjStorage # noqa __all__ = ["get_objstorage", "ObjStorage"] diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/interface.py copy from swh/objstorage/objstorage.py copy to swh/objstorage/interface.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/interface.py @@ -1,78 +1,18 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import abc -import bz2 -from itertools import dropwhile, islice -import lzma from typing import Dict -import zlib -from swh.model import hashutil +from typing_extensions import Protocol, runtime_checkable -from .exc import ObjNotFoundError +from swh.core.api import remote_api_endpoint +from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT -ID_HASH_ALGO = "sha1" -ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. -DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 # Size in bytes of the streaming chunks -DEFAULT_LIMIT = 10000 - -def compute_hash(content): - """Compute the content's hash. - - Args: - content (bytes): The raw content to hash - hash_name (str): Hash's name (default to ID_HASH_ALGO) - - Returns: - The ID_HASH_ALGO for the content - - """ - return ( - hashutil.MultiHash.from_data(content, hash_names=[ID_HASH_ALGO],) - .digest() - .get(ID_HASH_ALGO) - ) - - -class NullCompressor: - def compress(self, data): - return data - - def flush(self): - return b"" - - -class NullDecompressor: - def decompress(self, data): - return data - - @property - def unused_data(self): - return b"" - - -decompressors = { - "bz2": bz2.BZ2Decompressor, - "lzma": lzma.LZMADecompressor, - "gzip": lambda: zlib.decompressobj(wbits=31), - "zlib": zlib.decompressobj, - "none": NullDecompressor, -} - -compressors = { - "bz2": bz2.BZ2Compressor, - "lzma": lzma.LZMACompressor, - "gzip": lambda: zlib.compressobj(wbits=31), - "zlib": zlib.compressobj, - "none": NullCompressor, -} - - -class ObjStorage(metaclass=abc.ABCMeta): +@runtime_checkable +class ObjStorageInterface(Protocol): """ High-level API to manipulate the Software Heritage object storage. Conceptually, the object storage offers the following methods: @@ -92,20 +32,13 @@ Some of the methods have available streaming equivalents: - - add_stream() same as add() but with a chunked iterator - - restore_stream() same as add_stream() but erase already existing content - get_stream() same as get() but returns a chunked iterator Each implementation of this interface can have a different behavior and its own way to store the contents. """ - def __init__(self, *, allow_delete=False, **kwargs): - # A more complete permission system could be used in place of that if - # it becomes needed - self.allow_delete = allow_delete - - @abc.abstractmethod + @remote_api_endpoint("check_config") def check_config(self, *, check_write): """Check whether the object storage is properly configured. @@ -116,10 +49,10 @@ Returns: True if the configuration check worked, an exception if it didn't. """ - pass + ... - @abc.abstractmethod - def __contains__(self, obj_id, *args, **kwargs): + @remote_api_endpoint("content/contains") + def __contains__(self, obj_id): """Indicate if the given object is present in the storage. Args: @@ -130,10 +63,10 @@ storage. """ - pass + ... - @abc.abstractmethod - def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + @remote_api_endpoint("content/add") + def add(self, content, obj_id=None, check_presence=True): """Add a new object to the object storage. Args: @@ -149,8 +82,9 @@ the id (bytes) of the object into the storage. """ - pass + ... + @remote_api_endpoint("content/add/batch") def add_batch(self, contents, check_presence=True) -> Dict: """Add a batch of new objects to the object storage. @@ -162,16 +96,9 @@ count of bytes object) """ - summary = {"object:add": 0, "object:add:bytes": 0} - for obj_id, content in contents.items(): - if check_presence and obj_id in self: - continue - self.add(content, obj_id, check_presence=False) - summary["object:add"] += 1 - summary["object:add:bytes"] += len(content) - return summary - - def restore(self, content, obj_id=None, *args, **kwargs): + ... + + def restore(self, content, obj_id=None): """Restore a content that have been corrupted. This function is identical to add but does not check if @@ -187,11 +114,10 @@ the fly. """ - # check_presence to false will erase the potential previous content. - return self.add(content, obj_id, check_presence=False) + ... - @abc.abstractmethod - def get(self, obj_id, *args, **kwargs): + @remote_api_endpoint("content/get") + def get(self, obj_id): """Retrieve the content of a given object. Args: @@ -204,9 +130,10 @@ ObjNotFoundError: if the requested object is missing. """ - pass + ... - def get_batch(self, obj_ids, *args, **kwargs): + @remote_api_endpoint("content/get/batch") + def get_batch(self, obj_ids): """Retrieve objects' raw content in bulk from storage. Note: This function does have a default implementation in @@ -225,14 +152,10 @@ one content will not cancel the whole request. """ - for obj_id in obj_ids: - try: - yield self.get(obj_id) - except ObjNotFoundError: - yield None - - @abc.abstractmethod - def check(self, obj_id, *args, **kwargs): + ... + + @remote_api_endpoint("content/check") + def check(self, obj_id): """Perform an integrity check for a given object. Verify that the file object is in place and that the content matches @@ -246,10 +169,10 @@ Error: if the request object is corrupted. """ - pass + ... - @abc.abstractmethod - def delete(self, obj_id, *args, **kwargs): + @remote_api_endpoint("content/delete") + def delete(self, obj_id): """Delete an object. Args: @@ -259,12 +182,12 @@ ObjNotFoundError: if the requested object is missing. """ - if not self.allow_delete: - raise PermissionError("Delete is not allowed.") + ... # Management methods - def get_random(self, batch_size, *args, **kwargs): + @remote_api_endpoint("content/get/random") + def get_random(self, batch_size): """Get random ids of existing contents. This method is used in order to get random ids to perform @@ -278,46 +201,10 @@ current object storage. """ - pass + ... # Streaming methods - def add_stream(self, content_iter, obj_id, check_presence=True): - """Add a new object to the object storage using streaming. - - This function is identical to add() except it takes a generator that - yields the chunked content instead of the whole content at once. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ - raise NotImplementedError - - def restore_stream(self, content_iter, obj_id=None): - """Restore a content that have been corrupted using streaming. - - This function is identical to restore() except it takes a generator - that yields the chunked content instead of the whole content at once. - The default implementation provided by the current class is - suitable for most cases. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - - """ - # check_presence to false will erase the potential previous content. - return self.add_stream(content_iter, obj_id, check_presence=False) - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): """Retrieve the content of a given object as a chunked iterator. @@ -331,7 +218,10 @@ ObjNotFoundError: if the requested object is missing. """ - raise NotImplementedError + ... + + def __iter__(self): + ... def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): """Generates known object ids. @@ -344,7 +234,4 @@ Generates: obj_id (bytes): object ids. """ - it = iter(self) - if last_obj_id: - it = dropwhile(lambda x: x <= last_obj_id, it) - return islice(it, limit) + ... diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py --- a/swh/objstorage/multiplexer/multiplexer_objstorage.py +++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py @@ -6,6 +6,7 @@ import queue import random import threading +from typing import Dict from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.objstorage import ObjStorage @@ -251,7 +252,7 @@ continue return result - def add_batch(self, contents, check_presence=True): + def add_batch(self, contents, check_presence=True) -> Dict: """Add a batch of new objects to the object storage. """ diff --git a/swh/objstorage/multiplexer/striping_objstorage.py b/swh/objstorage/multiplexer/striping_objstorage.py --- a/swh/objstorage/multiplexer/striping_objstorage.py +++ b/swh/objstorage/multiplexer/striping_objstorage.py @@ -5,6 +5,7 @@ from collections import defaultdict import queue +from typing import Dict from swh.objstorage.multiplexer.multiplexer_objstorage import ( MultiplexerObjStorage, @@ -49,16 +50,16 @@ for i in range(self.num_storages): yield self.storage_threads[(idx + i) % self.num_storages] - def add_batch(self, contents, check_presence=True): + def add_batch(self, contents, check_presence=True) -> Dict: """Add a batch of new objects to the object storage. """ - content_by_storage_index = defaultdict(dict) + content_by_storage_index: Dict[bytes, Dict] = defaultdict(dict) for obj_id, content in contents.items(): storage_index = self.get_storage_index(obj_id) content_by_storage_index[storage_index][obj_id] = content - mailbox = queue.Queue() + mailbox: queue.Queue[Dict] = queue.Queue() for storage_index, contents in content_by_storage_index.items(): self.storage_threads[storage_index].queue_command( "add_batch", contents, check_presence=check_presence, mailbox=mailbox, diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -15,9 +15,18 @@ from .exc import ObjNotFoundError ID_HASH_ALGO = "sha1" -ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. -DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 # Size in bytes of the streaming chunks + +ID_HEXDIGEST_LENGTH = 40 +"""Size in bytes of the hash hexadecimal representation.""" + +ID_DIGEST_LENGTH = 20 +"""Size in bytes of the hash""" + +DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 +"""Size in bytes of the streaming chunks""" + DEFAULT_LIMIT = 10000 +"""Default number of results of ``list_content``.""" def compute_hash(content): @@ -73,33 +82,6 @@ class ObjStorage(metaclass=abc.ABCMeta): - """ High-level API to manipulate the Software Heritage object storage. - - Conceptually, the object storage offers the following methods: - - - check_config() check if the object storage is properly configured - - __contains__() check if an object is present, by object id - - add() add a new object, returning an object id - - restore() same as add() but erase an already existed content - - get() retrieve the content of an object, by object id - - check() check the integrity of an object, by object id - - delete() remove an object - - And some management methods: - - - get_random() get random object id of existing contents (used for the - content integrity checker). - - Some of the methods have available streaming equivalents: - - - add_stream() same as add() but with a chunked iterator - - restore_stream() same as add_stream() but erase already existing content - - get_stream() same as get() but returns a chunked iterator - - Each implementation of this interface can have a different behavior and - its own way to store the contents. - """ - def __init__(self, *, allow_delete=False, **kwargs): # A more complete permission system could be used in place of that if # it becomes needed @@ -107,61 +89,17 @@ @abc.abstractmethod def check_config(self, *, check_write): - """Check whether the object storage is properly configured. - - Args: - check_write (bool): if True, check if writes to the object storage - can succeed. - - Returns: - True if the configuration check worked, an exception if it didn't. - """ pass @abc.abstractmethod - def __contains__(self, obj_id, *args, **kwargs): - """Indicate if the given object is present in the storage. - - Args: - obj_id (bytes): object identifier. - - Returns: - True if and only if the object is present in the current object - storage. - - """ + def __contains__(self, obj_id): pass @abc.abstractmethod - def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): - """Add a new object to the object storage. - - Args: - content (bytes): object's raw content to add in storage. - obj_id (bytes): checksum of [bytes] using [ID_HASH_ALGO] - algorithm. When given, obj_id will be trusted to match - the bytes. If missing, obj_id will be computed on the - fly. - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ + def add(self, content, obj_id=None, check_presence=True): pass def add_batch(self, contents, check_presence=True) -> Dict: - """Add a batch of new objects to the object storage. - - Args: - contents: mapping from obj_id to object contents - - Returns: - the summary of objects added to the storage (count of object, - count of bytes object) - - """ summary = {"object:add": 0, "object:add:bytes": 0} for obj_id, content in contents.items(): if check_presence and obj_id in self: @@ -171,60 +109,15 @@ summary["object:add:bytes"] += len(content) return summary - def restore(self, content, obj_id=None, *args, **kwargs): - """Restore a content that have been corrupted. - - This function is identical to add but does not check if - the object id is already in the file system. - The default implementation provided by the current class is - suitable for most cases. - - Args: - content (bytes): object's raw content to add in storage - obj_id (bytes): checksum of `bytes` as computed by - ID_HASH_ALGO. When given, obj_id will be trusted to - match bytes. If missing, obj_id will be computed on - the fly. - - """ + def restore(self, content, obj_id=None): # check_presence to false will erase the potential previous content. return self.add(content, obj_id, check_presence=False) @abc.abstractmethod - def get(self, obj_id, *args, **kwargs): - """Retrieve the content of a given object. - - Args: - obj_id (bytes): object id. - - Returns: - the content of the requested object as bytes. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ + def get(self, obj_id): pass - def get_batch(self, obj_ids, *args, **kwargs): - """Retrieve objects' raw content in bulk from storage. - - Note: This function does have a default implementation in - ObjStorage that is suitable for most cases. - - For object storages that needs to do the minimal number of - requests possible (ex: remote object storages), that method - can be overridden to perform a more efficient operation. - - Args: - obj_ids ([bytes]: list of object ids. - - Returns: - list of resulting contents, or None if the content could - not be retrieved. Do not raise any exception as a fail for - one content will not cancel the whole request. - - """ + def get_batch(self, obj_ids): for obj_id in obj_ids: try: yield self.get(obj_id) @@ -232,118 +125,28 @@ yield None @abc.abstractmethod - def check(self, obj_id, *args, **kwargs): - """Perform an integrity check for a given object. - - Verify that the file object is in place and that the content matches - the object id. - - Args: - obj_id (bytes): object identifier. - - Raises: - ObjNotFoundError: if the requested object is missing. - Error: if the request object is corrupted. - - """ + def check(self, obj_id): pass @abc.abstractmethod - def delete(self, obj_id, *args, **kwargs): - """Delete an object. - - Args: - obj_id (bytes): object identifier. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ + def delete(self, obj_id): if not self.allow_delete: raise PermissionError("Delete is not allowed.") # Management methods - def get_random(self, batch_size, *args, **kwargs): - """Get random ids of existing contents. - - This method is used in order to get random ids to perform - content integrity verifications on random contents. - - Args: - batch_size (int): Number of ids that will be given - - Yields: - An iterable of ids (bytes) of contents that are in the - current object storage. - - """ + def get_random(self, batch_size): pass # Streaming methods def add_stream(self, content_iter, obj_id, check_presence=True): - """Add a new object to the object storage using streaming. - - This function is identical to add() except it takes a generator that - yields the chunked content instead of the whole content at once. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ raise NotImplementedError - def restore_stream(self, content_iter, obj_id=None): - """Restore a content that have been corrupted using streaming. - - This function is identical to restore() except it takes a generator - that yields the chunked content instead of the whole content at once. - The default implementation provided by the current class is - suitable for most cases. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - - """ - # check_presence to false will erase the potential previous content. - return self.add_stream(content_iter, obj_id, check_presence=False) - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - """Retrieve the content of a given object as a chunked iterator. - - Args: - obj_id (bytes): object id. - - Returns: - the content of the requested object as bytes. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ raise NotImplementedError def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): - """Generates known object ids. - - Args: - last_obj_id (bytes): object id from which to iterate from - (excluded). - limit (int): max number of object ids to generate. - - Generates: - obj_id (bytes): object ids. - """ it = iter(self) if last_obj_id: it = dropwhile(lambda x: x <= last_obj_id, it) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -4,13 +4,45 @@ # See top-level LICENSE file for more information from collections.abc import Iterator +import inspect import time from swh.objstorage import exc +from swh.objstorage.interface import ObjStorageInterface from swh.objstorage.objstorage import compute_hash class ObjStorageTestFixture: + def test_types(self): + """Checks all methods of ObjStorageInterface are implemented by this + backend, and that they have the same signature.""" + # Create an instance of the protocol (which cannot be instantiated + # directly, so this creates a subclass, then instantiates it) + interface = type("_", (ObjStorageInterface,), {})() + + assert "get_batch" in dir(interface) + + missing_methods = [] + + for meth_name in dir(interface): + if meth_name.startswith("_"): + continue + interface_meth = getattr(interface, meth_name) + concrete_meth = getattr(self.storage, meth_name) + + expected_signature = inspect.signature(interface_meth) + actual_signature = inspect.signature(concrete_meth) + + assert expected_signature == actual_signature, meth_name + + assert missing_methods == [] + + # If all the assertions above succeed, then this one should too. + # But there's no harm in double-checking. + # And we could replace the assertions above by this one, but unlike + # the assertions above, it doesn't explain what is missing. + assert isinstance(self.storage, ObjStorageInterface) + def hash_content(self, content): obj_id = compute_hash(content) return content, obj_id diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py --- a/swh/objstorage/tests/test_objstorage_api.py +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -9,15 +9,13 @@ import pytest -from swh.core.api.tests.server_testing import ServerTestFixtureAsync -from swh.objstorage.api.server import make_app +from swh.core.api.tests.server_testing import ServerTestFixture +from swh.objstorage.api.server import app from swh.objstorage.factory import get_objstorage from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture -class TestRemoteObjStorage( - ServerTestFixtureAsync, ObjStorageTestFixture, unittest.TestCase -): +class TestRemoteObjStorage(ServerTestFixture, ObjStorageTestFixture, unittest.TestCase): """ Test the remote archive API. """ @@ -33,7 +31,7 @@ "client_max_size": 8 * 1024 * 1024, } - self.app = make_app(self.config) + self.app = app super().setUp() self.storage = get_objstorage("remote", {"url": self.url()}) diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py --- a/swh/objstorage/tests/test_objstorage_pathslicing.py +++ b/swh/objstorage/tests/test_objstorage_pathslicing.py @@ -11,7 +11,7 @@ from swh.model import hashutil from swh.objstorage import exc from swh.objstorage.factory import get_objstorage -from swh.objstorage.objstorage import ID_HASH_LENGTH +from swh.objstorage.objstorage import ID_DIGEST_LENGTH from .objstorage_testing import ObjStorageTestFixture @@ -86,7 +86,7 @@ all_ids.append(obj_id) all_ids.sort() - ids = list(self.storage.iter_from(b"\x00" * (ID_HASH_LENGTH // 2))) + ids = list(self.storage.iter_from(b"\x00" * ID_DIGEST_LENGTH)) self.assertEqual(len(ids), len(all_ids)) self.assertEqual(ids, all_ids)