Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import click | import click | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import os | import os | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.objstorage import get_obsstorage | |||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.replay import process_replay_objects | from swh.journal.replay import process_replay_objects | ||||
from swh.journal.replay import process_replay_objects_content | |||||
from swh.journal.backfill import JournalBackfiller | from swh.journal.backfill import JournalBackfiller | ||||
@click.group(name='journal', context_settings=CONTEXT_SETTINGS) | @click.group(name='journal', context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--config-file', '-C', default=None, | @click.option('--config-file', '-C', default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="Configuration file.") | help="Configuration file.") | ||||
@click.pass_context | @click.pass_context | ||||
Show All 18 Lines | def cli(ctx, config_file): | ||||
log_level = ctx.obj.get('log_level', logging.INFO) | log_level = ctx.obj.get('log_level', logging.INFO) | ||||
logging.root.setLevel(log_level) | logging.root.setLevel(log_level) | ||||
logging.getLogger('kafka').setLevel(logging.INFO) | logging.getLogger('kafka').setLevel(logging.INFO) | ||||
ctx.obj['config'] = conf | ctx.obj['config'] = conf | ||||
def get_journal_client(ctx, brokers, prefix, group_id): | |||||
conf = ctx.obj['config'] | |||||
if brokers is None: | |||||
brokers = conf.get('journal', {}).get('brokers') | |||||
if not brokers: | |||||
ctx.fail('You must specify at least one kafka broker.') | |||||
if not isinstance(brokers, (list, tuple)): | |||||
brokers = [brokers] | |||||
if prefix is None: | |||||
prefix = conf.get('journal', {}).get('prefix') | |||||
if group_id is None: | |||||
group_id = conf.get('journal', {}).get('group_id') | |||||
return JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) | |||||
@cli.command() | @cli.command() | ||||
@click.option('--max-messages', '-m', default=None, type=int, | @click.option('--max-messages', '-m', default=None, type=int, | ||||
help='Maximum number of objects to replay. Default is to ' | help='Maximum number of objects to replay. Default is to ' | ||||
'run forever.') | 'run forever.') | ||||
@click.option('--broker', 'brokers', type=str, multiple=True, | @click.option('--broker', 'brokers', type=str, multiple=True, | ||||
hidden=True, # prefer config file | hidden=True, # prefer config file | ||||
help='Kafka broker to connect to.') | help='Kafka broker to connect to.') | ||||
@click.option('--prefix', type=str, default=None, | @click.option('--prefix', type=str, default=None, | ||||
Show All 11 Lines | def replay(ctx, brokers, prefix, group_id, max_messages): | ||||
""" | """ | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
conf = ctx.obj['config'] | conf = ctx.obj['config'] | ||||
try: | try: | ||||
storage = get_storage(**conf.pop('storage')) | storage = get_storage(**conf.pop('storage')) | ||||
except KeyError: | except KeyError: | ||||
ctx.fail('You must have a storage configured in your config file.') | ctx.fail('You must have a storage configured in your config file.') | ||||
if brokers is None: | client = get_journal_client(ctx, brokers, prefix, group_id) | ||||
brokers = conf.get('journal', {}).get('brokers') | |||||
if not brokers: | |||||
ctx.fail('You must specify at least one kafka broker.') | |||||
if not isinstance(brokers, (list, tuple)): | |||||
brokers = [brokers] | |||||
if prefix is None: | |||||
prefix = conf.get('journal', {}).get('prefix') | |||||
if group_id is None: | |||||
group_id = conf.get('journal', {}).get('group_id') | |||||
client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
while not max_messages or nb_messages < max_messages: | while not max_messages or nb_messages < max_messages: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
logger.info('Processed %d messages.' % nb_messages) | logger.info('Processed %d messages.' % nb_messages) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
Show All 31 Lines | try: | ||||
backfiller.run( | backfiller.run( | ||||
object_type=object_type, | object_type=object_type, | ||||
start_object=start_object, end_object=end_object, | start_object=start_object, end_object=end_object, | ||||
dry_run=dry_run) | dry_run=dry_run) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
@cli.command() | |||||
@click.option('--broker', 'brokers', type=str, multiple=True, | |||||
hidden=True, # prefer config file | |||||
help='Kafka broker to connect to.') | |||||
@click.option('--prefix', type=str, default=None, | |||||
hidden=True, # prefer config file | |||||
help='Prefix of Kafka topic names to read from.') | |||||
@click.option('--group-id', '--consumer-id', type=str, | |||||
hidden=True, # prefer config file | |||||
help='Name of the consumer/group id for reading from Kafka.') | |||||
@click.pass_context | |||||
def content_replay(ctx, brokers, prefix, group_id): | |||||
"""Fill a destination Object Storage (typically a mirror) by reading a Journal | |||||
and retrieving objects from an existing source ObjStorage. | |||||
There can be several 'replayers' filling a given ObjStorage as long as they | |||||
use the same `group-id`. | |||||
This service retrieves object ids to copy from the 'content' topic. It will | |||||
only copy object's content if the object's description in the kafka | |||||
nmessage has the status:visible set. | |||||
""" | |||||
logger = logging.getLogger(__name__) | |||||
conf = ctx.obj['config'] | |||||
try: | |||||
objstorage_src = get_obsstorage(**conf.pop('objstorage_src')) | |||||
except KeyError: | |||||
ctx.fail('You must have a source objstorage configured in ' | |||||
'your config file.') | |||||
try: | |||||
objstorage_dst = get_obsstorage(**conf.pop('objstorage_dst')) | |||||
except KeyError: | |||||
ctx.fail('You must have a destination objstorage configured ' | |||||
'in your config file.') | |||||
client = get_journal_client(ctx, brokers, prefix, group_id) | |||||
worker_fn = functools.partial(process_replay_objects_content, | |||||
src=objstorage_src, | |||||
vlorentz: I think these error messages should tell the exact name of the missing key, so it's easier to… | |||||
dst=objstorage_dst) | |||||
try: | |||||
nb_messages = 0 | |||||
while True: | |||||
nb_messages += client.process(worker_fn) | |||||
Not Done Inline ActionsWe should raise an error if there is no prefix or group id. vlorentz: We should raise an error if there is no prefix or group id. | |||||
Done Inline Actionsnot for the prefix, since the default value is fine (and is the one that will be used in most of the cases... TBH, I don't really see the reason this is configurable). group_id is indeed a mandatory config option and should not have a default value... douardda: not for the prefix, since the default value is fine (and is the one that will be used in most… | |||||
Not Done Inline Actionsif prefix is None and there is no prefix in the config, then prefix will be set to None, not to DEFAULT_PREFIX. And we don't want that. vlorentz: if `prefix is None` and there is no prefix in the config, then `prefix` will be set to `None`… | |||||
Done Inline Actionsindeed we do not want that. I did write the correct version of this stuff at some point but I guess I messed during a rebase or smthg douardda: indeed we do not want that. I did write the correct version of this stuff at some point but I… | |||||
logger.info('Processed %d messages.' % nb_messages) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
else: | |||||
print('Done.') | |||||
def main(): | def main(): | ||||
logging.basicConfig() | logging.basicConfig() | ||||
return cli(auto_envvar_prefix='SWH_JOURNAL') | return cli(auto_envvar_prefix='SWH_JOURNAL') | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |
I think these error messages should tell the exact name of the missing key, so it's easier to look for them in an example config file.
(And we should also have an example config file)