Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import mmap | import mmap | ||||
import os | import os | ||||
import time | |||||
import click | import click | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
▲ Show 20 Lines • Show All 74 Lines • ▼ Show 20 Lines | except KeyError: | ||||
ctx.fail('You must have a storage configured in your config file.') | ctx.fail('You must have a storage configured in your config file.') | ||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, brokers=brokers, prefix=prefix, group_id=group_id) | ctx, brokers=brokers, prefix=prefix, group_id=group_id) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
last_log_time = 0 | |||||
while not max_messages or nb_messages < max_messages: | while not max_messages or nb_messages < max_messages: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
if time.time() - last_log_time >= 60: | |||||
# Log at most once per minute. | |||||
logger.info('Processed %d messages.' % nb_messages) | logger.info('Processed %d messages.' % nb_messages) | ||||
last_log_time = time.time() | |||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print('Done.') | print('Done.') | ||||
@cli.command() | @cli.command() | ||||
@click.argument('object_type') | @click.argument('object_type') | ||||
▲ Show 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | def content_replay(ctx, max_messages, concurrency, | ||||
worker_fn = functools.partial(process_replay_objects_content, | worker_fn = functools.partial(process_replay_objects_content, | ||||
src=objstorage_src, | src=objstorage_src, | ||||
dst=objstorage_dst, | dst=objstorage_dst, | ||||
concurrency=concurrency, | concurrency=concurrency, | ||||
exclude_fn=exclude_fn) | exclude_fn=exclude_fn) | ||||
try: | try: | ||||
nb_messages = 0 | nb_messages = 0 | ||||
last_log_time = 0 | |||||
while not max_messages or nb_messages < max_messages: | while not max_messages or nb_messages < max_messages: | ||||
nb_messages += client.process(worker_fn) | nb_messages += client.process(worker_fn) | ||||
if time.time() - last_log_time >= 60: | |||||
# Log at most once per minute. | |||||
logger.info('Processed %d messages.' % nb_messages) | logger.info('Processed %d messages.' % nb_messages) | ||||
last_log_time = time.time() | |||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print('Done.') | print('Done.') | ||||
def main(): | def main(): | ||||
logging.basicConfig() | logging.basicConfig() | ||||
return cli(auto_envvar_prefix='SWH_JOURNAL') | return cli(auto_envvar_prefix='SWH_JOURNAL') | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |