Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
index e78b4de8..0d78c906 100644
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -1,105 +1,106 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import importlib
-from typing import Any, Dict, List
+from typing import Any, Dict, List, TYPE_CHECKING
import warnings
-from .interface import StorageInterface
+if TYPE_CHECKING:
+ from .interface import StorageInterface
STORAGE_IMPLEMENTATIONS = {
"local": ".postgresql.storage.Storage",
"remote": ".api.client.RemoteStorage",
"memory": ".in_memory.InMemoryStorage",
"filter": ".filter.FilteringProxyStorage",
"buffer": ".buffer.BufferingProxyStorage",
"retry": ".retry.RetryingProxyStorage",
"cassandra": ".cassandra.CassandraStorage",
"validate": ".validate.ValidatingProxyStorage",
}
-def get_storage(cls: str, **kwargs) -> StorageInterface:
+def get_storage(cls: str, **kwargs) -> "StorageInterface":
"""Get a storage object of class `storage_class` with arguments
`storage_args`.
Args:
storage (dict): dictionary with keys:
- cls (str): storage's class, either local, remote, memory, filter,
buffer
- args (dict): dictionary with keys
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
if cls == "pipeline":
return get_storage_pipeline(**kwargs)
class_path = STORAGE_IMPLEMENTATIONS.get(cls)
if class_path is None:
raise ValueError(
"Unknown storage class `%s`. Supported: %s"
% (cls, ", ".join(STORAGE_IMPLEMENTATIONS))
)
(module_path, class_name) = class_path.rsplit(".", 1)
module = importlib.import_module(module_path, package=__package__)
Storage = getattr(module, class_name)
check_config = kwargs.pop("check_config", {})
storage = Storage(**kwargs)
if check_config:
if not storage.check_config(**check_config):
raise EnvironmentError("storage check config failed")
return storage
def get_storage_pipeline(
steps: List[Dict[str, Any]], check_config=None
-) -> StorageInterface:
+) -> "StorageInterface":
"""Recursively get a storage object that may use other storage objects
as backends.
Args:
steps (List[dict]): List of dicts that may be used as kwargs for
`get_storage`.
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
storage_config = None
for step in reversed(steps):
if "args" in step:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly ' "instead.",
DeprecationWarning,
)
step = {
"cls": step["cls"],
**step["args"],
}
if storage_config:
step["storage"] = storage_config
step["check_config"] = check_config
storage_config = step
if storage_config is None:
raise ValueError("'pipeline' has no steps.")
return get_storage(**storage_config)
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
index a35f5d84..f191e8d3 100644
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -1,221 +1,226 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import functools
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
import logging
import os
from typing import Dict, Optional
import click
-from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
-from swh.journal.client import get_journal_client
-from swh.storage import get_storage
-from swh.storage.api.server import app
try:
from systemd.daemon import notify
except ImportError:
notify = None
@click.group(name="storage", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.",
)
@click.option(
"--check-config",
default=None,
type=click.Choice(["no", "read", "write"]),
help=(
"Check the configuration of the storage at startup for read or write access; "
"if set, override the value present in the configuration file if any. "
"Defaults to 'read' for the 'backfill' command, and 'write' for 'rpc-server' "
"and 'replay' commands."
),
)
@click.pass_context
def storage(ctx, config_file, check_config):
"""Software Heritage Storage tools."""
+ from swh.core import config
+
if not config_file:
config_file = os.environ.get("SWH_CONFIG_FILENAME")
if config_file:
if not os.path.exists(config_file):
raise ValueError("%s does not exist" % config_file)
conf = config.read(config_file)
else:
conf = {}
if "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
ctx.ensure_object(dict)
ctx.obj["config"] = conf
ctx.obj["check_config"] = check_config
@storage.command(name="rpc-serve")
@click.option(
"--host",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="Host ip address to bind the server on",
)
@click.option(
"--port",
default=5002,
type=click.INT,
metavar="PORT",
show_default=True,
help="Binding port of the server",
)
@click.option(
"--debug/--no-debug",
default=True,
help="Indicates if the server should run in debug mode",
)
@click.pass_context
def serve(ctx, host, port, debug):
"""Software Heritage Storage RPC server.
Do NOT use this in a production environment.
"""
+ from swh.storage.api.server import app
+
if "log_level" in ctx.obj:
logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"])
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write")
app.config.update(ctx.obj["config"])
app.run(host, port=int(port), debug=bool(debug))
@storage.command()
@click.argument("object_type")
@click.option("--start-object", default=None)
@click.option("--end-object", default=None)
@click.option("--dry-run", is_flag=True, default=False)
@click.pass_context
def backfill(ctx, object_type, start_object, end_object, dry_run):
"""Run the backfiller
The backfiller list objects from a Storage and produce journal entries from
there.
Typically used to rebuild a journal or compensate for missing objects in a
journal (eg. due to a downtime of this later).
The configuration file requires the following entries:
- brokers: a list of kafka endpoints (the journal) in which entries will be
added.
- storage_dbconn: URL to connect to the storage DB.
- prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
- client_id: the kafka client ID.
"""
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "read")
# for "lazy" loading
from swh.storage.backfill import JournalBackfiller
try:
from systemd.daemon import notify
except ImportError:
notify = None
conf = ctx.obj["config"]
backfiller = JournalBackfiller(conf)
if notify:
notify("READY=1")
try:
backfiller.run(
object_type=object_type,
start_object=start_object,
end_object=end_object,
dry_run=dry_run,
)
except KeyboardInterrupt:
if notify:
notify("STOPPING=1")
ctx.exit(0)
@storage.command()
@click.option(
"--stop-after-objects",
"-n",
default=None,
type=int,
help="Stop after processing this many objects. Default is to " "run forever.",
)
@click.pass_context
def replay(ctx, stop_after_objects):
"""Fill a Storage by reading a Journal.
There can be several 'replayers' filling a Storage as long as they use
the same `group-id`.
"""
+ import functools
+
+ from swh.journal.client import get_journal_client
+ from swh.storage import get_storage
from swh.storage.replay import process_replay_objects
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write")
conf = ctx.obj["config"]
storage = get_storage(**conf.pop("storage"))
client_cfg = conf.pop("journal_client")
if stop_after_objects:
client_cfg["stop_after_objects"] = stop_after_objects
try:
client = get_journal_client(**client_cfg)
except ValueError as exc:
ctx.fail(exc)
worker_fn = functools.partial(process_replay_objects, storage=storage)
if notify:
notify("READY=1")
try:
client.process(worker_fn)
except KeyboardInterrupt:
ctx.exit(0)
else:
print("Done.")
finally:
if notify:
notify("STOPPING=1")
client.close()
def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str):
"""Helper function to inject the setting of check_config option in the storage config
dict according to the expected default value (default value depends on the command,
eg. backfill can be read-only).
"""
if check_config is not None:
if check_config == "no":
storage_cfg.pop("check_config", None)
else:
storage_cfg["check_config"] = {"check_write": check_config == "write"}
else:
if "check_config" not in storage_cfg:
storage_cfg["check_config"] = {"check_write": default == "write"}
def main():
logging.basicConfig()
return serve(auto_envvar_prefix="SWH_STORAGE")
if __name__ == "__main__":
main()
diff --git a/swh/storage/tests/test_cli.py b/swh/storage/tests/test_cli.py
index c51ce877..02d6bd33 100644
--- a/swh/storage/tests/test_cli.py
+++ b/swh/storage/tests/test_cli.py
@@ -1,114 +1,114 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import logging
import re
import tempfile
import yaml
from unittest.mock import patch
import pytest
from click.testing import CliRunner
from confluent_kafka import Producer
from swh.model.model import Snapshot, SnapshotBranch, TargetType
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.storage import get_storage
from swh.storage.cli import storage as cli
logger = logging.getLogger(__name__)
CLI_CONFIG = {
"storage": {"cls": "memory",},
}
@pytest.fixture
def swh_storage():
"""An swh-storage object that gets injected into the CLI functions."""
storage = get_storage(**CLI_CONFIG["storage"])
- with patch("swh.storage.cli.get_storage") as get_storage_mock:
+ with patch("swh.storage.get_storage") as get_storage_mock:
get_storage_mock.return_value = storage
yield storage
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
from swh.journal.replay import copy_object, obj_in_objstorage
monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
def invoke(*args, env=None, journal_config=None):
config = copy.deepcopy(CLI_CONFIG)
if journal_config:
config["journal_client"] = journal_config.copy()
config["journal_client"]["cls"] = "kafka"
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
ret = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
return ret
def test_replay(
swh_storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target_type=TargetType.REVISION, target=b"\x01" * 20,
)
},
)
snapshot_dict = snapshot.to_dict()
producer.produce(
topic=kafka_prefix + ".snapshot",
key=key_to_kafka(snapshot.id),
value=value_to_kafka(snapshot_dict),
)
producer.flush()
logger.debug("Flushed producer")
result = invoke(
"replay",
"--stop-after-objects",
"1",
journal_config={
"brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
assert swh_storage.snapshot_get(snapshot.id) == {
**snapshot_dict,
"next_branch": None,
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:47 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3261731

Event Timeline