Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/pytest_plugin.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import glob | import glob | ||||
from os import environ, path | from os import environ, path | ||||
import subprocess | |||||
from typing import Union | from typing import Union | ||||
import pytest | import pytest | ||||
from pytest_postgresql import factories | from pytest_postgresql import factories | ||||
from pytest_postgresql.janitor import DatabaseJanitor, Version, psycopg2 | from pytest_postgresql.janitor import DatabaseJanitor, Version, psycopg2 | ||||
from swh.core.utils import numfile_sortkey as sortkey | from swh.core.utils import numfile_sortkey as sortkey | ||||
import swh.storage | import swh.storage | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | def __init__( | ||||
db_name: str, | db_name: str, | ||||
version: Union[str, float, Version], | version: Union[str, float, Version], | ||||
dump_files: str = DUMP_FILES, | dump_files: str = DUMP_FILES, | ||||
) -> None: | ) -> None: | ||||
super().__init__(user, host, port, db_name, version) | super().__init__(user, host, port, db_name, version) | ||||
self.dump_files = sorted(glob.glob(dump_files), key=sortkey) | self.dump_files = sorted(glob.glob(dump_files), key=sortkey) | ||||
def db_setup(self): | def db_setup(self): | ||||
with psycopg2.connect( | conninfo = ( | ||||
dbname=self.db_name, user=self.user, host=self.host, port=self.port, | f"host={self.host} user={self.user} port={self.port} dbname={self.db_name}" | ||||
) as cnx: | ) | ||||
with cnx.cursor() as cur: | |||||
for fname in self.dump_files: | for fname in self.dump_files: | ||||
with open(fname) as fobj: | subprocess.check_call( | ||||
sql = fobj.read().replace("concurrently", "").strip() | [ | ||||
if sql: | "psql", | ||||
cur.execute(sql) | "--quiet", | ||||
cnx.commit() | "--no-psqlrc", | ||||
"-v", | |||||
"ON_ERROR_STOP=1", | |||||
"-d", | |||||
conninfo, | |||||
"-f", | |||||
fname, | |||||
] | |||||
) | |||||
def db_reset(self): | def db_reset(self): | ||||
with psycopg2.connect( | with psycopg2.connect( | ||||
dbname=self.db_name, user=self.user, host=self.host, port=self.port, | dbname=self.db_name, user=self.user, host=self.host, port=self.port, | ||||
) as cnx: | ) as cnx: | ||||
with cnx.cursor() as cur: | with cnx.cursor() as cur: | ||||
cur.execute( | cur.execute( | ||||
"SELECT table_name FROM information_schema.tables " | "SELECT table_name FROM information_schema.tables " | ||||
▲ Show 20 Lines • Show All 62 Lines • Show Last 20 Lines |