diff --git a/manifests/broker/config.pp b/manifests/broker/config.pp index 971ff12..9b7fed9 100644 --- a/manifests/broker/config.pp +++ b/manifests/broker/config.pp @@ -1,39 +1,35 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::broker::config +# @summary +# This class handles the Kafka (broker) config. # -# This private class is meant to be called from `kafka::broker`. -# It manages the broker config files +# @api private # class kafka::broker::config( Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, String $service_name = $kafka::broker::service_name, Boolean $service_install = $kafka::broker::service_install, Boolean $service_restart = $kafka::broker::service_restart, Hash $config = $kafka::broker::config, Stdlib::Filemode $config_mode = $kafka::broker::config_mode, String $user = $kafka::broker::user, String $group = $kafka::broker::group, ) { assert_private() if ($service_install and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'brokerconfigs' file { "${config_dir}/server.properties": ensure => present, owner => $user, group => $group, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/broker/install.pp b/manifests/broker/install.pp index 4856046..498b140 100644 --- a/manifests/broker/install.pp +++ b/manifests/broker/install.pp @@ -1,34 +1,30 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::broker::install +# @summary +# This class handles the Kafka (broker) package. # -# This private class is meant to be called from `kafka::broker`. -# It downloads the package and installs it. +# @api private # class kafka::broker::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::broker::version, scala_version => $kafka::broker::scala_version, install_dir => $kafka::broker::install_dir, mirror_url => $kafka::broker::mirror_url, install_java => $kafka::broker::install_java, package_dir => $kafka::broker::package_dir, package_name => $kafka::broker::package_name, package_ensure => $kafka::broker::package_ensure, user => $kafka::broker::user, group => $kafka::broker::group, user_id => $kafka::broker::user_id, group_id => $kafka::broker::group_id, manage_user => $kafka::broker::manage_user, manage_group => $kafka::broker::manage_group, config_dir => $kafka::broker::config_dir, log_dir => $kafka::broker::log_dir, } } } diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index a3a70e0..3af8bed 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,77 +1,72 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::broker::service +# @summary +# This class handles the Kafka (broker) service. # -# This private class is meant to be called from `kafka::broker`. -# It manages the kafka service +# @api private # class kafka::broker::service( String $user = $kafka::broker::user, String $group = $kafka::broker::group, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, String $service_name = $kafka::broker::service_name, Boolean $service_install = $kafka::broker::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, Array[String] $service_requires = $kafka::broker::service_requires, Optional[String] $limit_nofile = $kafka::broker::limit_nofile, Optional[String] $limit_core = $kafka::broker::limit_core, Optional[String] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, String $heap_opts = $kafka::broker::heap_opts, String $jmx_opts = $kafka::broker::jmx_opts, String $log4j_opts = $kafka::broker::log4j_opts, $opts = $kafka::broker::opts, ) { assert_private() if $service_install { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] - } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer/config.pp b/manifests/consumer/config.pp index 555834f..eeb9e5f 100644 --- a/manifests/consumer/config.pp +++ b/manifests/consumer/config.pp @@ -1,37 +1,31 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::consumer::config -# -# This private class is meant to be called from `kafka::consumer`. -# It manages the consumer config files +# @summary +# This class handles the Kafka (consumer) config. # class kafka::consumer::config( Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, String $service_name = $kafka::consumer::service_name, Boolean $service_install = $kafka::consumer::service_install, Boolean $service_restart = $kafka::consumer::service_restart, Hash $config = $kafka::consumer::config, Stdlib::Filemode $config_mode = $kafka::consumer::config_mode, String $user = $kafka::consumer::user, String $group = $kafka::consumer::group, ) { if ($service_install and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'consumerconfigs' file { "${config_dir}/consumer.properties": ensure => present, owner => $user, group => $group, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/consumer/install.pp b/manifests/consumer/install.pp index b0fb26a..e845fcb 100644 --- a/manifests/consumer/install.pp +++ b/manifests/consumer/install.pp @@ -1,34 +1,30 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::consumer::install +# @summary +# This class handles the Kafka (consumer) package. # -# This private class is meant to be called from `kafka::consumer`. -# It downloads the package and installs it. +# @api private # class kafka::consumer::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::consumer::version, scala_version => $kafka::consumer::scala_version, install_dir => $kafka::consumer::install_dir, mirror_url => $kafka::consumer::mirror_url, install_java => $kafka::consumer::install_java, package_dir => $kafka::consumer::package_dir, package_name => $kafka::consumer::package_name, package_ensure => $kafka::consumer::package_ensure, user => $kafka::consumer::user, group => $kafka::consumer::group, user_id => $kafka::consumer::user_id, group_id => $kafka::consumer::group_id, manage_user => $kafka::consumer::manage_user, manage_group => $kafka::consumer::manage_group, config_dir => $kafka::consumer::config_dir, log_dir => $kafka::consumer::log_dir, } } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index 30df409..7764856 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,78 +1,72 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::consumer::service +# @summary +# This class handles the Kafka (consumer) service. # -# This private class is meant to be called from `kafka::consumer`. -# It manages the kafka-consumer service +# @api private # class kafka::consumer::service( String $user = $kafka::consumer::user, String $group = $kafka::consumer::group, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, String $service_name = $kafka::consumer::service_name, Boolean $service_install = $kafka::consumer::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, Array[String] $service_requires = $kafka::consumer::service_requires, Optional[String] $limit_nofile = $kafka::consumer::limit_nofile, Optional[String] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, String $jmx_opts = $kafka::consumer::jmx_opts, String $log4j_opts = $kafka::consumer::log4j_opts, Hash $service_config = $kafka::consumer::service_config, ) { assert_private() if $service_install { - if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] - } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/mirror/config.pp b/manifests/mirror/config.pp index 9916c00..b909bfc 100644 --- a/manifests/mirror/config.pp +++ b/manifests/mirror/config.pp @@ -1,55 +1,51 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::mirror::config +# @summary +# This class handles the Kafka (mirror) config. # -# This private class is meant to be called from `kafka::mirror`. -# It manages the mirror-maker config files +# @api private # class kafka::mirror::config( Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, String $service_name = $kafka::mirror::service_name, Boolean $service_install = $kafka::mirror::service_install, Boolean $service_restart = $kafka::mirror::service_restart, Hash $consumer_config = $kafka::mirror::consumer_config, Hash $producer_config = $kafka::mirror::producer_config, Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, String $user = $kafka::mirror::user, String $group = $kafka::mirror::group, ) { assert_private() if $consumer_config['group.id'] == '' { fail('[Consumer] You need to specify a value for group.id') } if $consumer_config['zookeeper.connect'] == '' { fail('[Consumer] You need to specify a value for zookeeper.connect') } if $producer_config['bootstrap.servers'] == '' { fail('[Producer] You need to specify a value for bootstrap.servers') } class { 'kafka::consumer::config': config_dir => $config_dir, config_mode => $config_mode, service_name => $service_name, service_install => $service_install, service_restart => $service_restart, config => $consumer_config, user => $user, group => $group, } class { 'kafka::producer::config': config_dir => $config_dir, config_mode => $config_mode, service_name => $service_name, service_install => $service_install, service_restart => $service_restart, config => $producer_config, user => $user, group => $group, } } diff --git a/manifests/mirror/install.pp b/manifests/mirror/install.pp index 9d9637a..e69ec25 100644 --- a/manifests/mirror/install.pp +++ b/manifests/mirror/install.pp @@ -1,34 +1,30 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::mirror::install +# @summary +# This class handles the Kafka (mirror) package. # -# This private class is meant to be called from `kafka::mirror`. -# It downloads the package and installs it. +# @api private # class kafka::mirror::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::mirror::version, scala_version => $kafka::mirror::scala_version, install_dir => $kafka::mirror::install_dir, mirror_url => $kafka::mirror::mirror_url, install_java => $kafka::mirror::install_java, package_dir => $kafka::mirror::package_dir, package_name => $kafka::mirror::package_name, package_ensure => $kafka::mirror::package_ensure, user => $kafka::mirror::user, group => $kafka::mirror::group, user_id => $kafka::mirror::user_id, group_id => $kafka::mirror::group_id, manage_user => $kafka::mirror::manage_user, manage_group => $kafka::mirror::manage_group, config_dir => $kafka::mirror::config_dir, log_dir => $kafka::mirror::log_dir, } } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index bb1333c..76cff20 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,74 +1,69 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::mirror::service +# @summary +# This class handles the Kafka (mirror) service. # -# This private class is meant to be called from `kafka::mirror`. -# It manages the kafka-mirror service +# @api private # class kafka::mirror::service( String $user = $kafka::mirror::user, String $group = $kafka::mirror::group, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, String $service_name = $kafka::mirror::service_name, Boolean $service_install = $kafka::mirror::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, Array[String] $service_requires = $kafka::mirror::service_requires, Optional[String] $limit_nofile = $kafka::mirror::limit_nofile, Optional[String] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, Hash $consumer_config = $kafka::mirror::consumer_config, Hash $producer_config = $kafka::mirror::producer_config, Hash $service_config = $kafka::mirror::service_config, String $heap_opts = $kafka::mirror::heap_opts, String $jmx_opts = $kafka::mirror::jmx_opts, String $log4j_opts = $kafka::mirror::log4j_opts, ) { assert_private() if $service_install { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] - } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/producer/config.pp b/manifests/producer/config.pp index fad8113..a078658 100644 --- a/manifests/producer/config.pp +++ b/manifests/producer/config.pp @@ -1,37 +1,31 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::producer::config -# -# This private class is meant to be called from `kafka::producer`. -# It manages the producer config files +# @summary +# This class handles the Kafka (producer) config. # class kafka::producer::config( Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, String $service_name = $kafka::producer::service_name, Boolean $service_install = $kafka::producer::service_install, Boolean $service_restart = $kafka::producer::service_restart, Hash $config = $kafka::producer::config, Stdlib::Filemode $config_mode = $kafka::producer::config_mode, String $user = $kafka::producer::user, String $group = $kafka::producer::group, ) { if ($service_install and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'producerconfigs' file { "${config_dir}/producer.properties": ensure => present, owner => $user, group => $group, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/producer/install.pp b/manifests/producer/install.pp index 78e0b6a..b25ee69 100644 --- a/manifests/producer/install.pp +++ b/manifests/producer/install.pp @@ -1,34 +1,30 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::producer::install +# @summary +# This class handles the Kafka (producer) package. # -# This private class is meant to be called from `kafka::producer`. -# It downloads the package and installs it. +# @api private # class kafka::producer::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': version => $kafka::producer::version, scala_version => $kafka::producer::scala_version, install_dir => $kafka::producer::install_dir, mirror_url => $kafka::producer::mirror_url, install_java => $kafka::producer::install_java, package_dir => $kafka::producer::package_dir, package_name => $kafka::producer::package_name, package_ensure => $kafka::producer::package_ensure, user => $kafka::producer::user, group => $kafka::producer::group, user_id => $kafka::producer::user_id, group_id => $kafka::producer::group_id, manage_user => $kafka::producer::manage_user, manage_group => $kafka::producer::manage_group, config_dir => $kafka::producer::config_dir, log_dir => $kafka::producer::log_dir, } } } diff --git a/manifests/producer/service.pp b/manifests/producer/service.pp index ef2f2dd..76d6cb6 100644 --- a/manifests/producer/service.pp +++ b/manifests/producer/service.pp @@ -1,64 +1,59 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Class: kafka::producer::service +# @summary +# This class handles the Kafka (producer) service. # -# This private class is meant to be called from `kafka::producer`. -# It manages the kafka-producer service +# @api private # class kafka::producer::service( String $user = $kafka::producer::user, String $group = $kafka::producer::group, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::producer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::producer::bin_dir, String $service_name = $kafka::producer::service_name, Boolean $service_install = $kafka::producer::service_install, Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, Array[String] $service_requires = $kafka::producer::service_requires, Optional[String] $limit_nofile = $kafka::producer::limit_nofile, Optional[String] $limit_core = $kafka::producer::limit_core, Hash $env = $kafka::producer::env, $input = $kafka::producer::input, String $jmx_opts = $kafka::producer::jmx_opts, String $log4j_opts = $kafka::producer::log4j_opts, Hash $service_config = $kafka::producer::service_config, ) { assert_private() if $service_install { - if $service_config['broker-list'] == '' { fail('[Producer] You need to specify a value for broker-list') } if $service_config['topic'] == '' { fail('[Producer] You need to specify a value for topic') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { fail('Console Producer is not supported on systemd, because the stdin of the process cannot be redirected') } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/topic.pp b/manifests/topic.pp index 52b1699..b16ba08 100644 --- a/manifests/topic.pp +++ b/manifests/topic.pp @@ -1,43 +1,68 @@ -# Author:: Liam Bennett (mailto:lbennett@opentable.com) -# Copyright:: Copyright (c) 2013 OpenTable Inc -# License:: MIT - -# == Define: kafka::topic +# @summary +# This defined type handles the creation of Kafka topics. +# +# @example Basic usage +# kafka::topic { 'test': +# ensure => present, +# zookeeper => 'localhost:2181', +# replication_factor => 1, +# partitions => 1, +# } +# +# @param ensure +# Should the topic be created. +# +# @param zookeeper +# The connection string for the ZooKeeper connection in the form host:port. +# Multiple hosts can be given to allow fail-over. +# +# @param replication_factor +# The replication factor for each partition in the topic being created. If +# not supplied, defaults to the cluster default. +# +# @param partitions +# The number of partitions for the topic being created or altered. If not +# supplied for create, defaults to the cluster default. +# +# @param bin_dir +# The directory where the file kafka-topics.sh is located. # -# This defined type is used to manage the creation of kafka topics. +# @param config +# A topic configuration override for the topic being created or altered. +# See the Kafka documentation for full details on the topic configs. # define kafka::topic( String $ensure = '', String $zookeeper = '', Variant[Integer,String] $replication_factor = 1, Variant[Integer,String] $partitions = 1, String $bin_dir = '/opt/kafka/bin', Optional[Hash[String,String]] $config = undef, ) { if is_string($replication_factor) { deprication('kafka::topic', 'Please use Integer type, not String, for paramter replication_factor') } if is_string($partitions) { deprication('kafka::topic', 'Please use Integer type, not String, for paramter partitions') } $_zookeeper = "--zookeeper ${zookeeper}" $_replication_factor = "--replication-factor ${replication_factor}" $_partitions = "--partitions ${partitions}" if $config { $_config_array = $config.map |$key, $value| { "--config ${key}=${value}" } $_config = join($_config_array, ' ') } else { $_config = '' } if $ensure == 'present' { exec { "create topic ${name}": path => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}", command => "kafka-topics.sh --create ${_zookeeper} ${_replication_factor} ${_partitions} --topic ${name} ${_config}", unless => "kafka-topics.sh --list ${_zookeeper} | grep -x ${name}", } } }