diff --git a/manifests/broker.pp b/manifests/broker.pp index f708814..e36f9a3 100644 --- a/manifests/broker.pp +++ b/manifests/broker.pp @@ -1,159 +1,158 @@ # @summary # This class handles the Kafka (broker). # # @example Basic usage # class { 'kafka::broker': # config => { # 'broker.id' => '0', # 'zookeeper.connect' => 'localhost:2181' # } # } # # @param kafka_version # The version of Kafka that should be installed. # # @param scala_version # The scala version what Kafka was built with. # # @param install_dir # The directory to install Kafka to. # # @param mirror_url # The url where the Kafka is downloaded from. # # @param manage_java # Install java if it's not already installed. # # @param package_dir # The directory to install Kafka. # # @param package_name # Package name, when installing Kafka from a package. # # @param package_ensure # Package version or ensure state, when installing Kafka from a package. # # @param user_name # User to run Kafka as. # # @param user_shell # Login shell of the Kafka user. # # @param group_name # Group to run Kafka as. # # @param user_id # Create the Kafka user with this ID. # # @param group_id # Create the Kafka group with this ID. # # @param manage_user # Create the Kafka user if it's not already present. # # @param manage_group # Create the Kafka group if it's not already present. # # @param config_mode # The permissions for the config files. # # @param config_dir # The directory to create the Kafka config files to. # # @param log_dir # The directory for Kafka log files. # # @param bin_dir # The directory where the Kafka scripts are. # # @param service_name # Set the name of the service. # # @param manage_service # Install the init.d or systemd service. # # @param service_ensure # Set the ensure state of the service. # # @param service_restart # Whether the configuration files should trigger a service restart. # # @param service_requires # Set the list of services required to be running before Kafka. # # @param limit_nofile # Set the 'LimitNOFILE' option of the systemd service. # # @param limit_core # Set the 'LimitCORE' option of the systemd service. # # @param timeout_stop # Set the 'TimeoutStopSec' option of the systemd service. # # @param exec_stop # Set the 'ExecStop' option of the systemd service to 'kafka-server-stop.sh'. # # @param daemon_start # Use the '-daemon' option when starting Kafka with 'kafka-server-start.sh'. # # @param env # A hash of the environment variables to set. # # @param config # A hash of the broker configuration options. # # @param heap_opts # Set the Java heap size. # # @param jmx_opts # Set the JMX options. # # @param log4j_opts # Set the Log4j options. # # @param opts # Set the Kafka options. # class kafka::broker ( String[1] $kafka_version = $kafka::params::kafka_version, String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String[1]] $package_name = $kafka::params::package_name, String[1] $package_ensure = $kafka::params::package_ensure, String[1] $user_name = $kafka::params::user_name, Stdlib::Absolutepath $user_shell = $kafka::params::user_shell, String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String[1] $service_name = 'kafka', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String[1]] $service_requires = $kafka::params::service_requires, Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, Optional[String[1]] $limit_core = $kafka::params::limit_core, Optional[String[1]] $timeout_stop = $kafka::params::timeout_stop, Boolean $exec_stop = $kafka::params::exec_stop, Boolean $daemon_start = $kafka::params::daemon_start, Hash $env = {}, Hash[String[1], Any] $config = {}, String[1] $heap_opts = $kafka::params::broker_heap_opts, String[1] $jmx_opts = $kafka::params::broker_jmx_opts, String[1] $log4j_opts = $kafka::params::broker_log4j_opts, String[0] $opts = $kafka::params::broker_opts, ) inherits kafka::params { - class { 'kafka::broker::install': } -> class { 'kafka::broker::config': } -> class { 'kafka::broker::service': } -> Class['kafka::broker'] } diff --git a/manifests/broker/config.pp b/manifests/broker/config.pp index 1602351..0407ecc 100644 --- a/manifests/broker/config.pp +++ b/manifests/broker/config.pp @@ -1,35 +1,34 @@ # @summary # This class handles the Kafka (broker) config. # # @api private # -class kafka::broker::config( +class kafka::broker::config ( Boolean $manage_service = $kafka::broker::manage_service, String[1] $service_name = $kafka::broker::service_name, Boolean $service_restart = $kafka::broker::service_restart, Hash[String[1], Any] $config = $kafka::broker::config, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, String[1] $user_name = $kafka::broker::user_name, String[1] $group_name = $kafka::broker::group_name, Stdlib::Filemode $config_mode = $kafka::broker::config_mode, ) { - assert_private() if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'brokerconfigs' file { "${config_dir}/server.properties": - ensure => present, + ensure => file, owner => $user_name, group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/broker/install.pp b/manifests/broker/install.pp index 8870e36..5c66a06 100644 --- a/manifests/broker/install.pp +++ b/manifests/broker/install.pp @@ -1,31 +1,30 @@ # @summary # This class handles the Kafka (broker) package. # # @api private # class kafka::broker::install { - assert_private() if !defined(Class['kafka']) { class { 'kafka': manage_java => $kafka::broker::manage_java, manage_group => $kafka::broker::manage_group, group_id => $kafka::broker::group_id, group_name => $kafka::broker::group_name, manage_user => $kafka::broker::manage_user, user_id => $kafka::broker::user_id, user_name => $kafka::broker::user_name, user_shell => $kafka::broker::user_shell, config_dir => $kafka::broker::config_dir, log_dir => $kafka::broker::log_dir, mirror_url => $kafka::broker::mirror_url, kafka_version => $kafka::broker::kafka_version, scala_version => $kafka::broker::scala_version, install_dir => $kafka::broker::install_dir, package_dir => $kafka::broker::package_dir, package_ensure => $kafka::broker::package_ensure, package_name => $kafka::broker::package_name, } } } diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index a2f66fa..fa59c84 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,72 +1,71 @@ # @summary # This class handles the Kafka (broker) service. # # @api private # -class kafka::broker::service( +class kafka::broker::service ( Boolean $manage_service = $kafka::broker::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, String[1] $service_name = $kafka::broker::service_name, String[1] $user_name = $kafka::broker::user_name, String[1] $group_name = $kafka::broker::group_name, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, Array[String[1]] $service_requires = $kafka::broker::service_requires, Optional[String[1]] $limit_nofile = $kafka::broker::limit_nofile, Optional[String[1]] $limit_core = $kafka::broker::limit_core, Optional[String[1]] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, String[1] $heap_opts = $kafka::broker::heap_opts, String[1] $jmx_opts = $kafka::broker::jmx_opts, String[1] $log4j_opts = $kafka::broker::log4j_opts, String[0] $opts = $kafka::broker::opts, ) { - assert_private() if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer.pp b/manifests/consumer.pp index d10394c..06439f9 100644 --- a/manifests/consumer.pp +++ b/manifests/consumer.pp @@ -1,143 +1,142 @@ # @summary # This class handles the Kafka (consumer). # # @example Basic usage # class { 'kafka::consumer': # config => { # 'client.id' => '0', # 'zookeeper.connect' => 'localhost:2181' # } # } # # @param kafka_version # The version of Kafka that should be installed. # # @param scala_version # The scala version what Kafka was built with. # # @param install_dir # The directory to install Kafka to. # # @param mirror_url # The url where the Kafka is downloaded from. # # @param manage_java # Install java if it's not already installed. # # @param package_dir # The directory to install Kafka. # # @param package_name # Package name, when installing Kafka from a package. # # @param package_ensure # Package version or ensure state, when installing Kafka from a package. # # @param user_name # User to run Kafka as. # # @param user_shell # Login shell of the Kafka user. # # @param group_name # Group to run Kafka as. # # @param user_id # Create the Kafka user with this ID. # # @param group_id # Create the Kafka group with this ID. # # @param manage_user # Create the Kafka user if it's not already present. # # @param manage_group # Create the Kafka group if it's not already present. # # @param config_mode # The permissions for the config files. # # @param config_dir # The directory to create the Kafka config files to. # # @param log_dir # The directory for Kafka log files. # # @param bin_dir # The directory where the Kafka scripts are. # # @param service_name # Set the name of the service. # # @param manage_service # Install the init.d or systemd service. # # @param service_ensure # Set the ensure state of the service. # # @param service_restart # Whether the configuration files should trigger a service restart. # # @param service_requires # Set the list of services required to be running before Kafka. # # @param limit_nofile # Set the 'LimitNOFILE' option of the systemd service. # # @param limit_core # Set the 'LimitCORE' option of the systemd service. # # @param env # A hash of the environment variables to set. # # @param config # A hash of the consumer configuration options. # # @param service_config # A hash of the `kafka-console-consumer.sh` script options. # # @param jmx_opts # Set the JMX options. # # @param log4j_opts # Set the Log4j options. # class kafka::consumer ( String[1] $kafka_version = $kafka::params::kafka_version, String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String[1]] $package_name = $kafka::params::package_name, String[1] $package_ensure = $kafka::params::package_ensure, String[1] $user_name = $kafka::params::user_name, Stdlib::Absolutepath $user_shell = $kafka::params::user_shell, String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String[1] $service_name = 'kafka-consumer', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String[1]] $service_requires = $kafka::params::service_requires, Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, Hash[String[1], Any] $config = {}, Hash[String[1],String[1]] $service_config = {}, String[1] $jmx_opts = $kafka::params::consumer_jmx_opts, String[1] $log4j_opts = $kafka::params::consumer_log4j_opts, ) inherits kafka::params { - class { 'kafka::consumer::install': } -> class { 'kafka::consumer::config': } -> class { 'kafka::consumer::service': } -> Class['kafka::consumer'] } diff --git a/manifests/consumer/config.pp b/manifests/consumer/config.pp index 6e92d9d..b44f096 100644 --- a/manifests/consumer/config.pp +++ b/manifests/consumer/config.pp @@ -1,31 +1,30 @@ # @summary # This class handles the Kafka (consumer) config. # -class kafka::consumer::config( +class kafka::consumer::config ( Boolean $manage_service = $kafka::consumer::manage_service, String[1] $service_name = $kafka::consumer::service_name, Boolean $service_restart = $kafka::consumer::service_restart, Hash[String[1], Any] $config = $kafka::consumer::config, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, String[1] $user_name = $kafka::consumer::user_name, String[1] $group_name = $kafka::consumer::group_name, Stdlib::Filemode $config_mode = $kafka::consumer::config_mode, ) { - if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'consumerconfigs' file { "${config_dir}/consumer.properties": - ensure => present, + ensure => file, owner => $user_name, group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/consumer/install.pp b/manifests/consumer/install.pp index 5b98f9f..9a1d63c 100644 --- a/manifests/consumer/install.pp +++ b/manifests/consumer/install.pp @@ -1,31 +1,30 @@ # @summary # This class handles the Kafka (consumer) package. # # @api private # class kafka::consumer::install { - assert_private() if !defined(Class['kafka']) { class { 'kafka': manage_java => $kafka::consumer::manage_java, manage_group => $kafka::consumer::manage_group, group_id => $kafka::consumer::group_id, group_name => $kafka::consumer::group_name, manage_user => $kafka::consumer::manage_user, user_id => $kafka::consumer::user_id, user_name => $kafka::consumer::user_name, user_shell => $kafka::consumer::user_shell, config_dir => $kafka::consumer::config_dir, log_dir => $kafka::consumer::log_dir, mirror_url => $kafka::consumer::mirror_url, kafka_version => $kafka::consumer::kafka_version, scala_version => $kafka::consumer::scala_version, install_dir => $kafka::consumer::install_dir, package_dir => $kafka::consumer::package_dir, package_ensure => $kafka::consumer::package_ensure, package_name => $kafka::consumer::package_name, } } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index 287ae32..e96facd 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,73 +1,71 @@ # @summary # This class handles the Kafka (consumer) service. # # @api private # -class kafka::consumer::service( +class kafka::consumer::service ( Boolean $manage_service = $kafka::consumer::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, String[1] $service_name = $kafka::consumer::service_name, String[1] $user_name = $kafka::consumer::user_name, String[1] $group_name = $kafka::consumer::group_name, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, Array[String[1]] $service_requires = $kafka::consumer::service_requires, Optional[String[1]] $limit_nofile = $kafka::consumer::limit_nofile, Optional[String[1]] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, String[1] $jmx_opts = $kafka::consumer::jmx_opts, String[1] $log4j_opts = $kafka::consumer::log4j_opts, Hash[String[1],String[1]] $service_config = $kafka::consumer::service_config, ) { - assert_private() if $manage_service { - if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/init.pp b/manifests/init.pp index cd43efa..c8dd525 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -1,236 +1,235 @@ # @summary # This class handles the Kafka requirements. # # @example Basic usage # class { 'kafka': } # # @param kafka_version # The version of Kafka that should be installed. # # @param scala_version # The scala version what Kafka was built with. # # @param install_dir # The directory to install Kafka to. # # @param mirror_url # The url where the Kafka is downloaded from. # # @param manage_java # Install java if it's not already installed. # # @param package_dir # The directory to install Kafka. # # @param package_name # Package name, when installing Kafka from a package. # # @param mirror_subpath # The sub directory where the source is downloaded from. # # @param proxy_server # Set proxy server, when installing Kafka from source. # # @param proxy_port # Set proxy port, when installing Kafka from source. # # @param proxy_host # Set proxy host, when installing Kafka from source. # # @param proxy_type # Set proxy type, when installing Kafka from source. # # @param package_ensure # Package version or ensure state, when installing Kafka from a package. # # @param user_name # User to run Kafka as. # # @param user_shell # Login shell of the Kafka user. # # @param group_name # Group to run Kafka as. # # @param system_user # Whether the Kafka user is a system user or not. # # @param system_group # Whether the Kafka group is a system group or not. # # @param user_id # Create the Kafka user with this ID. # # @param group_id # Create the Kafka group with this ID. # # @param manage_user # Create the Kafka user if it's not already present. # # @param manage_group # Create the Kafka group if it's not already present. # # @param config_dir # The directory to create the Kafka config files to. # # @param log_dir # The directory for Kafka log files. # # @param install_mode # The permissions for the install directory. # class kafka ( String[1] $kafka_version = $kafka::params::kafka_version, String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String[1]] $package_name = $kafka::params::package_name, Optional[String[1]] $mirror_subpath = $kafka::params::mirror_subpath, Optional[String[1]] $proxy_server = $kafka::params::proxy_server, Optional[String[1]] $proxy_port = $kafka::params::proxy_port, Optional[String[1]] $proxy_host = $kafka::params::proxy_host, Optional[String[1]] $proxy_type = $kafka::params::proxy_type, String[1] $package_ensure = $kafka::params::package_ensure, String[1] $user_name = $kafka::params::user_name, Stdlib::Absolutepath $user_shell = $kafka::params::user_shell, String[1] $group_name = $kafka::params::group_name, Boolean $system_user = $kafka::params::system_user, Boolean $system_group = $kafka::params::system_group, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Filemode $install_mode = $kafka::params::install_mode, ) inherits kafka::params { - if $manage_java { class { 'java': distribution => 'jdk', } } if $manage_group { group { $group_name: ensure => present, gid => $group_id, system => $system_group, } } if $manage_user { user { $user_name: ensure => present, shell => $user_shell, require => Group[$group_name], uid => $user_id, system => $system_user, } } file { $config_dir: ensure => directory, owner => $user_name, group => $group_name, } file { $log_dir: ensure => directory, owner => $user_name, group => $group_name, require => [ Group[$group_name], User[$user_name], ], } if $package_name == undef { include archive $mirror_path = $mirror_subpath ? { # if mirror_subpath was not changed, # we adapt it for the version $kafka::params::mirror_subpath => "kafka/${kafka_version}", # else, we just take whatever was supplied: default => $mirror_subpath, } $basefilename = "kafka_${scala_version}-${kafka_version}.tgz" $package_url = "${mirror_url}${mirror_path}/${basefilename}" - $source = $mirror_url ?{ + $source = $mirror_url ? { /tgz$/ => $mirror_url, default => $package_url, } $install_directory = $install_dir ? { # if install_dir was not changed, # we adapt it for the scala_version and the version $kafka::params::install_dir => "/opt/kafka-${scala_version}-${kafka_version}", # else, we just take whatever was supplied: default => $install_dir, } file { $package_dir: ensure => directory, owner => $user_name, group => $group_name, require => [ Group[$group_name], User[$user_name], ], } file { $install_directory: ensure => directory, owner => $user_name, group => $group_name, mode => $install_mode, require => [ Group[$group_name], User[$user_name], ], } file { '/opt/kafka': ensure => link, target => $install_directory, require => File[$install_directory], } if $proxy_server == undef and $proxy_host != undef and $proxy_port != undef { $final_proxy_server = "${proxy_host}:${proxy_port}" } else { $final_proxy_server = $proxy_server } archive { "${package_dir}/${basefilename}": ensure => present, extract => true, extract_command => 'tar xfz %s --strip-components=1', extract_path => $install_directory, source => $source, creates => "${install_directory}/config", cleanup => true, proxy_server => $final_proxy_server, proxy_type => $proxy_type, user => $user_name, group => $group_name, require => [ File[$package_dir], File[$install_directory], Group[$group_name], User[$user_name], ], before => File[$config_dir], } } else { package { $package_name: ensure => $package_ensure, before => File[$config_dir], } } } diff --git a/manifests/mirror.pp b/manifests/mirror.pp index 80d156e..0ceddd1 100644 --- a/manifests/mirror.pp +++ b/manifests/mirror.pp @@ -1,157 +1,156 @@ # @summary # This class handles the Kafka (mirror). # # @example Basic usage # class { 'kafka::mirror': # consumer_config => { # 'group.id' => 'kafka-mirror', # 'zookeeper.connect' => 'localhost:2181' # }, # producer_config => { # 'zookeeper.connect' => 'localhost:2181', # }, # service_config => { # 'whitelist' => '.*', # } # } # # @param kafka_version # The version of Kafka that should be installed. # # @param scala_version # The scala version what Kafka was built with. # # @param install_dir # The directory to install Kafka to. # # @param mirror_url # The url where the Kafka is downloaded from. # # @param manage_java # Install java if it's not already installed. # # @param package_dir # The directory to install Kafka. # # @param package_name # Package name, when installing Kafka from a package. # # @param package_ensure # Package version or ensure state, when installing Kafka from a package. # # @param user_name # User to run Kafka as. # # @param user_shell # Login shell of the Kafka user. # # @param group_name # Group to run Kafka as. # # @param user_id # Create the Kafka user with this ID. # # @param group_id # Create the Kafka group with this ID. # # @param manage_user # Create the Kafka user if it's not already present. # # @param manage_group # Create the Kafka group if it's not already present. # # @param config_mode # The permissions for the config files. # # @param config_dir # The directory to create the Kafka config files to. # # @param log_dir # The directory for Kafka log files. # # @param bin_dir # The directory where the Kafka scripts are. # # @param service_name # Set the name of the service. # # @param manage_service # Install the init.d or systemd service. # # @param service_ensure # Set the ensure state of the service. # # @param service_restart # Whether the configuration files should trigger a service restart. # # @param service_requires # Set the list of services required to be running before Kafka. # # @param limit_nofile # Set the 'LimitNOFILE' option of the systemd service. # # @param limit_core # Set the 'LimitCORE' option of the systemd service. # # @param env # A hash of the environment variables to set. # # @param consumer_config # A hash of the consumer configuration options. # # @param producer_config # A hash of the producer configuration options. # # @param service_config # A hash of the mirror script options. # # @param heap_opts # Set the Java heap size. # # @param jmx_opts # Set the JMX options. # # @param log4j_opts # Set the Log4j options. # class kafka::mirror ( String[1] $kafka_version = $kafka::params::kafka_version, String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String[1]] $package_name = $kafka::params::package_name, String[1] $package_ensure = $kafka::params::package_ensure, String[1] $user_name = $kafka::params::user_name, Stdlib::Absolutepath $user_shell = $kafka::params::user_shell, String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String[1] $service_name = 'kafka-mirror', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String[1]] $service_requires = $kafka::params::service_requires, Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, Hash[String[1],String[1]] $consumer_config = {}, Hash[String[1],String[1]] $producer_config = {}, Hash[String[1],String[1]] $service_config = {}, String[1] $heap_opts = $kafka::params::mirror_heap_opts, String[1] $jmx_opts = $kafka::params::mirror_jmx_opts, String[1] $log4j_opts = $kafka::params::mirror_log4j_opts, ) inherits kafka::params { - class { 'kafka::mirror::install': } -> class { 'kafka::mirror::config': } -> class { 'kafka::mirror::service': } -> Class['kafka::mirror'] } diff --git a/manifests/mirror/config.pp b/manifests/mirror/config.pp index 38a667a..9bd1bd2 100644 --- a/manifests/mirror/config.pp +++ b/manifests/mirror/config.pp @@ -1,51 +1,50 @@ # @summary # This class handles the Kafka (mirror) config. # # @api private # -class kafka::mirror::config( +class kafka::mirror::config ( Boolean $manage_service = $kafka::mirror::manage_service, String[1] $service_name = $kafka::mirror::service_name, Boolean $service_restart = $kafka::mirror::service_restart, Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, String[1] $user_name = $kafka::mirror::user_name, String[1] $group_name = $kafka::mirror::group_name, Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, ) { - assert_private() if $consumer_config['group.id'] == '' { fail('[Consumer] You need to specify a value for group.id') } if $consumer_config['zookeeper.connect'] == '' { fail('[Consumer] You need to specify a value for zookeeper.connect') } if $producer_config['bootstrap.servers'] == '' { fail('[Producer] You need to specify a value for bootstrap.servers') } class { 'kafka::consumer::config': manage_service => $manage_service, service_name => $service_name, service_restart => $service_restart, config => $consumer_config, config_dir => $config_dir, user_name => $user_name, group_name => $group_name, config_mode => $config_mode, } class { 'kafka::producer::config': manage_service => $manage_service, service_name => $service_name, service_restart => $service_restart, config => $producer_config, config_dir => $config_dir, user_name => $user_name, group_name => $group_name, config_mode => $config_mode, } } diff --git a/manifests/mirror/install.pp b/manifests/mirror/install.pp index d243aa4..f310bda 100644 --- a/manifests/mirror/install.pp +++ b/manifests/mirror/install.pp @@ -1,31 +1,30 @@ # @summary # This class handles the Kafka (mirror) package. # # @api private # class kafka::mirror::install { - assert_private() if !defined(Class['kafka']) { class { 'kafka': manage_java => $kafka::mirror::manage_java, manage_group => $kafka::mirror::manage_group, group_id => $kafka::mirror::group_id, group_name => $kafka::mirror::group_name, manage_user => $kafka::mirror::manage_user, user_id => $kafka::mirror::user_id, user_name => $kafka::mirror::user_name, user_shell => $kafka::mirror::user_shell, config_dir => $kafka::mirror::config_dir, log_dir => $kafka::mirror::log_dir, mirror_url => $kafka::mirror::mirror_url, kafka_version => $kafka::mirror::kafka_version, scala_version => $kafka::mirror::scala_version, install_dir => $kafka::mirror::install_dir, package_dir => $kafka::mirror::package_dir, package_ensure => $kafka::mirror::package_ensure, package_name => $kafka::mirror::package_name, } } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index 015c912..91e9cd8 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,69 +1,68 @@ # @summary # This class handles the Kafka (mirror) service. # # @api private # -class kafka::mirror::service( +class kafka::mirror::service ( Boolean $manage_service = $kafka::mirror::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, String[1] $service_name = $kafka::mirror::service_name, String[1] $user_name = $kafka::mirror::user_name, String[1] $group_name = $kafka::mirror::group_name, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, Array[String[1]] $service_requires = $kafka::mirror::service_requires, Optional[String[1]] $limit_nofile = $kafka::mirror::limit_nofile, Optional[String[1]] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, Hash[String[1],String[1]] $service_config = $kafka::mirror::service_config, String[1] $heap_opts = $kafka::mirror::heap_opts, String[1] $jmx_opts = $kafka::mirror::jmx_opts, String[1] $log4j_opts = $kafka::mirror::log4j_opts, ) { - assert_private() if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/producer.pp b/manifests/producer.pp index 6981717..45f7561 100644 --- a/manifests/producer.pp +++ b/manifests/producer.pp @@ -1,147 +1,146 @@ # @summary # This class handles the Kafka (producer). # # @example Basic usage # class { 'kafka::producer': # config => { # 'client.id' => '0', # 'zookeeper.connect' => 'localhost:2181' # } # } # # @param input # Set named pipe as input. # # @param kafka_version # The version of Kafka that should be installed. # # @param scala_version # The scala version what Kafka was built with. # # @param install_dir # The directory to install Kafka to. # # @param mirror_url # The url where the Kafka is downloaded from. # # @param manage_java # Install java if it's not already installed. # # @param package_dir # The directory to install Kafka. # # @param package_name # Package name, when installing Kafka from a package. # # @param package_ensure # Package version or ensure state, when installing Kafka from a package. # # @param user_name # User to run Kafka as. # # @param user_shell # Login shell of the Kafka user. # # @param group_name # Group to run Kafka as. # # @param user_id # Create the Kafka user with this ID. # # @param group_id # Create the Kafka group with this ID. # # @param manage_user # Create the Kafka user if it's not already present. # # @param manage_group # Create the Kafka group if it's not already present. # # @param config_mode # The permissions for the config files. # # @param config_dir # The directory to create the Kafka config files to. # # @param log_dir # The directory for Kafka log files. # # @param bin_dir # The directory where the Kafka scripts are. # # @param service_name # Set the name of the service. # # @param manage_service # Install the init.d or systemd service. # # @param service_ensure # Set the ensure state of the service. # # @param service_restart # Whether the configuration files should trigger a service restart. # # @param service_requires # Set the list of services required to be running before Kafka. # # @param limit_nofile # Set the 'LimitNOFILE' option of the systemd service. # # @param limit_core # Set the 'LimitCORE' option of the systemd service. # # @param env # A hash of the environment variables to set. # # @param config # A hash of the producer configuration options. # # @param service_config # A hash of the `kafka-console-producer.sh` script options. # # @param jmx_opts # Set the JMX options. # # @param log4j_opts # Set the Log4j options. # class kafka::producer ( Optional[String[1]] $input, String[1] $kafka_version = $kafka::params::kafka_version, String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, Optional[String[1]] $package_name = $kafka::params::package_name, String[1] $package_ensure = $kafka::params::package_ensure, String[1] $user_name = $kafka::params::user_name, Stdlib::Absolutepath $user_shell = $kafka::params::user_shell, String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, String[1] $service_name = 'kafka-producer', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, Array[String[1]] $service_requires = $kafka::params::service_requires, Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, Hash[String[1], Any] $config = {}, Hash[String[1],String[1]] $service_config = {}, String[1] $jmx_opts = $kafka::params::producer_jmx_opts, String[1] $log4j_opts = $kafka::params::producer_log4j_opts, ) inherits kafka::params { - class { 'kafka::producer::install': } -> class { 'kafka::producer::config': } -> class { 'kafka::producer::service': } -> Class['kafka::producer'] } diff --git a/manifests/producer/config.pp b/manifests/producer/config.pp index 36a8c13..7869830 100644 --- a/manifests/producer/config.pp +++ b/manifests/producer/config.pp @@ -1,31 +1,30 @@ # @summary # This class handles the Kafka (producer) config. # -class kafka::producer::config( +class kafka::producer::config ( Boolean $manage_service = $kafka::producer::manage_service, String[1] $service_name = $kafka::producer::service_name, Boolean $service_restart = $kafka::producer::service_restart, Hash[String[1], Any] $config = $kafka::producer::config, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, String[1] $user_name = $kafka::producer::user_name, String[1] $group_name = $kafka::producer::group_name, Stdlib::Filemode $config_mode = $kafka::producer::config_mode, ) { - if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'producerconfigs' file { "${config_dir}/producer.properties": - ensure => present, + ensure => file, owner => $user_name, group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/producer/install.pp b/manifests/producer/install.pp index fbb7a2c..6200791 100644 --- a/manifests/producer/install.pp +++ b/manifests/producer/install.pp @@ -1,31 +1,30 @@ # @summary # This class handles the Kafka (producer) package. # # @api private # class kafka::producer::install { - assert_private() if !defined(Class['kafka']) { class { 'kafka': manage_java => $kafka::producer::manage_java, manage_group => $kafka::producer::manage_group, group_id => $kafka::producer::group_id, group_name => $kafka::producer::group_name, manage_user => $kafka::producer::manage_user, user_id => $kafka::producer::user_id, user_name => $kafka::producer::user_name, user_shell => $kafka::producer::user_shell, config_dir => $kafka::producer::config_dir, log_dir => $kafka::producer::log_dir, mirror_url => $kafka::producer::mirror_url, kafka_version => $kafka::producer::kafka_version, scala_version => $kafka::producer::scala_version, install_dir => $kafka::producer::install_dir, package_dir => $kafka::producer::package_dir, package_ensure => $kafka::producer::package_ensure, package_name => $kafka::producer::package_name, } } } diff --git a/manifests/producer/service.pp b/manifests/producer/service.pp index 7d8b4a9..89f44c9 100644 --- a/manifests/producer/service.pp +++ b/manifests/producer/service.pp @@ -1,60 +1,58 @@ # @summary # This class handles the Kafka (producer) service. # # @api private # -class kafka::producer::service( +class kafka::producer::service ( Boolean $manage_service = $kafka::producer::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, String[1] $service_name = $kafka::producer::service_name, String[1] $user_name = $kafka::producer::user_name, String[1] $group_name = $kafka::producer::group_name, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::producer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::producer::bin_dir, Array[String[1]] $service_requires = $kafka::producer::service_requires, Optional[String[1]] $limit_nofile = $kafka::producer::limit_nofile, Optional[String[1]] $limit_core = $kafka::producer::limit_core, Hash $env = $kafka::producer::env, Optional[String[1]] $input = $kafka::producer::input, String[1] $jmx_opts = $kafka::producer::jmx_opts, String[1] $log4j_opts = $kafka::producer::log4j_opts, Hash[String[1],String[1]] $service_config = $kafka::producer::service_config, ) { - assert_private() if $manage_service { - if $service_config['broker-list'] == '' { fail('[Producer] You need to specify a value for broker-list') } if $service_config['topic'] == '' { fail('[Producer] You need to specify a value for topic') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { fail('Console Producer is not supported on systemd, because the stdin of the process cannot be redirected') } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/topic.pp b/manifests/topic.pp index fc0f8d5..95c5bed 100644 --- a/manifests/topic.pp +++ b/manifests/topic.pp @@ -1,68 +1,67 @@ # @summary # This defined type handles the creation of Kafka topics. # # @example Basic usage # kafka::topic { 'test': # ensure => present, # zookeeper => 'localhost:2181', # replication_factor => 1, # partitions => 1, # } # # @param ensure # Should the topic be created. # # @param zookeeper # The connection string for the ZooKeeper connection in the form host:port. # Multiple hosts can be given to allow fail-over. # # @param replication_factor # The replication factor for each partition in the topic being created. If # not supplied, defaults to the cluster default. # # @param partitions # The number of partitions for the topic being created or altered. If not # supplied for create, defaults to the cluster default. # # @param bin_dir # The directory where the file kafka-topics.sh is located. # # @param config # A topic configuration override for the topic being created or altered. # See the Kafka documentation for full details on the topic configs. # -define kafka::topic( +define kafka::topic ( String[1] $ensure = '', String[1] $zookeeper = '', Variant[Integer,String[1]] $replication_factor = 1, Variant[Integer,String[1]] $partitions = 1, String[1] $bin_dir = '/opt/kafka/bin', Optional[Hash[String[1],String[1]]] $config = undef, ) { - if is_string($replication_factor) { deprecation('kafka::topic', 'Please use Integer type, not String, for paramter replication_factor') } if is_string($partitions) { deprecation('kafka::topic', 'Please use Integer type, not String, for paramter partitions') } $_zookeeper = "--zookeeper ${zookeeper}" $_replication_factor = "--replication-factor ${replication_factor}" $_partitions = "--partitions ${partitions}" if $config { $_config_array = $config.map |$key, $value| { "--config ${key}=${value}" } $_config = join($_config_array, ' ') } else { $_config = '' } if $ensure == 'present' { exec { "create topic ${name}": path => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}", command => "kafka-topics.sh --create ${_zookeeper} ${_replication_factor} ${_partitions} --topic ${name} ${_config}", unless => "kafka-topics.sh --list ${_zookeeper} | grep -x ${name}", } } }