diff --git a/swh/journal/cli.py b/swh/journal/cli.py index ce11024..d48a474 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,202 +1,205 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import functools import logging import os from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) log_level = ctx.obj.get('log_level', logging.INFO) logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from. ' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) @cli.command() +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') @click.option('--concurrency', type=int, default=8, help='Concurrentcy level.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from.' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.pass_context -def content_replay(ctx, concurrency, brokers, prefix, group_id): +def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, concurrency=concurrency) try: nb_messages = 0 - while True: + while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: - logger.info('Done.') + print('Done.') def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 966a57f..5aba92b 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,90 +1,167 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools import re import tempfile from subprocess import Popen from typing import Tuple from unittest.mock import patch from click.testing import CliRunner from kafka import KafkaProducer import pytest +from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage.in_memory import Storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka CLI_CONFIG = ''' storage: cls: memory args: {} +objstorage_src: + cls: mocked + args: + name: src +objstorage_dst: + cls: mocked + args: + name: dst ''' @pytest.fixture def storage(): """An instance of swh.storage.in_memory.Storage that gets injected into the CLI functions.""" storage = Storage() with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage def invoke(catch_exceptions, args): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test-producer', ) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'bar', } }} producer.send( topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) result = invoke(False, [ 'replay', '--broker', 'localhost:%d' % port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} -# TODO: write a test for the content-replay command +def _patch_objstorages(names): + objstorages = {name: InMemoryObjStorage() for name in names} + + def get_mock_objstorage(cls, args): + assert cls == 'mocked', cls + return objstorages[args['name']] + + def decorator(f): + @functools.wraps(f) + @patch('swh.journal.cli.get_objstorage') + def newf(get_objstorage_mock, *args, **kwargs): + get_objstorage_mock.side_effect = get_mock_objstorage + f(*args, objstorages=objstorages, **kwargs) + + return newf + + return decorator + + +def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(kafka_port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test-producer', + ) + + contents = {} + for i in range(10): + content = b'\x00'*19 + bytes([i]) + sha1 = objstorages['src'].add(content) + contents[sha1] = content + producer.send(topic=kafka_prefix+'.content', key=sha1, value={ + 'sha1': sha1, + 'status': 'visible', + }) + + producer.flush() + + return contents + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content( + objstorages, + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + result = invoke(False, [ + 'content-replay', + '--broker', 'localhost:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '10', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content