# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#!/usr/bin/env python

# ===================================
# kafka_system_test_utils.py
# ===================================

import datetime
import getpass
import hashlib
import inspect
import json
import logging
import os
import pprint
import re
import subprocess
import sys
import thread
import time
import traceback

import system_test_utils
import metrics

from datetime  import datetime
from time      import mktime

# ====================================================================
# Two logging formats are defined in system_test/system_test_runner.py
# ====================================================================

# 1. "namedLogger" is defined to log message in this format:
#    "%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s"
#    usage: to log message and showing the class name of the message

logger     = logging.getLogger("namedLogger")
thisClassName = '(kafka_system_test_utils)'
d = {'name_of_class': thisClassName}

# 2. "anonymousLogger" is defined to log message in this format:
#    "%(asctime)s - %(levelname)s - %(message)s"
#    usage: to log message without showing class name and it's appropriate
#           for logging generic message such as "sleeping for 5 seconds"

anonLogger = logging.getLogger("anonymousLogger")


# =====================================
# Sample usage of getting testcase env
# =====================================
def get_testcase_env(testcaseEnv):
    anonLogger.info("================================================")
    anonLogger.info("systemTestBaseDir     : " + testcaseEnv.systemTestBaseDir)
    anonLogger.info("testSuiteBaseDir      : " + testcaseEnv.testSuiteBaseDir)
    anonLogger.info("testCaseBaseDir       : " + testcaseEnv.testCaseBaseDir)
    anonLogger.info("testCaseLogsDir       : " + testcaseEnv.testCaseLogsDir)
    anonLogger.info("userDefinedEnvVarDict : (testcaseEnv.userDefinedEnvVarDict)")
    anonLogger.info("================================================")


def get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, type):

    defaultLogDir = testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId

    # type is either "metrics" or "dashboards" or "default"
    if type == "metrics":
        return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/metrics"
    elif type == "log_segments" :
        return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/log_segments"
    elif type == "default" :
        return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId
    elif type == "dashboards":
        return testcaseEnv.testCaseLogsDir + "/dashboards"
    elif type == "config":
        return testcaseEnv.testCaseBaseDir + "/config"
    else:
        logger.error("unrecognized log directory type : " + type, extra=d)
        logger.error("returning default log dir : " + defaultLogDir, extra=d)
        return defaultLogDir


def generate_testcase_log_dirs(systemTestEnv, testcaseEnv):

    testcasePathName = testcaseEnv.testCaseBaseDir
    logger.debug("testcase pathname: " + testcasePathName, extra=d)

    if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config")
    if not os.path.exists(testcasePathName + "/logs")   : os.makedirs(testcasePathName + "/logs")
    if not os.path.exists(testcasePathName + "/dashboards")   : os.makedirs(testcasePathName + "/dashboards")

    dashboardsPathName = testcasePathName + "/dashboards"
    if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName)

    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
        entityId = clusterEntityConfigDict["entity_id"]
        role     = clusterEntityConfigDict["role"]

        metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
        if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)

        # create the role directory under dashboards
        dashboardsRoleDir = dashboardsPathName + "/" + role
        if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir)
        

def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
    anonLogger.info("================================================")
    anonLogger.info("collecting logs from remote machines")
    anonLogger.info("================================================")

    testCaseBaseDir = testcaseEnv.testCaseBaseDir
    tcConfigsList   = testcaseEnv.testcaseConfigsList

    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
        hostname   = clusterEntityConfigDict["hostname"]
        entity_id  = clusterEntityConfigDict["entity_id"]
        role       = clusterEntityConfigDict["role"]
        kafkaHome  = clusterEntityConfigDict["kafka_home"]

        logger.debug("entity_id : " + entity_id, extra=d)
        logger.debug("hostname  : " + hostname,  extra=d)
        logger.debug("role      : " + role,      extra=d)

        configPathName     = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
        metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
        logPathName        = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default")
        rmtLogPathName     = logPathName
        rmtMetricsPathName = metricsPathName

        if hostname != "localhost":
            rmtConfigPathName  = replace_kafka_home(configPathName, kafkaHome)
            rmtMetricsPathName = replace_kafka_home(metricsPathName, kafkaHome)
            rmtLogPathName     = replace_kafka_home(logPathName, kafkaHome)

        # ==============================
        # collect entity log file
        # ==============================
        cmdList = ["scp",
                   hostname + ":" + rmtLogPathName + "/*",
                   logPathName]
        cmdStr  = " ".join(cmdList)
        logger.debug("executing command [" + cmdStr + "]", extra=d)
        system_test_utils.sys_call(cmdStr)

        # ==============================
        # collect entity metrics file
        # ==============================
        cmdList = ["scp",
                   hostname + ":" + rmtMetricsPathName + "/*",
                   metricsPathName]
        cmdStr  = " ".join(cmdList)
        logger.debug("executing command [" + cmdStr + "]", extra=d)
        system_test_utils.sys_call(cmdStr)

        # ==============================
        # collect broker log segment file
        # ==============================
        if role == "broker":
            dataLogPathName = system_test_utils.get_data_by_lookup_keyval(
                                  testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "log.dir")

            cmdList = ["scp -r",
                       hostname + ":" + dataLogPathName,
                       logPathName]
            cmdStr  = " ".join(cmdList)
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

        # ==============================
        # collect ZK log
        # ==============================
        if role == "zookeeper":
            dataLogPathName = system_test_utils.get_data_by_lookup_keyval(
                                  testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "dataDir")

            cmdList = ["scp -r",
                       hostname + ":" + dataLogPathName,
                       logPathName]
            cmdStr  = " ".join(cmdList)
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

    # ==============================
    # collect dashboards file
    # ==============================
    dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
    rmtDashboardsPathName = dashboardsPathName

    if hostname != "localhost":
        rmtDashboardsPathName  = replace_kafka_home(dashboardsPathName, kafkaHome)

    cmdList = ["scp",
               hostname + ":" + rmtDashboardsPathName + "/*",
               dashboardsPathName]
    cmdStr  = " ".join(cmdList)
    logger.debug("executing command [" + cmdStr + "]", extra=d)
    system_test_utils.sys_call(cmdStr)

 
def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
    testCaseBaseDir = testcaseEnv.testCaseBaseDir

    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
        hostname   = clusterEntityConfigDict["hostname"]
        entity_id  = clusterEntityConfigDict["entity_id"]
        role       = clusterEntityConfigDict["role"]
        kafkaHome  = clusterEntityConfigDict["kafka_home"]

        logger.debug("entity_id : " + entity_id, extra=d)
        logger.debug("hostname  : " + hostname, extra=d)
        logger.debug("role      : " + role, extra=d)

        configPathName     = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
        metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
        dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")

        if hostname != "localhost":
            configPathName     = replace_kafka_home(configPathName, kafkaHome)
            metricsPathName    = replace_kafka_home(metricsPathName, kafkaHome)
            dashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome)

        cmdList = ["ssh " + hostname,
                   "'mkdir -p",
                   configPathName,
                   metricsPathName,
                   dashboardsPathName + "'"]
        cmdStr  = " ".join(cmdList)
        logger.debug("executing command [" + cmdStr + "]", extra=d)
        system_test_utils.sys_call(cmdStr)


def init_entity_props(systemTestEnv, testcaseEnv):
    clusterConfigsList  = systemTestEnv.clusterEntityConfigDictList
    testcaseConfigsList = testcaseEnv.testcaseConfigsList
    testcasePathName    = testcaseEnv.testCaseBaseDir

    try:
        # consumer config / log files location
        consEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
                             clusterConfigsList, "role", "console_consumer", "entity_id")
        consLogList        = system_test_utils.get_data_from_list_of_dicts( \
                             testcaseConfigsList, "entity_id", consEntityIdList[0], "log_filename")
        consLogPathname    = testcasePathName + "/logs/" + consLogList[0]
        consCfgList        = system_test_utils.get_data_from_list_of_dicts( \
                             testcaseConfigsList, "entity_id", consEntityIdList[0], "config_filename")
        consCfgPathname    = testcasePathName + "/config/" + consCfgList[0]

        # producer config / log files location
        prodEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
                             clusterConfigsList, "role", "producer_performance", "entity_id")
        prodLogList        = system_test_utils.get_data_from_list_of_dicts( \
                             testcaseConfigsList, "entity_id", prodEntityIdList[0], "log_filename")
        prodLogPathname    = testcasePathName + "/logs/" + prodLogList[0]
        prodCfgList        = system_test_utils.get_data_from_list_of_dicts( \
                             testcaseConfigsList, "entity_id", prodEntityIdList[0], "config_filename")
        prodCfgPathname    = testcasePathName + "/config/" + prodCfgList[0]
    except:
        logger.error("Failed to initialize entity config/log path names: possibly mismatched " \
                    + "number of entities in cluster_config.json & testcase_n_properties.json", extra=d)
        raise

    testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"]    = consLogPathname
    testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"] = consCfgPathname
    testcaseEnv.userDefinedEnvVarDict["producerLogPathName"]    = prodLogPathname
    testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"] = prodCfgPathname


def copy_file_with_dict_values(srcFile, destFile, dictObj, keyValToAddDict):
    infile  = open(srcFile, "r")
    inlines = infile.readlines()
    infile.close()

    outfile = open(destFile, 'w')
    for line in inlines:
        for key in dictObj.keys():
            if (line.startswith(key + "=")):
                line = key + "=" + dictObj[key] + "\n"
        outfile.write(line)

    if (keyValToAddDict is not None):
        for key in sorted(keyValToAddDict.iterkeys()):
            line = key + "=" + keyValToAddDict[key] + "\n"
            outfile.write(line)

    outfile.close()

def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
    logger.info("calling generate_properties_files", extra=d)

    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
    tcPathname    = testcaseEnv.testCaseBaseDir
    tcConfigsList = testcaseEnv.testcaseConfigsList

    cfgTemplatePathname = os.path.abspath(testsuitePathname + "/config")
    cfgDestPathname     = os.path.abspath(tcPathname + "/config")
    logger.info("config template (source) pathname : " + cfgTemplatePathname, extra=d)
    logger.info("testcase config (dest)   pathname : " + cfgDestPathname, extra=d)

    # loop through all zookeepers (if more than 1) to retrieve host and clientPort
    # to construct a zk.connect str for broker in the form of:
    # zk.connect=<host1>:<port1>,<host2>:<port2>,...
    testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]        = ""
    testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]        = ""
    testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"]      = []
    testcaseEnv.userDefinedEnvVarDict["targetZkEntityIdList"]      = []
    testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"]      = {}
    testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"]      = {}
    testcaseEnv.userDefinedEnvVarDict["sourceBrokerEntityIdList"]  = []
    testcaseEnv.userDefinedEnvVarDict["targetBrokerEntityIdList"]  = []
    testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]          = ""
    testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]          = ""

    # update zookeeper cluster info into "testcaseEnv.userDefinedEnvVarDict"
    zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")

    for zkDict in zkDictList:
        entityID       = zkDict["entity_id"]
        hostname       = zkDict["hostname"]
        clusterName    = zkDict["cluster_name"]
        clientPortList = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "clientPort")
        clientPort     = clientPortList[0]

        if clusterName == "source":
            # update source cluster zookeeper entities
            testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"].append(entityID)
            if ( len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) == 0 ):
                testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] = hostname + ":" + clientPort
            else:
                testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] += "," + hostname + ":" + clientPort

            # generate these strings for zookeeper config:
            # server.1=host1:2180:2182
            # server.2=host2:2180:2182
            zkClusterSize = len(testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"])
            zkClusterId   = str(zkClusterSize + 1)
            key           = "server." + zkClusterId
            val           = hostname + ":" + str(int(clientPort) - 1) + ":" + str(int(clientPort) + 1)
            testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"][key] = val

        elif clusterName == "target":
            # update target cluster zookeeper entities
            testcaseEnv.userDefinedEnvVarDict["targetZkEntityIdList"].append(entityID)
            if ( len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) == 0 ):
                testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] = hostname + ":" + clientPort
            else:
                testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] += "," + hostname + ":" + clientPort

            # generate these strings for zookeeper config:
            # server.1=host1:2180:2182
            # server.2=host2:2180:2182
            zkClusterSize = len(testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"])
            zkClusterId   = str(zkClusterSize + 1)
            key           = "server." + zkClusterId
            val           = hostname + ":" + str(int(clientPort) - 1) + ":" + str(int(clientPort) + 1)
            testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"][key] = val

        else:
            logger.error("Invalid cluster name: " + clusterName, extra=d)
            raise Exception("Invalid cluster name : " + clusterName)
            sys.exit(1)

    # update broker cluster info into "testcaseEnv.userDefinedEnvVarDict"
    brokerDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "broker")
    for brokerDict in brokerDictList:
        entityID       = brokerDict["entity_id"]
        hostname       = brokerDict["hostname"]
        clusterName    = brokerDict["cluster_name"]
        portList       = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "port")
        port           = portList[0]

        if clusterName == "source":
            if ( len(testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]) == 0 ):
                testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] = hostname + ":" + port
            else:
                testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] += "," + hostname + ":" + port
        elif clusterName == "target":
            if ( len(testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]) == 0 ):
                testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] = hostname + ":" + port
            else:
                testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] += "," + hostname + ":" + port
        else:
            logger.error("Invalid cluster name: " + clusterName, extra=d)
            raise Exception("Invalid cluster name : " + clusterName)
            sys.exit(1)

    # for each entity in the cluster config
    for clusterCfg in clusterConfigsList:
        cl_entity_id = clusterCfg["entity_id"]

        # loop through testcase config list 'tcConfigsList' for a matching cluster entity_id
        for tcCfg in tcConfigsList:
            if (tcCfg["entity_id"] == cl_entity_id):

                # copy the associated .properties template, update values, write to testcase_<xxx>/config

                if ( clusterCfg["role"] == "broker" ):
                    if clusterCfg["cluster_name"] == "source":
                        tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
                    elif clusterCfg["cluster_name"] == "target":
                        tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
                    else:
                        logger.error("Unknown cluster name: " + clusterName, extra=d)
                        sys.exit(1)

                    addedCSVConfig = {}
                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") 
                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
                    addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true"
                    copy_file_with_dict_values(cfgTemplatePathname + "/server.properties",
                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)

                elif ( clusterCfg["role"] == "zookeeper"):
                    if clusterCfg["cluster_name"] == "source":
                        copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties",
                            cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg,
                            testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"])
                    elif clusterCfg["cluster_name"] == "target":
                        copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties",
                            cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg,
                            testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"])
                    else:
                        logger.error("Unknown cluster name: " + clusterName, extra=d)
                        sys.exit(1)

                elif ( clusterCfg["role"] == "mirror_maker"):
                    tcCfg["broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
                    copy_file_with_dict_values(cfgTemplatePathname + "/mirror_producer.properties",
                        cfgDestPathname + "/" + tcCfg["mirror_producer_config_filename"], tcCfg, None)

                    # update zk.connect with the zk entities specified in cluster_config.json
                    tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
                    copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
                        cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None)
                
                else:
                    logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d)

    # scp updated config files to remote hosts
    scp_file_to_remote_host(clusterConfigsList, testcaseEnv)


def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv):

    testcaseConfigsList = testcaseEnv.testcaseConfigsList

    for clusterEntityConfigDict in clusterEntityConfigDictList:
        hostname         = clusterEntityConfigDict["hostname"]
        kafkaHome        = clusterEntityConfigDict["kafka_home"]
        localTestcasePathName  = testcaseEnv.testCaseBaseDir
        remoteTestcasePathName = localTestcasePathName

        if hostname != "localhost":
            remoteTestcasePathName = replace_kafka_home(localTestcasePathName, kafkaHome)

        cmdStr = "scp " + localTestcasePathName + "/config/* " + hostname + ":" + remoteTestcasePathName + "/config"
        logger.debug("executing command [" + cmdStr + "]", extra=d)
        system_test_utils.sys_call(cmdStr)


def start_zookeepers(systemTestEnv, testcaseEnv):
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
        clusterEntityConfigDictList, "role", "zookeeper", "entity_id")

    for zkEntityId in zkEntityIdList:
        configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "zookeeper", zkEntityId, "config")
        configFile     = system_test_utils.get_data_by_lookup_keyval(
                             testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "config_filename")
        clientPort     = system_test_utils.get_data_by_lookup_keyval(
                             testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "clientPort")
        dataDir        = system_test_utils.get_data_by_lookup_keyval(
                             testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "dataDir")
        hostname       = system_test_utils.get_data_by_lookup_keyval(
                             clusterEntityConfigDictList, "entity_id", zkEntityId, "hostname")
        minusOnePort   = str(int(clientPort) - 1)
        plusOnePort    = str(int(clientPort) + 1)

        # read configFile to find out the id of the zk and create the file "myid"
        infile  = open(configPathName + "/" + configFile, "r")
        inlines = infile.readlines()
        infile.close()

        for line in inlines:
            if line.startswith("server.") and hostname + ":" + minusOnePort + ":" + plusOnePort in line:
                # server.1=host1:2187:2189
                matchObj    = re.match("server\.(.*?)=.*", line)
                zkServerId  = matchObj.group(1)

        cmdStr = "ssh " + hostname + " 'mkdir -p " + dataDir + "; echo " + zkServerId + " > " + dataDir + "/myid'"
        logger.debug("executing command [" + cmdStr + "]", extra=d)
        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
        for line in subproc.stdout.readlines():
            pass    # dummy loop to wait until producer is completed

        time.sleep(2)
        start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId)

def start_brokers(systemTestEnv, testcaseEnv):
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
        clusterEntityConfigDictList, "role", "broker", "entity_id")

    for brokerEntityId in brokerEntityIdList:
        start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)


def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):

    if onlyThisEntityId is not None:
        start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId)
    else:
        clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
        brokerEntityIdList          = system_test_utils.get_data_from_list_of_dicts( 
                                      clusterEntityConfigDictList, "role", "mirror_maker", "entity_id")

        for brokerEntityId in brokerEntityIdList:
            start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)


def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict):

    logger.info("looking up broker shutdown...", extra=d)

    # keep track of broker related data in this dict such as broker id,
    # entity id and timestamp and return it to the caller function
    shutdownBrokerDict = {} 

    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
                             clusterEntityConfigDictList, "role", "broker", "entity_id")

    for brokerEntityId in brokerEntityIdList:

        hostname   = system_test_utils.get_data_by_lookup_keyval( 
                         clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
        logFile    = system_test_utils.get_data_by_lookup_keyval( 
                         testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")

        logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
        cmdStrList = ["ssh " + hostname,
                      "\"grep -i -h '" + leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ",
                      logPathName + "/" + logFile + " | ",
                      "sort | tail -1\""]
        cmdStr     = " ".join(cmdStrList)

        logger.debug("executing command [" + cmdStr + "]", extra=d)
        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
        for line in subproc.stdout.readlines():

            line = line.rstrip('\n')

            if leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
                logger.debug("found the log line : " + line, extra=d)
                try:
                    matchObj    = re.match(leaderAttributesDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line)
                    datetimeStr = matchObj.group(1)
                    datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
                    unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
                    #print "{0:.3f}".format(unixTs)

                    # update shutdownBrokerDict when
                    # 1. shutdownBrokerDict has no logline entry
                    # 2. shutdownBrokerDict has existing logline enty but found another logline with more recent timestamp
                    if (len(shutdownBrokerDict) > 0 and shutdownBrokerDict["timestamp"] < unixTs) or (len(shutdownBrokerDict) == 0):
                        shutdownBrokerDict["timestamp"] = unixTs
                        shutdownBrokerDict["brokerid"]  = matchObj.group(2)
                        shutdownBrokerDict["hostname"]  = hostname
                        shutdownBrokerDict["entity_id"] = brokerEntityId
                    logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + \
                        "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d)
                except:
                    logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
                    raise

    return shutdownBrokerDict


def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict):

    logger.debug("looking up leader...", extra=d)

    # keep track of leader related data in this dict such as broker id,
    # entity id and timestamp and return it to the caller function
    leaderDict = {} 

    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
                             clusterEntityConfigDictList, "role", "broker", "entity_id")

    for brokerEntityId in brokerEntityIdList:

        hostname   = system_test_utils.get_data_by_lookup_keyval( \
                         clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
        kafkaHome  = system_test_utils.get_data_by_lookup_keyval( \
                         clusterEntityConfigDictList, "entity_id", brokerEntityId, "kafka_home")
        logFile    = system_test_utils.get_data_by_lookup_keyval( \
                         testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")

        logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")

        if hostname != "localhost":
            logPathName = replace_kafka_home(logPathName, kafkaHome)

        cmdStrList = ["ssh " + hostname,
                      "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
                      logPathName + "/" + logFile + " | ",
                      "sort | tail -1\""]
        cmdStr     = " ".join(cmdStrList)

        logger.debug("executing command [" + cmdStr + "]", extra=d)
        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
        for line in subproc.stdout.readlines():

            line = line.rstrip('\n')

            if leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] in line:
                logger.debug("found the log line : " + line, extra=d)
                try:
                    matchObj    = re.match(leaderAttributesDict["REGX_LEADER_ELECTION_PATTERN"], line)
                    datetimeStr = matchObj.group(1)
                    datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
                    unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
                    #print "{0:.3f}".format(unixTs)

                    # update leaderDict when
                    # 1. leaderDict has no logline entry
                    # 2. leaderDict has existing logline entry but found another logline with more recent timestamp
                    if (len(leaderDict) > 0 and leaderDict["timestamp"] < unixTs) or (len(leaderDict) == 0):
                        leaderDict["timestamp"] = unixTs
                        leaderDict["brokerid"]  = matchObj.group(2)
                        leaderDict["topic"]     = matchObj.group(3)
                        leaderDict["partition"] = matchObj.group(4)
                        leaderDict["entity_id"] = brokerEntityId
                        leaderDict["hostname"]  = hostname
                    logger.debug("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d)
                except:
                    logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
                    raise
            #else:
            #    logger.debug("unmatched line found [" + line + "]", extra=d)

    return leaderDict


def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):

    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    # cluster configurations:
    hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
    role      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role")
    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
    jmxPort   = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
    clusterName = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "cluster_name")

    # testcase configurations:
    testcaseConfigsList = testcaseEnv.testcaseConfigsList
    clientPort = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "clientPort")
    configFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "config_filename")
    logFile    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")

    mmConsumerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
                           "mirror_consumer_config_filename")
    mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
                           "mirror_producer_config_filename")

    logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d)

    configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config")
    logPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default")

    if hostname != "localhost":
        configPathName = replace_kafka_home(configPathName, kafkaHome)
        logPathName    = replace_kafka_home(logPathName, kafkaHome)

    if role == "zookeeper":
        cmdList = ["ssh " + hostname,
                  "'JAVA_HOME=" + javaHome,
                  "JMX_PORT=" + jmxPort,
                  kafkaHome + "/bin/zookeeper-server-start.sh ",
                  configPathName + "/" + configFile + " &> ",
                  logPathName + "/" + logFile + " & echo pid:$! > ",
                  logPathName + "/entity_" + entityId + "_pid'"]

    elif role == "broker":
        cmdList = ["ssh " + hostname,
                  "'JAVA_HOME=" + javaHome,
                 "JMX_PORT=" + jmxPort,
                  kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
                  configPathName + "/" + configFile + " >> ",
                  logPathName + "/" + logFile + " & echo pid:$! > ",
                  logPathName + "/entity_" + entityId + "_pid'"]

    elif role == "mirror_maker":
        cmdList = ["ssh " + hostname,
                  "'JAVA_HOME=" + javaHome,
                 "JMX_PORT=" + jmxPort,
                  kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker",
                  "--consumer.config " + configPathName + "/" + mmConsumerConfigFile,
                  "--producer.config " + configPathName + "/" + mmProducerConfigFile,
                  "--whitelist=\".*\" >> ",
                  logPathName + "/" + logFile + " & echo pid:$! > ",
                  logPathName + "/entity_" + entityId + "_pid'"]

    cmdStr = " ".join(cmdList)

    logger.debug("executing command: [" + cmdStr + "]", extra=d)
    system_test_utils.async_sys_call(cmdStr)
    time.sleep(5)

    pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid' 2> /dev/null"
    logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
    subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)

    # keep track of the remote entity pid in a dictionary
    for line in subproc.stdout.readlines():
        if line.startswith("pid"):
            line = line.rstrip('\n')
            logger.debug("found pid line: [" + line + "]", extra=d)
            tokens = line.split(':')
            if role == "zookeeper":
                testcaseEnv.entityZkParentPidDict[entityId] = tokens[1]
            elif role == "broker":
                testcaseEnv.entityBrokerParentPidDict[entityId] = tokens[1]
            elif role == "mirror_maker":
                testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1]


def start_console_consumer(systemTestEnv, testcaseEnv):

    clusterList = systemTestEnv.clusterEntityConfigDictList

    consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer")
    for consumerConfig in consumerConfigList:
        host              = consumerConfig["hostname"]
        entityId          = consumerConfig["entity_id"]
        jmxPort           = consumerConfig["jmx_port"] 
        role              = consumerConfig["role"]
        clusterName       = consumerConfig["cluster_name"] 
        kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home")
        javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home")
        jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port")
        kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"

        logger.info("starting console consumer", extra=d)

        consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default")
        metricsDir      = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"),

        if host != "localhost":
            consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome)
            #metricsDir      = replace_kafka_home(metricsDir, kafkaHome)

        consumerLogPathName = consumerLogPath + "/console_consumer.log"

        testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName

        # testcase configurations:
        testcaseList = testcaseEnv.testcaseConfigsList
        topic     = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")
        timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms")

        formatterOption = ""
        try:
            formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter")
        except:
            pass

        if len(formatterOption) > 0:
            formatterOption = " --formatter " + formatterOption + " "

        zkConnectStr = ""
        if clusterName == "source":
            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
        elif clusterName == "target":
            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
        else:
            logger.error("Invalid cluster name : " + clusterName, extra=d)
            sys.exit(1)

        cmdList = ["ssh " + host,
                   "'JAVA_HOME=" + javaHome,
                   "JMX_PORT=" + jmxPort,
                   kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
                   "--zookeeper " + zkConnectStr,
                   "--topic " + topic,
                   "--consumer-timeout-ms " + timeoutMs,
                   "--csv-reporter-enabled",
                   #"--metrics-dir " + metricsDir,
                   formatterOption,
                   "--from-beginning ",
                   " >> " + consumerLogPathName,
                   " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]

        cmdStr = " ".join(cmdList)

        logger.debug("executing command: [" + cmdStr + "]", extra=d)
        system_test_utils.async_sys_call(cmdStr)

        pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)

        # keep track of the remote entity pid in a dictionary
        for line in subproc.stdout.readlines():
            if line.startswith("pid"):
                line = line.rstrip('\n')
                logger.debug("found pid line: [" + line + "]", extra=d)
                tokens = line.split(':')
                testcaseEnv.consumerHostParentPidDict[host] = tokens[1]

def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):

    entityConfigList     = systemTestEnv.clusterEntityConfigDictList
    testcaseConfigsList  = testcaseEnv.testcaseConfigsList
    brokerListStr = ""

    # construct "broker-list" for producer
    for entityConfig in entityConfigList:
        entityRole = entityConfig["role"]
        if entityRole == "broker":
            hostname = entityConfig["hostname"]
            entityId = entityConfig["entity_id"]
            port     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port")

    producerConfigList = system_test_utils.get_dict_from_list_of_dicts(entityConfigList, "role", "producer_performance")
    for producerConfig in producerConfigList:
        host              = producerConfig["hostname"]
        entityId          = producerConfig["entity_id"]
        jmxPort           = producerConfig["jmx_port"] 
        role              = producerConfig["role"] 

        thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client))
        testcaseEnv.lock.acquire()
        testcaseEnv.numProducerThreadsRunning += 1
        logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
        time.sleep(1)
        testcaseEnv.lock.release()

def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
    host              = producerConfig["hostname"]
    entityId          = producerConfig["entity_id"]
    jmxPort           = producerConfig["jmx_port"] 
    role              = producerConfig["role"]
    clusterName       = producerConfig["cluster_name"]
    kafkaHome         = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home")
    javaHome          = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "java_home")
    jmxPort           = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port")
    kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"

    # testcase configurations:
    testcaseConfigsList = testcaseEnv.testcaseConfigsList
    topic          = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic")
    threads        = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads")
    compCodec      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec")
    messageSize    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size")
    noMsgPerBatch  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message")
    requestNumAcks = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "request-num-acks")
    syncMode       = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "sync")
    retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms")
    numOfRetries   = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries")

    # for optional properties in testcase_xxxx_properties.json,
    # check the length of returned value for those properties:
    if len(retryBackoffMs) == 0:      # no setting for "producer-retry-backoff-ms"
        retryBackoffMs     =  "100"   # default
    if len(numOfRetries)   == 0:      # no setting for "producer-num-retries"
        numOfRetries       =  "3"     # default

    brokerListStr  = ""
    if clusterName == "source":
        brokerListStr  = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]
    elif clusterName == "target":
        brokerListStr  = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
    else:
        logger.error("Unknown cluster name: " + clusterName, extra=d)
        sys.exit(1)

    logger.info("starting producer preformance", extra=d)

    producerLogPath  = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default")
    metricsDir       = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics")

    if host != "localhost":
        producerLogPath = replace_kafka_home(producerLogPath, kafkaHome)
        metricsDir      = replace_kafka_home(metricsDir, kafkaHome)

    producerLogPathName = producerLogPath + "/producer_performance.log"

    testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName

    counter = 0
    producerSleepSec = int(testcaseEnv.testcaseArgumentsDict["sleep_seconds_between_producer_calls"])

    boolArgumentsStr = ""
    if syncMode.lower() == "true":
        boolArgumentsStr = boolArgumentsStr + " --sync"

    # keep calling producer until signaled to stop by:
    # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]
    while 1:
        logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
        testcaseEnv.lock.acquire()
        if not testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]:
            initMsgId = counter * int(noMsgPerBatch)

            logger.info("#### [producer thread] status of stopBackgroundProducer : [False] => producing [" \
                + str(noMsgPerBatch) + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d)

            cmdList = ["ssh " + host,
                       "'JAVA_HOME=" + javaHome,
                       "JMX_PORT=" + jmxPort,
                       kafkaRunClassBin + " kafka.perf.ProducerPerformance",
                       "--broker-list " + brokerListStr,
                       "--initial-message-id " + str(initMsgId),
                       "--messages " + noMsgPerBatch,
                       "--topics " + topic,
                       "--threads " + threads,
                       "--compression-codec " + compCodec,
                       "--message-size " + messageSize,
                       "--request-num-acks " + requestNumAcks,
                       "--producer-retry-backoff-ms " + retryBackoffMs,
                       "--producer-num-retries " + numOfRetries,
                       "--csv-reporter-enabled",
                       "--metrics-dir " + metricsDir,
                       boolArgumentsStr,
                       " >> " + producerLogPathName,
                       " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]

            if kafka07Client:
                cmdList[:] = []

                brokerInfoStr = ""
                tokenList = brokerListStr.split(',')
                index = 1
                for token in tokenList:
                    if len(brokerInfoStr) == 0:
                        brokerInfoStr = str(index) + ":" + token
                    else:
                        brokerInfoStr += "," + str(index) + ":" + token
                    index += 1

                brokerInfoStr = "broker.list=" + brokerInfoStr

                cmdList = ["ssh " + host,
                       "'JAVA_HOME=" + javaHome,
                       "JMX_PORT=" + jmxPort,
                       kafkaRunClassBin + " kafka.perf.ProducerPerformance",
                       "--brokerinfo " + brokerInfoStr,
                       "--messages " + noMsgPerBatch,
                       "--topic " + topic,
                       "--threads " + threads,
                       "--compression-codec " + compCodec,
                       "--message-size " + messageSize,
                       "--vary-message-size --async",
                       " >> " + producerLogPathName,
                       " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]

            cmdStr = " ".join(cmdList)
            logger.debug("executing command: [" + cmdStr + "]", extra=d)

            subproc = system_test_utils.sys_call_return_subproc(cmdStr)
            for line in subproc.stdout.readlines():
                pass    # dummy loop to wait until producer is completed
        else:
            testcaseEnv.numProducerThreadsRunning -= 1
            logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
            testcaseEnv.lock.release()
            break

        counter += 1
        logger.debug("calling testcaseEnv.lock.release()", extra=d)
        testcaseEnv.lock.release()
        time.sleep(int(producerSleepSec))

    # wait until other producer threads also stops and
    # let the main testcase know all producers have stopped
    while 1:
        testcaseEnv.lock.acquire()
        time.sleep(1)
        if testcaseEnv.numProducerThreadsRunning == 0:
            testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True
            testcaseEnv.lock.release()
            break
        else:
            logger.debug("waiting for TRUE of testcaseEnv.userDefinedEnvVarDict['backgroundProducerStopped']", extra=d)
            testcaseEnv.lock.release()
        time.sleep(1)

def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"):
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
    pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)

    logger.debug("terminating (" + signalType + ") process id: " + parentPid + " in host: " + hostname, extra=d)

    if signalType.lower() == "sigterm":
        system_test_utils.sigterm_remote_process(hostname, pidStack)
    elif signalType.lower() == "sigkill":
        system_test_utils.sigkill_remote_process(hostname, pidStack)
    else:
        logger.error("Invalid signal type: " + signalType, extra=d)
        raise Exception("Invalid signal type: " + signalType)


def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
    pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)

    logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
    system_test_utils.sigkill_remote_process(hostname, pidStack)


def create_topic(systemTestEnv, testcaseEnv):
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")

    for prodPerfCfg in prodPerfCfgList:
        topicsStr       = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic")
        zkEntityId      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
        zkHost          = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
        kafkaHome       = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
        javaHome        = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
        createTopicBin  = kafkaHome + "/bin/kafka-create-topic.sh"

        logger.debug("zkEntityId     : " + zkEntityId, extra=d)
        logger.debug("createTopicBin : " + createTopicBin, extra=d)

        zkConnectStr = ""
        topicsList   = topicsStr.split(',')

        if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0:
            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
        elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
        else:
            raise Exception("Empty zkConnectStr found")

        testcaseBaseDir = testcaseEnv.testCaseBaseDir

        if zkHost != "localhost":
            testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome)

        for topic in topicsList:
            logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) 
            cmdList = ["ssh " + zkHost,
                       "'JAVA_HOME=" + javaHome,
                       createTopicBin,
                       " --topic "     + topic,
                       " --zookeeper " + zkConnectStr,
                       " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
                       " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
                       testcaseBaseDir + "/logs/create_source_cluster_topic.log'"]
    
            cmdStr = " ".join(cmdList)
            logger.debug("executing command: [" + cmdStr + "]", extra=d)
            subproc = system_test_utils.sys_call_return_subproc(cmdStr)


def get_message_id(logPathName, topic=""):
    logLines      = open(logPathName, "r").readlines()
    messageIdList = []

    for line in logLines:
        if not "MessageID" in line:
            continue
        else:
            matchObj = re.match('.*Topic:(.*?):.*:MessageID:(.*?):', line)
            if len(topic) == 0:
                messageIdList.append( matchObj.group(2) )
            else:
                if topic == matchObj.group(1):
                    messageIdList.append( matchObj.group(2) )

    return messageIdList

def get_message_checksum(logPathName):
    logLines = open(logPathName, "r").readlines()
    messageChecksumList = []

    for line in logLines:
        if not "checksum:" in line:
            continue
        else:
            matchObj = re.match('.*checksum:(\d*).*', line)
            if matchObj is not None:
                checksum = matchObj.group(1)
                messageChecksumList.append( checksum )
            else:
                logger.error("unexpected log line : " + line, extra=d)

    return messageChecksumList


def validate_data_matched(systemTestEnv, testcaseEnv):
    validationStatusDict        = testcaseEnv.validationStatusDict
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")

    for prodPerfCfg in prodPerfCfgList:
        producerEntityId = prodPerfCfg["entity_id"]
        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
        acks  = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")

        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
                           clusterEntityConfigDictList, "role", "console_consumer", "entity_id")

        matchingConsumerEntityId = None
        for consumerEntityId in consumerEntityIdList:
            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
            if consumerTopic in topic:
                matchingConsumerEntityId = consumerEntityId
                break

        if matchingConsumerEntityId is None:
            break

        msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
                           testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + "/msg_id_missing_in_consumer.log"
        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
        producerLogPathName = producerLogPath + "/producer_performance.log"

        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
        consumerLogPathName = consumerLogPath + "/console_consumer.log"

        producerMsgIdList  = get_message_id(producerLogPathName)
        consumerMsgIdList  = get_message_id(consumerLogPathName)
        producerMsgIdSet   = set(producerMsgIdList)
        consumerMsgIdSet   = set(consumerMsgIdList)

        missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet

        outfile = open(msgIdMissingInConsumerLogPathName, "w")
        for id in missingMsgIdInConsumer:
            outfile.write(id + "\n")
        outfile.close()

        logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
        logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d)
        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
        validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))

        if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
        elif (acks == "1"):
            missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
            print "#### missing Percent : ", missingPercentage
            if missingPercentage <= 1:
                validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
                logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
        else:
            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
            logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)


def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict):

    if ( len(leaderDict) > 0 ):
        try:
            leaderBrokerId = leaderDict["brokerid"]
            leaderEntityId = leaderDict["entity_id"]
            leaderPid      = testcaseEnv.entityBrokerParentPidDict[leaderEntityId]
            hostname       = leaderDict["hostname"]

            logger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \
                leaderBrokerId + "] for partition [" + leaderDict["partition"] + "]", extra=d)
            validationStatusDict["Validate leader election successful"] = "PASSED"
            return True
        except Exception, e:
            logger.error("leader info not completed: {0}".format(e), extra=d)
            traceback.print_exc()
            print leaderDict
            traceback.print_exc()
            validationStatusDict["Validate leader election successful"] = "FAILED"
            return False
    else:
        validationStatusDict["Validate leader election successful"] = "FAILED"
        return False


def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv):

    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
    testcaseConfigsList         = testcaseEnv.testcaseConfigsList
    testCaseBaseDir             = testcaseEnv.testCaseBaseDir

    # clean up the following directories in localhost
    #     system_test/<xxxx_testsuite>/testcase_xxxx/config
    #     system_test/<xxxx_testsuite>/testcase_xxxx/dashboards
    #     system_test/<xxxx_testsuite>/testcase_xxxx/logs
    logger.info("cleaning up test case dir: [" + testCaseBaseDir + "]", extra=d)

    if "system_test" not in testCaseBaseDir:
        logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
        logger.warn("check config file: system_test/cluster_config.properties", extra=d)
        logger.warn("aborting test...", extra=d)
        sys.exit(1)
    else:
        system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/config/*")
        system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/dashboards/*")
        system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/logs/*")

    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:

        hostname         = clusterEntityConfigDict["hostname"]
        entityId         = clusterEntityConfigDict["entity_id"]
        role             = clusterEntityConfigDict["role"]
        kafkaHome        = clusterEntityConfigDict["kafka_home"]
        cmdStr           = ""
        dataDir          = ""

        if hostname == "localhost":
            remoteTestCaseBaseDir = testCaseBaseDir
        else:
            remoteTestCaseBaseDir = replace_kafka_home(testCaseBaseDir, kafkaHome)

        logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d)

        if role == 'zookeeper':
            dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "dataDir")
        elif role == 'broker':
            dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log.dir")
        else:
            logger.info("skipping role [" + role + "] on host : [" + hostname + "]", extra=d)
            continue

        cmdStr  = "ssh " + hostname + " 'rm -rf " + dataDir + "'"

        if not dataDir.startswith("/tmp"):
            logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
            logger.warn("check config file: system_test/cluster_config.properties", extra=d)
            logger.warn("aborting test...", extra=d)
            sys.exit(1)

        # ============================
        # cleaning data dir
        # ============================
        logger.debug("executing command [" + cmdStr + "]", extra=d)
        system_test_utils.sys_call(cmdStr)

        # ============================
        # cleaning log/metrics/svg, ...
        # ============================
        if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
            # so kafkaHome is a real kafka installation
            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\""
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\""
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\""
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\""
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\""
            logger.debug("executing command [" + cmdStr + "]", extra=d)
            system_test_utils.sys_call(cmdStr)

def replace_kafka_home(systemTestSubDirPath, kafkaHome):
    matchObj = re.match(".*(\/system_test\/.*)$", systemTestSubDirPath)
    relativeSubDirPath = matchObj.group(1)
    return kafkaHome + relativeSubDirPath

def get_entity_log_directory(testCaseBaseDir, entity_id, role):
    return testCaseBaseDir + "/logs/" + role + "-" + entity_id

def get_entities_for_role(clusterConfig, role):
    return filter(lambda entity: entity['role'] == role, clusterConfig)

def stop_consumer():
    system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")

def ps_grep_terminate_running_entity(systemTestEnv):
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
    username = getpass.getuser()

    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
        hostname = clusterEntityConfigDict["hostname"]
        cmdList  = ["ssh " + hostname,
                    "\"ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + username,
                    "| grep -i 'java\|server\-start\|run\-\|producer\|consumer\|jmxtool' | grep kafka",
                    "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""]

        cmdStr = " ".join(cmdList)
        logger.debug("executing command [" + cmdStr + "]", extra=d)

        system_test_utils.sys_call(cmdStr) 

def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict):
    leaderEntityId = None
    leaderBrokerId = None
    leaderPPid     = None
    shutdownLeaderTimestamp = None
    leaderReElectionLatency = -1

    if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
        # leader election is not successful - something is wrong => so skip this testcase
        return None
    else:
        # leader elected => stop leader
        try:
            leaderEntityId = leaderDict["entity_id"]
            leaderBrokerId = leaderDict["brokerid"]
            leaderPPid     = testcaseEnv.entityBrokerParentPidDict[leaderEntityId]
        except:
            logger.info("leader details unavailable", extra=d)
            raise

        logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d)
        signalType = None
        try:
            signalType = testcaseEnv.testcaseArgumentsDict["signal_type"]
        except:
            pass

        if signalType is None or signalType.lower() == "sigterm":
            stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
        elif signalType.lower() == "sigkill":
            stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid, "SIGKILL")
        else:
            logger.error("Unsupported signal type: " + signalType, extra=d)
            raise Exception("Unsupported signal type: " + signalType)

    logger.info("sleeping for 10s for leader re-election to complete", extra=d)
    time.sleep(10)

    # get broker shut down completed timestamp
    shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict)
    shutdownTimestamp  = -1

    try:
        shutdownTimestamp = shutdownBrokerDict["timestamp"]
        logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownTimestamp)), extra=d)
    except:
        logger.warn("unable to find broker shut down timestamp", extra=d)

    logger.info("looking up new leader", extra=d)
    leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict)
    logger.debug("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d)

    if shutdownTimestamp > 0:
        leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownTimestamp)
        logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
 
    return leaderReElectionLatency


def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):

    entityConfigs = systemTestEnv.clusterEntityConfigDictList

    for hostname, producerPPid in testcaseEnv.producerHostParentPidDict.items():
        producerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")
        stop_remote_entity(systemTestEnv, producerEntityId, producerPPid)

    for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items():
        consumerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")
        stop_remote_entity(systemTestEnv, consumerEntityId, consumerPPid)

    for entityId, jmxParentPidList in testcaseEnv.entityJmxParentPidDict.items():
        for jmxParentPid in jmxParentPidList:
            stop_remote_entity(systemTestEnv, entityId, jmxParentPid)

    for entityId, mirrorMakerParentPid in testcaseEnv.entityMirrorMakerParentPidDict.items():
        stop_remote_entity(systemTestEnv, entityId, mirrorMakerParentPid)

    for entityId, brokerParentPid in testcaseEnv.entityBrokerParentPidDict.items():
        stop_remote_entity(systemTestEnv, entityId, brokerParentPid)

    for entityId, zkParentPid in testcaseEnv.entityZkParentPidDict.items():
        stop_remote_entity(systemTestEnv, entityId, zkParentPid)


def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
    clusterConfigList = systemTestEnv.clusterEntityConfigDictList
    migrationToolConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "migration_tool")

    for migrationToolConfig in migrationToolConfigList:

        entityId = migrationToolConfig["entity_id"]

        if onlyThisEntityId is None or entityId == onlyThisEntityId:

            host              = migrationToolConfig["hostname"]
            jmxPort           = migrationToolConfig["jmx_port"] 
            role              = migrationToolConfig["role"] 
            kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home")
            javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home")
            jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port")
            kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"

            logger.info("starting kafka migration tool", extra=d)
            migrationToolLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "migration_tool", entityId, "default")
            migrationToolLogPathName = migrationToolLogPath + "/migration_tool.log"
            testcaseEnv.userDefinedEnvVarDict["migrationToolLogPathName"] = migrationToolLogPathName

            testcaseConfigsList = testcaseEnv.testcaseConfigsList
            numProducers    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.producers")
            numStreams      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.streams")
            producerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer.config")
            consumerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "consumer.config")
            zkClientJar     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "zkclient.01.jar")
            kafka07Jar      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "kafka.07.jar")
            whiteList       = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "whitelist")
            logFile         = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")

            cmdList = ["ssh " + host,
                       "'JAVA_HOME=" + javaHome,
                       "JMX_PORT=" + jmxPort,
                       kafkaRunClassBin + " kafka.tools.KafkaMigrationTool",
                       "--whitelist="        + whiteList,
                       "--num.producers="    + numProducers,
                       "--num.streams="      + numStreams,
                       "--producer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + producerConfig,
                       "--consumer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + consumerConfig,
                       "--zkclient.01.jar="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + zkClientJar,
                       "--kafka.07.jar="     + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + kafka07Jar,
                       " &> " + migrationToolLogPath + "/migrationTool.log",
                       " & echo pid:$! > " + migrationToolLogPath + "/entity_" + entityId + "_pid'"]

            cmdStr = " ".join(cmdList)
            logger.debug("executing command: [" + cmdStr + "]", extra=d)
            system_test_utils.async_sys_call(cmdStr)
            time.sleep(5)

            pidCmdStr = "ssh " + host + " 'cat " + migrationToolLogPath + "/entity_" + entityId + "_pid' 2> /dev/null"
            logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
            subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)

            # keep track of the remote entity pid in a dictionary
            for line in subproc.stdout.readlines():
                if line.startswith("pid"):
                    line = line.rstrip('\n')
                    logger.debug("found pid line: [" + line + "]", extra=d)
                    tokens = line.split(':')
                    testcaseEnv.entityMigrationToolParentPidDict[entityId] = tokens[1]


def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
    validationStatusDict        = testcaseEnv.validationStatusDict
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")

    for prodPerfCfg in prodPerfCfgList:
        producerEntityId = prodPerfCfg["entity_id"]
        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")

        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
                               clusterEntityConfigDictList, "role", "console_consumer", "entity_id")

        matchingConsumerEntityId = None
        for consumerEntityId in consumerEntityIdList:
            consumerTopic = system_test_utils.get_data_by_lookup_keyval(
                            testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
            if consumerTopic in topic:
                matchingConsumerEntityId = consumerEntityId
                break

        if matchingConsumerEntityId is None:
            break

        msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
                                                  testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
                                                  + "/msg_checksum_missing_in_consumer.log"
        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
        producerLogPathName = producerLogPath + "/producer_performance.log"

        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
        consumerLogPathName = consumerLogPath + "/console_consumer.log"

        producerMsgChecksumList      = get_message_checksum(producerLogPathName)
        consumerMsgChecksumList      = get_message_checksum(consumerLogPathName)
        producerMsgChecksumSet       = set(producerMsgChecksumList)
        consumerMsgChecksumSet       = set(consumerMsgChecksumList)
        producerMsgChecksumUniqList  = list(producerMsgChecksumSet)
        consumerMsgChecksumUniqList  = list(consumerMsgChecksumSet)

        missingMsgChecksumInConsumer = producerMsgChecksumSet - consumerMsgChecksumSet

        logger.debug("size of producerMsgChecksumList      : " + str(len(producerMsgChecksumList)), extra=d)
        logger.debug("size of consumerMsgChecksumList      : " + str(len(consumerMsgChecksumList)), extra=d)
        logger.debug("size of producerMsgChecksumSet       : " + str(len(producerMsgChecksumSet)), extra=d)
        logger.debug("size of consumerMsgChecksumSet       : " + str(len(consumerMsgChecksumSet)), extra=d)
        logger.debug("size of producerMsgChecksumUniqList  : " + str(len(producerMsgChecksumUniqList)), extra=d)
        logger.debug("size of consumerMsgChecksumUniqList  : " + str(len(consumerMsgChecksumUniqList)), extra=d)
        logger.debug("size of missingMsgChecksumInConsumer : " + str(len(missingMsgChecksumInConsumer)), extra=d)

        outfile = open(msgChecksumMissingInConsumerLogPathName, "w")
        for id in missingMsgChecksumInConsumer:
            outfile.write(id + "\n")
        outfile.close()

        logger.info("no. of messages on topic [" + topic + "] sent from producer          : " + str(len(producerMsgChecksumList)), extra=d)
        logger.info("no. of messages on topic [" + topic + "] received by consumer        : " + str(len(consumerMsgChecksumList)), extra=d)
        logger.info("no. of unique messages on topic [" + topic + "] sent from producer   : " + str(len(producerMsgChecksumUniqList)),  extra=d)
        logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgChecksumUniqList)),  extra=d)
        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(list(producerMsgChecksumSet)))
        validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(list(consumerMsgChecksumSet)))

        if ( len(producerMsgChecksumList) > 0 and len(list(producerMsgChecksumSet)) == len(list(consumerMsgChecksumSet))):
            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
        else:
            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
            logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing MessageID", extra=d)

def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName="source"):

    anonLogger.info("================================================")
    anonLogger.info("validating merged broker log segment checksums")
    anonLogger.info("================================================")

    brokerLogCksumDict   = {}
    testCaseBaseDir      = testcaseEnv.testCaseBaseDir
    tcConfigsList        = testcaseEnv.testcaseConfigsList
    validationStatusDict = testcaseEnv.validationStatusDict
    clusterConfigList    = systemTestEnv.clusterEntityConfigDictList
    #brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(clusterConfigList, "role", "broker", "entity_id")
    allBrokerConfigList  = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker")
    brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id")

    # loop through all brokers
    for brokerEntityId in brokerEntityIdList:
        logCksumDict   = {}
        # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
        # => remoteLogSegmentDir   : kafka_server_4_logs
        remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir")
        remoteLogSegmentDir      = os.path.basename(remoteLogSegmentPathName)
        logPathName              = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
        localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir

        # localLogSegmentPath :
        # .../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs
        #   |- test_1-0
        #        |- 00000000000000000000.index
        #        |- 00000000000000000000.log
        #        |- 00000000000000000020.index
        #        |- 00000000000000000020.log
        #        |- . . .
        #   |- test_1-1
        #        |- 00000000000000000000.index
        #        |- 00000000000000000000.log
        #        |- 00000000000000000020.index
        #        |- 00000000000000000020.log
        #        |- . . .

        # loop through all topicPartition directories such as : test_1-0, test_1-1, ... 
        for topicPartition in os.listdir(localLogSegmentPath):
            # found a topic-partition directory
            if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
                # md5 hasher
                m = hashlib.md5()

                # logSegmentKey is like this : kafka_server_9_logs:test_1-0 (delimited by ':')
                logSegmentKey = remoteLogSegmentDir + ":" + topicPartition

                # log segment files are located in : localLogSegmentPath + "/" + topicPartition
                # sort the log segment files under each topic-partition and get the md5 checksum
                for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)):
                    # only process log file: *.log
                    if logFile.endswith(".log"):
                        # read the log segment file as binary
                        offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition + "/" + logFile
                        fin = file(offsetLogSegmentPathName, 'rb')
                        # keep reading 64K max at a time
                        while True:
                            data = fin.read(65536)
                            if not data:
                                fin.close()
                                break
                            # update it into the hasher
                            m.update(data)

                # update the md5 checksum into brokerLogCksumDict with the corresponding key
                brokerLogCksumDict[logSegmentKey] = m.hexdigest()

    # print it out to the console for reference
    pprint.pprint(brokerLogCksumDict)

    # brokerLogCksumDict will look like this:
    # {
    #   'kafka_server_1_logs:tests_1-0': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_1_logs:tests_1-1': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_1_logs:tests_2-0': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_1_logs:tests_2-1': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_2_logs:tests_1-0': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_2_logs:tests_1-1': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_2_logs:tests_2-0': 'd41d8cd98f00b204e9800998ecf8427e',
    #   'kafka_server_2_logs:tests_2-1': 'd41d8cd98f00b204e9800998ecf8427e'
    # }

    checksumDict = {}
    # organize the checksum according to their topic-partition and checksumDict will look like this:
    # {
    #   'test_1-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
    #   'test_1-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
    #   'test_2-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
    #   'test_2-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e']
    # }
  
    for brokerTopicPartitionKey, md5Checksum in brokerLogCksumDict.items():
        tokens = brokerTopicPartitionKey.split(":")
        brokerKey      = tokens[0]
        topicPartition = tokens[1]
        if topicPartition in checksumDict:
            # key already exist
            checksumDict[topicPartition].append(md5Checksum)
        else:
            # new key => create a new list to store checksum
            checksumDict[topicPartition] = []
            checksumDict[topicPartition].append(md5Checksum)

    failureCount = 0

    # loop through checksumDict: the checksums should be the same inside each
    # topic-partition's list. Otherwise, checksum mismatched is detected
    for topicPartition, checksumList in checksumDict.items():
        checksumSet = frozenset(checksumList)
        if len(checksumSet) > 1:
            failureCount += 1
            logger.error("merged log segment checksum in " + topicPartition + " mismatched", extra=d)
        elif len(checksumSet) == 1:
            logger.debug("merged log segment checksum in " + topicPartition + " matched", extra=d)
        else:
            logger.error("unexpected error in " + topicPartition, extra=d)
            
    if failureCount == 0:
        validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "PASSED"
    else:
        validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "FAILED"

def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None):

    clusterList        = systemTestEnv.clusterEntityConfigDictList
    consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer")
    for consumerConfig in consumerConfigList:
        host              = consumerConfig["hostname"]
        entityId          = consumerConfig["entity_id"]
        jmxPort           = consumerConfig["jmx_port"] 
        clusterName       = consumerConfig["cluster_name"] 
        kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home")
        javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home")
        kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
        consumerLogPath   = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default")

        if host != "localhost":
            consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome)

        # testcase configurations:
        testcaseList = testcaseEnv.testcaseConfigsList
        topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")

        brokerListStr = ""
        if clusterName == "source":
            brokerListStr  = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]
        elif clusterName == "target":
            brokerListStr  = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
        else:
            logger.error("Invalid cluster name : " + clusterName, extra=d)
            raise Exception("Invalid cluster name : " + clusterName)

        if len(brokerListStr) == 0:
            logger.error("Empty broker list str", extra=d)
            raise Exception("Empty broker list str")

        numPartitions = None
        try:
            numPartitions = testcaseEnv.testcaseArgumentsDict["num_partition"]
        except:
            pass

        if numPartitions is None:
            logger.error("Invalid no. of partitions: " + numPartitions, extra=d)
            raise Exception("Invalid no. of partitions: " + numPartitions)
        else:
            numPartitions = int(numPartitions)

        replicaIndex   = 1
        startingOffset = -2
        brokerPortList = brokerListStr.split(',')
        for brokerPort in brokerPortList:

            partitionId = 0
            while (partitionId < numPartitions):
                logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(partitionId) + "]", extra=d)

                if minStartingOffsetDict is not None:
                    topicPartition = topic + "-" + str(partitionId)
                    startingOffset = minStartingOffsetDict[topicPartition]

                outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log"
                brokerPortLabel = brokerPort.replace(":", "_")
                cmdList = ["ssh " + host,
                           "'JAVA_HOME=" + javaHome,
                           kafkaRunClassBin + " kafka.tools.SimpleConsumerShell",
                           "--broker-list " + brokerListStr,
                           "--topic " + topic,
                           "--partition " + str(partitionId),
                           "--replica " + str(replicaIndex),
                           "--offset " + str(startingOffset),
                           "--no-wait-at-logend ",
                           " > " + outputFilePathName,
                           " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
    
                cmdStr = " ".join(cmdList)
    
                logger.debug("executing command: [" + cmdStr + "]", extra=d)
                subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr)
                # dummy for-loop to wait until the process is completed
                for line in subproc_1.stdout.readlines():
                    pass 
                time.sleep(1)
   
                partitionId += 1
            replicaIndex += 1

def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv):
    validationStatusDict        = testcaseEnv.validationStatusDict
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")

    mismatchCount = 0

    for prodPerfCfg in prodPerfCfgList:
        producerEntityId = prodPerfCfg["entity_id"]
        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
        acks  = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
        logger.debug("request-num-acks [" + acks + "]", extra=d)

        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
                           clusterEntityConfigDictList, "role", "console_consumer", "entity_id")

        matchingConsumerEntityId = None
        for consumerEntityId in consumerEntityIdList:
            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
            if consumerTopic in topic:
                matchingConsumerEntityId = consumerEntityId
                break

        if matchingConsumerEntityId is None:
            break

        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
        producerLogPathName = producerLogPath + "/producer_performance.log"
        producerMsgIdList   = get_message_id(producerLogPathName)
        producerMsgIdSet    = set(producerMsgIdList)
        logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))

        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
        for logFile in sorted(os.listdir(consumerLogPath)):
            # only process log file: *.log
            if logFile.endswith(".log"):
                consumerLogPathName = consumerLogPath + "/" + logFile
                consumerMsgIdList   = get_message_id(consumerLogPathName)
                consumerMsgIdSet   = set(consumerMsgIdList)
                missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
                msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
                    testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + \
                    "/" + logFile + "_msg_id_missing_in_consumer.log"

                outfile = open(msgIdMissingInConsumerLogPathName, "w")
                for id in missingMsgIdInConsumer:
                    outfile.write(id + "\n")
                outfile.close()

                logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
                validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet))

                if acks == "-1" and len(missingMsgIdInConsumer) > 0:
                    mismatchCount += 1
                elif acks == "1" and len(missingMsgIdInConsumer) > 0:
                    missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
                    logger.debug("missing percentage [" + str(missingPercentage) + "]", extra=d)
                    if missingPercentage <= 1:
                        logger.warn("Test case (acks == 1) passes with < 1% data loss : [" + \
                            str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
                    else:
                        mismatchCount += 1

        if mismatchCount == 0:
            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
        else:
            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"

def get_controller_attributes(systemTestEnv, testcaseEnv):

    logger.info("Querying Zookeeper for Controller info ...", extra=d)

    # keep track of controller data in this dict such as broker id & entity id
    controllerDict = {} 

    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
    tcConfigsList      = testcaseEnv.testcaseConfigsList

    zkDictList         = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
    firstZkDict        = zkDictList[0]
    hostname           = firstZkDict["hostname"]
    zkEntityId         = firstZkDict["entity_id"]
    clientPort         = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", zkEntityId, "clientPort")
    kafkaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "kafka_home")
    javaHome           = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", zkEntityId, "java_home")
    kafkaRunClassBin   = kafkaHome + "/bin/kafka-run-class.sh"

    cmdStrList = ["ssh " + hostname,
                  "\"JAVA_HOME=" + javaHome,
                  kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
                  "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
                  "'get /controller' 2> /dev/null | tail -1\""]

    cmdStr = " ".join(cmdStrList)
    logger.debug("executing command [" + cmdStr + "]", extra=d)
    subproc = system_test_utils.sys_call_return_subproc(cmdStr)
    for line in subproc.stdout.readlines():
        brokerid = line.rstrip('\n')
        controllerDict["brokerid"]  = brokerid
        controllerDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval(
                                          tcConfigsList, "brokerid", brokerid, "entity_id")
    return controllerDict

def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source"):

    brokerLogStartOffsetDict = {}
    minCommonStartOffsetDict = {}

    tcConfigsList        = testcaseEnv.testcaseConfigsList
    clusterConfigList    = systemTestEnv.clusterEntityConfigDictList
    allBrokerConfigList  = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker")
    brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id")

    # loop through all brokers
    for brokerEntityId in sorted(brokerEntityIdList):
        # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
        # => remoteLogSegmentDir   : kafka_server_4_logs
        remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir")
        remoteLogSegmentDir      = os.path.basename(remoteLogSegmentPathName)
        logPathName              = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
        localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir

        # loop through all topicPartition directories such as : test_1-0, test_1-1, ... 
        for topicPartition in sorted(os.listdir(localLogSegmentPath)):
            # found a topic-partition directory
            if os.path.isdir(localLogSegmentPath + "/" + topicPartition):

                # startingOffsetKey : <brokerEntityId>:<topicPartition>  (eg. 1:test_1-0)
                startingOffsetKey = brokerEntityId + ":" + topicPartition

                # log segment files are located in : localLogSegmentPath + "/" + topicPartition
                # sort the log segment files under each topic-partition
                for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)):

                    # logFile is located at:
                    # system_test/xxxx_testsuite/testcase_xxxx/logs/broker-1/kafka_server_1_logs/test_1-0/00000000000000003800.log
                    if logFile.endswith(".log"):
                        matchObj = re.match("0*(.*)\.log", logFile)    # remove the leading zeros & the file extension
                        startingOffset = matchObj.group(1)             # this is the starting offset from the file name
                        if len(startingOffset) == 0:                   # when log filename is: 00000000000000000000.log
                            startingOffset = "0"

                        # starting offset of a topic-partition can be retrieved from the filename of the first log segment
                        # => break out of this innest for-loop after processing the first log segment file
                        brokerLogStartOffsetDict[startingOffsetKey] = startingOffset
                        break

    # brokerLogStartOffsetDict is like this:
    # {u'1:test_1-0': u'400',
    #  u'1:test_1-1': u'400',
    #  u'1:test_2-0': u'200',
    #  u'1:test_2-1': u'200',
    #  u'2:test_1-0': u'400',
    #  u'2:test_1-1': u'400',
    #  u'2:test_2-0': u'200',
    #  u'2:test_2-1': u'200',
    #  u'3:test_1-0': '0',
    #  u'3:test_1-1': '0',
    #  u'3:test_2-0': '0',
    #  u'3:test_2-1': '0'}

    # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition    
    for brokerTopicPartition in sorted(brokerLogStartOffsetDict.iterkeys()):
        topicPartition = brokerTopicPartition.split(':')[1]

        if topicPartition in minCommonStartOffsetDict:
            # key exists => if the new value is greater, replace the existing value with new
            if minCommonStartOffsetDict[topicPartition] < brokerLogStartOffsetDict[brokerTopicPartition]:
                minCommonStartOffsetDict[topicPartition] = brokerLogStartOffsetDict[brokerTopicPartition]
        else:
            # key doesn't exist => add it to the dictionary
            minCommonStartOffsetDict[topicPartition] = brokerLogStartOffsetDict[brokerTopicPartition]

    # returning minCommonStartOffsetDict which is like this:
    # {u'test_1-0': u'400',
    #  u'test_1-1': u'400',
    #  u'test_2-0': u'200',
    #  u'test_2-1': u'200'}
    return minCommonStartOffsetDict

def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcaseEnv):
    validationStatusDict        = testcaseEnv.validationStatusDict
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
    consumerEntityIdList        = system_test_utils.get_data_from_list_of_dicts(
                                  clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
    replicaFactor               = testcaseEnv.testcaseArgumentsDict["replica_factor"]
    numPartition                = testcaseEnv.testcaseArgumentsDict["num_partition"]

    # Unique messages from producer on [test_1]  :  1500
    # Unique messages from consumer on [test_1]  :  1500

    # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log  :  750
    # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log  :  750
    # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log  :  0

    # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log  :  0
    # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log  :  750
    # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log  :  750

    # ==================================================

    # Unique messages from producer on [test_2]  :  1000
    # Unique messages from consumer on [test_2]  :  1000

    # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log  :  500
    # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log  :  0
    # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log  :  500

    # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log  :  500
    # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log  :  500
    # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log  :  0

    mismatchCounter = 0
    for consumerEntityId in consumerEntityIdList:

        topic           = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
        consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default")

        replicaIdxMsgCountDictList = []
        # replicaIdxMsgCountDictList is being used as follows:
        #
        # the above replica message count will be organized as follows:
        # index of the list would map to the partitionId
        # each element in the list maps to the replicaIdx-MessageCount
        # to validate that :
        # 1. there should be "no. of broker" of non-zero message count and they are equal
        # 2. there should be "no. of broker - replication factor" of zero count
        # [{"1": "750", "2": "750", "3": "0"  },
        #  {"1": "0"  , "2": "750", "3": "750"}]

        j = 0
        while j < int(numPartition):
            newDict = {}
            replicaIdxMsgCountDictList.append(newDict)
            j += 1

        for logFile in sorted(os.listdir(consumerLogPath)):

            if logFile.startswith("simple_consumer_") and logFile.endswith(".log"):
                matchObj    = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile)
                partitionId = int(matchObj.group(1))
                replicaIdx  = int(matchObj.group(2))

                consumerLogPathName   = consumerLogPath + "/" + logFile
                consumerMsgIdList     = get_message_id(consumerLogPathName)
                consumerMsgIdSet      = set(consumerMsgIdList)

                replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet)

                logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
                validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet))

        pprint.pprint(replicaIdxMsgCountDictList)

        partitionId = 0
        while partitionId < int(numPartition):
            zeroMsgCounter    = 0
            nonZeroMsgCounter = 0
            nonZeroMsgValue   = -1

            for replicaIdx in sorted(replicaIdxMsgCountDictList[partitionId].iterkeys()):
                if replicaIdxMsgCountDictList[partitionId][int(replicaIdx)] == 0:
                    zeroMsgCounter += 1
                else:
                    if nonZeroMsgValue == -1:
                        nonZeroMsgValue = replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]
                    else:
                        if nonZeroMsgValue != replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]:
                            mismatchCounter += 1
                    nonZeroMsgCounter += 1
            partitionId += 1

            logger.info("topic " + topic + " : no. of brokers with zero msg count     : " + str(zeroMsgCounter), extra=d)
            logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d)
            logger.info("topic " + topic + " : non-zero brokers msg count             : " + str(nonZeroMsgValue), extra=d)

        if mismatchCounter == 0 and nonZeroMsgCounter > 0:
            validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED"
        else:
            validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED"


def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTestEnv, testcaseEnv):
    validationStatusDict        = testcaseEnv.validationStatusDict
    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList

    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")

    for prodPerfCfg in prodPerfCfgList:
        producerEntityId = prodPerfCfg["entity_id"]
        topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
        acks     = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")

        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id")

        matchingConsumerEntityId = None
        for consumerEntityId in consumerEntityIdList:
            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
            if consumerTopic in topicStr:
                matchingConsumerEntityId = consumerEntityId
                break

        if matchingConsumerEntityId is None:
            break

        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
        producerLogPathName = producerLogPath + "/producer_performance.log"

        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
        consumerLogPathName = consumerLogPath + "/console_consumer.log"

        topicList = topicStr.split(',')
        for topic in topicList:
            msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
                                                testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
                                                + "/msg_id_missing_in_consumer_" + topic + ".log"
            producerMsgIdList  = get_message_id(producerLogPathName, topic)
            consumerMsgIdList  = get_message_id(consumerLogPathName, topic)
            producerMsgIdSet   = set(producerMsgIdList)
            consumerMsgIdSet   = set(consumerMsgIdList)

            missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet

            outfile = open(msgIdMissingInConsumerLogPathName, "w")
            for id in missingMsgIdInConsumer:
                outfile.write(id + "\n")
            outfile.close()

            logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
            logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d)
            validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
            validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))

            if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
                validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
            elif (acks == "1"):
                missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
                print "#### missing Percent : ", missingPercentage
                if missingPercentage <= 1:
                    validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
                    logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
            else:
                validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
                logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)



