diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -14,6 +14,7 @@ from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @@ -48,6 +49,24 @@ ctx.obj['config'] = conf +def get_journal_client(ctx, brokers, prefix, group_id): + conf = ctx.obj['config'] + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + if not isinstance(brokers, (list, tuple)): + brokers = [brokers] + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + return JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) + + @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' @@ -75,20 +94,7 @@ except KeyError: ctx.fail('You must have a storage configured in your config file.') - if brokers is None: - brokers = conf.get('journal', {}).get('brokers') - if not brokers: - ctx.fail('You must specify at least one kafka broker.') - if not isinstance(brokers, (list, tuple)): - brokers = [brokers] - - if prefix is None: - prefix = conf.get('journal', {}).get('prefix') - - if group_id is None: - group_id = conf.get('journal', {}).get('group_id') - - client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) + client = get_journal_client(ctx, brokers, prefix, group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: @@ -136,6 +142,57 @@ ctx.exit(0) +@cli.command() +@click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default=None, + hidden=True, # prefer config file + help='Prefix of Kafka topic names to read from.') +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file + help='Name of the consumer/group id for reading from Kafka.') +@click.pass_context +def content_replay(ctx, brokers, prefix, group_id): + """Fill a destination Object Storage (typically a mirror) by reading a Journal + and retrieving objects from an existing source ObjStorage. + + There can be several 'replayers' filling a given ObjStorage as long as they + use the same `group-id`. + + This service retrieves object ids to copy from the 'content' topic. It will + only copy object's content if the object's description in the kafka + nmessage has the status:visible set. + """ + logger = logging.getLogger(__name__) + conf = ctx.obj['config'] + try: + objstorage_src = get_storage(**conf.pop('objstorage_src')) + except KeyError: + ctx.fail('You must have a source objstorage configured in ' + 'your config file.') + try: + objstorage_dst = get_storage(**conf.pop('objstorage_dst')) + except KeyError: + ctx.fail('You must have a destination objstorage configured ' + 'in your config file.') + + client = get_journal_client(ctx, brokers, prefix, group_id) + worker_fn = functools.partial(process_replay_objects_content, + src=objstorage_src, + dst=objstorage_dst) + + try: + nb_messages = 0 + while True: + nb_messages += client.process(worker_fn) + logger.info('Processed %d messages.' % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,6 +6,7 @@ import logging from swh.storage import HashCollision +from swh.objstorage.objstorage import ID_HASH_ALGO logger = logging.getLogger(__name__) @@ -38,4 +39,18 @@ } for obj in objects]) else: - assert False + logger.warning('Received a series of %s, this should not happen', + object_type) + + +def process_replay_objects_content(all_objects, *, src, dst): + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.warning('Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + if obj['status'] == 'visible': + obj_id = obj[ID_HASH_ALGO] + obj = src.get(obj_id) + dst.add(obj, obj_id=obj_id) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -85,3 +85,6 @@ assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} + + +# TODO: write a test for the content-replay command diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -16,6 +16,7 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.serializers import ( key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) @@ -95,3 +96,31 @@ # TODO: add test for hash collision + + +@given(lists(object_dicts(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_content(objects): + replayer = MockedJournalClient() + + storage1 = Storage() + storage1.journal_writer = replayer.consumer + + for (obj_type, obj) in objects: + obj = obj.copy() + if obj_type == 'content': + storage1.content_add([obj]) + + queue_size = sum(len(partition) + for batch in replayer.consumer.queue + for partition in batch.values()) + + storage2 = Storage() + worker_fn = functools.partial(process_replay_objects_content, + src=storage1.objstorage, + dst=storage2.objstorage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + assert storage1.objstorage.state == storage2.objstorage.state