Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312265
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
View Options
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
index e78b4de8..0d78c906 100644
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -1,105 +1,106 @@
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import importlib
-from typing import Any, Dict, List
+from typing import Any, Dict, List, TYPE_CHECKING
import warnings
-from .interface import StorageInterface
+if TYPE_CHECKING:
+ from .interface import StorageInterface
STORAGE_IMPLEMENTATIONS = {
"local": ".postgresql.storage.Storage",
"remote": ".api.client.RemoteStorage",
"memory": ".in_memory.InMemoryStorage",
"filter": ".filter.FilteringProxyStorage",
"buffer": ".buffer.BufferingProxyStorage",
"retry": ".retry.RetryingProxyStorage",
"cassandra": ".cassandra.CassandraStorage",
"validate": ".validate.ValidatingProxyStorage",
}
-def get_storage(cls: str, **kwargs) -> StorageInterface:
+def get_storage(cls: str, **kwargs) -> "StorageInterface":
"""Get a storage object of class `storage_class` with arguments
`storage_args`.
Args:
storage (dict): dictionary with keys:
- cls (str): storage's class, either local, remote, memory, filter,
buffer
- args (dict): dictionary with keys
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
if cls == "pipeline":
return get_storage_pipeline(**kwargs)
class_path = STORAGE_IMPLEMENTATIONS.get(cls)
if class_path is None:
raise ValueError(
"Unknown storage class `%s`. Supported: %s"
% (cls, ", ".join(STORAGE_IMPLEMENTATIONS))
)
(module_path, class_name) = class_path.rsplit(".", 1)
module = importlib.import_module(module_path, package=__package__)
Storage = getattr(module, class_name)
check_config = kwargs.pop("check_config", {})
storage = Storage(**kwargs)
if check_config:
if not storage.check_config(**check_config):
raise EnvironmentError("storage check config failed")
return storage
def get_storage_pipeline(
steps: List[Dict[str, Any]], check_config=None
-) -> StorageInterface:
+) -> "StorageInterface":
"""Recursively get a storage object that may use other storage objects
as backends.
Args:
steps (List[dict]): List of dicts that may be used as kwargs for
`get_storage`.
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
storage_config = None
for step in reversed(steps):
if "args" in step:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly ' "instead.",
DeprecationWarning,
)
step = {
"cls": step["cls"],
**step["args"],
}
if storage_config:
step["storage"] = storage_config
step["check_config"] = check_config
storage_config = step
if storage_config is None:
raise ValueError("'pipeline' has no steps.")
return get_storage(**storage_config)
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
index a35f5d84..f191e8d3 100644
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -1,221 +1,226 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import functools
+# WARNING: do not import unnecessary things here to keep cli startup time under
+# control
import logging
import os
from typing import Dict, Optional
import click
-from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
-from swh.journal.client import get_journal_client
-from swh.storage import get_storage
-from swh.storage.api.server import app
try:
from systemd.daemon import notify
except ImportError:
notify = None
@click.group(name="storage", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.",
)
@click.option(
"--check-config",
default=None,
type=click.Choice(["no", "read", "write"]),
help=(
"Check the configuration of the storage at startup for read or write access; "
"if set, override the value present in the configuration file if any. "
"Defaults to 'read' for the 'backfill' command, and 'write' for 'rpc-server' "
"and 'replay' commands."
),
)
@click.pass_context
def storage(ctx, config_file, check_config):
"""Software Heritage Storage tools."""
+ from swh.core import config
+
if not config_file:
config_file = os.environ.get("SWH_CONFIG_FILENAME")
if config_file:
if not os.path.exists(config_file):
raise ValueError("%s does not exist" % config_file)
conf = config.read(config_file)
else:
conf = {}
if "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
ctx.ensure_object(dict)
ctx.obj["config"] = conf
ctx.obj["check_config"] = check_config
@storage.command(name="rpc-serve")
@click.option(
"--host",
default="0.0.0.0",
metavar="IP",
show_default=True,
help="Host ip address to bind the server on",
)
@click.option(
"--port",
default=5002,
type=click.INT,
metavar="PORT",
show_default=True,
help="Binding port of the server",
)
@click.option(
"--debug/--no-debug",
default=True,
help="Indicates if the server should run in debug mode",
)
@click.pass_context
def serve(ctx, host, port, debug):
"""Software Heritage Storage RPC server.
Do NOT use this in a production environment.
"""
+ from swh.storage.api.server import app
+
if "log_level" in ctx.obj:
logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"])
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write")
app.config.update(ctx.obj["config"])
app.run(host, port=int(port), debug=bool(debug))
@storage.command()
@click.argument("object_type")
@click.option("--start-object", default=None)
@click.option("--end-object", default=None)
@click.option("--dry-run", is_flag=True, default=False)
@click.pass_context
def backfill(ctx, object_type, start_object, end_object, dry_run):
"""Run the backfiller
The backfiller list objects from a Storage and produce journal entries from
there.
Typically used to rebuild a journal or compensate for missing objects in a
journal (eg. due to a downtime of this later).
The configuration file requires the following entries:
- brokers: a list of kafka endpoints (the journal) in which entries will be
added.
- storage_dbconn: URL to connect to the storage DB.
- prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
- client_id: the kafka client ID.
"""
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "read")
# for "lazy" loading
from swh.storage.backfill import JournalBackfiller
try:
from systemd.daemon import notify
except ImportError:
notify = None
conf = ctx.obj["config"]
backfiller = JournalBackfiller(conf)
if notify:
notify("READY=1")
try:
backfiller.run(
object_type=object_type,
start_object=start_object,
end_object=end_object,
dry_run=dry_run,
)
except KeyboardInterrupt:
if notify:
notify("STOPPING=1")
ctx.exit(0)
@storage.command()
@click.option(
"--stop-after-objects",
"-n",
default=None,
type=int,
help="Stop after processing this many objects. Default is to " "run forever.",
)
@click.pass_context
def replay(ctx, stop_after_objects):
"""Fill a Storage by reading a Journal.
There can be several 'replayers' filling a Storage as long as they use
the same `group-id`.
"""
+ import functools
+
+ from swh.journal.client import get_journal_client
+ from swh.storage import get_storage
from swh.storage.replay import process_replay_objects
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write")
conf = ctx.obj["config"]
storage = get_storage(**conf.pop("storage"))
client_cfg = conf.pop("journal_client")
if stop_after_objects:
client_cfg["stop_after_objects"] = stop_after_objects
try:
client = get_journal_client(**client_cfg)
except ValueError as exc:
ctx.fail(exc)
worker_fn = functools.partial(process_replay_objects, storage=storage)
if notify:
notify("READY=1")
try:
client.process(worker_fn)
except KeyboardInterrupt:
ctx.exit(0)
else:
print("Done.")
finally:
if notify:
notify("STOPPING=1")
client.close()
def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str):
"""Helper function to inject the setting of check_config option in the storage config
dict according to the expected default value (default value depends on the command,
eg. backfill can be read-only).
"""
if check_config is not None:
if check_config == "no":
storage_cfg.pop("check_config", None)
else:
storage_cfg["check_config"] = {"check_write": check_config == "write"}
else:
if "check_config" not in storage_cfg:
storage_cfg["check_config"] = {"check_write": default == "write"}
def main():
logging.basicConfig()
return serve(auto_envvar_prefix="SWH_STORAGE")
if __name__ == "__main__":
main()
diff --git a/swh/storage/tests/test_cli.py b/swh/storage/tests/test_cli.py
index c51ce877..02d6bd33 100644
--- a/swh/storage/tests/test_cli.py
+++ b/swh/storage/tests/test_cli.py
@@ -1,114 +1,114 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import logging
import re
import tempfile
import yaml
from unittest.mock import patch
import pytest
from click.testing import CliRunner
from confluent_kafka import Producer
from swh.model.model import Snapshot, SnapshotBranch, TargetType
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.storage import get_storage
from swh.storage.cli import storage as cli
logger = logging.getLogger(__name__)
CLI_CONFIG = {
"storage": {"cls": "memory",},
}
@pytest.fixture
def swh_storage():
"""An swh-storage object that gets injected into the CLI functions."""
storage = get_storage(**CLI_CONFIG["storage"])
- with patch("swh.storage.cli.get_storage") as get_storage_mock:
+ with patch("swh.storage.get_storage") as get_storage_mock:
get_storage_mock.return_value = storage
yield storage
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
from swh.journal.replay import copy_object, obj_in_objstorage
monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
def invoke(*args, env=None, journal_config=None):
config = copy.deepcopy(CLI_CONFIG)
if journal_config:
config["journal_client"] = journal_config.copy()
config["journal_client"]["cls"] = "kafka"
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
ret = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
return ret
def test_replay(
swh_storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target_type=TargetType.REVISION, target=b"\x01" * 20,
)
},
)
snapshot_dict = snapshot.to_dict()
producer.produce(
topic=kafka_prefix + ".snapshot",
key=key_to_kafka(snapshot.id),
value=value_to_kafka(snapshot_dict),
)
producer.flush()
logger.debug("Flushed producer")
result = invoke(
"replay",
"--stop-after-objects",
"1",
journal_config={
"brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
assert swh_storage.snapshot_get(snapshot.id) == {
**snapshot_dict,
"next_branch": None,
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:47 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3261731
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment