diff --git a/swh/storage/cli.py b/swh/storage/cli.py index a3ab903d..1dc7b656 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -1,133 +1,144 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os +import warnings import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.storage.api.server import load_and_check_config, app @click.group(name="storage", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def storage(ctx, config_file): """Software Heritage Storage tools.""" if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj["config"] = conf @storage.command(name="rpc-serve") -@click.argument("config-path", required=True) +@click.argument("config-path", default=None, required=False) @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5002, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, config_path, host, port, debug): """Software Heritage Storage RPC server. Do NOT use this in a production environment. """ if "log_level" in ctx.obj: logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) - api_cfg = load_and_check_config(config_path, type="any") - app.config.update(api_cfg) + if config_path: + # for bw compat + warnings.warn( + "The `config_path` argument of the `swh storage rpc-server` is now " + "deprecated. Please use the --config option of `swh storage` instead.", + DeprecationWarning, + ) + api_cfg = load_and_check_config(config_path, type="any") + app.config.update(api_cfg) + else: + app.config.update(ctx.obj["config"]) + app.run(host, port=int(port), debug=bool(debug)) @storage.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfill(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ # for "lazy" loading from swh.storage.backfill import JournalBackfiller try: from systemd.daemon import notify except ImportError: notify = None conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: notify("READY=1") try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run, ) except KeyboardInterrupt: if notify: notify("STOPPING=1") ctx.exit(0) def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") if __name__ == "__main__": main() diff --git a/swh/storage/tests/test_cli.py b/swh/storage/tests/test_cli.py new file mode 100644 index 00000000..138ab57c --- /dev/null +++ b/swh/storage/tests/test_cli.py @@ -0,0 +1,119 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import logging +import tempfile +import requests +import threading +import time + +from contextlib import contextmanager +from aiohttp.test_utils import unused_port + +import yaml + +from click.testing import CliRunner + +from swh.storage.cli import storage as cli + + +logger = logging.getLogger(__name__) + + +CLI_CONFIG = { + "storage": {"cls": "memory",}, +} + + +def invoke(*args, env=None): + config = copy.deepcopy(CLI_CONFIG) + runner = CliRunner() + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + yaml.dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + list(args) + return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) + + +@contextmanager +def check_rpc_serve(): + # this context manager adds, if needed, a /quit route to the flask application that + # uses the werzeuk tric to exit the test server started by app.run() + # + # The /testing/ gathering code is executed in a thread while the main thread runs + # the test server. Results of the tests consist in a list of requests Response + # objects stored in the `response` shared list. + # + # This convoluted execution code is needed because the flask app needs to run in the + # main thread. + # + # The context manager will yield the port on which tests (GET queries) will be done, + # so the test function should start the RPC server on this port in the body of the + # context manager. + + from swh.storage.api.server import app + from flask import request + + if "/quit" not in [r.rule for r in app.url_map.iter_rules()]: + + @app.route("/quit") + def quit_app(): + request.environ["werkzeug.server.shutdown"]() + return "Bye" + + port = unused_port() + responses = [] + + def run_tests(): + # we do run the "test" part in the thread because flask does not like the + # app.run() to be executed in a (non-main) thread + def get(path): + for i in range(5): + try: + resp = requests.get(f"http://127.0.0.1:{port}{path}") + break + except requests.exceptions.ConnectionError: + time.sleep(0.2) + responses.append(resp) + + get("/") # ensure the server starts and can reply the '/' path + + get("/quit") # ask the test server to quit gracefully + + t = threading.Thread(target=run_tests) + t.start() + yield port # this is where the caller should start the server listening on "port" + + # we expect to reach this point because the /quit endpoint should have been called, + # thus the server, executed in the caller's context manager's body, should now + # return + t.join() + + # check the GET requests we made in the thread have expected results + assert len(responses) == 2 + assert responses[0].status_code == 200 + assert "Software Heritage storage server" in responses[0].text + assert responses[1].status_code == 200 + assert responses[1].text == "Bye" + + +def test_rpc_serve(): + with check_rpc_serve() as port: + invoke("rpc-serve", "--host", "127.0.0.1", "--port", port) + + +def test_rpc_serve_bwcompat(): + def invoke(*args, env=None): + config = copy.deepcopy(CLI_CONFIG) + runner = CliRunner() + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + yaml.dump(config, config_fd) + config_fd.seek(0) + args = list(args) + [config_fd.name] + return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) + + with check_rpc_serve() as port: + invoke("rpc-serve", "--host", "127.0.0.1", "--port", port)