diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.storage >= 0.0.138 +swh.storage >= 0.0.141 diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -13,7 +13,7 @@ from swh.storage import get_storage from swh.objstorage import get_objstorage -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @@ -50,10 +50,9 @@ ctx.obj['config'] = conf -def get_journal_client(ctx, brokers, prefix, group_id, - object_types=ACCEPTED_OBJECT_TYPES): +def get_journal_client(ctx, brokers, prefix, group_id, object_types=None): conf = ctx.obj['config'] - if brokers is None: + if not brokers: brokers = conf.get('journal', {}).get('brokers') if not brokers: ctx.fail('You must specify at least one kafka broker.') @@ -66,9 +65,10 @@ if group_id is None: group_id = conf.get('journal', {}).get('group_id') - return JournalClient( - brokers=brokers, group_id=group_id, prefix=prefix, - object_types=object_types) + kwargs = dict(brokers=brokers, group_id=group_id, prefix=prefix) + if object_types: + kwargs['object_types'] = object_types + return JournalClient(**kwargs) @cli.command() @@ -181,7 +181,8 @@ ctx.fail('You must have a destination objstorage configured ' 'in your config file.') - client = get_journal_client(ctx, brokers, prefix, group_id) + client = get_journal_client(ctx, brokers, prefix, group_id, + object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst) diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -32,24 +32,28 @@ The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each - object type using a specific topic under that prefix. + object type using a specific topic under that prefix. If the 'prefix' + argument is None (default value), it will take the default value + 'swh.journal.objects'. - Clients subscribe to events specific to each object type by using - the `object_types` configuration variable. + Clients subscribe to events specific to each object type as listed in the + `object_types` argument (if unset, defaults to all accepted objet types). - Clients can be sharded by setting the `client_id` to a common + Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message - throughput across the nodes sharing the same client_id. + throughput across the nodes sharing the same group_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. """ def __init__( - self, brokers, group_id, prefix=DEFAULT_PREFIX, - object_types=ACCEPTED_OBJECT_TYPES, + self, brokers, group_id, prefix=None, object_types=None, max_messages=0, auto_offset_reset='earliest'): - + if prefix is None: + prefix = DEFAULT_PREFIX + if object_types is None: + object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,6 +6,7 @@ import logging from swh.storage import HashCollision +from swh.model.hashutil import hash_to_hex from swh.objstorage.objstorage import ID_HASH_ALGO @@ -49,8 +50,16 @@ logger.warning('Received a series of %s, this should not happen', object_type) continue + logger.info('processing %s content objects', len(objects)) for obj in objects: + obj_id = obj[ID_HASH_ALGO] if obj['status'] == 'visible': - obj_id = obj[ID_HASH_ALGO] - obj = src.get(obj_id) - dst.add(obj, obj_id=obj_id) + try: + obj = src.get(obj_id) + dst.add(obj, obj_id=obj_id) + logger.debug('copied %s', hash_to_hex(obj_id)) + except Exception: + logger.exception('Failed to copy %s', hash_to_hex(obj_id)) + else: + logger.debug('skipped %s (%s)', + hash_to_hex(obj_id), obj['status'])