DAEMON_OPTS="<% @consumer_service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%>=<%= v.is_a?(Array) ? v.join(',') : v %> <% end -%><% end -%>"
<%- when 'kafka-mirror' -%>
PGREP_PATTERN=kafka.tools.MirrorMaker
DAEMON="<%= @bin_dir %>/kafka-run-class.sh"
-DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --num.streams <%= @num_streams -%> --producer.config <%= @config_dir %>/producer.properties<%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%> --whitelist='<%= @whitelist -%>'<%- end %><%- if !@blacklist.eql?('') -%> --blacklist='<%= @blacklist -%>'<%- end -%> <%= @abort_on_send_failure_opt %>"
DAEMON_OPTS="<% @producer_service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%>=<%= v.is_a?(Array) ? v.join(',') : v %> <% end -%><% end -%>"
PRODUCER_INPUT="<%= @input %>"
<%- end -%>
if [ -f /etc/default/kafka ]; then
. /etc/default/kafka
fi
start() {
ulimit -n 65536
ulimit -s 10240
ulimit -c unlimited
if [ -f "$PID_FILE" ]; then
PID=`cat "$PID_FILE"`
if [ `ps -p "$PID" -o pid= || echo 1` -eq `pgrep -f "$PGREP_PATTERN" || echo 2` ] ; then
echo "$PID_FILE exists, process is already running"
exit 0
fi
echo "$PID_FILE exists but the process is not running. Deleting $PID_FILE and re-trying"
rm -f -- "$PID_FILE"
start
return $?
fi
/bin/su "$KAFKA_USER" -c "KAFKA_JMX_OPTS=\"$KAFKA_JMX_OPTS\" $DAEMON $DAEMON_OPTS<%- if @service_name == 'kafka-producer' -%> $PRODUCER_INPUT<%- end -%> >/dev/null 2>&1 &"
ExecStart=<%= @bin_dir %>/kafka-console-consumer.sh <% @consumer_service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%>=<%= v.is_a?(Array) ? v.join(',') : v %> <% end -%><% end -%>
<%- when 'kafka-mirror' -%>
-ExecStart=<%= @bin_dir %>/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --num.streams <%= @num_streams -%> --producer.config <%= @config_dir %>/producer.properties<%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%> --whitelist='<%= @whitelist -%>'<%- end %><%- if !@blacklist.eql?('') -%> --blacklist='<%= @blacklist -%>'<%- end -%> <%= @abort_on_send_failure_opt %>
+ExecStart=<%= @bin_dir %>/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --num.streams <%= @num_streams -%> --producer.config <%= @config_dir %>/producer.properties<%- if (scope.function_versioncmp([scope.lookupvar('kafka::version'), '0.9.0.0']) < 0) -%> --num.producers <%= @num_producers -%><%- end -%><%- if !@whitelist.eql?('') -%><%- if @consumer_config['bootstrap.servers'] -%> --whitelist '<%= @whitelist -%>'<%- else -%> --whitelist='<%= @whitelist -%>'<%- end -%><%- end -%><%- if !@blacklist.eql?('') -%><%- if @consumer_config['bootstrap.servers'] -%> --blacklist '<%= @blacklist -%>'<%- else -%> --blacklist='<%= @blacklist -%>'<%- end -%><%- end -%> <%= @abort_on_send_failure_opt %>
<%- when 'kafka-producer' -%>
ExecStart=<%= @bin_dir %>/kafka-console-producer.sh <% @producer_service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%>=<%= v.is_a?(Array) ? v.join(',') : v %> <% end -%><% end -%> <%= @input %>