# Copyright (C) 2016-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import functools
import logging
import mmap
import os
import time

import click

from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.model.model import SHA1_SIZE
from swh.storage import get_storage
from swh.objstorage import get_objstorage

from swh.journal.client import JournalClient
from swh.journal.replay import is_hash_in_bytearray
from swh.journal.replay import process_replay_objects
from swh.journal.replay import process_replay_objects_content
from swh.journal.backfill import JournalBackfiller


@click.group(name='journal', context_settings=CONTEXT_SETTINGS)
@click.option('--config-file', '-C', default=None,
              type=click.Path(exists=True, dir_okay=False,),
              help="Configuration file.")
@click.pass_context
def cli(ctx, config_file):
    """Software Heritage Journal tools.

    The journal is a persistent logger of changes to the archive, with
    publish-subscribe support.

    """
    if not config_file:
        config_file = os.environ.get('SWH_CONFIG_FILENAME')

    if config_file:
        if not os.path.exists(config_file):
            raise ValueError('%s does not exist' % config_file)
        conf = config.read(config_file)
    else:
        conf = {}

    ctx.ensure_object(dict)

    log_level = ctx.obj.get('log_level', logging.INFO)
    logging.root.setLevel(log_level)
    logging.getLogger('kafka').setLevel(logging.INFO)

    ctx.obj['config'] = conf


def get_journal_client(ctx, **kwargs):
    conf = ctx.obj['config'].get('journal', {})
    conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())})
    if not conf.get('brokers'):
        ctx.fail('You must specify at least one kafka broker.')
    if not isinstance(conf['brokers'], (list, tuple)):
        conf['brokers'] = [conf['brokers']]
    return JournalClient(**conf)


@cli.command()
@click.option('--max-messages', '-m', default=None, type=int,
              help='Maximum number of objects to replay. Default is to '
                   'run forever.')
@click.option('--broker', 'brokers', type=str, multiple=True,
              help='Kafka broker to connect to. '
                   '(deprecated, use the config file instead)')
@click.option('--prefix', type=str, default=None,
              help='Prefix of Kafka topic names to read from. '
                   '(deprecated, use the config file instead)')
@click.option('--group-id', type=str,
              help='Name of the group id for reading from Kafka. '
                   '(deprecated, use the config file instead)')
@click.pass_context
def replay(ctx, brokers, prefix, group_id, max_messages):
    """Fill a Storage by reading a Journal.

    There can be several 'replayers' filling a Storage as long as they use
    the same `group-id`.
    """
    logger = logging.getLogger(__name__)
    conf = ctx.obj['config']
    try:
        storage = get_storage(**conf.pop('storage'))
    except KeyError:
        ctx.fail('You must have a storage configured in your config file.')

    client = get_journal_client(
        ctx, brokers=brokers, prefix=prefix, group_id=group_id,
        max_messages=max_messages)
    worker_fn = functools.partial(process_replay_objects, storage=storage)

    try:
        nb_messages = 0
        last_log_time = 0
        while not max_messages or nb_messages < max_messages:
            nb_messages += client.process(worker_fn)
            if time.time() - last_log_time >= 60:
                # Log at most once per minute.
                logger.info('Processed %d messages.' % nb_messages)
                last_log_time = time.time()
    except KeyboardInterrupt:
        ctx.exit(0)
    else:
        print('Done.')
    finally:
        client.close()


@cli.command()
@click.argument('object_type')
@click.option('--start-object', default=None)
@click.option('--end-object', default=None)
@click.option('--dry-run', is_flag=True, default=False)
@click.pass_context
def backfiller(ctx, object_type, start_object, end_object, dry_run):
    """Run the backfiller

    The backfiller list objects from a Storage and produce journal entries from
    there.

    Typically used to rebuild a journal or compensate for missing objects in a
    journal (eg. due to a downtime of this later).

    The configuration file requires the following entries:
    - brokers: a list of kafka endpoints (the journal) in which entries will be
               added.
    - storage_dbconn: URL to connect to the storage DB.
    - prefix: the prefix of the topics (topics will be <prefix>.<object_type>).
    - client_id: the kafka client ID.

    """
    conf = ctx.obj['config']
    backfiller = JournalBackfiller(conf)
    try:
        backfiller.run(
            object_type=object_type,
            start_object=start_object, end_object=end_object,
            dry_run=dry_run)
    except KeyboardInterrupt:
        ctx.exit(0)


@cli.command('content-replay')
@click.option('--max-messages', '-m', default=None, type=int,
              help='Maximum number of objects to replay. Default is to '
                   'run forever.')
@click.option('--broker', 'brokers', type=str, multiple=True,
              help='Kafka broker to connect to.'
                   '(deprecated, use the config file instead)')
@click.option('--prefix', type=str, default=None,
              help='Prefix of Kafka topic names to read from.'
                   '(deprecated, use the config file instead)')
@click.option('--group-id', type=str,
              help='Name of the group id for reading from Kafka.'
                   '(deprecated, use the config file instead)')
@click.option('--exclude-sha1-file', default=None, type=click.File('rb'),
              help='File containing a sorted array of hashes to be excluded.')
@click.pass_context
def content_replay(ctx, max_messages,
                   brokers, prefix, group_id, exclude_sha1_file):
    """Fill a destination Object Storage (typically a mirror) by reading a Journal
    and retrieving objects from an existing source ObjStorage.

    There can be several 'replayers' filling a given ObjStorage as long as they
    use the same `group-id`.

    This service retrieves object ids to copy from the 'content' topic. It will
    only copy object's content if the object's description in the kafka
    nmessage has the status:visible set.

    `--exclude-sha1-file` may be used to exclude some hashes to speed-up the
    replay in case many of the contents are already in the destination
    objstorage. It must contain a concatenation of all (sha1) hashes,
    and it must be sorted.
    This file will not be fully loaded into memory at any given time,
    so it can be arbitrarily large.
    """
    logger = logging.getLogger(__name__)
    conf = ctx.obj['config']
    try:
        objstorage_src = get_objstorage(**conf.pop('objstorage_src'))
    except KeyError:
        ctx.fail('You must have a source objstorage configured in '
                 'your config file.')
    try:
        objstorage_dst = get_objstorage(**conf.pop('objstorage_dst'))
    except KeyError:
        ctx.fail('You must have a destination objstorage configured '
                 'in your config file.')

    if exclude_sha1_file:
        map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ)
        if map_.size() % SHA1_SIZE != 0:
            ctx.fail('--exclude-sha1 must link to a file whose size is an '
                     'exact multiple of %d bytes.' % SHA1_SIZE)
        nb_excluded_hashes = int(map_.size()/SHA1_SIZE)

        def exclude_fn(obj):
            return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes)
    else:
        exclude_fn = None

    client = get_journal_client(
        ctx, brokers=brokers, prefix=prefix, group_id=group_id,
        max_messages=max_messages, object_types=('content',))
    worker_fn = functools.partial(process_replay_objects_content,
                                  src=objstorage_src,
                                  dst=objstorage_dst,
                                  exclude_fn=exclude_fn)

    try:
        nb_messages = 0
        last_log_time = 0
        while not max_messages or nb_messages < max_messages:
            nb_messages += client.process(worker_fn)
            if time.time() - last_log_time >= 60:
                # Log at most once per minute.
                logger.info('Processed %d messages.' % nb_messages)
                last_log_time = time.time()
    except KeyboardInterrupt:
        ctx.exit(0)
    else:
        print('Done.')


def main():
    logging.basicConfig()
    return cli(auto_envvar_prefix='SWH_JOURNAL')


if __name__ == '__main__':
    main()
