Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
Show First 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | def replay(ctx, brokers, prefix, group_id, max_messages): | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
conf = ctx.obj['config'] | conf = ctx.obj['config'] | ||||
try: | try: | ||||
storage = get_storage(**conf.pop('storage')) | storage = get_storage(**conf.pop('storage')) | ||||
except KeyError: | except KeyError: | ||||
ctx.fail('You must have a storage configured in your config file.') | ctx.fail('You must have a storage configured in your config file.') | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, brokers=brokers, prefix=prefix, group_id=group_id) | ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ||||
max_messages=max_messages) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
last_log_time = 0 | last_log_time = 0 | ||||
while not max_messages or nb_messages < max_messages: | while not max_messages or nb_messages < max_messages: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
if time.time() - last_log_time >= 60: | if time.time() - last_log_time >= 60: | ||||
▲ Show 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | if exclude_sha1_file: | ||||
def exclude_fn(obj): | def exclude_fn(obj): | ||||
return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) | return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) | ||||
else: | else: | ||||
exclude_fn = None | exclude_fn = None | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ctx, brokers=brokers, prefix=prefix, group_id=group_id, | ||||
object_types=('content',)) | max_messages=max_messages, object_types=('content',)) | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=objstorage_src, | src=objstorage_src, | ||||
dst=objstorage_dst, | dst=objstorage_dst, | ||||
exclude_fn=exclude_fn) | exclude_fn=exclude_fn) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
last_log_time = 0 | last_log_time = 0 | ||||
Show All 19 Lines |