diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -3,20 +3,17 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import functools -import time +import json import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup -from swh.journal.cli import get_journal_client from swh.scheduler import get_scheduler from swh.scheduler.cli_utils import schedule_origin_batches from swh.storage import get_storage from swh.indexer import metadata_dictionary -from swh.indexer.journal_client import process_journal_objects from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.api.server import load_and_check_config, app @@ -89,6 +86,23 @@ click.echo('\t' + ', '.join(sorted(supported_mappings))) +@mapping.command('translate') +@click.argument('mapping-name') +@click.argument('file', type=click.File('rb')) +def mapping_translate(mapping_name, file): + """Prints the list of known mappings.""" + print(metadata_dictionary.MAPPINGS) + mapping_cls = [cls for cls in metadata_dictionary.MAPPINGS.values() + if cls.name == mapping_name] + if not mapping_cls: + raise click.ClickException('Unknown mapping {}'.format(mapping_name)) + assert len(mapping_cls) == 1 + mapping_cls = mapping_cls[0] + mapping = mapping_cls() + codemeta_doc = mapping.translate(file.read()) + click.echo(json.dumps(codemeta_doc, indent=4)) + + @cli.group('schedule') @click.option('--scheduler-url', '-s', default=None, help="URL of the scheduler API") @@ -162,64 +176,11 @@ origins = list_origins_by_producer(idx_storage, mappings, tool_ids) - kwargs = {"policy_update": "update-dups"} + kwargs = {"policy_update": "update-dups", "parse_ids": False} schedule_origin_batches( scheduler, task_type, origins, origin_batch_size, kwargs) -@cli.command('journal-client') -@click.option('--scheduler-url', '-s', default=None, - help="URL of the scheduler API") -@click.option('--origin-metadata-task-type', - default='index-origin-metadata', - help='Name of the task running the origin metadata indexer.') -@click.option('--broker', 'brokers', type=str, multiple=True, - help='Kafka broker to connect to.') -@click.option('--prefix', type=str, default=None, - help='Prefix of Kafka topic names to read from.') -@click.option('--group-id', type=str, - help='Consumer/group id for reading from Kafka.') -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' - 'run forever.') -@click.pass_context -def journal_client(ctx, scheduler_url, origin_metadata_task_type, - brokers, prefix, group_id, max_messages): - """Listens for new objects from the SWH Journal, and schedules tasks - to run relevant indexers (currently, only origin-intrinsic-metadata) - on these new objects.""" - scheduler = _get_api( - get_scheduler, - ctx.obj['config'], - 'scheduler', - scheduler_url - ) - - client = get_journal_client( - ctx, brokers=brokers, prefix=prefix, group_id=group_id, - object_types=['origin_visit']) - - worker_fn = functools.partial( - process_journal_objects, - scheduler=scheduler, - task_names={ - 'origin_metadata': origin_metadata_task_type, - } - ) - nb_messages = 0 - last_log_time = 0 - try: - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if time.monotonic() - last_log_time >= 60: - print('Processed %d messages.' % nb_messages) - last_log_time = time.monotonic() - except KeyboardInterrupt: - ctx.exit(0) - else: - print('Done.') - - @cli.command('rpc-serve') @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @@ -234,6 +195,10 @@ app.run(host, port=int(port), debug=bool(debug)) +cli.add_alias(rpc_server, 'api-server') +cli.add_alias(rpc_server, 'serve') + + def main(): return cli(auto_envvar_prefix='SWH_INDEXER')