diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py index 30cffe6..beb8bef 100644 --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,270 +1,284 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import aiohttp.web import json from swh.core.config import read as config_read from swh.core.api.asynchronous import ( RPCServerApp, decode_request, encode_data_server as encode_data, ) from swh.core.api.serializers import msgpack_loads, SWHJSONDecoder from swh.model import hashutil from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import DEFAULT_LIMIT from swh.objstorage.exc import Error, ObjNotFoundError from swh.core.statsd import statsd def timed(f): async def w(*a, **kw): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__} ): return await f(*a, **kw) return w @timed async def index(request): return aiohttp.web.Response(body="SWH Objstorage API server") @timed async def check_config(request): req = await decode_request(request) return encode_data(request.app["objstorage"].check_config(**req)) @timed async def contains(request): req = await decode_request(request) return encode_data(request.app["objstorage"].__contains__(**req)) @timed async def add_bytes(request): req = await decode_request(request) statsd.increment( "swh_objstorage_in_bytes_total", len(req["content"]), tags={"endpoint": "add_bytes"}, ) return encode_data(request.app["objstorage"].add(**req)) @timed async def add_batch(request): req = await decode_request(request) return encode_data(request.app["objstorage"].add_batch(**req)) @timed async def get_bytes(request): req = await decode_request(request) ret = request.app["objstorage"].get(**req) statsd.increment( "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} ) return encode_data(ret) @timed async def get_batch(request): req = await decode_request(request) return encode_data(request.app["objstorage"].get_batch(**req)) @timed async def check(request): req = await decode_request(request) return encode_data(request.app["objstorage"].check(**req)) @timed async def delete(request): req = await decode_request(request) return encode_data(request.app["objstorage"].delete(**req)) # Management methods @timed async def get_random_contents(request): req = await decode_request(request) return encode_data(request.app["objstorage"].get_random(**req)) # Streaming methods @timed async def add_stream(request): hex_id = request.match_info["hex_id"] obj_id = hashutil.hash_to_bytes(hex_id) check_pres = request.query.get("check_presence", "").lower() == "true" objstorage = request.app["objstorage"] if check_pres and obj_id in objstorage: return encode_data(obj_id) # XXX this really should go in a decode_stream_request coroutine in # swh.core, but since py35 does not support async generators, it cannot # easily be made for now content_type = request.headers.get("Content-Type") if content_type == "application/x-msgpack": decode = msgpack_loads elif content_type == "application/json": decode = lambda x: json.loads(x, cls=SWHJSONDecoder) # noqa else: raise ValueError("Wrong content type `%s` for API request" % content_type) buffer = b"" with objstorage.chunk_writer(obj_id) as write: while not request.content.at_eof(): data, eot = await request.content.readchunk() buffer += data if eot: write(decode(buffer)) buffer = b"" return encode_data(obj_id) @timed async def get_stream(request): hex_id = request.match_info["hex_id"] obj_id = hashutil.hash_to_bytes(hex_id) response = aiohttp.web.StreamResponse() await response.prepare(request) for chunk in request.app["objstorage"].get_stream(obj_id, 2 << 20): await response.write(chunk) await response.write_eof() return response @timed async def list_content(request): last_obj_id = request.query.get("last_obj_id") if last_obj_id: last_obj_id = bytes.fromhex(last_obj_id) limit = int(request.query.get("limit", DEFAULT_LIMIT)) response = aiohttp.web.StreamResponse() response.enable_chunked_encoding() await response.prepare(request) for obj_id in request.app["objstorage"].list_content(last_obj_id, limit=limit): await response.write(obj_id) await response.write_eof() return response def make_app(config): """Initialize the remote api application. """ client_max_size = config.get("client_max_size", 1024 * 1024 * 1024) app = RPCServerApp(client_max_size=client_max_size) app.client_exception_classes = (ObjNotFoundError, Error) # retro compatibility configuration settings app["config"] = config _cfg = config["objstorage"] app["objstorage"] = get_objstorage(_cfg["cls"], _cfg["args"]) app.router.add_route("GET", "/", index) app.router.add_route("POST", "/check_config", check_config) app.router.add_route("POST", "/content/contains", contains) app.router.add_route("POST", "/content/add", add_bytes) app.router.add_route("POST", "/content/add/batch", add_batch) app.router.add_route("POST", "/content/get", get_bytes) app.router.add_route("POST", "/content/get/batch", get_batch) app.router.add_route("POST", "/content/get/random", get_random_contents) app.router.add_route("POST", "/content/check", check) app.router.add_route("POST", "/content/delete", delete) app.router.add_route("GET", "/content", list_content) app.router.add_route("POST", "/content/add_stream/{hex_id}", add_stream) app.router.add_route("GET", "/content/get_stream/{hex_id}", get_stream) return app def load_and_check_config(config_file): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load - type (str): configuration type. For 'local' type, more - checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config_read(config_file) + return validate_config(cfg) + +def validate_config(cfg): + """Check the minimal configuration is set to run the api or raise an + explanatory error. + + Args: + cfg (dict): Loaded configuration. + + Raises: + Error if the setup is not as expected + + Returns: + configuration as a dict + + """ if "objstorage" not in cfg: raise KeyError("Invalid configuration; missing objstorage config entry") missing_keys = [] vcfg = cfg["objstorage"] for key in ("cls", "args"): v = vcfg.get(key) if v is None: missing_keys.append(key) if missing_keys: raise KeyError( "Invalid configuration; missing %s config entry" % (", ".join(missing_keys),) ) cls = vcfg.get("cls") if cls == "pathslicing": args = vcfg["args"] for key in ("root", "slicing"): v = args.get(key) if v is None: missing_keys.append(key) if missing_keys: raise KeyError( "Invalid configuration; missing args.%s config entry" % (", ".join(missing_keys),) ) return cfg def make_app_from_configfile(): """Load configuration and then build application to run """ config_file = os.environ.get("SWH_CONFIG_FILENAME") config = load_and_check_config(config_file) return make_app(config=config) if __name__ == "__main__": print("Deprecated. Use swh-objstorage") diff --git a/swh/objstorage/cli.py b/swh/objstorage/cli.py index dac97ae..793c075 100644 --- a/swh/objstorage/cli.py +++ b/swh/objstorage/cli.py @@ -1,107 +1,118 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging import time import click import aiohttp.web +from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -from swh.objstorage.api.server import load_and_check_config, make_app from swh.objstorage.factory import get_objstorage +from swh.objstorage.api.server import validate_config, make_app @click.group(name="objstorage", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): """Software Heritage Objstorage tools. """ + if not config_file: + config_file = os.environ.get("SWH_CONFIG_FILENAME") + + if config_file: + if not os.path.exists(config_file): + raise ValueError("%s does not exist" % config_file) + conf = config.read(config_file) + else: + conf = {} + ctx.ensure_object(dict) - cfg = load_and_check_config(config_file) - ctx.obj["config"] = cfg + + ctx.obj["config"] = conf @cli.command("rpc-serve") @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", "-p", default=5003, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.pass_context def serve(ctx, host, port): """Run a standalone objstorage server. This is not meant to be run on production systems. """ - app = make_app(ctx.obj["config"]) + app = make_app(validate_config(ctx.obj["config"])) if ctx.obj["log_level"] == "DEBUG": app.update(debug=True) aiohttp.web.run_app(app, host=host, port=int(port)) @cli.command("import") @click.argument("directory", required=True, nargs=-1) @click.pass_context def import_directories(ctx, directory): """Import a local directory in an existing objstorage. """ objstorage = get_objstorage(**ctx.obj["config"]["objstorage"]) nobj = 0 volume = 0 t0 = time.time() for dirname in directory: for root, _dirs, files in os.walk(dirname): for name in files: path = os.path.join(root, name) with open(path, "rb") as f: objstorage.add(f.read()) volume += os.stat(path).st_size nobj += 1 click.echo( "Imported %d files for a volume of %s bytes in %d seconds" % (nobj, volume, time.time() - t0) ) @cli.command("fsck") @click.pass_context def fsck(ctx): """Check the objstorage is not corrupted. """ objstorage = get_objstorage(**ctx.obj["config"]["objstorage"]) for obj_id in objstorage: try: objstorage.check(obj_id) except objstorage.Error as err: logging.error(err) def main(): return cli(auto_envvar_prefix="SWH_OBJSTORAGE") if __name__ == "__main__": main()