diff --git a/manifests/broker.pp b/manifests/broker.pp index e6b84ca..b52347a 100644 --- a/manifests/broker.pp +++ b/manifests/broker.pp @@ -1,155 +1,155 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker # # This class will install kafka with the broker role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # # [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # # [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # # [*user_name*] # User to run kafka as. # # [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # # [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*timeout_stop*] # Set the 'TimeoutStopSec' option of the systemd service. # # [*exec_stop*] # Set the 'ExecStop' option of the systemd service to 'kafka-server-stop.sh'. # # [*daemon_start*] # Use the '-daemon' option when starting Kafka with 'kafka-server-start.sh'. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the configuration options. # # === Examples # # Create a single broker instance which talks to a local zookeeper instance. # # class { 'kafka::broker': # config => { 'broker.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::broker ( - String $kafka_version = $kafka::params::kafka_version, - String $scala_version = $kafka::params::scala_version, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user_name = $kafka::params::user_name, - String $group_name = $kafka::params::group_name, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka', + String[1] $service_name = 'kafka', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, - Optional[String] $timeout_stop = $kafka::params::timeout_stop, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, + Optional[String[1]] $timeout_stop = $kafka::params::timeout_stop, Boolean $exec_stop = $kafka::params::exec_stop, Boolean $daemon_start = $kafka::params::daemon_start, Hash $env = {}, - Hash $config = {}, - String $heap_opts = $kafka::params::broker_heap_opts, - String $jmx_opts = $kafka::params::broker_jmx_opts, - String $log4j_opts = $kafka::params::broker_log4j_opts, - $opts = $kafka::params::broker_opts, + Hash[String[1], Any] $config = {}, + String[1] $heap_opts = $kafka::params::broker_heap_opts, + String[1] $jmx_opts = $kafka::params::broker_jmx_opts, + String[1] $log4j_opts = $kafka::params::broker_log4j_opts, + String[0] $opts = $kafka::params::broker_opts, ) inherits kafka::params { class { 'kafka::broker::install': } -> class { 'kafka::broker::config': } -> class { 'kafka::broker::service': } -> Class['kafka::broker'] } diff --git a/manifests/broker/config.pp b/manifests/broker/config.pp index 04f9e47..32b62d0 100644 --- a/manifests/broker/config.pp +++ b/manifests/broker/config.pp @@ -1,39 +1,39 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::config # # This private class is meant to be called from `kafka::broker`. # It manages the broker config files # class kafka::broker::config( Boolean $manage_service = $kafka::broker::manage_service, - String $service_name = $kafka::broker::service_name, + String[1] $service_name = $kafka::broker::service_name, Boolean $service_restart = $kafka::broker::service_restart, - Hash $config = $kafka::broker::config, + Hash[String[1], Any] $config = $kafka::broker::config, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, - String $user_name = $kafka::broker::user_name, - String $group_name = $kafka::broker::group_name, + String[1] $user_name = $kafka::broker::user_name, + String[1] $group_name = $kafka::broker::group_name, Stdlib::Filemode $config_mode = $kafka::broker::config_mode, ) { assert_private() if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'brokerconfigs' file { "${config_dir}/server.properties": ensure => present, owner => $user_name, group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index 7a021dc..d6e12d3 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,77 +1,77 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker::service # # This private class is meant to be called from `kafka::broker`. # It manages the kafka service # class kafka::broker::service( Boolean $manage_service = $kafka::broker::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, - String $service_name = $kafka::broker::service_name, - String $user_name = $kafka::broker::user_name, - String $group_name = $kafka::broker::group_name, + String[1] $service_name = $kafka::broker::service_name, + String[1] $user_name = $kafka::broker::user_name, + String[1] $group_name = $kafka::broker::group_name, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, - Array[String] $service_requires = $kafka::broker::service_requires, - Optional[String] $limit_nofile = $kafka::broker::limit_nofile, - Optional[String] $limit_core = $kafka::broker::limit_core, - Optional[String] $timeout_stop = $kafka::broker::timeout_stop, + Array[String[1]] $service_requires = $kafka::broker::service_requires, + Optional[String[1]] $limit_nofile = $kafka::broker::limit_nofile, + Optional[String[1]] $limit_core = $kafka::broker::limit_core, + Optional[String[1]] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, - String $heap_opts = $kafka::broker::heap_opts, - String $jmx_opts = $kafka::broker::jmx_opts, - String $log4j_opts = $kafka::broker::log4j_opts, - $opts = $kafka::broker::opts, + String[1] $heap_opts = $kafka::broker::heap_opts, + String[1] $jmx_opts = $kafka::broker::jmx_opts, + String[1] $log4j_opts = $kafka::broker::log4j_opts, + String[0] $opts = $kafka::broker::opts, ) { assert_private() if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer.pp b/manifests/consumer.pp index 682bdda..6c6bd94 100644 --- a/manifests/consumer.pp +++ b/manifests/consumer.pp @@ -1,144 +1,144 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer # # This class will install kafka with the consumer role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # # [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # # [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # # [*user_name*] # User to run kafka as. # # [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # # [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the consumer configuration options. # # [*service_config*] # A hash of the `kafka-console-consumer.sh` script options. # # === Examples # # Create the consumer service connecting to a local zookeeper # # class { 'kafka::consumer': # config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } class kafka::consumer ( - String $kafka_version = $kafka::params::kafka_version, - String $scala_version = $kafka::params::scala_version, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user_name = $kafka::params::user_name, - String $group_name = $kafka::params::group_name, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka-consumer', + String[1] $service_name = 'kafka-consumer', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, - Hash $config = {}, - Hash $service_config = {}, - String $jmx_opts = $kafka::params::consumer_jmx_opts, - String $log4j_opts = $kafka::params::consumer_log4j_opts, + Hash[String[1], Any] $config = {}, + Hash[String[1],String[1]] $service_config = {}, + String[1] $jmx_opts = $kafka::params::consumer_jmx_opts, + String[1] $log4j_opts = $kafka::params::consumer_log4j_opts, ) inherits kafka::params { class { 'kafka::consumer::install': } -> class { 'kafka::consumer::config': } -> class { 'kafka::consumer::service': } -> Class['kafka::consumer'] } diff --git a/manifests/consumer/config.pp b/manifests/consumer/config.pp index daa4e33..b2d62ed 100644 --- a/manifests/consumer/config.pp +++ b/manifests/consumer/config.pp @@ -1,37 +1,37 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::config # # This private class is meant to be called from `kafka::consumer`. # It manages the consumer config files # class kafka::consumer::config( Boolean $manage_service = $kafka::consumer::manage_service, - String $service_name = $kafka::consumer::service_name, + String[1] $service_name = $kafka::consumer::service_name, Boolean $service_restart = $kafka::consumer::service_restart, - Hash $config = $kafka::consumer::config, + Hash[String[1], Any] $config = $kafka::consumer::config, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, - String $user_name = $kafka::consumer::user_name, - String $group_name = $kafka::consumer::group_name, + String[1] $user_name = $kafka::consumer::user_name, + String[1] $group_name = $kafka::consumer::group_name, Stdlib::Filemode $config_mode = $kafka::consumer::config_mode, ) { if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'consumerconfigs' file { "${config_dir}/consumer.properties": ensure => present, owner => $user_name, group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index 6017cfc..da8e60a 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,78 +1,78 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer::service # # This private class is meant to be called from `kafka::consumer`. # It manages the kafka-consumer service # class kafka::consumer::service( Boolean $manage_service = $kafka::consumer::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, - String $service_name = $kafka::consumer::service_name, - String $user_name = $kafka::consumer::user_name, - String $group_name = $kafka::consumer::group_name, + String[1] $service_name = $kafka::consumer::service_name, + String[1] $user_name = $kafka::consumer::user_name, + String[1] $group_name = $kafka::consumer::group_name, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, - Array[String] $service_requires = $kafka::consumer::service_requires, - Optional[String] $limit_nofile = $kafka::consumer::limit_nofile, - Optional[String] $limit_core = $kafka::consumer::limit_core, + Array[String[1]] $service_requires = $kafka::consumer::service_requires, + Optional[String[1]] $limit_nofile = $kafka::consumer::limit_nofile, + Optional[String[1]] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, - String $jmx_opts = $kafka::consumer::jmx_opts, - String $log4j_opts = $kafka::consumer::log4j_opts, - Hash $service_config = $kafka::consumer::service_config, + String[1] $jmx_opts = $kafka::consumer::jmx_opts, + String[1] $log4j_opts = $kafka::consumer::log4j_opts, + Hash[String[1],String[1]] $service_config = $kafka::consumer::service_config, ) { assert_private() if $manage_service { if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/init.pp b/manifests/init.pp index 6bf0f8f..a9e1fa8 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -1,230 +1,230 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka # # This class will install kafka binaries # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # # [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # # [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # # [*user_name*] # User to run kafka as. # # [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*system_user*] # Whether the kafka user is a system user or not. # # [*group_id*] # Create the kafka group with this ID. # # [*system_group*] # Whether the kafka group is a system group or not. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # === Examples # # class kafka ( - String $kafka_version = $kafka::params::kafka_version, - String $scala_version = $kafka::params::scala_version, - Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, - Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $manage_java = $kafka::params::manage_java, - Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - Optional[String] $mirror_subpath = $kafka::params::mirror_subpath, - Optional[String] $proxy_server = $kafka::params::proxy_server, - Optional[String] $proxy_port = $kafka::params::proxy_port, - Optional[String] $proxy_host = $kafka::params::proxy_host, - Optional[String] $proxy_type = $kafka::params::proxy_type, - String $package_ensure = $kafka::params::package_ensure, - String $user_name = $kafka::params::user_name, - String $group_name = $kafka::params::group_name, - Boolean $system_user = $kafka::params::system_user, - Boolean $system_group = $kafka::params::system_group, - Optional[Integer] $user_id = $kafka::params::user_id, - Optional[Integer] $group_id = $kafka::params::group_id, - Boolean $manage_user = $kafka::params::manage_user, - Boolean $manage_group = $kafka::params::manage_group, - Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, - Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, - Optional[String] $install_mode = $kafka::params::install_mode, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, + Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, + Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, + Boolean $manage_java = $kafka::params::manage_java, + Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, + Optional[String[1]] $package_name = $kafka::params::package_name, + Optional[String[1]] $mirror_subpath = $kafka::params::mirror_subpath, + Optional[String[1]] $proxy_server = $kafka::params::proxy_server, + Optional[String[1]] $proxy_port = $kafka::params::proxy_port, + Optional[String[1]] $proxy_host = $kafka::params::proxy_host, + Optional[String[1]] $proxy_type = $kafka::params::proxy_type, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, + Boolean $system_user = $kafka::params::system_user, + Boolean $system_group = $kafka::params::system_group, + Optional[Integer] $user_id = $kafka::params::user_id, + Optional[Integer] $group_id = $kafka::params::group_id, + Boolean $manage_user = $kafka::params::manage_user, + Boolean $manage_group = $kafka::params::manage_group, + Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, + Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, + Optional[String[1]] $install_mode = $kafka::params::install_mode, ) inherits kafka::params { if $manage_java { class { 'java': distribution => 'jdk', } } if $manage_group { group { $group_name: ensure => present, gid => $group_id, system => $system_group, } } if $manage_user { user { $user_name: ensure => present, shell => '/bin/bash', require => Group[$group_name], uid => $user_id, system => $system_user, } } file { $config_dir: ensure => directory, owner => $user_name, group => $group_name, } file { $log_dir: ensure => directory, owner => $user_name, group => $group_name, require => [ Group[$group_name], User[$user_name], ], } if $package_name == undef { include archive $mirror_path = $mirror_subpath ? { # if mirror_subpath was not changed, # we adapt it for the version $kafka::params::mirror_subpath => "kafka/${kafka_version}", # else, we just take whatever was supplied: default => $mirror_subpath, } $basefilename = "kafka_${scala_version}-${kafka_version}.tgz" $package_url = "${mirror_url}${mirror_path}/${basefilename}" $source = $mirror_url ?{ /tgz$/ => $mirror_url, default => $package_url, } $install_directory = $install_dir ? { # if install_dir was not changed, # we adapt it for the scala_version and the version $kafka::params::install_dir => "/opt/kafka-${scala_version}-${kafka_version}", # else, we just take whatever was supplied: default => $install_dir, } file { $package_dir: ensure => directory, owner => $user_name, group => $group_name, require => [ Group[$group_name], User[$user_name], ], } file { $install_directory: ensure => directory, owner => $user_name, group => $group_name, mode => $install_mode, require => [ Group[$group_name], User[$user_name], ], } file { '/opt/kafka': ensure => link, target => $install_directory, require => File[$install_directory], } if $proxy_server == undef and $proxy_host != undef and $proxy_port != undef { $final_proxy_server = "${proxy_host}:${proxy_port}" } else { $final_proxy_server = $proxy_server } archive { "${package_dir}/${basefilename}": ensure => present, extract => true, extract_command => 'tar xfz %s --strip-components=1', extract_path => $install_directory, source => $source, creates => "${install_directory}/config", cleanup => true, proxy_server => $final_proxy_server, proxy_type => $proxy_type, user => $user_name, group => $group_name, require => [ File[$package_dir], File[$install_directory], Group[$group_name], User[$user_name], ], before => File[$config_dir], } } else { package { $package_name: ensure => $package_ensure, before => File[$config_dir], } } } diff --git a/manifests/mirror.pp b/manifests/mirror.pp index c503701..939b56e 100644 --- a/manifests/mirror.pp +++ b/manifests/mirror.pp @@ -1,147 +1,147 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror # # This class will install kafka with the mirror role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # # [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # # [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # # [*user_name*] # User to run kafka as. # # [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # # [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*consumer_config*] # A hash of the consumer configuration options. # # [*producer_config*] # A hash of the producer configuration options. # # [*service_config*] # A hash of the mirror script options. # # === Examples # # Create the mirror service connecting to a local zookeeper # # class { 'kafka::mirror': # consumer_config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::mirror ( - String $kafka_version = $kafka::params::kafka_version, - String $scala_version = $kafka::params::scala_version, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user_name = $kafka::params::user_name, - String $group_name = $kafka::params::group_name, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka-mirror', + String[1] $service_name = 'kafka-mirror', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, - Hash $consumer_config = {}, - Hash $producer_config = {}, - Hash $service_config = {}, - String $heap_opts = $kafka::params::mirror_heap_opts, - String $jmx_opts = $kafka::params::mirror_jmx_opts, - String $log4j_opts = $kafka::params::mirror_log4j_opts, + Hash[String[1],String[1]] $consumer_config = {}, + Hash[String[1],String[1]] $producer_config = {}, + Hash[String[1],String[1]] $service_config = {}, + String[1] $heap_opts = $kafka::params::mirror_heap_opts, + String[1] $jmx_opts = $kafka::params::mirror_jmx_opts, + String[1] $log4j_opts = $kafka::params::mirror_log4j_opts, ) inherits kafka::params { class { 'kafka::mirror::install': } -> class { 'kafka::mirror::config': } -> class { 'kafka::mirror::service': } -> Class['kafka::mirror'] } diff --git a/manifests/mirror/config.pp b/manifests/mirror/config.pp index 275c727..076bfe6 100644 --- a/manifests/mirror/config.pp +++ b/manifests/mirror/config.pp @@ -1,55 +1,55 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::config # # This private class is meant to be called from `kafka::mirror`. # It manages the mirror-maker config files # class kafka::mirror::config( - Boolean $manage_service = $kafka::mirror::manage_service, - String $service_name = $kafka::mirror::service_name, - Boolean $service_restart = $kafka::mirror::service_restart, - Hash $consumer_config = $kafka::mirror::consumer_config, - Hash $producer_config = $kafka::mirror::producer_config, - Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, - String $user_name = $kafka::mirror::user_name, - String $group_name = $kafka::mirror::group_name, - Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, + Boolean $manage_service = $kafka::mirror::manage_service, + String[1] $service_name = $kafka::mirror::service_name, + Boolean $service_restart = $kafka::mirror::service_restart, + Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, + Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, + Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, + String[1] $user_name = $kafka::mirror::user_name, + String[1] $group_name = $kafka::mirror::group_name, + Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, ) { assert_private() if $consumer_config['group.id'] == '' { fail('[Consumer] You need to specify a value for group.id') } if $consumer_config['zookeeper.connect'] == '' { fail('[Consumer] You need to specify a value for zookeeper.connect') } if $producer_config['bootstrap.servers'] == '' { fail('[Producer] You need to specify a value for bootstrap.servers') } class { 'kafka::consumer::config': manage_service => $manage_service, service_name => $service_name, service_restart => $service_restart, config => $consumer_config, config_dir => $config_dir, user_name => $user_name, group_name => $group_name, config_mode => $config_mode, } class { 'kafka::producer::config': manage_service => $manage_service, service_name => $service_name, service_restart => $service_restart, config => $producer_config, config_dir => $config_dir, user_name => $user_name, group_name => $group_name, config_mode => $config_mode, } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index 4b86c21..834172e 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,74 +1,74 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror::service # # This private class is meant to be called from `kafka::mirror`. # It manages the kafka-mirror service # class kafka::mirror::service( Boolean $manage_service = $kafka::mirror::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, - String $service_name = $kafka::mirror::service_name, - String $user_name = $kafka::mirror::user_name, - String $group_name = $kafka::mirror::group_name, + String[1] $service_name = $kafka::mirror::service_name, + String[1] $user_name = $kafka::mirror::user_name, + String[1] $group_name = $kafka::mirror::group_name, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, - Array[String] $service_requires = $kafka::mirror::service_requires, - Optional[String] $limit_nofile = $kafka::mirror::limit_nofile, - Optional[String] $limit_core = $kafka::mirror::limit_core, + Array[String[1]] $service_requires = $kafka::mirror::service_requires, + Optional[String[1]] $limit_nofile = $kafka::mirror::limit_nofile, + Optional[String[1]] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, - Hash $consumer_config = $kafka::mirror::consumer_config, - Hash $producer_config = $kafka::mirror::producer_config, - Hash $service_config = $kafka::mirror::service_config, - String $heap_opts = $kafka::mirror::heap_opts, - String $jmx_opts = $kafka::mirror::jmx_opts, - String $log4j_opts = $kafka::mirror::log4j_opts, + Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, + Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, + Hash[String[1],String[1]] $service_config = $kafka::mirror::service_config, + String[1] $heap_opts = $kafka::mirror::heap_opts, + String[1] $jmx_opts = $kafka::mirror::jmx_opts, + String[1] $log4j_opts = $kafka::mirror::log4j_opts, ) { assert_private() if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/producer.pp b/manifests/producer.pp index 0c8cb53..7f46b3e 100644 --- a/manifests/producer.pp +++ b/manifests/producer.pp @@ -1,146 +1,146 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer # # This class will install kafka with the producer role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # # [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # # [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # # [*user_name*] # User to run kafka as. # # [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # # [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the producer configuration options. # # [*service_config*] # A hash of the `kafka-console-producer.sh` script options. # # === Examples # # Create the producer service connecting to a local zookeeper # # class { 'kafka::producer': # config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::producer ( - $input, - String $kafka_version = $kafka::params::kafka_version, - String $scala_version = $kafka::params::scala_version, + Optional[String[1]] $input, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user_name = $kafka::params::user_name, - String $group_name = $kafka::params::group_name, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka-producer', + String[1] $service_name = 'kafka-producer', Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, - Hash $config = {}, - Hash $service_config = {}, - String $jmx_opts = $kafka::params::producer_jmx_opts, - String $log4j_opts = $kafka::params::producer_log4j_opts, + Hash[String[1], Any] $config = {}, + Hash[String[1],String[1]] $service_config = {}, + String[1] $jmx_opts = $kafka::params::producer_jmx_opts, + String[1] $log4j_opts = $kafka::params::producer_log4j_opts, ) inherits kafka::params { class { 'kafka::producer::install': } -> class { 'kafka::producer::config': } -> class { 'kafka::producer::service': } -> Class['kafka::producer'] } diff --git a/manifests/producer/config.pp b/manifests/producer/config.pp index 8fc8d13..99b1173 100644 --- a/manifests/producer/config.pp +++ b/manifests/producer/config.pp @@ -1,37 +1,37 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::config # # This private class is meant to be called from `kafka::producer`. # It manages the producer config files # class kafka::producer::config( Boolean $manage_service = $kafka::producer::manage_service, - String $service_name = $kafka::producer::service_name, + String[1] $service_name = $kafka::producer::service_name, Boolean $service_restart = $kafka::producer::service_restart, - Hash $config = $kafka::producer::config, + Hash[String[1], Any] $config = $kafka::producer::config, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, - String $user_name = $kafka::producer::user_name, - String $group_name = $kafka::producer::group_name, + String[1] $user_name = $kafka::producer::user_name, + String[1] $group_name = $kafka::producer::group_name, Stdlib::Filemode $config_mode = $kafka::producer::config_mode, ) { if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'producerconfigs' file { "${config_dir}/producer.properties": ensure => present, owner => $user_name, group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/producer/service.pp b/manifests/producer/service.pp index ea4819a..18a0956 100644 --- a/manifests/producer/service.pp +++ b/manifests/producer/service.pp @@ -1,64 +1,64 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer::service # # This private class is meant to be called from `kafka::producer`. # It manages the kafka-producer service # class kafka::producer::service( Boolean $manage_service = $kafka::producer::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, - String $service_name = $kafka::producer::service_name, - String $user_name = $kafka::producer::user_name, - String $group_name = $kafka::producer::group_name, + String[1] $service_name = $kafka::producer::service_name, + String[1]$user_name = $kafka::producer::user_name, + String[1] $group_name = $kafka::producer::group_name, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::producer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::producer::bin_dir, - Array[String] $service_requires = $kafka::producer::service_requires, - Optional[String] $limit_nofile = $kafka::producer::limit_nofile, - Optional[String] $limit_core = $kafka::producer::limit_core, + Array[String[1]] $service_requires = $kafka::producer::service_requires, + Optional[String[1]] $limit_nofile = $kafka::producer::limit_nofile, + Optional[String[1]] $limit_core = $kafka::producer::limit_core, Hash $env = $kafka::producer::env, - $input = $kafka::producer::input, - String $jmx_opts = $kafka::producer::jmx_opts, - String $log4j_opts = $kafka::producer::log4j_opts, - Hash $service_config = $kafka::producer::service_config, + Optional[String[1]] $input = $kafka::producer::input, + String[1] $jmx_opts = $kafka::producer::jmx_opts, + String[1] $log4j_opts = $kafka::producer::log4j_opts, + Hash[String[1],String[1]] $service_config = $kafka::producer::service_config, ) { assert_private() if $manage_service { if $service_config['broker-list'] == '' { fail('[Producer] You need to specify a value for broker-list') } if $service_config['topic'] == '' { fail('[Producer] You need to specify a value for topic') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $facts['service_provider'] == 'systemd' { fail('Console Producer is not supported on systemd, because the stdin of the process cannot be redirected') } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/topic.pp b/manifests/topic.pp index 52b1699..61b452e 100644 --- a/manifests/topic.pp +++ b/manifests/topic.pp @@ -1,43 +1,43 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Define: kafka::topic # # This defined type is used to manage the creation of kafka topics. # define kafka::topic( - String $ensure = '', - String $zookeeper = '', - Variant[Integer,String] $replication_factor = 1, - Variant[Integer,String] $partitions = 1, - String $bin_dir = '/opt/kafka/bin', - Optional[Hash[String,String]] $config = undef, + String[1] $ensure = '', + String[1] $zookeeper = '', + Variant[Integer,String[1]] $replication_factor = 1, + Variant[Integer,String[1]] $partitions = 1, + String[1] $bin_dir = '/opt/kafka/bin', + Optional[Hash[String[1],String[1]]] $config = undef, ) { if is_string($replication_factor) { deprication('kafka::topic', 'Please use Integer type, not String, for paramter replication_factor') } if is_string($partitions) { deprication('kafka::topic', 'Please use Integer type, not String, for paramter partitions') } $_zookeeper = "--zookeeper ${zookeeper}" $_replication_factor = "--replication-factor ${replication_factor}" $_partitions = "--partitions ${partitions}" if $config { $_config_array = $config.map |$key, $value| { "--config ${key}=${value}" } $_config = join($_config_array, ' ') } else { $_config = '' } if $ensure == 'present' { exec { "create topic ${name}": path => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}", command => "kafka-topics.sh --create ${_zookeeper} ${_replication_factor} ${_partitions} --topic ${name} ${_config}", unless => "kafka-topics.sh --list ${_zookeeper} | grep -x ${name}", } } }