diff --git a/spec/classes/broker_spec.rb b/spec/classes/broker_spec.rb index a617f89..f514266 100644 --- a/spec/classes/broker_spec.rb +++ b/spec/classes/broker_spec.rb @@ -1,153 +1,173 @@ require 'spec_helper' require 'shared_examples_param_validation' describe 'kafka::broker', type: :class do let :facts do { osfamily: 'Debian', os: { family: 'Debian' }, operatingsystem: 'Ubuntu', operatingsystemrelease: '14.04', lsbdistcodename: 'trusty', architecture: 'amd64', service_provider: 'upstart' } end let :common_params do { config: { 'zookeeper.connect' => 'localhost:2181' } } end let :params do common_params end it { is_expected.to contain_class('kafka::broker::install').that_comes_before('Class[kafka::broker::config]') } it { is_expected.to contain_class('kafka::broker::config').that_comes_before('Class[kafka::broker::service]') } it { is_expected.to contain_class('kafka::broker::service').that_comes_before('Class[kafka::broker]') } it { is_expected.to contain_class('kafka::broker') } context 'on Debian' do describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do context 'service_install false' do let :params do common_params.merge(service_install: false) end it { is_expected.not_to contain_file('/etc/init.d/kafka') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do it { is_expected.to contain_file('/etc/init.d/kafka') } + context 'limit_nofile set' do + let :params do + { + limit_nofile: '65536' + } + end + + it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -n 65536$} } + end + + context 'limit_core set' do + let :params do + { + limit_core: 'infinity' + } + end + + it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -c infinity$} } + end + it { is_expected.to contain_service('kafka') } end end end context 'on Centos' do let :facts do { osfamily: 'RedHat', os: { family: 'RedHat' }, operatingsystem: 'CentOS', operatingsystemrelease: '7', operatingsystemmajrelease: '7', architecture: 'amd64', path: '/usr/local/sbin', service_provider: 'systemd' } end describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do context 'service_install false' do let :params do common_params.merge(service_install: false) end it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do it { is_expected.to contain_file('/etc/systemd/system/kafka.service').that_notifies('Exec[systemctl-daemon-reload]') } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=network\.target syslog\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=network\.target syslog\.target$} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=} } it do is_expected.to contain_file('/etc/init.d/kafka').with( ensure: 'absent' ) end it { is_expected.to contain_exec('systemctl-daemon-reload').that_comes_before('Service[kafka]') } it { is_expected.to contain_service('kafka') } end context 'limit_nofile set' do let :params do { limit_nofile: '65536' } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=65536$} } end context 'limit_core set' do let :params do { limit_core: 'infinity' } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=infinity$} } end context 'service_requires set' do let :params do { service_requires: ['dummy.target'] } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=dummy\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=dummy\.target$} } end end end it_validates_parameter 'mirror_url' end diff --git a/templates/init.erb b/templates/init.erb index 84459f0..cc899ed 100644 --- a/templates/init.erb +++ b/templates/init.erb @@ -1,153 +1,161 @@ #!/bin/sh # # Init file for Apache Kafka <%= @service_name.split(/-/)[1] and @service_name.split(/-/)[1].capitalize %> # <%- if @osfamily == 'Debian' -%> ### BEGIN INIT INFO # Provides: <%- @service_name -%> # Required-Start: <%= @service_requires.join(' ') %> # Required-Stop: # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # X-Interactive: true # Short-Description: Apache Kafka is a distributed publish-subscribe messaging system ### END INIT INFO <%- else -%> # chkconfig: 35 85 15 # description: Apache Kafka is a distributed publish-subscribe messaging system # pidfile: /var/run/<%= @service_name -%>.pid <%- end -%> NAME=<%= @service_name %> <% @environment.sort.map do |k,v| -%> <% unless v.to_s.strip.empty? -%> export <%= k %>="<%= v %>" <% end -%> <% end -%> PID_FILE="/var/run/$NAME.pid" KAFKA_USER=<%= @user %> <%- case @service_name when 'kafka' -%> PGREP_PATTERN=kafka.Kafka DAEMON="<%= @bin_dir %>/kafka-server-start.sh" DAEMON_OPTS="<%= @config_dir %>/server.properties" <%- when 'kafka-consumer' -%> PGREP_PATTERN=kafka.tools.ConsoleConsumer DAEMON="<%= @bin_dir %>/kafka-console-consumer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-mirror' -%> PGREP_PATTERN=kafka.tools.MirrorMaker DAEMON="<%= @bin_dir %>/kafka-run-class.sh" DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --producer.config <%= @config_dir %>/producer.properties <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-producer' -%> PGREP_PATTERN=kafka.tools.ConsoleProducer DAEMON="<%= @bin_dir %>/kafka-console-producer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" PRODUCER_INPUT="<%= @input %>" <%- end -%> if [ -f /etc/default/kafka ]; then . /etc/default/kafka fi start() { - ulimit -n 65536 + + <% if @limit_nofile -%> + ulimit -n <%= @limit_nofile %> + <% end -%> + + <% if @limit_core -%> + ulimit -c <%= @limit_core %> + <% end -%> + ulimit -s 10240 - ulimit -c unlimited + if [ -f "$PID_FILE" ]; then PID=`cat "$PID_FILE"` if [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$PID_FILE exists, process is already running" exit 0 fi echo "$PID_FILE exists but the process is not running. Deleting $PID_FILE and re-trying" rm -f -- "$PID_FILE" start return $? fi /bin/su "$KAFKA_USER" -c "KAFKA_JMX_OPTS=\"$KAFKA_JMX_OPTS\" $DAEMON $DAEMON_OPTS<%- if @service_name == 'kafka-producer' -%> $PRODUCER_INPUT<%- end -%> >/dev/null 2>&1 &" sleep 2 PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` if [ -z "$PID" ]; then echo "$NAME could not be started" exit 1 fi echo "$PID" > "$PID_FILE"; echo "$NAME started" return 0 } stop() { if ! [ -f "$PID_FILE" ]; then echo -n "$PID_FILE does not exist" if PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ; then echo -n ", but process is running" echo "$PID" > "$PID_FILE" else echo -n ", and process is not running" return 1 fi fi PID=`cat $PID_FILE` kill $PID; rm -f -- "$PID_FILE"; # wait until the process is finished RETRIES=0 MAX_RETRIES=10 while [ ! -z `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ]; do sleep 1 RETRIES=$((RETRIES+1)) if [ "$RETRIES" -ge "$MAX_RETRIES" ]; then echo "$NAME service: stop tried $MAX_RETRIES times but process $PID is still running" return 1 fi done echo "$NAME stopped" return 0 } status() { if ! [ -f "$PID_FILE" ]; then echo "$NAME stopped" exit 1 fi PID=`cat "$PID_FILE"` if ! [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$NAME stopped but pid file exists" exit 1 fi echo "$NAME running with pid $PID" exit 0 } case "$1" in status) status ;; start) echo "Starting daemon: $NAME" start ;; stop) echo "Stopping daemon: $NAME" stop ;; restart) echo "Restarting daemon: $NAME" stop sleep 2 start ;; *) echo "Usage: "$1" {status|start|stop|restart}" exit 1 esac exit 0