diff --git a/manifests/broker.pp b/manifests/broker.pp index fae44a6..e6b84ca 100644 --- a/manifests/broker.pp +++ b/manifests/broker.pp @@ -1,155 +1,155 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker # # This class will install kafka with the broker role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*timeout_stop*] # Set the 'TimeoutStopSec' option of the systemd service. # # [*exec_stop*] # Set the 'ExecStop' option of the systemd service to 'kafka-server-stop.sh'. # # [*daemon_start*] # Use the '-daemon' option when starting Kafka with 'kafka-server-start.sh'. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the configuration options. # # === Examples # # Create a single broker instance which talks to a local zookeeper instance. # # class { 'kafka::broker': # config => { 'broker.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::broker ( - String $version = $kafka::params::version, + String $kafka_version = $kafka::params::kafka_version, String $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String] $package_name = $kafka::params::package_name, String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + String $user_name = $kafka::params::user_name, + String $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String $service_name = 'kafka', - Boolean $service_install = $kafka::params::service_install, + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String] $service_requires = $kafka::params::service_requires, Optional[String] $limit_nofile = $kafka::params::limit_nofile, Optional[String] $limit_core = $kafka::params::limit_core, Optional[String] $timeout_stop = $kafka::params::timeout_stop, Boolean $exec_stop = $kafka::params::exec_stop, Boolean $daemon_start = $kafka::params::daemon_start, Hash $env = {}, Hash $config = {}, String $heap_opts = $kafka::params::broker_heap_opts, String $jmx_opts = $kafka::params::broker_jmx_opts, String $log4j_opts = $kafka::params::broker_log4j_opts, $opts = $kafka::params::broker_opts, ) inherits kafka::params { class { 'kafka::broker::install': } -> class { 'kafka::broker::config': } -> class { 'kafka::broker::service': } -> Class['kafka::broker'] } diff --git a/manifests/broker/config.pp b/manifests/broker/config.pp index 971ff12..04f9e47 100644 --- a/manifests/broker/config.pp +++ b/manifests/broker/config.pp @@ -1,39 +1,39 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::config # # This private class is meant to be called from `kafka::broker`. # It manages the broker config files # class kafka::broker::config( - Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, + Boolean $manage_service = $kafka::broker::manage_service, String $service_name = $kafka::broker::service_name, - Boolean $service_install = $kafka::broker::service_install, Boolean $service_restart = $kafka::broker::service_restart, Hash $config = $kafka::broker::config, + Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, + String $user_name = $kafka::broker::user_name, + String $group_name = $kafka::broker::group_name, Stdlib::Filemode $config_mode = $kafka::broker::config_mode, - String $user = $kafka::broker::user, - String $group = $kafka::broker::group, ) { assert_private() - if ($service_install and $service_restart) { + if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'brokerconfigs' file { "${config_dir}/server.properties": ensure => present, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/broker/install.pp b/manifests/broker/install.pp index 4856046..6c15e0f 100644 --- a/manifests/broker/install.pp +++ b/manifests/broker/install.pp @@ -1,34 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::install # # This private class is meant to be called from `kafka::broker`. # It downloads the package and installs it. # class kafka::broker::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::broker::version, - scala_version => $kafka::broker::scala_version, - install_dir => $kafka::broker::install_dir, - mirror_url => $kafka::broker::mirror_url, - install_java => $kafka::broker::install_java, - package_dir => $kafka::broker::package_dir, - package_name => $kafka::broker::package_name, - package_ensure => $kafka::broker::package_ensure, - user => $kafka::broker::user, - group => $kafka::broker::group, - user_id => $kafka::broker::user_id, + manage_java => $kafka::broker::manage_java, + manage_group => $kafka::broker::manage_group, group_id => $kafka::broker::group_id, + group_name => $kafka::broker::group_name, manage_user => $kafka::broker::manage_user, - manage_group => $kafka::broker::manage_group, + user_id => $kafka::broker::user_id, + user_name => $kafka::broker::user_name, config_dir => $kafka::broker::config_dir, log_dir => $kafka::broker::log_dir, + mirror_url => $kafka::broker::mirror_url, + kafka_version => $kafka::broker::kafka_version, + scala_version => $kafka::broker::scala_version, + install_dir => $kafka::broker::install_dir, + package_dir => $kafka::broker::package_dir, + package_ensure => $kafka::broker::package_ensure, + package_name => $kafka::broker::package_name, } } } diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index a3a70e0..7a021dc 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,77 +1,77 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::service # # This private class is meant to be called from `kafka::broker`. # It manages the kafka service # class kafka::broker::service( - String $user = $kafka::broker::user, - String $group = $kafka::broker::group, + Boolean $manage_service = $kafka::broker::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, + String $service_name = $kafka::broker::service_name, + String $user_name = $kafka::broker::user_name, + String $group_name = $kafka::broker::group_name, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, - String $service_name = $kafka::broker::service_name, - Boolean $service_install = $kafka::broker::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, Array[String] $service_requires = $kafka::broker::service_requires, Optional[String] $limit_nofile = $kafka::broker::limit_nofile, Optional[String] $limit_core = $kafka::broker::limit_core, Optional[String] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, String $heap_opts = $kafka::broker::heap_opts, String $jmx_opts = $kafka::broker::jmx_opts, String $log4j_opts = $kafka::broker::log4j_opts, $opts = $kafka::broker::opts, ) { assert_private() - if $service_install { + if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) - if $::service_provider == 'systemd' { + if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer.pp b/manifests/consumer.pp index eb2c07e..682bdda 100644 --- a/manifests/consumer.pp +++ b/manifests/consumer.pp @@ -1,144 +1,144 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer # # This class will install kafka with the consumer role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the consumer configuration options. # # [*service_config*] # A hash of the `kafka-console-consumer.sh` script options. # # === Examples # # Create the consumer service connecting to a local zookeeper # # class { 'kafka::consumer': # config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } class kafka::consumer ( - String $version = $kafka::params::version, + String $kafka_version = $kafka::params::kafka_version, String $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String] $package_name = $kafka::params::package_name, String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + String $user_name = $kafka::params::user_name, + String $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String $service_name = 'kafka-consumer', - Boolean $service_install = $kafka::params::service_install, + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String] $service_requires = $kafka::params::service_requires, Optional[String] $limit_nofile = $kafka::params::limit_nofile, Optional[String] $limit_core = $kafka::params::limit_core, Hash $env = {}, Hash $config = {}, Hash $service_config = {}, String $jmx_opts = $kafka::params::consumer_jmx_opts, String $log4j_opts = $kafka::params::consumer_log4j_opts, ) inherits kafka::params { class { 'kafka::consumer::install': } -> class { 'kafka::consumer::config': } -> class { 'kafka::consumer::service': } -> Class['kafka::consumer'] } diff --git a/manifests/consumer/config.pp b/manifests/consumer/config.pp index 555834f..daa4e33 100644 --- a/manifests/consumer/config.pp +++ b/manifests/consumer/config.pp @@ -1,37 +1,37 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::config # # This private class is meant to be called from `kafka::consumer`. # It manages the consumer config files # class kafka::consumer::config( - Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, + Boolean $manage_service = $kafka::consumer::manage_service, String $service_name = $kafka::consumer::service_name, - Boolean $service_install = $kafka::consumer::service_install, Boolean $service_restart = $kafka::consumer::service_restart, Hash $config = $kafka::consumer::config, + Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, + String $user_name = $kafka::consumer::user_name, + String $group_name = $kafka::consumer::group_name, Stdlib::Filemode $config_mode = $kafka::consumer::config_mode, - String $user = $kafka::consumer::user, - String $group = $kafka::consumer::group, ) { - if ($service_install and $service_restart) { + if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'consumerconfigs' file { "${config_dir}/consumer.properties": ensure => present, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/consumer/install.pp b/manifests/consumer/install.pp index b0fb26a..724baaa 100644 --- a/manifests/consumer/install.pp +++ b/manifests/consumer/install.pp @@ -1,34 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::install # # This private class is meant to be called from `kafka::consumer`. # It downloads the package and installs it. # class kafka::consumer::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::consumer::version, - scala_version => $kafka::consumer::scala_version, - install_dir => $kafka::consumer::install_dir, - mirror_url => $kafka::consumer::mirror_url, - install_java => $kafka::consumer::install_java, - package_dir => $kafka::consumer::package_dir, - package_name => $kafka::consumer::package_name, - package_ensure => $kafka::consumer::package_ensure, - user => $kafka::consumer::user, - group => $kafka::consumer::group, - user_id => $kafka::consumer::user_id, + manage_java => $kafka::consumer::manage_java, + manage_group => $kafka::consumer::manage_group, group_id => $kafka::consumer::group_id, + group_name => $kafka::consumer::group_name, manage_user => $kafka::consumer::manage_user, - manage_group => $kafka::consumer::manage_group, + user_id => $kafka::consumer::user_id, + user_name => $kafka::consumer::user_name, config_dir => $kafka::consumer::config_dir, log_dir => $kafka::consumer::log_dir, + mirror_url => $kafka::consumer::mirror_url, + kafka_version => $kafka::consumer::kafka_version, + scala_version => $kafka::consumer::scala_version, + install_dir => $kafka::consumer::install_dir, + package_dir => $kafka::consumer::package_dir, + package_ensure => $kafka::consumer::package_ensure, + package_name => $kafka::consumer::package_name, } } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index 30df409..6017cfc 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,78 +1,78 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::service # # This private class is meant to be called from `kafka::consumer`. # It manages the kafka-consumer service # class kafka::consumer::service( - String $user = $kafka::consumer::user, - String $group = $kafka::consumer::group, + Boolean $manage_service = $kafka::consumer::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, + String $service_name = $kafka::consumer::service_name, + String $user_name = $kafka::consumer::user_name, + String $group_name = $kafka::consumer::group_name, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, - String $service_name = $kafka::consumer::service_name, - Boolean $service_install = $kafka::consumer::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, Array[String] $service_requires = $kafka::consumer::service_requires, Optional[String] $limit_nofile = $kafka::consumer::limit_nofile, Optional[String] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, String $jmx_opts = $kafka::consumer::jmx_opts, String $log4j_opts = $kafka::consumer::log4j_opts, Hash $service_config = $kafka::consumer::service_config, ) { assert_private() - if $service_install { + if $manage_service { if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) - if $::service_provider == 'systemd' { + if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/init.pp b/manifests/init.pp index fe44a1c..6bf0f8f 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -1,230 +1,230 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka # # This class will install kafka binaries # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*system_user*] # Whether the kafka user is a system user or not. # # [*group_id*] # Create the kafka group with this ID. # # [*system_group*] # Whether the kafka group is a system group or not. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # === Examples # # class kafka ( - String $version = $kafka::params::version, + String $kafka_version = $kafka::params::kafka_version, String $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String] $package_name = $kafka::params::package_name, Optional[String] $mirror_subpath = $kafka::params::mirror_subpath, Optional[String] $proxy_server = $kafka::params::proxy_server, Optional[String] $proxy_port = $kafka::params::proxy_port, Optional[String] $proxy_host = $kafka::params::proxy_host, Optional[String] $proxy_type = $kafka::params::proxy_type, String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + String $user_name = $kafka::params::user_name, + String $group_name = $kafka::params::group_name, Boolean $system_user = $kafka::params::system_user, Boolean $system_group = $kafka::params::system_group, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Optional[String] $install_mode = $kafka::params::install_mode, ) inherits kafka::params { - if $install_java { + if $manage_java { class { 'java': distribution => 'jdk', } } if $manage_group { - group { $group: + group { $group_name: ensure => present, gid => $group_id, system => $system_group, } } if $manage_user { - user { $user: + user { $user_name: ensure => present, shell => '/bin/bash', - require => Group[$group], + require => Group[$group_name], uid => $user_id, system => $system_user, } } file { $config_dir: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, } file { $log_dir: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, require => [ - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], } if $package_name == undef { include archive $mirror_path = $mirror_subpath ? { # if mirror_subpath was not changed, # we adapt it for the version - $kafka::params::mirror_subpath => "kafka/${version}", + $kafka::params::mirror_subpath => "kafka/${kafka_version}", # else, we just take whatever was supplied: default => $mirror_subpath, } - $basefilename = "kafka_${scala_version}-${version}.tgz" + $basefilename = "kafka_${scala_version}-${kafka_version}.tgz" $package_url = "${mirror_url}${mirror_path}/${basefilename}" $source = $mirror_url ?{ /tgz$/ => $mirror_url, default => $package_url, } $install_directory = $install_dir ? { # if install_dir was not changed, # we adapt it for the scala_version and the version - $kafka::params::install_dir => "/opt/kafka-${scala_version}-${version}", + $kafka::params::install_dir => "/opt/kafka-${scala_version}-${kafka_version}", # else, we just take whatever was supplied: default => $install_dir, } file { $package_dir: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, require => [ - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], } file { $install_directory: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $install_mode, require => [ - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], } file { '/opt/kafka': ensure => link, target => $install_directory, require => File[$install_directory], } if $proxy_server == undef and $proxy_host != undef and $proxy_port != undef { $final_proxy_server = "${proxy_host}:${proxy_port}" } else { $final_proxy_server = $proxy_server } archive { "${package_dir}/${basefilename}": ensure => present, extract => true, extract_command => 'tar xfz %s --strip-components=1', extract_path => $install_directory, source => $source, creates => "${install_directory}/config", cleanup => true, proxy_server => $final_proxy_server, proxy_type => $proxy_type, - user => $user, - group => $group, + user => $user_name, + group => $group_name, require => [ File[$package_dir], File[$install_directory], - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], before => File[$config_dir], } } else { package { $package_name: ensure => $package_ensure, before => File[$config_dir], } } } diff --git a/manifests/mirror.pp b/manifests/mirror.pp index 7bc291d..c503701 100644 --- a/manifests/mirror.pp +++ b/manifests/mirror.pp @@ -1,147 +1,147 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror # # This class will install kafka with the mirror role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*consumer_config*] # A hash of the consumer configuration options. # # [*producer_config*] # A hash of the producer configuration options. # # [*service_config*] # A hash of the mirror script options. # # === Examples # # Create the mirror service connecting to a local zookeeper # # class { 'kafka::mirror': # consumer_config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::mirror ( - String $version = $kafka::params::version, + String $kafka_version = $kafka::params::kafka_version, String $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String] $package_name = $kafka::params::package_name, String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + String $user_name = $kafka::params::user_name, + String $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String $service_name = 'kafka-mirror', - Boolean $service_install = $kafka::params::service_install, + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String] $service_requires = $kafka::params::service_requires, Optional[String] $limit_nofile = $kafka::params::limit_nofile, Optional[String] $limit_core = $kafka::params::limit_core, Hash $env = {}, Hash $consumer_config = {}, Hash $producer_config = {}, Hash $service_config = {}, String $heap_opts = $kafka::params::mirror_heap_opts, String $jmx_opts = $kafka::params::mirror_jmx_opts, String $log4j_opts = $kafka::params::mirror_log4j_opts, ) inherits kafka::params { class { 'kafka::mirror::install': } -> class { 'kafka::mirror::config': } -> class { 'kafka::mirror::service': } -> Class['kafka::mirror'] } diff --git a/manifests/mirror/config.pp b/manifests/mirror/config.pp index 9916c00..275c727 100644 --- a/manifests/mirror/config.pp +++ b/manifests/mirror/config.pp @@ -1,55 +1,55 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::config # # This private class is meant to be called from `kafka::mirror`. # It manages the mirror-maker config files # class kafka::mirror::config( - Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, + Boolean $manage_service = $kafka::mirror::manage_service, String $service_name = $kafka::mirror::service_name, - Boolean $service_install = $kafka::mirror::service_install, Boolean $service_restart = $kafka::mirror::service_restart, Hash $consumer_config = $kafka::mirror::consumer_config, Hash $producer_config = $kafka::mirror::producer_config, + Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, + String $user_name = $kafka::mirror::user_name, + String $group_name = $kafka::mirror::group_name, Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, - String $user = $kafka::mirror::user, - String $group = $kafka::mirror::group, ) { assert_private() if $consumer_config['group.id'] == '' { fail('[Consumer] You need to specify a value for group.id') } if $consumer_config['zookeeper.connect'] == '' { fail('[Consumer] You need to specify a value for zookeeper.connect') } if $producer_config['bootstrap.servers'] == '' { fail('[Producer] You need to specify a value for bootstrap.servers') } class { 'kafka::consumer::config': - config_dir => $config_dir, - config_mode => $config_mode, + manage_service => $manage_service, service_name => $service_name, - service_install => $service_install, service_restart => $service_restart, config => $consumer_config, - user => $user, - group => $group, + config_dir => $config_dir, + user_name => $user_name, + group_name => $group_name, + config_mode => $config_mode, } class { 'kafka::producer::config': - config_dir => $config_dir, - config_mode => $config_mode, + manage_service => $manage_service, service_name => $service_name, - service_install => $service_install, service_restart => $service_restart, config => $producer_config, - user => $user, - group => $group, + config_dir => $config_dir, + user_name => $user_name, + group_name => $group_name, + config_mode => $config_mode, } } diff --git a/manifests/mirror/install.pp b/manifests/mirror/install.pp index 9d9637a..cbc030b 100644 --- a/manifests/mirror/install.pp +++ b/manifests/mirror/install.pp @@ -1,34 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::install # # This private class is meant to be called from `kafka::mirror`. # It downloads the package and installs it. # class kafka::mirror::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::mirror::version, - scala_version => $kafka::mirror::scala_version, - install_dir => $kafka::mirror::install_dir, - mirror_url => $kafka::mirror::mirror_url, - install_java => $kafka::mirror::install_java, - package_dir => $kafka::mirror::package_dir, - package_name => $kafka::mirror::package_name, - package_ensure => $kafka::mirror::package_ensure, - user => $kafka::mirror::user, - group => $kafka::mirror::group, - user_id => $kafka::mirror::user_id, + manage_java => $kafka::mirror::manage_java, + manage_group => $kafka::mirror::manage_group, group_id => $kafka::mirror::group_id, + group_name => $kafka::mirror::group_name, manage_user => $kafka::mirror::manage_user, - manage_group => $kafka::mirror::manage_group, + user_id => $kafka::mirror::user_id, + user_name => $kafka::mirror::user_name, config_dir => $kafka::mirror::config_dir, log_dir => $kafka::mirror::log_dir, + mirror_url => $kafka::mirror::mirror_url, + kafka_version => $kafka::mirror::kafka_version, + scala_version => $kafka::mirror::scala_version, + install_dir => $kafka::mirror::install_dir, + package_dir => $kafka::mirror::package_dir, + package_ensure => $kafka::mirror::package_ensure, + package_name => $kafka::mirror::package_name, } } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index bb1333c..4b86c21 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,74 +1,74 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::service # # This private class is meant to be called from `kafka::mirror`. # It manages the kafka-mirror service # class kafka::mirror::service( - String $user = $kafka::mirror::user, - String $group = $kafka::mirror::group, + Boolean $manage_service = $kafka::mirror::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, + String $service_name = $kafka::mirror::service_name, + String $user_name = $kafka::mirror::user_name, + String $group_name = $kafka::mirror::group_name, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, - String $service_name = $kafka::mirror::service_name, - Boolean $service_install = $kafka::mirror::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, Array[String] $service_requires = $kafka::mirror::service_requires, Optional[String] $limit_nofile = $kafka::mirror::limit_nofile, Optional[String] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, Hash $consumer_config = $kafka::mirror::consumer_config, Hash $producer_config = $kafka::mirror::producer_config, Hash $service_config = $kafka::mirror::service_config, String $heap_opts = $kafka::mirror::heap_opts, String $jmx_opts = $kafka::mirror::jmx_opts, String $log4j_opts = $kafka::mirror::log4j_opts, ) { assert_private() - if $service_install { + if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/params.pp b/manifests/params.pp index 23e9be0..b3f08d9 100644 --- a/manifests/params.pp +++ b/manifests/params.pp @@ -1,73 +1,73 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class kafka::params # # This class is meant to be called from kafka::broker # It sets variables according to platform # class kafka::params { # this is all only tested on Debian and RedHat # params gets included everywhere so we can do the validation here unless $facts['os']['family'] =~ /(RedHat|Debian)/ { warning("${facts['os']['family']} is not supported") } - $version = '2.4.1' + $kafka_version = '2.4.1' $scala_version = '2.12' - $install_dir = "/opt/kafka-${scala_version}-${version}" + $install_dir = "/opt/kafka-${scala_version}-${kafka_version}" $config_dir = '/opt/kafka/config' $bin_dir = '/opt/kafka/bin' $log_dir = '/var/log/kafka' $mirror_url = 'https://www.apache.org/dyn/closer.lua?action=download&filename=' - $mirror_subpath = "kafka/${version}" - $install_java = false + $mirror_subpath = "kafka/${kafka_version}" + $manage_java = false $package_dir = '/var/tmp/kafka' $package_name = undef $proxy_server = undef $proxy_host = undef $proxy_port = undef $proxy_type = undef $package_ensure = 'present' - $user = 'kafka' - $group = 'kafka' + $user_name = 'kafka' + $group_name = 'kafka' $user_id = undef $group_id = undef $system_user = false $system_group = false $manage_user = true $manage_group = true $config_mode = '0644' $install_mode = '0755' - $service_install = true + $manage_service = true $service_ensure = 'running' $service_restart = true $service_requires = $facts['os']['family'] ? { 'RedHat' => ['network.target', 'syslog.target'], default => [], } $limit_nofile = undef $limit_core = undef $timeout_stop = undef $exec_stop = false $daemon_start = false $broker_heap_opts = '-Xmx1G -Xms1G' $broker_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false \ -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9990' $broker_log4j_opts = "-Dlog4j.configuration=file:${config_dir}/log4j.properties" $broker_opts = '' $mirror_heap_opts = '-Xmx256M' $mirror_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991' $mirror_log4j_opts = $broker_log4j_opts $producer_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9992' $producer_log4j_opts = $broker_log4j_opts $consumer_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993' $consumer_log4j_opts = $broker_log4j_opts } diff --git a/manifests/producer.pp b/manifests/producer.pp index d083824..0c8cb53 100644 --- a/manifests/producer.pp +++ b/manifests/producer.pp @@ -1,146 +1,146 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer # # This class will install kafka with the producer role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the producer configuration options. # # [*service_config*] # A hash of the `kafka-console-producer.sh` script options. # # === Examples # # Create the producer service connecting to a local zookeeper # # class { 'kafka::producer': # config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::producer ( $input, - String $version = $kafka::params::version, + String $kafka_version = $kafka::params::kafka_version, String $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String] $package_name = $kafka::params::package_name, String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + String $user_name = $kafka::params::user_name, + String $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String $service_name = 'kafka-producer', - Boolean $service_install = $kafka::params::service_install, + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String] $service_requires = $kafka::params::service_requires, Optional[String] $limit_nofile = $kafka::params::limit_nofile, Optional[String] $limit_core = $kafka::params::limit_core, Hash $env = {}, Hash $config = {}, Hash $service_config = {}, String $jmx_opts = $kafka::params::producer_jmx_opts, String $log4j_opts = $kafka::params::producer_log4j_opts, ) inherits kafka::params { class { 'kafka::producer::install': } -> class { 'kafka::producer::config': } -> class { 'kafka::producer::service': } -> Class['kafka::producer'] } diff --git a/manifests/producer/config.pp b/manifests/producer/config.pp index fad8113..8fc8d13 100644 --- a/manifests/producer/config.pp +++ b/manifests/producer/config.pp @@ -1,37 +1,37 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::config # # This private class is meant to be called from `kafka::producer`. # It manages the producer config files # class kafka::producer::config( - Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, + Boolean $manage_service = $kafka::producer::manage_service, String $service_name = $kafka::producer::service_name, - Boolean $service_install = $kafka::producer::service_install, Boolean $service_restart = $kafka::producer::service_restart, Hash $config = $kafka::producer::config, + Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, + String $user_name = $kafka::producer::user_name, + String $group_name = $kafka::producer::group_name, Stdlib::Filemode $config_mode = $kafka::producer::config_mode, - String $user = $kafka::producer::user, - String $group = $kafka::producer::group, ) { - if ($service_install and $service_restart) { + if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'producerconfigs' file { "${config_dir}/producer.properties": ensure => present, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/producer/install.pp b/manifests/producer/install.pp index 78e0b6a..23d5c9b 100644 --- a/manifests/producer/install.pp +++ b/manifests/producer/install.pp @@ -1,34 +1,34 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::install # # This private class is meant to be called from `kafka::producer`. # It downloads the package and installs it. # class kafka::producer::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::producer::version, - scala_version => $kafka::producer::scala_version, - install_dir => $kafka::producer::install_dir, - mirror_url => $kafka::producer::mirror_url, - install_java => $kafka::producer::install_java, - package_dir => $kafka::producer::package_dir, - package_name => $kafka::producer::package_name, - package_ensure => $kafka::producer::package_ensure, - user => $kafka::producer::user, - group => $kafka::producer::group, - user_id => $kafka::producer::user_id, + manage_java => $kafka::producer::manage_java, + manage_group => $kafka::producer::manage_group, group_id => $kafka::producer::group_id, + group_name => $kafka::producer::group_name, manage_user => $kafka::producer::manage_user, - manage_group => $kafka::producer::manage_group, + user_id => $kafka::producer::user_id, + user_name => $kafka::producer::user_name, config_dir => $kafka::producer::config_dir, log_dir => $kafka::producer::log_dir, + mirror_url => $kafka::producer::mirror_url, + kafka_version => $kafka::producer::kafka_version, + scala_version => $kafka::producer::scala_version, + install_dir => $kafka::producer::install_dir, + package_dir => $kafka::producer::package_dir, + package_ensure => $kafka::producer::package_ensure, + package_name => $kafka::producer::package_name, } } } diff --git a/manifests/producer/service.pp b/manifests/producer/service.pp index ef2f2dd..ea4819a 100644 --- a/manifests/producer/service.pp +++ b/manifests/producer/service.pp @@ -1,64 +1,64 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::service # # This private class is meant to be called from `kafka::producer`. # It manages the kafka-producer service # class kafka::producer::service( - String $user = $kafka::producer::user, - String $group = $kafka::producer::group, + Boolean $manage_service = $kafka::producer::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, + String $service_name = $kafka::producer::service_name, + String $user_name = $kafka::producer::user_name, + String $group_name = $kafka::producer::group_name, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::producer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::producer::bin_dir, - String $service_name = $kafka::producer::service_name, - Boolean $service_install = $kafka::producer::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, Array[String] $service_requires = $kafka::producer::service_requires, Optional[String] $limit_nofile = $kafka::producer::limit_nofile, Optional[String] $limit_core = $kafka::producer::limit_core, Hash $env = $kafka::producer::env, $input = $kafka::producer::input, String $jmx_opts = $kafka::producer::jmx_opts, String $log4j_opts = $kafka::producer::log4j_opts, Hash $service_config = $kafka::producer::service_config, ) { assert_private() - if $service_install { + if $manage_service { if $service_config['broker-list'] == '' { fail('[Producer] You need to specify a value for broker-list') } if $service_config['topic'] == '' { fail('[Producer] You need to specify a value for topic') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) - if $::service_provider == 'systemd' { + if $facts['service_provider'] == 'systemd' { fail('Console Producer is not supported on systemd, because the stdin of the process cannot be redirected') } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/spec/acceptance/broker_spec.rb b/spec/acceptance/broker_spec.rb index e242c32..9926af1 100644 --- a/spec/acceptance/broker_spec.rb +++ b/spec/acceptance/broker_spec.rb @@ -1,234 +1,234 @@ require 'spec_helper_acceptance' describe 'kafka::broker' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } -> kafka::topic { 'demo': ensure => present, zookeeper => 'localhost:2181', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::broker::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::broker::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } it { is_expected.to contain 'zookeeper.connect=localhost:2181' } end end context 'with custom config dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, config_dir => '/opt/kafka/custom_config' } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/custom_config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } it { is_expected.to contain 'zookeeper.connect=localhost:2181' } end end context 'with specific version' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': - version => '2.4.0', - config => { + kafka_version => '2.4.0', + config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::broker::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/etc/init.d/kafka'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } end describe file('/etc/systemd/system/kafka.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } end describe service('kafka') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end describe 'kafka::broker::service' do context 'with log4j/jmx parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create log dir': command => '/bin/mkdir -p /some/path/to/logs', creates => '/some/path/to/logs', } -> class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, heap_opts => '-Xmx512M -Xmx512M', log4j_opts => '-Dlog4j.configuration=file:/tmp/log4j.properties', jmx_opts => '-Dcom.sun.management.jmxremote', opts => '-Djava.security.policy=/some/path/my.policy', log_dir => '/some/path/to/logs' } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote"' } it { is_expected.to contain 'export KAFKA_HEAP_OPTS="-Xmx512M -Xmx512M"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/tmp/log4j.properties"' } end describe file('/etc/init.d/kafka'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain %r{^# Provides:\s+kafka$} } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote"' } it { is_expected.to contain 'export KAFKA_HEAP_OPTS="-Xmx512M -Xmx512M"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/tmp/log4j.properties"' } end describe file('/etc/systemd/system/kafka.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain "Environment='KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote'" } it { is_expected.to contain "Environment='KAFKA_HEAP_OPTS=-Xmx512M -Xmx512M'" } it { is_expected.to contain "Environment='KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/tmp/log4j.properties'" } it { is_expected.to contain "Environment='KAFKA_OPTS=-Djava.security.policy=/some/path/my.policy'" } it { is_expected.to contain "Environment='LOG_DIR=/some/path/to/logs'" } end describe service('kafka') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/init_spec.rb b/spec/acceptance/init_spec.rb index fdd3b92..e74d8db 100644 --- a/spec/acceptance/init_spec.rb +++ b/spec/acceptance/init_spec.rb @@ -1,212 +1,212 @@ require 'spec_helper_acceptance' describe 'kafka' do it 'works with no errors' do pp = <<-EOS class { 'kafka': } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::init' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka': } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific kafka version' do it 'works with no errors' do pp = <<-EOS class { 'kafka': - version => '2.4.0', + kafka_version => '2.4.0', } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.0') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.0') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific scala version' do it 'works with no errors' do pp = <<-EOS class { 'kafka': scala_version => '2.13', } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.13-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.13-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific config dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka': config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/custom_config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end end diff --git a/spec/acceptance/mirror_spec.rb b/spec/acceptance/mirror_spec.rb index b17f4e9..1301b47 100644 --- a/spec/acceptance/mirror_spec.rb +++ b/spec/acceptance/mirror_spec.rb @@ -1,244 +1,244 @@ require 'spec_helper_acceptance' describe 'kafka::mirror' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::mirror::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::mirror::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with custom config_dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/custom_config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/custom_config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific version' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': - version => '2.4.0', + kafka_version => '2.4.0', consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::mirror::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka-mirror'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end describe file('/etc/init.d/kafka-mirror'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain %r{^# Provides:\s+kafka-mirror$} } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end describe file('/etc/systemd/system/kafka-mirror.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'Environment=\'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991\'' } it { is_expected.to contain 'Environment=\'KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties\'' } end describe service('kafka-mirror') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/classes/broker_spec.rb b/spec/classes/broker_spec.rb index f514266..ee9017f 100644 --- a/spec/classes/broker_spec.rb +++ b/spec/classes/broker_spec.rb @@ -1,173 +1,173 @@ require 'spec_helper' require 'shared_examples_param_validation' describe 'kafka::broker', type: :class do let :facts do { osfamily: 'Debian', os: { family: 'Debian' }, operatingsystem: 'Ubuntu', operatingsystemrelease: '14.04', lsbdistcodename: 'trusty', architecture: 'amd64', service_provider: 'upstart' } end let :common_params do { config: { 'zookeeper.connect' => 'localhost:2181' } } end let :params do common_params end it { is_expected.to contain_class('kafka::broker::install').that_comes_before('Class[kafka::broker::config]') } it { is_expected.to contain_class('kafka::broker::config').that_comes_before('Class[kafka::broker::service]') } it { is_expected.to contain_class('kafka::broker::service').that_comes_before('Class[kafka::broker]') } it { is_expected.to contain_class('kafka::broker') } context 'on Debian' do describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do - context 'service_install false' do + context 'manage_service false' do let :params do - common_params.merge(service_install: false) + common_params.merge(manage_service: false) end it { is_expected.not_to contain_file('/etc/init.d/kafka') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do it { is_expected.to contain_file('/etc/init.d/kafka') } context 'limit_nofile set' do let :params do { limit_nofile: '65536' } end it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -n 65536$} } end context 'limit_core set' do let :params do { limit_core: 'infinity' } end it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -c infinity$} } end it { is_expected.to contain_service('kafka') } end end end context 'on Centos' do let :facts do { osfamily: 'RedHat', os: { family: 'RedHat' }, operatingsystem: 'CentOS', operatingsystemrelease: '7', operatingsystemmajrelease: '7', architecture: 'amd64', path: '/usr/local/sbin', service_provider: 'systemd' } end describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do - context 'service_install false' do + context 'manage_service false' do let :params do - common_params.merge(service_install: false) + common_params.merge(manage_service: false) end it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do it { is_expected.to contain_file('/etc/systemd/system/kafka.service').that_notifies('Exec[systemctl-daemon-reload]') } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=network\.target syslog\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=network\.target syslog\.target$} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=} } it do is_expected.to contain_file('/etc/init.d/kafka').with( ensure: 'absent' ) end it { is_expected.to contain_exec('systemctl-daemon-reload').that_comes_before('Service[kafka]') } it { is_expected.to contain_service('kafka') } end context 'limit_nofile set' do let :params do { limit_nofile: '65536' } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=65536$} } end context 'limit_core set' do let :params do { limit_core: 'infinity' } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=infinity$} } end context 'service_requires set' do let :params do { service_requires: ['dummy.target'] } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=dummy\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=dummy\.target$} } end end end it_validates_parameter 'mirror_url' end diff --git a/spec/classes/init_spec.rb b/spec/classes/init_spec.rb index 410d440..75851a0 100644 --- a/spec/classes/init_spec.rb +++ b/spec/classes/init_spec.rb @@ -1,62 +1,62 @@ require 'spec_helper' describe 'kafka', type: :class do let :facts do { osfamily: 'Debian', os: { family: 'Debian' }, operatingsystem: 'Ubuntu', operatingsystemrelease: '14.04', lsbdistcodename: 'trusty', architecture: 'amd64', service_provider: 'upstart' } end it { is_expected.to contain_class('kafka::params') } context 'on Debian' do describe 'kafka' do context 'defaults' do it { is_expected.to contain_group('kafka') } it { is_expected.to contain_user('kafka') } it { is_expected.to contain_file('/var/tmp/kafka') } it { is_expected.to contain_file('/opt/kafka-2.12-2.4.1') } it { is_expected.to contain_file('/opt/kafka') } it { is_expected.to contain_file('/opt/kafka/config') } it { is_expected.to contain_file('/var/log/kafka') } end end end context 'on Debian' do describe 'kafka' do context 'all (compatible) parameters' do let :params do { - version: '0.10.0.1', + kafka_version: '0.10.0.1', scala_version: '2.13', install_dir: '/usr/local/kafka', user_id: 9092, group_id: 9092, - user: 'mykafka', - group: 'mykafka', - install_java: false, + user_name: 'mykafka', + group_name: 'mykafka', + manage_java: false, config_dir: '/opt/kafka/custom_config', log_dir: '/var/log/custom_kafka' } end it { is_expected.to contain_group('mykafka').with(gid: 9092) } it { is_expected.to contain_user('mykafka').with(uid: 9092) } it { is_expected.to contain_file('/var/tmp/kafka') } it { is_expected.to contain_file('/opt/kafka') } it { is_expected.to contain_file('/usr/local/kafka') } it { is_expected.to contain_file('/opt/kafka/custom_config') } it { is_expected.to contain_file('/var/log/custom_kafka') } end end end end diff --git a/templates/init.erb b/templates/init.erb index aca3aeb..3cdd431 100644 --- a/templates/init.erb +++ b/templates/init.erb @@ -1,161 +1,161 @@ #!/bin/sh # # Init file for Apache Kafka <%= @service_name.split(/-/)[1] and @service_name.split(/-/)[1].capitalize %> # <%- if @osfamily == 'Debian' -%> ### BEGIN INIT INFO # Provides: <%= @service_name %> # Required-Start: <%= @service_requires.join(' ') %> # Required-Stop: # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # X-Interactive: true # Short-Description: Apache Kafka is a distributed publish-subscribe messaging system ### END INIT INFO <%- else -%> # chkconfig: 35 85 15 # description: Apache Kafka is a distributed publish-subscribe messaging system # pidfile: /var/run/<%= @service_name -%>.pid <%- end -%> NAME=<%= @service_name %> <% @environment.sort.map do |k,v| -%> <% unless v.to_s.strip.empty? -%> export <%= k %>="<%= v %>" <% end -%> <% end -%> PID_FILE="/var/run/$NAME.pid" -KAFKA_USER=<%= @user %> +KAFKA_USER=<%= @user_name %> <%- case @service_name when 'kafka' -%> PGREP_PATTERN=kafka.Kafka DAEMON="<%= @bin_dir %>/kafka-server-start.sh" DAEMON_OPTS="<%= @config_dir %>/server.properties" <%- when 'kafka-consumer' -%> PGREP_PATTERN=kafka.tools.ConsoleConsumer DAEMON="<%= @bin_dir %>/kafka-console-consumer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-mirror' -%> PGREP_PATTERN=kafka.tools.MirrorMaker DAEMON="<%= @bin_dir %>/kafka-run-class.sh" DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --producer.config <%= @config_dir %>/producer.properties <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-producer' -%> PGREP_PATTERN=kafka.tools.ConsoleProducer DAEMON="<%= @bin_dir %>/kafka-console-producer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" PRODUCER_INPUT="<%= @input %>" <%- end -%> if [ -f /etc/default/kafka ]; then . /etc/default/kafka fi start() { <% if @limit_nofile -%> ulimit -n <%= @limit_nofile %> <% end -%> <% if @limit_core -%> ulimit -c <%= @limit_core %> <% end -%> ulimit -s 10240 if [ -f "$PID_FILE" ]; then PID=`cat "$PID_FILE"` if [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$PID_FILE exists, process is already running" exit 0 fi echo "$PID_FILE exists but the process is not running. Deleting $PID_FILE and re-trying" rm -f -- "$PID_FILE" start return $? fi /bin/su "$KAFKA_USER" -c "KAFKA_JMX_OPTS=\"$KAFKA_JMX_OPTS\" $DAEMON $DAEMON_OPTS<%- if @service_name == 'kafka-producer' -%> $PRODUCER_INPUT<%- end -%> >/dev/null 2>&1 &" sleep 2 PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` if [ -z "$PID" ]; then echo "$NAME could not be started" exit 1 fi echo "$PID" > "$PID_FILE"; echo "$NAME started" return 0 } stop() { if ! [ -f "$PID_FILE" ]; then echo -n "$PID_FILE does not exist" if PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ; then echo -n ", but process is running" echo "$PID" > "$PID_FILE" else echo -n ", and process is not running" return 1 fi fi PID=`cat $PID_FILE` kill $PID; rm -f -- "$PID_FILE"; # wait until the process is finished RETRIES=0 MAX_RETRIES=10 while [ ! -z `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ]; do sleep 1 RETRIES=$((RETRIES+1)) if [ "$RETRIES" -ge "$MAX_RETRIES" ]; then echo "$NAME service: stop tried $MAX_RETRIES times but process $PID is still running" return 1 fi done echo "$NAME stopped" return 0 } status() { if ! [ -f "$PID_FILE" ]; then echo "$NAME stopped" exit 1 fi PID=`cat "$PID_FILE"` if ! [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$NAME stopped but pid file exists" exit 1 fi echo "$NAME running with pid $PID" exit 0 } case "$1" in status) status ;; start) echo "Starting daemon: $NAME" start ;; stop) echo "Stopping daemon: $NAME" stop ;; restart) echo "Restarting daemon: $NAME" stop sleep 2 start ;; *) echo "Usage: "$1" {status|start|stop|restart}" exit 1 esac exit 0 diff --git a/templates/unit.erb b/templates/unit.erb index cefb16d..b4abcef 100644 --- a/templates/unit.erb +++ b/templates/unit.erb @@ -1,45 +1,45 @@ [Unit] Description=Apache Kafka server (<%= (@service_name.split(/-/)[1] and @service_name.split(/-/)[1].capitalize) or 'broker' -%>) Documentation=http://kafka.apache.org/documentation.html <% if !@service_requires.empty? -%> After=<%= @service_requires.join(' ') %> Wants=<%= @service_requires.join(' ') %> <%- end -%> [Service] -User=<%= @user %> -Group=<%= @group %> +User=<%= @user_name %> +Group=<%= @group_name %> SyslogIdentifier=<%= @service_name %> <% @environment.sort.map do |k,v| -%> <% unless v.to_s.strip.empty? -%> Environment='<%= k %>=<%= v %>' <% end -%> <% end -%> <%- case @service_name when 'kafka' -%> Type=<%= @daemon_start ? 'forking' : 'simple' %> ExecStart=<%= @bin_dir %>/kafka-server-start.sh<%- if @daemon_start -%> -daemon<%- end -%> <%= @config_dir %>/server.properties <%- if @exec_stop -%> ExecStop=<%= @bin_dir %>/kafka-server-stop.sh <%- end -%> <%- if @timeout_stop -%> TimeoutStopSec=<%= @timeout_stop %> <%- end -%> <%- when 'kafka-consumer' -%> Type=simple ExecStart=<%= @bin_dir %>/kafka-console-consumer.sh <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end %> <%- when 'kafka-mirror' -%> Type=simple ExecStart=<%= @bin_dir %>/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --producer.config <%= @config_dir %>/producer.properties <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end %> <%- when 'kafka-producer' -%> Type=simple ExecStart=<%= @bin_dir %>/kafka-console-producer.sh <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%> <%= @input %> <%- end -%> <%- if @limit_nofile -%> LimitNOFILE=<%= @limit_nofile %> <%- end -%> <%- if @limit_core -%> LimitCORE=<%= @limit_core %> <%- end -%> [Install] WantedBy=multi-user.target