diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -11,9 +11,11 @@ from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage +from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @@ -136,6 +138,68 @@ ctx.exit(0) +@cli.command() +@click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default=None, + hidden=True, # prefer config file + help='Prefix of Kafka topic names to read from.') +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file + help='Name of the consumer/group id for reading from Kafka.') +@click.pass_context +def content_replay(ctx, brokers, prefix, group_id): + """Fill a destination Object Storage (typically a mirror) by reading a Journal + and retrieving objects from an existing source ObjStorage. + + There can be several 'replayers' filling a given ObjStorage as long as they + use the same `group-id`. + + This service retrieves object ids to copy from the 'content' topic. It will + only copy object's content if the object's description in the kafka + nmessage has the status:visible set. + """ + logger = logging.getLogger(__name__) + conf = ctx.obj['config'] + try: + objstorage_src = get_objstorage(**conf.pop('objstorage_src')) + except KeyError: + ctx.fail('You must have a source objstorage configured in ' + 'your config file.') + try: + objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) + except KeyError: + ctx.fail('You must have a destination objstorage configured ' + 'in your config file.') + + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) + worker_fn = functools.partial(process_replay_objects_content, + src=objstorage_src, + dst=objstorage_dst) + + try: + nb_messages = 0 + while True: + nb_messages += client.process(worker_fn) + logger.info('Processed %d messages.' % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,6 +6,7 @@ import logging from swh.storage import HashCollision +from swh.objstorage.objstorage import ID_HASH_ALGO logger = logging.getLogger(__name__) @@ -39,3 +40,16 @@ for obj in objects]) else: assert False + + +def process_replay_objects_content(all_objects, *, src, dst): + for (object_type, objects) in all_objects.items(): + if object_type != 'content': + logger.warning('Received a series of %s, this should not happen', + object_type) + continue + for obj in objects: + if obj['status'] == 'visible': + obj_id = obj[ID_HASH_ALGO] + obj = src.get(obj_id) + dst.add(obj, obj_id=obj_id) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -85,3 +85,6 @@ assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} + + +# TODO: write a test for the content-replay command diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -16,6 +16,7 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects +from swh.journal.replay import process_replay_objects_content from swh.journal.serializers import ( key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) @@ -101,3 +102,32 @@ # TODO: add test for hash collision + + +@given(lists(object_dicts(), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_content(objects): + queue = [] + replayer = MockedJournalClient(queue) + + storage1 = Storage() + storage1.journal_writer = MockedKafkaWriter(queue) + + for (obj_type, obj) in objects: + obj = obj.copy() + if obj_type == 'content': + storage1.content_add([obj]) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage2 = Storage() + worker_fn = functools.partial(process_replay_objects_content, + src=storage1.objstorage, + dst=storage2.objstorage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + assert storage1.objstorage.state == storage2.objstorage.state