diff --git a/spec/acceptance/broker_spec.rb b/spec/acceptance/broker_spec.rb index 8f9973c..e242c32 100644 --- a/spec/acceptance/broker_spec.rb +++ b/spec/acceptance/broker_spec.rb @@ -1,224 +1,234 @@ require 'spec_helper_acceptance' describe 'kafka::broker' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } -> kafka::topic { 'demo': ensure => present, zookeeper => 'localhost:2181', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::broker::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::broker::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } it { is_expected.to contain 'zookeeper.connect=localhost:2181' } end end context 'with custom config dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, config_dir => '/opt/kafka/custom_config' } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/custom_config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } it { is_expected.to contain 'zookeeper.connect=localhost:2181' } end end context 'with specific version' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': version => '2.4.0', config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::broker::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/etc/init.d/kafka'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } end describe file('/etc/systemd/system/kafka.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } end describe service('kafka') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end describe 'kafka::broker::service' do context 'with log4j/jmx parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create log dir': command => '/bin/mkdir -p /some/path/to/logs', creates => '/some/path/to/logs', } -> class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, heap_opts => '-Xmx512M -Xmx512M', log4j_opts => '-Dlog4j.configuration=file:/tmp/log4j.properties', jmx_opts => '-Dcom.sun.management.jmxremote', opts => '-Djava.security.policy=/some/path/my.policy', log_dir => '/some/path/to/logs' } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote"' } it { is_expected.to contain 'export KAFKA_HEAP_OPTS="-Xmx512M -Xmx512M"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/tmp/log4j.properties"' } end + describe file('/etc/init.d/kafka'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to be_file } + it { is_expected.to be_owned_by 'root' } + it { is_expected.to be_grouped_into 'root' } + it { is_expected.to contain %r{^# Provides:\s+kafka$} } + it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote"' } + it { is_expected.to contain 'export KAFKA_HEAP_OPTS="-Xmx512M -Xmx512M"' } + it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/tmp/log4j.properties"' } + end + describe file('/etc/systemd/system/kafka.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain "Environment='KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote'" } it { is_expected.to contain "Environment='KAFKA_HEAP_OPTS=-Xmx512M -Xmx512M'" } it { is_expected.to contain "Environment='KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/tmp/log4j.properties'" } it { is_expected.to contain "Environment='KAFKA_OPTS=-Djava.security.policy=/some/path/my.policy'" } it { is_expected.to contain "Environment='LOG_DIR=/some/path/to/logs'" } end describe service('kafka') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/consumer_spec.rb b/spec/acceptance/consumer_spec.rb index 31c4526..0650a4a 100644 --- a/spec/acceptance/consumer_spec.rb +++ b/spec/acceptance/consumer_spec.rb @@ -1,157 +1,166 @@ require 'spec_helper_acceptance' describe 'kafka::consumer' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::consumer::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::consumer::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::consumer::config' do context 'with custom config_dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/custom_config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::consumer::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/etc/init.d/kafka-consumer'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end + describe file('/etc/init.d/kafka-consumer'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to be_file } + it { is_expected.to be_owned_by 'root' } + it { is_expected.to be_grouped_into 'root' } + it { is_expected.to contain %r{^# Provides:\s+kafka-consumer$} } + it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993"' } + it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } + end + describe file('/etc/systemd/system/kafka-consumer.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'Environment=\'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993\'' } it { is_expected.to contain 'Environment=\'KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties\'' } end describe service('kafka-consumer') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/mirror_spec.rb b/spec/acceptance/mirror_spec.rb index e61102f..b17f4e9 100644 --- a/spec/acceptance/mirror_spec.rb +++ b/spec/acceptance/mirror_spec.rb @@ -1,235 +1,244 @@ require 'spec_helper_acceptance' describe 'kafka::mirror' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::mirror::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::mirror::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with custom config_dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/custom_config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/custom_config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific version' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': version => '2.4.0', consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::mirror::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka-mirror'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end + describe file('/etc/init.d/kafka-mirror'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to be_file } + it { is_expected.to be_owned_by 'root' } + it { is_expected.to be_grouped_into 'root' } + it { is_expected.to contain %r{^# Provides:\s+kafka-mirror$} } + it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991"' } + it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } + end + describe file('/etc/systemd/system/kafka-mirror.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'Environment=\'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991\'' } it { is_expected.to contain 'Environment=\'KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties\'' } end describe service('kafka-mirror') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/producer_spec.rb b/spec/acceptance/producer_spec.rb index c97d2f3..b2e18f8 100644 --- a/spec/acceptance/producer_spec.rb +++ b/spec/acceptance/producer_spec.rb @@ -1,147 +1,151 @@ require 'spec_helper_acceptance' describe 'kafka::producer', if: (fact('operatingsystemmajrelease') == '6' && fact('osfamily') == 'RedHat') do # systemd systems not supported by kafka::producer::service it 'works with no errors' do pp = <<-EOS exec { 'create fifo': command => '/usr/bin/mkfifo /tmp/kafka-producer', user => 'kafka', creates => '/tmp/kafka-producer', } -> class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::producer::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create fifo': command => '/usr/bin/mkfifo /tmp/kafka-producer', user => 'kafka', creates => '/tmp/kafka-producer', } -> class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::producer::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create fifo': command => '/usr/bin/mkfifo /tmp/kafka-producer', user => 'kafka', creates => '/tmp/kafka-producer', } -> class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::producer::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka-producer') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9992"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end + describe file('/etc/init.d/kafka-producer'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to contain %r{^# Provides:\s+kafka-producer$} } + end + describe service('kafka-producer') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/templates/init.erb b/templates/init.erb index cc899ed..aca3aeb 100644 --- a/templates/init.erb +++ b/templates/init.erb @@ -1,161 +1,161 @@ #!/bin/sh # # Init file for Apache Kafka <%= @service_name.split(/-/)[1] and @service_name.split(/-/)[1].capitalize %> # <%- if @osfamily == 'Debian' -%> ### BEGIN INIT INFO -# Provides: <%- @service_name -%> +# Provides: <%= @service_name %> # Required-Start: <%= @service_requires.join(' ') %> # Required-Stop: # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # X-Interactive: true # Short-Description: Apache Kafka is a distributed publish-subscribe messaging system ### END INIT INFO <%- else -%> # chkconfig: 35 85 15 # description: Apache Kafka is a distributed publish-subscribe messaging system # pidfile: /var/run/<%= @service_name -%>.pid <%- end -%> NAME=<%= @service_name %> <% @environment.sort.map do |k,v| -%> <% unless v.to_s.strip.empty? -%> export <%= k %>="<%= v %>" <% end -%> <% end -%> PID_FILE="/var/run/$NAME.pid" KAFKA_USER=<%= @user %> <%- case @service_name when 'kafka' -%> PGREP_PATTERN=kafka.Kafka DAEMON="<%= @bin_dir %>/kafka-server-start.sh" DAEMON_OPTS="<%= @config_dir %>/server.properties" <%- when 'kafka-consumer' -%> PGREP_PATTERN=kafka.tools.ConsoleConsumer DAEMON="<%= @bin_dir %>/kafka-console-consumer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-mirror' -%> PGREP_PATTERN=kafka.tools.MirrorMaker DAEMON="<%= @bin_dir %>/kafka-run-class.sh" DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --producer.config <%= @config_dir %>/producer.properties <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-producer' -%> PGREP_PATTERN=kafka.tools.ConsoleProducer DAEMON="<%= @bin_dir %>/kafka-console-producer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" PRODUCER_INPUT="<%= @input %>" <%- end -%> if [ -f /etc/default/kafka ]; then . /etc/default/kafka fi start() { <% if @limit_nofile -%> ulimit -n <%= @limit_nofile %> <% end -%> <% if @limit_core -%> ulimit -c <%= @limit_core %> <% end -%> ulimit -s 10240 if [ -f "$PID_FILE" ]; then PID=`cat "$PID_FILE"` if [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$PID_FILE exists, process is already running" exit 0 fi echo "$PID_FILE exists but the process is not running. Deleting $PID_FILE and re-trying" rm -f -- "$PID_FILE" start return $? fi /bin/su "$KAFKA_USER" -c "KAFKA_JMX_OPTS=\"$KAFKA_JMX_OPTS\" $DAEMON $DAEMON_OPTS<%- if @service_name == 'kafka-producer' -%> $PRODUCER_INPUT<%- end -%> >/dev/null 2>&1 &" sleep 2 PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` if [ -z "$PID" ]; then echo "$NAME could not be started" exit 1 fi echo "$PID" > "$PID_FILE"; echo "$NAME started" return 0 } stop() { if ! [ -f "$PID_FILE" ]; then echo -n "$PID_FILE does not exist" if PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ; then echo -n ", but process is running" echo "$PID" > "$PID_FILE" else echo -n ", and process is not running" return 1 fi fi PID=`cat $PID_FILE` kill $PID; rm -f -- "$PID_FILE"; # wait until the process is finished RETRIES=0 MAX_RETRIES=10 while [ ! -z `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ]; do sleep 1 RETRIES=$((RETRIES+1)) if [ "$RETRIES" -ge "$MAX_RETRIES" ]; then echo "$NAME service: stop tried $MAX_RETRIES times but process $PID is still running" return 1 fi done echo "$NAME stopped" return 0 } status() { if ! [ -f "$PID_FILE" ]; then echo "$NAME stopped" exit 1 fi PID=`cat "$PID_FILE"` if ! [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$NAME stopped but pid file exists" exit 1 fi echo "$NAME running with pid $PID" exit 0 } case "$1" in status) status ;; start) echo "Starting daemon: $NAME" start ;; stop) echo "Stopping daemon: $NAME" stop ;; restart) echo "Restarting daemon: $NAME" stop sleep 2 start ;; *) echo "Usage: "$1" {status|start|stop|restart}" exit 1 esac exit 0