Changeset View
Changeset View
Standalone View
Standalone View
swh/core/db/tests/pytest_plugin/test_pytest_plugin.py
Show All 9 Lines | |||||
from swh.core.db.pytest_plugin import postgresql_fact | from swh.core.db.pytest_plugin import postgresql_fact | ||||
SQL_DIR = os.path.join(os.path.dirname(__file__), "data") | SQL_DIR = os.path.join(os.path.dirname(__file__), "data") | ||||
# db with special policy for tables dbversion and people | # db with special policy for tables dbversion and people | ||||
postgres_fun = postgresql_fact( | postgres_fun = postgresql_fact( | ||||
"postgresql_proc", | "postgresql_proc", | ||||
db_name="fun", | dbname="fun", | ||||
dump_files=f"{SQL_DIR}/*.sql", | dump_files=f"{SQL_DIR}/*.sql", | ||||
no_truncate_tables={"dbversion", "people"}, | no_truncate_tables={"dbversion", "people"}, | ||||
) | ) | ||||
postgres_fun2 = postgresql_fact( | postgres_fun2 = postgresql_fact( | ||||
"postgresql_proc", | "postgresql_proc", | ||||
db_name="fun2", | dbname="fun2", | ||||
dump_files=sorted(glob.glob(f"{SQL_DIR}/*.sql")), | dump_files=sorted(glob.glob(f"{SQL_DIR}/*.sql")), | ||||
no_truncate_tables={"dbversion", "people"}, | no_truncate_tables={"dbversion", "people"}, | ||||
) | ) | ||||
def test_smoke_test_fun_db_is_up(postgres_fun): | def test_smoke_test_fun_db_is_up(postgres_fun): | ||||
"""This ensures the db is created and configured according to its dumps files. | """This ensures the db is created and configured according to its dumps files. | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | with BaseDb.connect(postgres_fun.dsn).cursor() as cur: | ||||
cur.execute("select nextval('serial')") | cur.execute("select nextval('serial')") | ||||
val = cur.fetchone()[0] | val = cur.fetchone()[0] | ||||
assert val == 1 | assert val == 1 | ||||
# db with no special policy for tables truncation, all tables are reset | # db with no special policy for tables truncation, all tables are reset | ||||
postgres_people = postgresql_fact( | postgres_people = postgresql_fact( | ||||
"postgresql_proc", | "postgresql_proc", | ||||
db_name="people", | dbname="people", | ||||
dump_files=f"{SQL_DIR}/*.sql", | dump_files=f"{SQL_DIR}/*.sql", | ||||
no_truncate_tables=set(), | no_truncate_tables=set(), | ||||
) | ) | ||||
def test_smoke_test_people_db_up(postgres_people): | def test_smoke_test_people_db_up(postgres_people): | ||||
"""'people' db is up and configured | """'people' db is up and configured | ||||
Show All 37 Lines | with BaseDb.connect(postgres_people.dsn).cursor() as cur: | ||||
assert nb_rows == 0 | assert nb_rows == 0 | ||||
cur.execute("select nextval('serial')") | cur.execute("select nextval('serial')") | ||||
val = cur.fetchone()[0] | val = cur.fetchone()[0] | ||||
assert val == 1 | assert val == 1 | ||||
# db with no initialization step, an empty db | # db with no initialization step, an empty db | ||||
postgres_no_init = postgresql_fact("postgresql_proc", db_name="something") | postgres_no_init = postgresql_fact("postgresql_proc", dbname="something") | ||||
def test_smoke_test_db_no_init(postgres_no_init): | def test_smoke_test_db_no_init(postgres_no_init): | ||||
"""We can connect to the db nonetheless | """We can connect to the db nonetheless | ||||
""" | """ | ||||
with BaseDb.connect(postgres_no_init.dsn).cursor() as cur: | with BaseDb.connect(postgres_no_init.dsn).cursor() as cur: | ||||
cur.execute("select now()") | cur.execute("select now()") | ||||
data = cur.fetchone()[0] | data = cur.fetchone()[0] | ||||
assert data is not None | assert data is not None |