Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cli.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | |||||
import logging | import logging | ||||
import os | import os | ||||
import warnings | import warnings | ||||
import click | import click | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.journal.cli import get_journal_client | |||||
from swh.storage import get_storage | |||||
from swh.storage.api.server import load_and_check_config, app | from swh.storage.api.server import load_and_check_config, app | ||||
try: | |||||
from systemd.daemon import notify | |||||
except ImportError: | |||||
notify = None | |||||
@click.group(name="storage", context_settings=CONTEXT_SETTINGS) | @click.group(name="storage", context_settings=CONTEXT_SETTINGS) | ||||
@click.option( | @click.option( | ||||
"--config-file", | "--config-file", | ||||
"-C", | "-C", | ||||
default=None, | default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="Configuration file.", | help="Configuration file.", | ||||
▲ Show 20 Lines • Show All 106 Lines • ▼ Show 20 Lines | try: | ||||
dry_run=dry_run, | dry_run=dry_run, | ||||
) | ) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
if notify: | if notify: | ||||
notify("STOPPING=1") | notify("STOPPING=1") | ||||
ctx.exit(0) | ctx.exit(0) | ||||
@storage.command() | |||||
@click.option( | |||||
"--stop-after-objects", | |||||
"-n", | |||||
default=None, | |||||
type=int, | |||||
help="Stop after processing this many objects. Default is to " "run forever.", | |||||
) | |||||
@click.pass_context | |||||
def replay(ctx, stop_after_objects): | |||||
"""Fill a Storage by reading a Journal. | |||||
There can be several 'replayers' filling a Storage as long as they use | |||||
the same `group-id`. | |||||
""" | |||||
from swh.storage.replay import process_replay_objects | |||||
conf = ctx.obj["config"] | |||||
try: | |||||
storage = get_storage(**conf.pop("storage")) | |||||
except KeyError: | |||||
ctx.fail("You must have a storage configured in your config file.") | |||||
client = get_journal_client(ctx, stop_after_objects=stop_after_objects) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | |||||
if notify: | |||||
notify("READY=1") | |||||
try: | |||||
client.process(worker_fn) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
else: | |||||
print("Done.") | |||||
finally: | |||||
if notify: | |||||
notify("STOPPING=1") | |||||
client.close() | |||||
def main(): | def main(): | ||||
logging.basicConfig() | logging.basicConfig() | ||||
return serve(auto_envvar_prefix="SWH_STORAGE") | return serve(auto_envvar_prefix="SWH_STORAGE") | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |