diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index fa59c84..36e6abc 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,71 +1,70 @@ # @summary # This class handles the Kafka (broker) service. # # @api private # class kafka::broker::service ( Boolean $manage_service = $kafka::broker::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, String[1] $service_name = $kafka::broker::service_name, String[1] $user_name = $kafka::broker::user_name, String[1] $group_name = $kafka::broker::group_name, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, Array[String[1]] $service_requires = $kafka::broker::service_requires, Optional[String[1]] $limit_nofile = $kafka::broker::limit_nofile, Optional[String[1]] $limit_core = $kafka::broker::limit_core, Optional[String[1]] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, String[1] $heap_opts = $kafka::broker::heap_opts, String[1] $jmx_opts = $kafka::broker::jmx_opts, String[1] $log4j_opts = $kafka::broker::log4j_opts, String[0] $opts = $kafka::broker::opts, ) { assert_private() if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] - ~> Exec['systemctl-daemon-reload'] - -> Service[$service_name] + ~> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index e96facd..a225856 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,71 +1,70 @@ # @summary # This class handles the Kafka (consumer) service. # # @api private # class kafka::consumer::service ( Boolean $manage_service = $kafka::consumer::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, String[1] $service_name = $kafka::consumer::service_name, String[1] $user_name = $kafka::consumer::user_name, String[1] $group_name = $kafka::consumer::group_name, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, Array[String[1]] $service_requires = $kafka::consumer::service_requires, Optional[String[1]] $limit_nofile = $kafka::consumer::limit_nofile, Optional[String[1]] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, String[1] $jmx_opts = $kafka::consumer::jmx_opts, String[1] $log4j_opts = $kafka::consumer::log4j_opts, Hash[String[1],String[1]] $service_config = $kafka::consumer::service_config, ) { assert_private() if $manage_service { if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] - ~> Exec['systemctl-daemon-reload'] - -> Service[$service_name] + ~> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index 91e9cd8..39c1c01 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,68 +1,67 @@ # @summary # This class handles the Kafka (mirror) service. # # @api private # class kafka::mirror::service ( Boolean $manage_service = $kafka::mirror::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, String[1] $service_name = $kafka::mirror::service_name, String[1] $user_name = $kafka::mirror::user_name, String[1] $group_name = $kafka::mirror::group_name, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, Array[String[1]] $service_requires = $kafka::mirror::service_requires, Optional[String[1]] $limit_nofile = $kafka::mirror::limit_nofile, Optional[String[1]] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, Hash[String[1],String[1]] $service_config = $kafka::mirror::service_config, String[1] $heap_opts = $kafka::mirror::heap_opts, String[1] $jmx_opts = $kafka::mirror::jmx_opts, String[1] $log4j_opts = $kafka::mirror::log4j_opts, ) { assert_private() if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] - ~> Exec['systemctl-daemon-reload'] - -> Service[$service_name] + ~> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/metadata.json b/metadata.json index f7c0780..dce8ab4 100644 --- a/metadata.json +++ b/metadata.json @@ -1,79 +1,79 @@ { "name": "puppet-kafka", "version": "7.0.1-rc0", "author": "Vox Pupuli", "summary": "Puppet module for Kafka", "license": "MIT", "source": "https://github.com/voxpupuli/puppet-kafka", "project_page": "https://github.com/voxpupuli/puppet-kafka", "issues_url": "https://github.com/voxpupuli/puppet-kafka/issues", "dependencies": [ { "name": "puppet/archive", "version_requirement": ">= 1.0.0 < 5.0.0" }, { "name": "puppetlabs/java", "version_requirement": ">= 1.4.2 < 6.0.0" }, { "name": "puppetlabs/stdlib", "version_requirement": ">= 4.22.0 < 7.0.0" }, { "name": "deric/zookeeper", "version_requirement": ">= 0.5.1 < 2.0.0" }, { "name": "camptocamp/systemd", - "version_requirement": ">= 0.4.0 < 3.0.0" + "version_requirement": ">= 0.4.0 < 4.0.0" } ], "operatingsystem_support": [ { "operatingsystem": "CentOS", "operatingsystemrelease": [ "7" ] }, { "operatingsystem": "RedHat", "operatingsystemrelease": [ "7" ] }, { "operatingsystem": "Ubuntu", "operatingsystemrelease": [ "16.04", "18.04" ] }, { "operatingsystem": "Debian", "operatingsystemrelease": [ "8", "9", "10" ] }, { "operatingsystem": "SLES", "operatingsystemrelease": [ "11", "12", "15" ] } ], "requirements": [ { "name": "puppet", "version_requirement": ">= 5.5.8 < 7.0.0" } ], "tags": [ "kafka", "pubsub" ] } diff --git a/spec/classes/broker_spec.rb b/spec/classes/broker_spec.rb index d248678..a1eed6b 100644 --- a/spec/classes/broker_spec.rb +++ b/spec/classes/broker_spec.rb @@ -1,92 +1,90 @@ require 'spec_helper' require 'shared_examples_param_validation' describe 'kafka::broker', type: :class do on_supported_os.each do |os, os_facts| context "on #{os}" do let(:facts) do os_facts end let :params do { config: { 'zookeeper.connect' => 'localhost:2181' } } end it { is_expected.to contain_class('kafka::broker::install').that_comes_before('Class[kafka::broker::config]') } it { is_expected.to contain_class('kafka::broker::config').that_comes_before('Class[kafka::broker::service]') } it { is_expected.to contain_class('kafka::broker::service').that_comes_before('Class[kafka::broker]') } it { is_expected.to contain_class('kafka::broker') } describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do context 'manage_service false' do let(:params) { super().merge(manage_service: false) } it { is_expected.not_to contain_file('/etc/init.d/kafka') } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do if os_facts[:service_provider] == 'systemd' it { is_expected.to contain_file('/etc/init.d/kafka').with_ensure('absent') } - it { is_expected.to contain_file('/etc/systemd/system/kafka.service').that_notifies('Exec[systemctl-daemon-reload]') } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=network\.target syslog\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=network\.target syslog\.target$} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=} } - it { is_expected.to contain_exec('systemctl-daemon-reload').that_comes_before('Service[kafka]') } else it { is_expected.to contain_file('/etc/init.d/kafka') } end it { is_expected.to contain_service('kafka') } end context 'limit_nofile set' do let(:params) { super().merge(limit_nofile: '65536') } if os_facts[:service_provider] == 'systemd' it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=65536$} } else it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -n 65536$} } end end context 'limit_core set' do let(:params) { super().merge(limit_core: 'infinity') } if os_facts[:service_provider] == 'systemd' it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=infinity$} } else it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -c infinity$} } end end context 'service_requires set', if: os_facts[:service_provider] == 'systemd' do let(:params) { super().merge(service_requires: ['dummy.target']) } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=dummy\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=dummy\.target$} } end end it_validates_parameter 'mirror_url' end end end diff --git a/spec/classes/consumer_spec.rb b/spec/classes/consumer_spec.rb index 1ca6b1d..81e6882 100644 --- a/spec/classes/consumer_spec.rb +++ b/spec/classes/consumer_spec.rb @@ -1,54 +1,52 @@ require 'spec_helper' require 'shared_examples_param_validation' describe 'kafka::consumer', type: :class do on_supported_os.each do |os, os_facts| context "on #{os}" do let(:facts) do os_facts end let :params do { service_config: { 'topic' => 'demo', 'zookeeper' => 'localhost:2181' } } end it { is_expected.to contain_class('kafka::consumer::install').that_comes_before('Class[kafka::consumer::config]') } it { is_expected.to contain_class('kafka::consumer::config').that_comes_before('Class[kafka::consumer::service]') } it { is_expected.to contain_class('kafka::consumer::service').that_comes_before('Class[kafka::consumer]') } it { is_expected.to contain_class('kafka::consumer') } describe 'kafka::consumer::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::consumer::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/consumer.properties') } end end describe 'kafka::consumer::service' do context 'defaults' do if os_facts[:service_provider] == 'systemd' it { is_expected.to contain_file('/etc/init.d/kafka-consumer').with_abent('absent') } - it { is_expected.to contain_file('/etc/systemd/system/kafka-consumer.service').that_notifies('Exec[systemctl-daemon-relad]') } - it { is_expected.to contain_exec('systemctl-daemon-reload').that_comes_before('Service[kafka-consumer]') } else it { is_expected.to contain_file('/etc/init.d/kafka-consumer') } end it { is_expected.to contain_service('kafka-consumer') } end end it_validates_parameter 'mirror_url' end end end diff --git a/spec/classes/mirror_spec.rb b/spec/classes/mirror_spec.rb index a4351f1..d27aceb 100644 --- a/spec/classes/mirror_spec.rb +++ b/spec/classes/mirror_spec.rb @@ -1,60 +1,58 @@ require 'spec_helper' require 'shared_examples_param_validation' describe 'kafka::mirror', type: :class do on_supported_os.each do |os, os_facts| context "on #{os}" do let(:facts) do os_facts end let :params do { consumer_config: { 'group.id' => 'kafka-mirror', 'zookeeper.connect' => 'localhost:2181' }, producer_config: { 'bootstrap.servers' => 'localhost:9092' } } end it { is_expected.to contain_class('kafka::mirror::install').that_comes_before('Class[kafka::mirror::config]') } it { is_expected.to contain_class('kafka::mirror::config').that_comes_before('Class[kafka::mirror::service]') } it { is_expected.to contain_class('kafka::mirror::service').that_comes_before('Class[kafka::mirror]') } it { is_expected.to contain_class('kafka::mirror') } describe 'kafka::mirror::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::mirror::config' do context 'defaults' do it { is_expected.to contain_class('kafka::consumer::config') } it { is_expected.to contain_class('kafka::producer::config') } end end describe 'kafka::mirror::service' do context 'defaults' do if os_facts[:service_provider] == 'systemd' it { is_expected.to contain_file('/etc/init.d/kafka-mirror').with_ensure('absent') } - it { is_expected.to contain_file('/etc/systemd/system/kafka-mirror.service').that_notifies('Exec[systemctl-daemon-reload]') } it { is_expected.to contain_file('/etc/systemd/system/kafka-mirror.service').with_content %r{/opt/kafka/config/(?=.*consumer)|(?=.*producer).propertie} } - it { is_expected.to contain_exec('systemctl-daemon-reload').that_comes_before('Service[kafka-mirror]') } else it { is_expected.to contain_file('/etc/init.d/kafka-mirror') } it { is_expected.to contain_file('/etc/init.d/kafka-mirror').with_content %r{/opt/kafka/config/(?=.*consumer)|(?=.*producer).properties} } end it { is_expected.to contain_service('kafka-mirror') } end end it_validates_parameter 'mirror_url' end end end