diff --git a/swh/storage/cli.py b/swh/storage/cli.py index 5d1a7d37..d21b9071 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -1,225 +1,226 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import os from typing import Dict, Optional import click -from swh.core.cli import CONTEXT_SETTINGS, swh as swh_cli_group +from swh.core.cli import CONTEXT_SETTINGS +from swh.core.cli import swh as swh_cli_group try: from systemd.daemon import notify except ImportError: notify = None @swh_cli_group.group(name="storage", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--check-config", default=None, type=click.Choice(["no", "read", "write"]), help=( "Check the configuration of the storage at startup for read or write access; " "if set, override the value present in the configuration file if any. " "Defaults to 'read' for the 'backfill' command, and 'write' for 'rpc-server' " "and 'replay' commands." ), ) @click.pass_context def storage(ctx, config_file, check_config): """Software Heritage Storage tools.""" from swh.core import config if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["check_config"] = check_config @storage.command(name="rpc-serve") @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5002, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, host, port, debug): """Software Heritage Storage RPC server. Do NOT use this in a production environment. """ from swh.storage.api.server import app if "log_level" in ctx.obj: logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") app.config.update(ctx.obj["config"]) app.run(host, port=int(port), debug=bool(debug)) @storage.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfill(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "read") # for "lazy" loading from swh.storage.backfill import JournalBackfiller try: from systemd.daemon import notify except ImportError: notify = None conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: notify("READY=1") try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run, ) except KeyboardInterrupt: if notify: notify("STOPPING=1") ctx.exit(0) @storage.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.pass_context def replay(ctx, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ import functools from swh.journal.client import get_journal_client from swh.storage import get_storage from swh.storage.replay import process_replay_objects ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") conf = ctx.obj["config"] storage = get_storage(**conf.pop("storage")) client_cfg = conf.pop("journal_client") if stop_after_objects: client_cfg["stop_after_objects"] = stop_after_objects try: client = get_journal_client(**client_cfg) except ValueError as exc: ctx.fail(exc) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): """Helper function to inject the setting of check_config option in the storage config dict according to the expected default value (default value depends on the command, eg. backfill can be read-only). """ if check_config is not None: if check_config == "no": storage_cfg.pop("check_config", None) else: storage_cfg["check_config"] = {"check_write": check_config == "write"} else: if "check_config" not in storage_cfg: storage_cfg["check_config"] = {"check_write": default == "write"} def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") if __name__ == "__main__": main() diff --git a/swh/storage/pytest_plugin.py b/swh/storage/pytest_plugin.py index 7c5ec0c7..70e7ac47 100644 --- a/swh/storage/pytest_plugin.py +++ b/swh/storage/pytest_plugin.py @@ -1,201 +1,200 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob from os import environ, path - import subprocess from typing import Union import pytest from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, Version, psycopg2 from swh.core.utils import numfile_sortkey as sortkey import swh.storage from swh.storage import get_storage from swh.storage.tests.storage_data import StorageData SQL_DIR = path.join(path.dirname(swh.storage.__file__), "sql") environ["LC_ALL"] = "C.UTF-8" DUMP_FILES = path.join(SQL_DIR, "*.sql") # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError("No module named psycopg2. Please install it.") proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config["dbname"] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files, ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options, ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact("postgresql_proc", db_name="storage") @pytest.fixture def swh_storage_backend_config(swh_storage_postgresql): """Basic pg storage configuration with no journal collaborator (to avoid pulling optional dependency on clients of this fixture) """ yield { "cls": "local", "db": swh_storage_postgresql.dsn, "objstorage": {"cls": "memory", "args": {}}, "check_config": {"check_write": True}, } @pytest.fixture def swh_storage(swh_storage_backend_config): return get_storage(**swh_storage_backend_config) # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES, ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted(glob.glob(dump_files), key=sortkey) def db_setup(self): conninfo = ( f"host={self.host} user={self.user} port={self.port} dbname={self.db_name}" ) for fname in self.dump_files: subprocess.check_call( [ "psql", "--quiet", "--no-psqlrc", "-v", "ON_ERROR_STOP=1", "-d", conninfo, "-f", fname, ] ) def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ("public",), ) tables = set(table for (table,) in cur.fetchall()) - {"dbversion"} for table in tables: cur.execute("truncate table %s cascade" % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ("public",), ) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute("ALTER SEQUENCE %s RESTART;" % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,) ) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( "UPDATE pg_database SET datallowconn=true " "WHERE datname = %s;", (self.db_name,), ) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = "pid" with self.cursor() as cur: cur.execute( "UPDATE pg_database SET datallowconn=false " "WHERE datname = %s;", (self.db_name,), ) cur.execute( "SELECT pg_terminate_backend(pg_stat_activity.{})" "FROM pg_stat_activity " "WHERE pg_stat_activity.datname = %s;".format(pid_column), (self.db_name,), ) @pytest.fixture def sample_data() -> StorageData: """Pre-defined sample storage object data to manipulate Returns: StorageData whose attribute keys are data model objects. Either multiple objects: contents, directories, revisions, releases, ... or simple ones: content, directory, revision, release, ... """ return StorageData()