Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
Show All 13 Lines | |||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.storage import get_storage | |||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.journal.client import get_journal_client as get_client | from swh.journal.client import get_journal_client as get_client | ||||
from swh.journal.replay import is_hash_in_bytearray | from swh.journal.replay import is_hash_in_bytearray | ||||
from swh.journal.replay import process_replay_objects | |||||
from swh.journal.replay import process_replay_objects_content | from swh.journal.replay import process_replay_objects_content | ||||
from swh.journal.backfill import JournalBackfiller | |||||
@click.group(name="journal", context_settings=CONTEXT_SETTINGS) | @click.group(name="journal", context_settings=CONTEXT_SETTINGS) | ||||
@click.option( | @click.option( | ||||
"--config-file", | "--config-file", | ||||
"-C", | "-C", | ||||
default=None, | default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | @click.option( | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"-n", | "-n", | ||||
default=None, | default=None, | ||||
type=int, | type=int, | ||||
help="Stop after processing this many objects. Default is to " "run forever.", | help="Stop after processing this many objects. Default is to " "run forever.", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def replay(ctx, stop_after_objects): | def replay(ctx, stop_after_objects): | ||||
"""Fill a Storage by reading a Journal. | """DEPRECATED: use `swh storage replay` instead. | ||||
There can be several 'replayers' filling a Storage as long as they use | Requires swh.storage >= 0.0.188. | ||||
the same `group-id`. | |||||
""" | """ | ||||
conf = ctx.obj["config"] | ctx.fail("DEPRECATED") | ||||
try: | |||||
storage = get_storage(**conf.pop("storage")) | |||||
except KeyError: | |||||
ctx.fail("You must have a storage configured in your config file.") | |||||
client = get_journal_client(ctx, stop_after_objects=stop_after_objects) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | |||||
if notify: | |||||
notify("READY=1") | |||||
try: | |||||
client.process(worker_fn) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
else: | |||||
print("Done.") | |||||
finally: | |||||
if notify: | |||||
notify("STOPPING=1") | |||||
client.close() | |||||
@cli.command() | @cli.command() | ||||
@click.argument("object_type") | @click.argument("object_type") | ||||
@click.option("--start-object", default=None) | @click.option("--start-object", default=None) | ||||
@click.option("--end-object", default=None) | @click.option("--end-object", default=None) | ||||
@click.option("--dry-run", is_flag=True, default=False) | @click.option("--dry-run", is_flag=True, default=False) | ||||
@click.pass_context | @click.pass_context | ||||
def backfiller(ctx, object_type, start_object, end_object, dry_run): | def backfiller(ctx, object_type, start_object, end_object, dry_run): | ||||
"""Run the backfiller | """DEPRECATED: use `swh storage backfill` instead. | ||||
The backfiller list objects from a Storage and produce journal entries from | |||||
there. | |||||
Typically used to rebuild a journal or compensate for missing objects in a | |||||
journal (eg. due to a downtime of this later). | |||||
The configuration file requires the following entries: | Requires swh.storage >= 0.0.188. | ||||
- brokers: a list of kafka endpoints (the journal) in which entries will be | |||||
added. | |||||
- storage_dbconn: URL to connect to the storage DB. | |||||
- prefix: the prefix of the topics (topics will be <prefix>.<object_type>). | |||||
- client_id: the kafka client ID. | |||||
""" | """ | ||||
conf = ctx.obj["config"] | ctx.fail("DEPRECATED") | ||||
backfiller = JournalBackfiller(conf) | |||||
if notify: | |||||
notify("READY=1") | |||||
try: | |||||
backfiller.run( | |||||
object_type=object_type, | |||||
start_object=start_object, | |||||
end_object=end_object, | |||||
dry_run=dry_run, | |||||
) | |||||
except KeyboardInterrupt: | |||||
if notify: | |||||
notify("STOPPING=1") | |||||
ctx.exit(0) | |||||
@cli.command("content-replay") | @cli.command("content-replay") | ||||
@click.option( | @click.option( | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"-n", | "-n", | ||||
default=None, | default=None, | ||||
type=int, | type=int, | ||||
▲ Show 20 Lines • Show All 97 Lines • Show Last 20 Lines |