diff --git a/swh/core/db/tests/test_cli.py b/swh/core/db/tests/test_cli.py index a29ebf2..a0c8a29 100644 --- a/swh/core/db/tests/test_cli.py +++ b/swh/core/db/tests/test_cli.py @@ -1,336 +1,359 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import os import traceback import pytest import yaml from swh.core.cli.db import db as swhdb from swh.core.db import BaseDb from swh.core.db.db_utils import import_swhmodule, swh_db_module, swh_db_version from swh.core.tests.test_cli import assert_section_contains def test_cli_swh_help(swhmain, cli_runner): swhmain.add_command(swhdb) result = cli_runner.invoke(swhmain, ["-h"]) assert result.exit_code == 0 assert_section_contains( result.output, "Commands", "db Software Heritage database generic tools." ) help_db_snippets = ( ( "Usage", ( "Usage: swh db [OPTIONS] COMMAND [ARGS]...", "Software Heritage database generic tools.", ), ), ( "Commands", ( "create Create a database for the Software Heritage .", "init Initialize a database for the Software Heritage .", "init-admin Execute superuser-level initialization steps", ), ), ) def test_cli_swh_db_help(swhmain, cli_runner): swhmain.add_command(swhdb) result = cli_runner.invoke(swhmain, ["db", "-h"]) assert result.exit_code == 0 for section, snippets in help_db_snippets: for snippet in snippets: assert_section_contains(result.output, section, snippet) @pytest.fixture def swh_db_cli(cli_runner, monkeypatch, postgresql): """This initializes a cli_runner and sets the correct environment variable expected by the cli to run appropriately (when not specifying the --dbname flag) """ db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) return cli_runner, db_params def craft_conninfo(test_db, dbname=None) -> str: """Craft conninfo string out of the test_db object. This also allows to override the dbname.""" db_params = test_db.get_dsn_parameters() if dbname: params = copy.deepcopy(db_params) params["dbname"] = dbname else: params = db_params return "postgresql://{user}@{host}:{port}/{dbname}".format(**params) def test_cli_swh_db_create_and_init_db(cli_runner, postgresql, mock_import_swhmodule): """Create a db then initializing it should be ok""" module_name = "test.cli" conninfo = craft_conninfo(postgresql, "new-db") # This creates the db and installs the necessary admin extensions result = cli_runner.invoke(swhdb, ["create", module_name, "--dbname", conninfo]) assert result.exit_code == 0, f"Unexpected output: {result.output}" # This initializes the schema and data result = cli_runner.invoke(swhdb, ["init", module_name, "--dbname", conninfo]) assert result.exit_code == 0, f"Unexpected output: {result.output}" # the origin value in the scripts uses a hash function (which implementation wise # uses a function from the pgcrypt extension, installed during db creation step) with BaseDb.connect(conninfo).cursor() as cur: cur.execute("select * from origin") origins = cur.fetchall() assert len(origins) == 1 def test_cli_swh_db_initialization_fail_without_creation_first( cli_runner, postgresql, mock_import_swhmodule ): """Init command on an inexisting db cannot work""" module_name = "test.cli" # it's mocked here conninfo = craft_conninfo(postgresql, "inexisting-db") result = cli_runner.invoke(swhdb, ["init", module_name, "--dbname", conninfo]) # Fails because we cannot connect to an inexisting db assert result.exit_code == 1, f"Unexpected output: {result.output}" def test_cli_swh_db_initialization_fail_without_extension( cli_runner, postgresql, mock_import_swhmodule ): """Init command cannot work without privileged extension. In this test, the schema needs privileged extension to work. """ module_name = "test.cli" # it's mocked here conninfo = craft_conninfo(postgresql) result = cli_runner.invoke(swhdb, ["init", module_name, "--dbname", conninfo]) # Fails as the function `public.digest` is not installed, init-admin calls is needed # first (the next tests show such behavior) assert result.exit_code == 1, f"Unexpected output: {result.output}" def test_cli_swh_db_initialization_works_with_flags( cli_runner, postgresql, mock_import_swhmodule ): """Init commands with carefully crafted libpq conninfo works""" module_name = "test.cli" # it's mocked here conninfo = craft_conninfo(postgresql) result = cli_runner.invoke(swhdb, ["init-admin", module_name, "--dbname", conninfo]) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke(swhdb, ["init", module_name, "--dbname", conninfo]) assert result.exit_code == 0, f"Unexpected output: {result.output}" # the origin values in the scripts uses a hash function (which implementation wise # uses a function from the pgcrypt extension, init-admin calls installs it) with BaseDb.connect(postgresql.dsn).cursor() as cur: cur.execute("select * from origin") origins = cur.fetchall() assert len(origins) == 1 def test_cli_swh_db_initialization_with_env( swh_db_cli, mock_import_swhmodule, postgresql ): """Init commands with standard environment variables works""" module_name = "test.cli" # it's mocked here cli_runner, db_params = swh_db_cli result = cli_runner.invoke( swhdb, ["init-admin", module_name, "--dbname", db_params["dbname"]] ) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke( swhdb, ["init", module_name, "--dbname", db_params["dbname"]] ) assert result.exit_code == 0, f"Unexpected output: {result.output}" # the origin values in the scripts uses a hash function (which implementation wise # uses a function from the pgcrypt extension, init-admin calls installs it) with BaseDb.connect(postgresql.dsn).cursor() as cur: cur.execute("select * from origin") origins = cur.fetchall() assert len(origins) == 1 def test_cli_swh_db_initialization_idempotent( swh_db_cli, mock_import_swhmodule, postgresql ): """Multiple runs of the init commands are idempotent""" module_name = "test.cli" # mocked cli_runner, db_params = swh_db_cli result = cli_runner.invoke( swhdb, ["init-admin", module_name, "--dbname", db_params["dbname"]] ) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke( swhdb, ["init", module_name, "--dbname", db_params["dbname"]] ) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke( swhdb, ["init-admin", module_name, "--dbname", db_params["dbname"]] ) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke( swhdb, ["init", module_name, "--dbname", db_params["dbname"]] ) assert result.exit_code == 0, f"Unexpected output: {result.output}" # the origin values in the scripts uses a hash function (which implementation wise # uses a function from the pgcrypt extension, init-admin calls installs it) with BaseDb.connect(postgresql.dsn).cursor() as cur: cur.execute("select * from origin") origins = cur.fetchall() assert len(origins) == 1 def test_cli_swh_db_create_and_init_db_new_api( cli_runner, postgresql, mock_import_swhmodule, mocker, tmp_path ): """Create a db then initializing it should be ok for a "new style" datastore""" module_name = "test.cli_new" conninfo = craft_conninfo(postgresql) # This initializes the schema and data cfgfile = tmp_path / "config.yml" cfgfile.write_text(yaml.dump({module_name: {"cls": "postgresql", "db": conninfo}})) result = cli_runner.invoke(swhdb, ["init-admin", module_name, "--dbname", conninfo]) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke(swhdb, ["-C", cfgfile, "init", module_name]) assert ( result.exit_code == 0 ), f"Unexpected output: {traceback.print_tb(result.exc_info[2])}" # the origin value in the scripts uses a hash function (which implementation wise # uses a function from the pgcrypt extension, installed during db creation step) with BaseDb.connect(conninfo).cursor() as cur: cur.execute("select * from origin") origins = cur.fetchall() assert len(origins) == 1 def test_cli_swh_db_upgrade_new_api(cli_runner, postgresql, datadir, mocker, tmp_path): """Upgrade scenario for a "new style" datastore""" module_name = "test.cli_new" # the `current_version` variable is the version that will be returned by # any call to `get_current_version()` in this test session, thanks to the # local mocked version of import_swhmodule() below. current_version = 1 # custom version of the mockup to make it easy to change the # current_version returned by get_current_version() # TODO: find a better solution for this... def import_swhmodule_mock(modname): if modname.startswith("test."): dirname = modname.split(".", 1)[1] def get_datastore(cls, **kw): return mocker.MagicMock(current_version=current_version) return mocker.MagicMock( __name__=modname, __file__=os.path.join(datadir, dirname, "__init__.py"), name=modname, get_datastore=get_datastore, ) return import_swhmodule(modname) mocker.patch("swh.core.db.db_utils.import_swhmodule", import_swhmodule_mock) conninfo = craft_conninfo(postgresql) # This initializes the schema and data cfgfile = tmp_path / "config.yml" cfgfile.write_text(yaml.dump({module_name: {"cls": "postgresql", "db": conninfo}})) result = cli_runner.invoke(swhdb, ["init-admin", module_name, "--dbname", conninfo]) assert result.exit_code == 0, f"Unexpected output: {result.output}" result = cli_runner.invoke(swhdb, ["-C", cfgfile, "init", module_name]) assert ( result.exit_code == 0 ), f"Unexpected output: {traceback.print_tb(result.exc_info[2])}" assert swh_db_version(conninfo) == 1 # the upgrade should not do anything because the datastore does advertise # version 1 result = cli_runner.invoke(swhdb, ["-C", cfgfile, "upgrade", module_name]) assert swh_db_version(conninfo) == 1 # advertise current version as 3, a simple upgrade should get us there, but # no further current_version = 3 result = cli_runner.invoke(swhdb, ["-C", cfgfile, "upgrade", module_name]) assert swh_db_version(conninfo) == 3 # an attempt to go further should not do anything result = cli_runner.invoke( swhdb, ["-C", cfgfile, "upgrade", module_name, "--to-version", 5] ) assert swh_db_version(conninfo) == 3 # an attempt to go lower should not do anything result = cli_runner.invoke( swhdb, ["-C", cfgfile, "upgrade", module_name, "--to-version", 2] ) assert swh_db_version(conninfo) == 3 # advertise current version as 6, an upgrade with --to-version 4 should # stick to the given version 4 and no further current_version = 6 result = cli_runner.invoke( swhdb, ["-C", cfgfile, "upgrade", module_name, "--to-version", 4] ) assert swh_db_version(conninfo) == 4 assert "migration was not complete" in result.output # attempt to upgrade to a newer version than current code version fails result = cli_runner.invoke( swhdb, ["-C", cfgfile, "upgrade", module_name, "--to-version", current_version + 1], ) assert result.exit_code != 0 assert swh_db_version(conninfo) == 4 cnx = BaseDb.connect(conninfo) with cnx.transaction() as cur: cur.execute("drop table dbmodule") assert swh_db_module(conninfo) is None # db migration should recreate the missing dbmodule table result = cli_runner.invoke(swhdb, ["-C", cfgfile, "upgrade", module_name]) assert result.exit_code == 0 assert "Warning: the database does not have a dbmodule table." in result.output assert ( "Write the module information (test.cli_new) in the database? [Y/n]" in result.output ) assert swh_db_module(conninfo) == module_name + + +def test_cli_swh_db_version(swh_db_cli, mock_import_swhmodule, postgresql): + module_name = "test.cli" + cli_runner, db_params = swh_db_cli + + conninfo = craft_conninfo(postgresql, "test-db-version") + # This creates the db and installs the necessary admin extensions + result = cli_runner.invoke(swhdb, ["create", module_name, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + # This initializes the schema and data + result = cli_runner.invoke(swhdb, ["init", module_name, "--dbname", conninfo]) + + actual_db_version = swh_db_version(conninfo) + + with BaseDb.connect(conninfo).cursor() as cur: + cur.execute("select version from dbversion order by version desc limit 1") + expected_version = cur.fetchone()[0] + assert actual_db_version == expected_version + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + assert f"initialized at version {expected_version}" in result.output