diff --git a/requirements-db-pytestplugin.txt b/requirements-db-pytestplugin.txt index a15d41b..815fedd 100644 --- a/requirements-db-pytestplugin.txt +++ b/requirements-db-pytestplugin.txt @@ -1,2 +1,2 @@ # requirements for swh.core.db.pytest_plugin -pytest-postgresql < 4.0.0 # version 4.0 depends on psycopg 3. https://github.com/ClearcodeHQ/pytest-postgresql/blob/main/CHANGES.rst#400 +pytest-postgresql >=3, < 4.0.0 # version 4.0 depends on psycopg 3. https://github.com/ClearcodeHQ/pytest-postgresql/blob/main/CHANGES.rst#400 diff --git a/swh/core/db/pytest_plugin.py b/swh/core/db/pytest_plugin.py index 0792d0b..9e5b2cb 100644 --- a/swh/core/db/pytest_plugin.py +++ b/swh/core/db/pytest_plugin.py @@ -1,190 +1,282 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from importlib import import_module import logging import subprocess -from typing import List, Optional, Set, Union +from typing import Callable, Iterable, Iterator, List, Optional, Sequence, Set, Union from _pytest.fixtures import FixtureRequest import psycopg2 import pytest +from pytest_postgresql.compat import check_for_psycopg2, connection +from pytest_postgresql.executor import PostgreSQLExecutor +from pytest_postgresql.executor_noop import NoopExecutor from pytest_postgresql.janitor import DatabaseJanitor -from swh.core.utils import numfile_sortkey as sortkey +from swh.core.db.db_utils import ( + init_admin_extensions, + populate_database_for_package, + swh_set_db_version, +) +from swh.core.utils import basename_sortkey # to keep mypy happy regardless pytest-postgresql version try: _pytest_pgsql_get_config_module = import_module("pytest_postgresql.config") except ImportError: # pytest_postgresql < 3.0.0 _pytest_pgsql_get_config_module = import_module("pytest_postgresql.factories") _pytest_postgresql_get_config = getattr(_pytest_pgsql_get_config_module, "get_config") logger = logging.getLogger(__name__) class SWHDatabaseJanitor(DatabaseJanitor): """SWH database janitor implementation with a a different setup/teardown policy than than the stock one. Instead of dropping, creating and initializing the database for each test, it creates and initializes the db once, then truncates the tables (and sequences) in between tests. This is needed to have acceptable test performances. """ def __init__( self, user: str, host: str, - port: str, + port: int, dbname: str, version: Union[str, float], - dump_files: Union[None, str, List[str]] = None, + password: Optional[str] = None, + isolation_level: Optional[int] = None, + connection_timeout: int = 60, + dump_files: Optional[Union[str, Sequence[str]]] = None, no_truncate_tables: Set[str] = set(), + no_db_drop: bool = False, ) -> None: super().__init__(user, host, port, dbname, version) - if not hasattr(self, "dbname") and hasattr(self, "db_name"): - # pytest_postgresql < 3.0.0 - self.dbname = getattr(self, "db_name") - if dump_files is None: - self.dump_files = [] - elif isinstance(dump_files, str): - self.dump_files = sorted(glob.glob(dump_files), key=sortkey) - else: - self.dump_files = dump_files # do no truncate the following tables self.no_truncate_tables = set(no_truncate_tables) + self.no_db_drop = no_db_drop + self.dump_files = dump_files - def db_setup(self): + def psql_exec(self, fname: str) -> None: conninfo = ( f"host={self.host} user={self.user} port={self.port} dbname={self.dbname}" ) - for fname in self.dump_files: - subprocess.check_call( - [ - "psql", - "--quiet", - "--no-psqlrc", - "-v", - "ON_ERROR_STOP=1", - "-d", - conninfo, - "-f", - fname, - ] - ) + subprocess.check_call( + [ + "psql", + "--quiet", + "--no-psqlrc", + "-v", + "ON_ERROR_STOP=1", + "-d", + conninfo, + "-f", + fname, + ] + ) - def db_reset(self): + def db_reset(self) -> None: """Truncate tables (all but self.no_truncate_tables set) and sequences """ with psycopg2.connect( dbname=self.dbname, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ("public",), ) all_tables = set(table for (table,) in cur.fetchall()) tables_to_truncate = all_tables - self.no_truncate_tables for table in tables_to_truncate: cur.execute("TRUNCATE TABLE %s CASCADE" % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ("public",), ) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute("ALTER SEQUENCE %s RESTART;" % seq) cnx.commit() - def init(self): - """Initialize db. Create the db if it does not exist. Reset it if it exists.""" - with self.cursor() as cur: - cur.execute( - "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.dbname,) - ) - db_exists = cur.fetchone()[0] == 1 - if db_exists: - cur.execute( - "UPDATE pg_database SET datallowconn=true WHERE datname = %s;", - (self.dbname,), - ) - self.db_reset() - return + def _db_exists(self, cur, dbname): + cur.execute( + "SELECT EXISTS " + "(SELECT datname FROM pg_catalog.pg_database WHERE datname= %s);", + (dbname,), + ) + row = cur.fetchone() + return (row is not None) and row[0] - # initialize the inexistent db + def init(self) -> None: + """Create database in postgresql out of a template it if it exists, bare + creation otherwise.""" + template_name = f"{self.dbname}_tmpl" + logger.debug("Initialize DB %s", self.dbname) with self.cursor() as cur: - cur.execute('CREATE DATABASE "{}";'.format(self.dbname)) - self.db_setup() - - def drop(self): - """The original DatabaseJanitor implementation prevents new connections from happening, - destroys current opened connections and finally drops the database. - - We actually do not want to drop the db so we instead do nothing and resets - (truncate most tables and sequences) the db instead, in order to have some - acceptable performance. + tmpl_exists = self._db_exists(cur, template_name) + db_exists = self._db_exists(cur, self.dbname) + if not db_exists: + if tmpl_exists: + logger.debug( + "Create %s from template %s", self.dbname, template_name + ) + cur.execute( + f'CREATE DATABASE "{self.dbname}" TEMPLATE "{template_name}";' + ) + else: + logger.debug("Create %s from scratch", self.dbname) + cur.execute(f'CREATE DATABASE "{self.dbname}";') + if self.dump_files: + logger.warning( + "Using dump_files on the postgresql_fact fixture " + "is deprecated. See swh.core documentation for more " + "details." + ) + for dump_file in gen_dump_files(self.dump_files): + logger.info(f"Loading {dump_file}") + self.psql_exec(dump_file) + else: + logger.debug("Reset %s", self.dbname) + self.db_reset() - """ - pass + def drop(self) -> None: + """Drop database in postgresql.""" + if self.no_db_drop: + with self.cursor() as cur: + self._terminate_connection(cur, self.dbname) + else: + super().drop() # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact( process_fixture_name: str, dbname: Optional[str] = None, - dump_files: Union[str, List[str]] = "", + load: Optional[Sequence[Union[Callable, str]]] = None, + isolation_level: Optional[int] = None, + modname: Optional[str] = None, + dump_files: Optional[Union[str, List[str]]] = None, no_truncate_tables: Set[str] = {"dbversion"}, -): + no_db_drop: bool = False, +) -> Callable[[FixtureRequest], Iterator[connection]]: + """ + Return connection fixture factory for PostgreSQL. + + :param process_fixture_name: name of the process fixture + :param dbname: database name + :param load: SQL, function or function import paths to automatically load + into our test database + :param isolation_level: optional postgresql isolation level + defaults to server's default + :param modname: (swh) module name for which the database is created + :dump_files: (deprecated, use load instead) list of sql script files to + execute after the database has been created + :no_truncate_tables: list of table not to truncate between tests (only used + when no_db_drop is True) + :no_db_drop: if True, keep the database between tests; in which case, the + database is reset (see SWHDatabaseJanitor.db_reset()) by truncating + most of the tables. Note that this makes de facto tests (potentially) + interdependent, use with extra caution. + :returns: function which makes a connection to postgresql + """ + @pytest.fixture - def postgresql_factory(request: FixtureRequest): - """Fixture factory for PostgreSQL. + def postgresql_factory(request: FixtureRequest) -> Iterator[connection]: + """ + Fixture factory for PostgreSQL. - :param FixtureRequest request: fixture request object - :rtype: psycopg2.connection + :param request: fixture request object :returns: postgresql client """ - config = _pytest_postgresql_get_config(request) - proc_fixture = request.getfixturevalue(process_fixture_name) + check_for_psycopg2() + proc_fixture: Union[PostgreSQLExecutor, NoopExecutor] = request.getfixturevalue( + process_fixture_name + ) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user + pg_password = proc_fixture.password pg_options = proc_fixture.options - pg_db = dbname or config["dbname"] + pg_db = dbname or proc_fixture.dbname + pg_load = load or [] + assert pg_db is not None + with SWHDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, + pg_password, + isolation_level=isolation_level, dump_files=dump_files, no_truncate_tables=no_truncate_tables, - ): - connection = psycopg2.connect( + no_db_drop=no_db_drop, + ) as janitor: + db_connection: connection = psycopg2.connect( dbname=pg_db, user=pg_user, + password=pg_password, host=pg_host, port=pg_port, options=pg_options, ) - yield connection - connection.close() + for load_element in pg_load: + janitor.load(load_element) + try: + yield db_connection + finally: + db_connection.close() return postgresql_factory + + +def initialize_database_for_module(modname, version, **kwargs): + conninfo = psycopg2.connect(**kwargs).dsn + init_admin_extensions(modname, conninfo) + populate_database_for_package(modname, conninfo) + try: + swh_set_db_version(conninfo, version) + except psycopg2.errors.UniqueViolation: + logger.warn( + "Version already set by db init scripts. " + "This generally means the swh.{modname} package needs to be " + "updated for swh.core>=1.2" + ) + + +def gen_dump_files(dump_files: Union[str, Iterable[str]]) -> Iterator[str]: + """Generate files potentially resolving glob patterns if any + + """ + if isinstance(dump_files, str): + dump_files = [dump_files] + for dump_file in dump_files: + if glob.has_magic(dump_file): + # if the dump_file is a glob pattern one, resolve it + yield from ( + fname for fname in sorted(glob.glob(dump_file), key=basename_sortkey) + ) + else: + # otherwise, just return the filename + yield dump_file diff --git a/swh/core/db/tests/pytest_plugin/test_pytest_plugin.py b/swh/core/db/tests/pytest_plugin/test_pytest_plugin.py index 84b0039..67a3fb5 100644 --- a/swh/core/db/tests/pytest_plugin/test_pytest_plugin.py +++ b/swh/core/db/tests/pytest_plugin/test_pytest_plugin.py @@ -1,173 +1,185 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import os +from pytest_postgresql import factories + from swh.core.db import BaseDb -from swh.core.db.pytest_plugin import postgresql_fact +from swh.core.db.pytest_plugin import gen_dump_files, postgresql_fact SQL_DIR = os.path.join(os.path.dirname(__file__), "data") +test_postgresql_proc = factories.postgresql_proc( + dbname="fun", + load=sorted(glob.glob(f"{SQL_DIR}/*.sql")), # type: ignore[arg-type] + # type ignored because load is typed as Optional[List[...]] instead of an + # Optional[Sequence[...]] in pytest_postgresql<4 +) # db with special policy for tables dbversion and people postgres_fun = postgresql_fact( - "postgresql_proc", - dbname="fun", - dump_files=f"{SQL_DIR}/*.sql", - no_truncate_tables={"dbversion", "people"}, + "test_postgresql_proc", no_db_drop=True, no_truncate_tables={"dbversion", "people"}, ) postgres_fun2 = postgresql_fact( - "postgresql_proc", + "test_postgresql_proc", dbname="fun2", - dump_files=sorted(glob.glob(f"{SQL_DIR}/*.sql")), + load=sorted(glob.glob(f"{SQL_DIR}/*.sql")), no_truncate_tables={"dbversion", "people"}, + no_db_drop=True, ) def test_smoke_test_fun_db_is_up(postgres_fun): """This ensures the db is created and configured according to its dumps files. """ with BaseDb.connect(postgres_fun.dsn).cursor() as cur: cur.execute("select count(*) from dbversion") nb_rows = cur.fetchone()[0] assert nb_rows == 5 cur.execute("select count(*) from fun") nb_rows = cur.fetchone()[0] assert nb_rows == 3 cur.execute("select count(*) from people") nb_rows = cur.fetchone()[0] assert nb_rows == 2 # in data, we requested a value already so it starts at 2 cur.execute("select nextval('serial')") val = cur.fetchone()[0] assert val == 2 def test_smoke_test_fun2_db_is_up(postgres_fun2): """This ensures the db is created and configured according to its dumps files. """ with BaseDb.connect(postgres_fun2.dsn).cursor() as cur: cur.execute("select count(*) from dbversion") nb_rows = cur.fetchone()[0] assert nb_rows == 5 cur.execute("select count(*) from fun") nb_rows = cur.fetchone()[0] assert nb_rows == 3 cur.execute("select count(*) from people") nb_rows = cur.fetchone()[0] assert nb_rows == 2 # in data, we requested a value already so it starts at 2 cur.execute("select nextval('serial')") val = cur.fetchone()[0] assert val == 2 def test_smoke_test_fun_db_is_still_up_and_got_reset(postgres_fun): """This ensures that within another tests, the 'fun' db is still up, created (and not configured again). This time, most of the data has been reset: - except for tables 'dbversion' and 'people' which were left as is - the other tables from the schema (here only "fun") got truncated - the sequences got truncated as well """ with BaseDb.connect(postgres_fun.dsn).cursor() as cur: # db version is excluded from the truncate cur.execute("select count(*) from dbversion") nb_rows = cur.fetchone()[0] assert nb_rows == 5 # people is also allowed not to be truncated cur.execute("select count(*) from people") nb_rows = cur.fetchone()[0] assert nb_rows == 2 # table and sequence are reset cur.execute("select count(*) from fun") nb_rows = cur.fetchone()[0] assert nb_rows == 0 cur.execute("select nextval('serial')") val = cur.fetchone()[0] assert val == 1 # db with no special policy for tables truncation, all tables are reset postgres_people = postgresql_fact( "postgresql_proc", dbname="people", dump_files=f"{SQL_DIR}/*.sql", no_truncate_tables=set(), + no_db_drop=True, ) +def test_gen_dump_files(): + files = [os.path.basename(fn) for fn in gen_dump_files(f"{SQL_DIR}/*.sql")] + assert files == ["0-schema.sql", "1-data.sql"] + + def test_smoke_test_people_db_up(postgres_people): """'people' db is up and configured """ with BaseDb.connect(postgres_people.dsn).cursor() as cur: cur.execute("select count(*) from dbversion") nb_rows = cur.fetchone()[0] assert nb_rows == 5 cur.execute("select count(*) from people") nb_rows = cur.fetchone()[0] assert nb_rows == 2 cur.execute("select count(*) from fun") nb_rows = cur.fetchone()[0] assert nb_rows == 3 cur.execute("select nextval('serial')") val = cur.fetchone()[0] assert val == 2 def test_smoke_test_people_db_up_and_reset(postgres_people): """'people' db is up and got reset on every tables and sequences """ with BaseDb.connect(postgres_people.dsn).cursor() as cur: # tables are truncated after the first round cur.execute("select count(*) from dbversion") nb_rows = cur.fetchone()[0] assert nb_rows == 0 # tables are truncated after the first round cur.execute("select count(*) from people") nb_rows = cur.fetchone()[0] assert nb_rows == 0 # table and sequence are reset cur.execute("select count(*) from fun") nb_rows = cur.fetchone()[0] assert nb_rows == 0 cur.execute("select nextval('serial')") val = cur.fetchone()[0] assert val == 1 # db with no initialization step, an empty db postgres_no_init = postgresql_fact("postgresql_proc", dbname="something") def test_smoke_test_db_no_init(postgres_no_init): """We can connect to the db nonetheless """ with BaseDb.connect(postgres_no_init.dsn).cursor() as cur: cur.execute("select now()") data = cur.fetchone()[0] assert data is not None diff --git a/swh/core/tests/test_utils.py b/swh/core/tests/test_utils.py index 3a7c501..1933d38 100644 --- a/swh/core/tests/test_utils.py +++ b/swh/core/tests/test_utils.py @@ -1,133 +1,138 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.core import utils def test_grouper(): # given actual_data = utils.grouper((i for i in range(0, 9)), 2) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks assert out == [[0, 1], [2, 3], [4, 5], [6, 7], [8]] # given actual_data = utils.grouper((i for i in range(9, 0, -1)), 4) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks assert out == [[9, 8, 7, 6], [5, 4, 3, 2], [1]] def test_grouper_with_stop_value(): # given actual_data = utils.grouper(((i, i + 1) for i in range(0, 9)), 2) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks assert out == [ [(0, 1), (1, 2)], [(2, 3), (3, 4)], [(4, 5), (5, 6)], [(6, 7), (7, 8)], [(8, 9)], ] # given actual_data = utils.grouper((i for i in range(9, 0, -1)), 4) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks assert out == [[9, 8, 7, 6], [5, 4, 3, 2], [1]] def test_backslashescape_errors(): raw_data_err = b"abcd\x80" with pytest.raises(UnicodeDecodeError): raw_data_err.decode("utf-8", "strict") assert raw_data_err.decode("utf-8", "backslashescape") == "abcd\\x80" raw_data_ok = b"abcd\xc3\xa9" assert raw_data_ok.decode("utf-8", "backslashescape") == raw_data_ok.decode( "utf-8", "strict" ) unicode_data = "abcdef\u00a3" assert unicode_data.encode("ascii", "backslashescape") == b"abcdef\\xa3" def test_encode_with_unescape(): valid_data = "\\x01020304\\x00" valid_data_encoded = b"\x01020304\x00" assert valid_data_encoded == utils.encode_with_unescape(valid_data) def test_encode_with_unescape_invalid_escape(): invalid_data = "test\\abcd" with pytest.raises(ValueError) as exc: utils.encode_with_unescape(invalid_data) assert "invalid escape" in exc.value.args[0] assert "position 4" in exc.value.args[0] def test_decode_with_escape(): backslashes = b"foo\\bar\\\\baz" backslashes_escaped = "foo\\\\bar\\\\\\\\baz" assert backslashes_escaped == utils.decode_with_escape(backslashes) valid_utf8 = b"foo\xc3\xa2" valid_utf8_escaped = "foo\u00e2" assert valid_utf8_escaped == utils.decode_with_escape(valid_utf8) invalid_utf8 = b"foo\xa2" invalid_utf8_escaped = "foo\\xa2" assert invalid_utf8_escaped == utils.decode_with_escape(invalid_utf8) valid_utf8_nul = b"foo\xc3\xa2\x00" valid_utf8_nul_escaped = "foo\u00e2\\x00" assert valid_utf8_nul_escaped == utils.decode_with_escape(valid_utf8_nul) def test_commonname(): # when actual_commonname = utils.commonname("/some/where/to/", "/some/where/to/go/to") # then assert "go/to" == actual_commonname # when actual_commonname2 = utils.commonname(b"/some/where/to/", b"/some/where/to/go/to") # then assert b"go/to" == actual_commonname2 def test_numfile_sotkey(): assert utils.numfile_sortkey("00-xxx.sql") == (0, "-xxx.sql") assert utils.numfile_sortkey("01-xxx.sql") == (1, "-xxx.sql") assert utils.numfile_sortkey("10-xxx.sql") == (10, "-xxx.sql") assert utils.numfile_sortkey("99-xxx.sql") == (99, "-xxx.sql") assert utils.numfile_sortkey("100-xxx.sql") == (100, "-xxx.sql") assert utils.numfile_sortkey("00100-xxx.sql") == (100, "-xxx.sql") assert utils.numfile_sortkey("1.sql") == (1, ".sql") assert utils.numfile_sortkey("1") == (1, "") assert utils.numfile_sortkey("toto-01.sql") == (999999, "toto-01.sql") + + +def test_basename_sotkey(): + assert utils.basename_sortkey("00-xxx.sql") == (0, "-xxx.sql") + assert utils.basename_sortkey("path/to/00-xxx.sql") == (0, "-xxx.sql") diff --git a/swh/core/utils.py b/swh/core/utils.py index 60a35ea..79f41cd 100644 --- a/swh/core/utils.py +++ b/swh/core/utils.py @@ -1,132 +1,137 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import codecs from contextlib import contextmanager import itertools import os import re from typing import Tuple @contextmanager def cwd(path): """Contextually change the working directory to do thy bidding. Then gets back to the original location. """ prev_cwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(prev_cwd) def grouper(iterable, n): """Collect data into fixed-length size iterables. The last block might contain less elements as it will hold only the remaining number of elements. The invariant here is that the number of elements in the input iterable and the sum of the number of elements of all iterables generated from this function should be equal. Args: iterable (Iterable): an iterable n (int): size of block to slice the iterable into Yields: fixed-length blocks as iterables. As mentioned, the last iterable might be less populated. """ args = [iter(iterable)] * n stop_value = object() for _data in itertools.zip_longest(*args, fillvalue=stop_value): yield (d for d in _data if d is not stop_value) def backslashescape_errors(exception): if isinstance(exception, UnicodeDecodeError): bad_data = exception.object[exception.start : exception.end] escaped = "".join(r"\x%02x" % x for x in bad_data) return escaped, exception.end return codecs.backslashreplace_errors(exception) codecs.register_error("backslashescape", backslashescape_errors) def encode_with_unescape(value): """Encode an unicode string containing \\x backslash escapes""" slices = [] start = 0 odd_backslashes = False i = 0 while i < len(value): if value[i] == "\\": odd_backslashes = not odd_backslashes else: if odd_backslashes: if value[i] != "x": raise ValueError( "invalid escape for %r at position %d" % (value, i - 1) ) slices.append( value[start : i - 1].replace("\\\\", "\\").encode("utf-8") ) slices.append(bytes.fromhex(value[i + 1 : i + 3])) odd_backslashes = False start = i = i + 3 continue i += 1 slices.append(value[start:i].replace("\\\\", "\\").encode("utf-8")) return b"".join(slices) def decode_with_escape(value): """Decode a bytestring as utf-8, escaping the bytes of invalid utf-8 sequences as \\x. We also escape NUL bytes as they are invalid in JSON strings. """ # escape backslashes value = value.replace(b"\\", b"\\\\") value = value.replace(b"\x00", b"\\x00") return value.decode("utf-8", "backslashescape") def commonname(path0, path1, as_str=False): """Compute the commonname between the path0 and path1. """ return path1.split(path0)[1] def numfile_sortkey(fname: str) -> Tuple[int, str]: """Simple function to sort filenames of the form: nnxxx.ext where nn is a number according to the numbers. Returns a tuple (order, remaining), where 'order' is the numeric (int) value extracted from the file name, and 'remaining' is the remaining part of the file name. Typically used to sort sql/nn-swh-xxx.sql files. Unmatched file names will return 999999 as order value. """ m = re.match(r"(\d*)(.*)", fname) assert m is not None num, rem = m.groups() return (int(num) if num else 999999, rem) + + +def basename_sortkey(fname: str) -> Tuple[int, str]: + "like numfile_sortkey but on basenames" + return numfile_sortkey(os.path.basename(fname))