diff --git a/manifests/broker/config.pp b/manifests/broker/config.pp index eab8f36..cad01b2 100644 --- a/manifests/broker/config.pp +++ b/manifests/broker/config.pp @@ -1,40 +1,38 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::config # # This private class is meant to be called from `kafka::broker`. # It manages the broker config files # class kafka::broker::config( Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, String $service_name = $kafka::broker::service_name, Boolean $service_install = $kafka::broker::service_install, Boolean $service_restart = $kafka::broker::service_restart, Hash $config = $kafka::broker::config, Stdlib::Filemode $config_mode = $kafka::broker::config_mode, String $group = $kafka::broker::group, ) { - if ($caller_module_name != $module_name) { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if ($service_install and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'brokerconfigs' file { "${config_dir}/server.properties": ensure => present, owner => 'root', group => $group, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/broker/install.pp b/manifests/broker/install.pp index 454e9e1..4856046 100644 --- a/manifests/broker/install.pp +++ b/manifests/broker/install.pp @@ -1,36 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::install # # This private class is meant to be called from `kafka::broker`. # It downloads the package and installs it. # class kafka::broker::install { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::broker::version, scala_version => $kafka::broker::scala_version, install_dir => $kafka::broker::install_dir, mirror_url => $kafka::broker::mirror_url, install_java => $kafka::broker::install_java, package_dir => $kafka::broker::package_dir, package_name => $kafka::broker::package_name, package_ensure => $kafka::broker::package_ensure, user => $kafka::broker::user, group => $kafka::broker::group, user_id => $kafka::broker::user_id, group_id => $kafka::broker::group_id, manage_user => $kafka::broker::manage_user, manage_group => $kafka::broker::manage_group, config_dir => $kafka::broker::config_dir, log_dir => $kafka::broker::log_dir, } } } diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index defe4ee..a3a70e0 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,79 +1,77 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::service # # This private class is meant to be called from `kafka::broker`. # It manages the kafka service # class kafka::broker::service( String $user = $kafka::broker::user, String $group = $kafka::broker::group, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, String $service_name = $kafka::broker::service_name, Boolean $service_install = $kafka::broker::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, Array[String] $service_requires = $kafka::broker::service_requires, Optional[String] $limit_nofile = $kafka::broker::limit_nofile, Optional[String] $limit_core = $kafka::broker::limit_core, Optional[String] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, String $heap_opts = $kafka::broker::heap_opts, String $jmx_opts = $kafka::broker::jmx_opts, String $log4j_opts = $kafka::broker::log4j_opts, $opts = $kafka::broker::opts, ) { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if $service_install { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer/install.pp b/manifests/consumer/install.pp index 8b2c7ea..b0fb26a 100644 --- a/manifests/consumer/install.pp +++ b/manifests/consumer/install.pp @@ -1,36 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::install # # This private class is meant to be called from `kafka::consumer`. # It downloads the package and installs it. # class kafka::consumer::install { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::consumer::version, scala_version => $kafka::consumer::scala_version, install_dir => $kafka::consumer::install_dir, mirror_url => $kafka::consumer::mirror_url, install_java => $kafka::consumer::install_java, package_dir => $kafka::consumer::package_dir, package_name => $kafka::consumer::package_name, package_ensure => $kafka::consumer::package_ensure, user => $kafka::consumer::user, group => $kafka::consumer::group, user_id => $kafka::consumer::user_id, group_id => $kafka::consumer::group_id, manage_user => $kafka::consumer::manage_user, manage_group => $kafka::consumer::manage_group, config_dir => $kafka::consumer::config_dir, log_dir => $kafka::consumer::log_dir, } } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index 5fff83b..30df409 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,80 +1,78 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::service # # This private class is meant to be called from `kafka::consumer`. # It manages the kafka-consumer service # class kafka::consumer::service( String $user = $kafka::consumer::user, String $group = $kafka::consumer::group, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, String $service_name = $kafka::consumer::service_name, Boolean $service_install = $kafka::consumer::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, Array[String] $service_requires = $kafka::consumer::service_requires, Optional[String] $limit_nofile = $kafka::consumer::limit_nofile, Optional[String] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, String $jmx_opts = $kafka::consumer::jmx_opts, String $log4j_opts = $kafka::consumer::log4j_opts, Hash $service_config = $kafka::consumer::service_config, ) { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if $service_install { if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/mirror/config.pp b/manifests/mirror/config.pp index 91a4406..7610c7f 100644 --- a/manifests/mirror/config.pp +++ b/manifests/mirror/config.pp @@ -1,54 +1,52 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::config # # This private class is meant to be called from `kafka::mirror`. # It manages the mirror-maker config files # class kafka::mirror::config( Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, String $service_name = $kafka::mirror::service_name, Boolean $service_install = $kafka::mirror::service_install, Boolean $service_restart = $kafka::mirror::service_restart, Hash $consumer_config = $kafka::mirror::consumer_config, Hash $producer_config = $kafka::mirror::producer_config, Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, String $group = $kafka::mirror::group, ) { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if $consumer_config['group.id'] == '' { fail('[Consumer] You need to specify a value for group.id') } if $consumer_config['zookeeper.connect'] == '' { fail('[Consumer] You need to specify a value for zookeeper.connect') } if $producer_config['bootstrap.servers'] == '' { fail('[Producer] You need to specify a value for bootstrap.servers') } class { 'kafka::consumer::config': config_dir => $config_dir, config_mode => $config_mode, service_name => $service_name, service_install => $service_install, service_restart => $service_restart, config => $consumer_config, group => $group, } class { 'kafka::producer::config': config_dir => $config_dir, config_mode => $config_mode, service_name => $service_name, service_install => $service_install, service_restart => $service_restart, config => $producer_config, group => $group, } } diff --git a/manifests/mirror/install.pp b/manifests/mirror/install.pp index 90b2e4a..9d9637a 100644 --- a/manifests/mirror/install.pp +++ b/manifests/mirror/install.pp @@ -1,36 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::install # # This private class is meant to be called from `kafka::mirror`. # It downloads the package and installs it. # class kafka::mirror::install { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::mirror::version, scala_version => $kafka::mirror::scala_version, install_dir => $kafka::mirror::install_dir, mirror_url => $kafka::mirror::mirror_url, install_java => $kafka::mirror::install_java, package_dir => $kafka::mirror::package_dir, package_name => $kafka::mirror::package_name, package_ensure => $kafka::mirror::package_ensure, user => $kafka::mirror::user, group => $kafka::mirror::group, user_id => $kafka::mirror::user_id, group_id => $kafka::mirror::group_id, manage_user => $kafka::mirror::manage_user, manage_group => $kafka::mirror::manage_group, config_dir => $kafka::mirror::config_dir, log_dir => $kafka::mirror::log_dir, } } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index ff58b5c..bb1333c 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,76 +1,74 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::service # # This private class is meant to be called from `kafka::mirror`. # It manages the kafka-mirror service # class kafka::mirror::service( String $user = $kafka::mirror::user, String $group = $kafka::mirror::group, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, String $service_name = $kafka::mirror::service_name, Boolean $service_install = $kafka::mirror::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, Array[String] $service_requires = $kafka::mirror::service_requires, Optional[String] $limit_nofile = $kafka::mirror::limit_nofile, Optional[String] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, Hash $consumer_config = $kafka::mirror::consumer_config, Hash $producer_config = $kafka::mirror::producer_config, Hash $service_config = $kafka::mirror::service_config, String $heap_opts = $kafka::mirror::heap_opts, String $jmx_opts = $kafka::mirror::jmx_opts, String $log4j_opts = $kafka::mirror::log4j_opts, ) { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if $service_install { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/producer/install.pp b/manifests/producer/install.pp index 4615db8..78e0b6a 100644 --- a/manifests/producer/install.pp +++ b/manifests/producer/install.pp @@ -1,36 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::install # # This private class is meant to be called from `kafka::producer`. # It downloads the package and installs it. # class kafka::producer::install { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::producer::version, scala_version => $kafka::producer::scala_version, install_dir => $kafka::producer::install_dir, mirror_url => $kafka::producer::mirror_url, install_java => $kafka::producer::install_java, package_dir => $kafka::producer::package_dir, package_name => $kafka::producer::package_name, package_ensure => $kafka::producer::package_ensure, user => $kafka::producer::user, group => $kafka::producer::group, user_id => $kafka::producer::user_id, group_id => $kafka::producer::group_id, manage_user => $kafka::producer::manage_user, manage_group => $kafka::producer::manage_group, config_dir => $kafka::producer::config_dir, log_dir => $kafka::producer::log_dir, } } } diff --git a/manifests/producer/service.pp b/manifests/producer/service.pp index 04d5ec3..ef2f2dd 100644 --- a/manifests/producer/service.pp +++ b/manifests/producer/service.pp @@ -1,66 +1,64 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::service # # This private class is meant to be called from `kafka::producer`. # It manages the kafka-producer service # class kafka::producer::service( String $user = $kafka::producer::user, String $group = $kafka::producer::group, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::producer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::producer::bin_dir, String $service_name = $kafka::producer::service_name, Boolean $service_install = $kafka::producer::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, Array[String] $service_requires = $kafka::producer::service_requires, Optional[String] $limit_nofile = $kafka::producer::limit_nofile, Optional[String] $limit_core = $kafka::producer::limit_core, Hash $env = $kafka::producer::env, $input = $kafka::producer::input, String $jmx_opts = $kafka::producer::jmx_opts, String $log4j_opts = $kafka::producer::log4j_opts, Hash $service_config = $kafka::producer::service_config, ) { - if $caller_module_name != $module_name { - fail("Use of private class ${name} by ${caller_module_name}") - } + assert_private() if $service_install { if $service_config['broker-list'] == '' { fail('[Producer] You need to specify a value for broker-list') } if $service_config['topic'] == '' { fail('[Producer] You need to specify a value for topic') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { fail('Console Producer is not supported on systemd, because the stdin of the process cannot be redirected') } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } }