diff --git a/.github/settings.yml b/.github/settings.yml new file mode 100644 index 0000000..2dab8e3 --- /dev/null +++ b/.github/settings.yml @@ -0,0 +1,33 @@ +--- +repository: + private: false + has_issues: true + has_projects: false + has_wiki: false + has_downloads: true + default_branch: master + allow_squash_merge: true + allow_merge_commit: true + allow_rebase_merge: true + archived: false +branches: +- name: master + protection: + required_pull_request_reviews: null + required_status_checks: + strict: true + contexts: [] + enforce_admins: null + restrictions: null + required_signatures: true +- name: modulesync + protection: + required_pull_request_reviews: null + required_status_checks: + strict: true + contexts: [] + enforce_admins: null + restrictions: null + required_signatures: true + allow_force_pushes: true + allow_deletions: true diff --git a/.travis.yml b/.travis.yml index d9ae225..8c52ae2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,103 +1,104 @@ --- +os: linux dist: bionic language: ruby cache: bundler before_install: - yes | gem update --system - bundle --version script: - 'bundle exec rake $CHECK' -matrix: +jobs: fast_finish: true include: - rvm: 2.4.4 bundler_args: --without system_tests development release env: PUPPET_VERSION="~> 5.0" CHECK=test - rvm: 2.5.3 bundler_args: --without system_tests development release env: PUPPET_VERSION="~> 6.0" CHECK=test_with_coveralls - rvm: 2.5.3 bundler_args: --without system_tests development release env: PUPPET_VERSION="~> 6.0" CHECK=rubocop - rvm: 2.4.4 bundler_args: --without system_tests development release env: PUPPET_VERSION="~> 5.0" CHECK=build DEPLOY_TO_FORGE=yes - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=debian8-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=debian8-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=debian9-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=debian9-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=debian10-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=debian10-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=ubuntu1604-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=ubuntu1604-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=ubuntu1804-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=ubuntu1804-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=centos6-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=centos6-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet5 BEAKER_setfile=centos7-64 CHECK=beaker services: docker - rvm: 2.5.3 bundler_args: --without development release env: BEAKER_PUPPET_COLLECTION=puppet6 BEAKER_setfile=centos7-64 CHECK=beaker services: docker branches: only: - master - /^v\d/ notifications: email: false webhooks: https://voxpupu.li/incoming/travis irc: on_success: always on_failure: always channels: - "chat.freenode.org#voxpupuli-notifications" deploy: provider: puppetforge - user: puppet + username: puppet password: secure: "j/Db/NnuJUwyFWGVwZEciC/0Xrhaes647UK49ZnlZTjUppUeTsqY/rKE8Pc4jpiW8DsfeGijCYP1O02tquH+KSKSwiwxIBjbToFjhNNJ6Qgh0DGIR29VZkiyirh5ZkK1yLMx9Ciyn8opwOXHqTRMk6JwAY05Gux1sD2T7Eu2c4w=" on: tags: true # all_branches is required to use tags all_branches: true # Only publish the build marked with "DEPLOY_TO_FORGE" condition: "$DEPLOY_TO_FORGE = yes" diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 0000000..c4bafc7 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1 @@ +.github/settings.yml @voxpupuli/project-maintainers diff --git a/Gemfile b/Gemfile index 32c3114..87db4f9 100644 --- a/Gemfile +++ b/Gemfile @@ -1,48 +1,48 @@ source ENV['GEM_SOURCE'] || "https://rubygems.org" def location_for(place, fake_version = nil) if place =~ /^(git[:@][^#]*)#(.*)/ [fake_version, { :git => $1, :branch => $2, :require => false }].compact elsif place =~ /^file:\/\/(.*)/ ['>= 0', { :path => File.expand_path($1), :require => false }] else [place, { :require => false }] end end group :test do - gem 'voxpupuli-test', '>= 1.0.0', :require => false + gem 'voxpupuli-test', '>= 1.4.0', :require => false gem 'coveralls', :require => false gem 'simplecov-console', :require => false end group :development do gem 'travis', :require => false gem 'travis-lint', :require => false gem 'guard-rake', :require => false gem 'overcommit', '>= 0.39.1', :require => false end group :system_tests do gem 'voxpupuli-acceptance', :require => false end group :release do gem 'github_changelog_generator', :require => false, :git => 'https://github.com/voxpupuli/github-changelog-generator', :branch => 'voxpupuli_essential_fixes' gem 'puppet-blacksmith', :require => false gem 'voxpupuli-release', :require => false gem 'puppet-strings', '>= 2.2', :require => false end if facterversion = ENV['FACTER_GEM_VERSION'] gem 'facter', facterversion.to_s, :require => false, :groups => [:test] else gem 'facter', :require => false, :groups => [:test] end ENV['PUPPET_VERSION'].nil? ? puppetversion = '~> 6.0' : puppetversion = ENV['PUPPET_VERSION'].to_s gem 'puppet', puppetversion, :require => false, :groups => [:test] # vim: syntax=ruby diff --git a/manifests/broker.pp b/manifests/broker.pp index fae44a6..b52347a 100644 --- a/manifests/broker.pp +++ b/manifests/broker.pp @@ -1,155 +1,155 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::broker # # This class will install kafka with the broker role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*timeout_stop*] # Set the 'TimeoutStopSec' option of the systemd service. # # [*exec_stop*] # Set the 'ExecStop' option of the systemd service to 'kafka-server-stop.sh'. # # [*daemon_start*] # Use the '-daemon' option when starting Kafka with 'kafka-server-start.sh'. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the configuration options. # # === Examples # # Create a single broker instance which talks to a local zookeeper instance. # # class { 'kafka::broker': # config => { 'broker.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::broker ( - String $version = $kafka::params::version, - String $scala_version = $kafka::params::scala_version, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka', - Boolean $service_install = $kafka::params::service_install, + String[1] $service_name = 'kafka', + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, - Optional[String] $timeout_stop = $kafka::params::timeout_stop, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, + Optional[String[1]] $timeout_stop = $kafka::params::timeout_stop, Boolean $exec_stop = $kafka::params::exec_stop, Boolean $daemon_start = $kafka::params::daemon_start, Hash $env = {}, - Hash $config = {}, - String $heap_opts = $kafka::params::broker_heap_opts, - String $jmx_opts = $kafka::params::broker_jmx_opts, - String $log4j_opts = $kafka::params::broker_log4j_opts, - $opts = $kafka::params::broker_opts, + Hash[String[1], Any] $config = {}, + String[1] $heap_opts = $kafka::params::broker_heap_opts, + String[1] $jmx_opts = $kafka::params::broker_jmx_opts, + String[1] $log4j_opts = $kafka::params::broker_log4j_opts, + String[0] $opts = $kafka::params::broker_opts, ) inherits kafka::params { class { 'kafka::broker::install': } -> class { 'kafka::broker::config': } -> class { 'kafka::broker::service': } -> Class['kafka::broker'] } diff --git a/manifests/broker/config.pp b/manifests/broker/config.pp index 9b7fed9..1602351 100644 --- a/manifests/broker/config.pp +++ b/manifests/broker/config.pp @@ -1,35 +1,35 @@ # @summary # This class handles the Kafka (broker) config. # # @api private # class kafka::broker::config( - Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, - String $service_name = $kafka::broker::service_name, - Boolean $service_install = $kafka::broker::service_install, + Boolean $manage_service = $kafka::broker::manage_service, + String[1] $service_name = $kafka::broker::service_name, Boolean $service_restart = $kafka::broker::service_restart, - Hash $config = $kafka::broker::config, + Hash[String[1], Any] $config = $kafka::broker::config, + Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, + String[1] $user_name = $kafka::broker::user_name, + String[1] $group_name = $kafka::broker::group_name, Stdlib::Filemode $config_mode = $kafka::broker::config_mode, - String $user = $kafka::broker::user, - String $group = $kafka::broker::group, ) { assert_private() - if ($service_install and $service_restart) { + if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'brokerconfigs' file { "${config_dir}/server.properties": ensure => present, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/broker/install.pp b/manifests/broker/install.pp index 498b140..b09f511 100644 --- a/manifests/broker/install.pp +++ b/manifests/broker/install.pp @@ -1,30 +1,30 @@ # @summary # This class handles the Kafka (broker) package. # # @api private # class kafka::broker::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::broker::version, - scala_version => $kafka::broker::scala_version, - install_dir => $kafka::broker::install_dir, - mirror_url => $kafka::broker::mirror_url, - install_java => $kafka::broker::install_java, - package_dir => $kafka::broker::package_dir, - package_name => $kafka::broker::package_name, - package_ensure => $kafka::broker::package_ensure, - user => $kafka::broker::user, - group => $kafka::broker::group, - user_id => $kafka::broker::user_id, + manage_java => $kafka::broker::manage_java, + manage_group => $kafka::broker::manage_group, group_id => $kafka::broker::group_id, + group_name => $kafka::broker::group_name, manage_user => $kafka::broker::manage_user, - manage_group => $kafka::broker::manage_group, + user_id => $kafka::broker::user_id, + user_name => $kafka::broker::user_name, config_dir => $kafka::broker::config_dir, log_dir => $kafka::broker::log_dir, + mirror_url => $kafka::broker::mirror_url, + kafka_version => $kafka::broker::kafka_version, + scala_version => $kafka::broker::scala_version, + install_dir => $kafka::broker::install_dir, + package_dir => $kafka::broker::package_dir, + package_ensure => $kafka::broker::package_ensure, + package_name => $kafka::broker::package_name, } } } diff --git a/manifests/broker/service.pp b/manifests/broker/service.pp index 3af8bed..a2f66fa 100644 --- a/manifests/broker/service.pp +++ b/manifests/broker/service.pp @@ -1,72 +1,72 @@ # @summary # This class handles the Kafka (broker) service. # # @api private # class kafka::broker::service( - String $user = $kafka::broker::user, - String $group = $kafka::broker::group, + Boolean $manage_service = $kafka::broker::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, + String[1] $service_name = $kafka::broker::service_name, + String[1] $user_name = $kafka::broker::user_name, + String[1] $group_name = $kafka::broker::group_name, Stdlib::Absolutepath $config_dir = $kafka::broker::config_dir, Stdlib::Absolutepath $log_dir = $kafka::broker::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::broker::bin_dir, - String $service_name = $kafka::broker::service_name, - Boolean $service_install = $kafka::broker::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::broker::service_ensure, - Array[String] $service_requires = $kafka::broker::service_requires, - Optional[String] $limit_nofile = $kafka::broker::limit_nofile, - Optional[String] $limit_core = $kafka::broker::limit_core, - Optional[String] $timeout_stop = $kafka::broker::timeout_stop, + Array[String[1]] $service_requires = $kafka::broker::service_requires, + Optional[String[1]] $limit_nofile = $kafka::broker::limit_nofile, + Optional[String[1]] $limit_core = $kafka::broker::limit_core, + Optional[String[1]] $timeout_stop = $kafka::broker::timeout_stop, Boolean $exec_stop = $kafka::broker::exec_stop, Boolean $daemon_start = $kafka::broker::daemon_start, Hash $env = $kafka::broker::env, - String $heap_opts = $kafka::broker::heap_opts, - String $jmx_opts = $kafka::broker::jmx_opts, - String $log4j_opts = $kafka::broker::log4j_opts, - $opts = $kafka::broker::opts, + String[1] $heap_opts = $kafka::broker::heap_opts, + String[1] $jmx_opts = $kafka::broker::jmx_opts, + String[1] $log4j_opts = $kafka::broker::log4j_opts, + String[0] $opts = $kafka::broker::opts, ) { assert_private() - if $service_install { + if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, 'KAFKA_OPTS' => $opts, 'LOG_DIR' => $log_dir, } $environment = deep_merge($env_defaults, $env) - if $::service_provider == 'systemd' { + if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/consumer.pp b/manifests/consumer.pp index eb2c07e..6c6bd94 100644 --- a/manifests/consumer.pp +++ b/manifests/consumer.pp @@ -1,144 +1,144 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::consumer # # This class will install kafka with the consumer role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the consumer configuration options. # # [*service_config*] # A hash of the `kafka-console-consumer.sh` script options. # # === Examples # # Create the consumer service connecting to a local zookeeper # # class { 'kafka::consumer': # config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } class kafka::consumer ( - String $version = $kafka::params::version, - String $scala_version = $kafka::params::scala_version, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka-consumer', - Boolean $service_install = $kafka::params::service_install, + String[1] $service_name = 'kafka-consumer', + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, - Hash $config = {}, - Hash $service_config = {}, - String $jmx_opts = $kafka::params::consumer_jmx_opts, - String $log4j_opts = $kafka::params::consumer_log4j_opts, + Hash[String[1], Any] $config = {}, + Hash[String[1],String[1]] $service_config = {}, + String[1] $jmx_opts = $kafka::params::consumer_jmx_opts, + String[1] $log4j_opts = $kafka::params::consumer_log4j_opts, ) inherits kafka::params { class { 'kafka::consumer::install': } -> class { 'kafka::consumer::config': } -> class { 'kafka::consumer::service': } -> Class['kafka::consumer'] } diff --git a/manifests/consumer/config.pp b/manifests/consumer/config.pp index eeb9e5f..6e92d9d 100644 --- a/manifests/consumer/config.pp +++ b/manifests/consumer/config.pp @@ -1,31 +1,31 @@ # @summary # This class handles the Kafka (consumer) config. # class kafka::consumer::config( - Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, - String $service_name = $kafka::consumer::service_name, - Boolean $service_install = $kafka::consumer::service_install, + Boolean $manage_service = $kafka::consumer::manage_service, + String[1] $service_name = $kafka::consumer::service_name, Boolean $service_restart = $kafka::consumer::service_restart, - Hash $config = $kafka::consumer::config, + Hash[String[1], Any] $config = $kafka::consumer::config, + Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, + String[1] $user_name = $kafka::consumer::user_name, + String[1] $group_name = $kafka::consumer::group_name, Stdlib::Filemode $config_mode = $kafka::consumer::config_mode, - String $user = $kafka::consumer::user, - String $group = $kafka::consumer::group, ) { - if ($service_install and $service_restart) { + if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'consumerconfigs' file { "${config_dir}/consumer.properties": ensure => present, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/consumer/install.pp b/manifests/consumer/install.pp index e845fcb..ed9f290 100644 --- a/manifests/consumer/install.pp +++ b/manifests/consumer/install.pp @@ -1,30 +1,30 @@ # @summary # This class handles the Kafka (consumer) package. # # @api private # class kafka::consumer::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::consumer::version, - scala_version => $kafka::consumer::scala_version, - install_dir => $kafka::consumer::install_dir, - mirror_url => $kafka::consumer::mirror_url, - install_java => $kafka::consumer::install_java, - package_dir => $kafka::consumer::package_dir, - package_name => $kafka::consumer::package_name, - package_ensure => $kafka::consumer::package_ensure, - user => $kafka::consumer::user, - group => $kafka::consumer::group, - user_id => $kafka::consumer::user_id, + manage_java => $kafka::consumer::manage_java, + manage_group => $kafka::consumer::manage_group, group_id => $kafka::consumer::group_id, + group_name => $kafka::consumer::group_name, manage_user => $kafka::consumer::manage_user, - manage_group => $kafka::consumer::manage_group, + user_id => $kafka::consumer::user_id, + user_name => $kafka::consumer::user_name, config_dir => $kafka::consumer::config_dir, log_dir => $kafka::consumer::log_dir, + mirror_url => $kafka::consumer::mirror_url, + kafka_version => $kafka::consumer::kafka_version, + scala_version => $kafka::consumer::scala_version, + install_dir => $kafka::consumer::install_dir, + package_dir => $kafka::consumer::package_dir, + package_ensure => $kafka::consumer::package_ensure, + package_name => $kafka::consumer::package_name, } } } diff --git a/manifests/consumer/service.pp b/manifests/consumer/service.pp index 7764856..287ae32 100644 --- a/manifests/consumer/service.pp +++ b/manifests/consumer/service.pp @@ -1,72 +1,73 @@ # @summary # This class handles the Kafka (consumer) service. # # @api private # class kafka::consumer::service( - String $user = $kafka::consumer::user, - String $group = $kafka::consumer::group, + Boolean $manage_service = $kafka::consumer::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, + String[1] $service_name = $kafka::consumer::service_name, + String[1] $user_name = $kafka::consumer::user_name, + String[1] $group_name = $kafka::consumer::group_name, Stdlib::Absolutepath $config_dir = $kafka::consumer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::consumer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::consumer::bin_dir, - String $service_name = $kafka::consumer::service_name, - Boolean $service_install = $kafka::consumer::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::consumer::service_ensure, - Array[String] $service_requires = $kafka::consumer::service_requires, - Optional[String] $limit_nofile = $kafka::consumer::limit_nofile, - Optional[String] $limit_core = $kafka::consumer::limit_core, + Array[String[1]] $service_requires = $kafka::consumer::service_requires, + Optional[String[1]] $limit_nofile = $kafka::consumer::limit_nofile, + Optional[String[1]] $limit_core = $kafka::consumer::limit_core, Hash $env = $kafka::consumer::env, - String $jmx_opts = $kafka::consumer::jmx_opts, - String $log4j_opts = $kafka::consumer::log4j_opts, - Hash $service_config = $kafka::consumer::service_config, + String[1] $jmx_opts = $kafka::consumer::jmx_opts, + String[1] $log4j_opts = $kafka::consumer::log4j_opts, + Hash[String[1],String[1]] $service_config = $kafka::consumer::service_config, ) { assert_private() - if $service_install { + if $manage_service { + if $service_config['topic'] == '' { fail('[Consumer] You need to specify a value for topic') } if $service_config['zookeeper'] == '' { fail('[Consumer] You need to specify a value for zookeeper') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) - if $::service_provider == 'systemd' { + if $facts['service_provider'] == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/init.pp b/manifests/init.pp index fe44a1c..a9e1fa8 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -1,230 +1,230 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka # # This class will install kafka binaries # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*system_user*] # Whether the kafka user is a system user or not. # # [*group_id*] # Create the kafka group with this ID. # # [*system_group*] # Whether the kafka group is a system group or not. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # === Examples # # class kafka ( - String $version = $kafka::params::version, - String $scala_version = $kafka::params::scala_version, - Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, - Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, - Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - Optional[String] $mirror_subpath = $kafka::params::mirror_subpath, - Optional[String] $proxy_server = $kafka::params::proxy_server, - Optional[String] $proxy_port = $kafka::params::proxy_port, - Optional[String] $proxy_host = $kafka::params::proxy_host, - Optional[String] $proxy_type = $kafka::params::proxy_type, - String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, - Boolean $system_user = $kafka::params::system_user, - Boolean $system_group = $kafka::params::system_group, - Optional[Integer] $user_id = $kafka::params::user_id, - Optional[Integer] $group_id = $kafka::params::group_id, - Boolean $manage_user = $kafka::params::manage_user, - Boolean $manage_group = $kafka::params::manage_group, - Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, - Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, - Optional[String] $install_mode = $kafka::params::install_mode, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, + Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, + Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, + Boolean $manage_java = $kafka::params::manage_java, + Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, + Optional[String[1]] $package_name = $kafka::params::package_name, + Optional[String[1]] $mirror_subpath = $kafka::params::mirror_subpath, + Optional[String[1]] $proxy_server = $kafka::params::proxy_server, + Optional[String[1]] $proxy_port = $kafka::params::proxy_port, + Optional[String[1]] $proxy_host = $kafka::params::proxy_host, + Optional[String[1]] $proxy_type = $kafka::params::proxy_type, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, + Boolean $system_user = $kafka::params::system_user, + Boolean $system_group = $kafka::params::system_group, + Optional[Integer] $user_id = $kafka::params::user_id, + Optional[Integer] $group_id = $kafka::params::group_id, + Boolean $manage_user = $kafka::params::manage_user, + Boolean $manage_group = $kafka::params::manage_group, + Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, + Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, + Optional[String[1]] $install_mode = $kafka::params::install_mode, ) inherits kafka::params { - if $install_java { + if $manage_java { class { 'java': distribution => 'jdk', } } if $manage_group { - group { $group: + group { $group_name: ensure => present, gid => $group_id, system => $system_group, } } if $manage_user { - user { $user: + user { $user_name: ensure => present, shell => '/bin/bash', - require => Group[$group], + require => Group[$group_name], uid => $user_id, system => $system_user, } } file { $config_dir: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, } file { $log_dir: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, require => [ - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], } if $package_name == undef { include archive $mirror_path = $mirror_subpath ? { # if mirror_subpath was not changed, # we adapt it for the version - $kafka::params::mirror_subpath => "kafka/${version}", + $kafka::params::mirror_subpath => "kafka/${kafka_version}", # else, we just take whatever was supplied: default => $mirror_subpath, } - $basefilename = "kafka_${scala_version}-${version}.tgz" + $basefilename = "kafka_${scala_version}-${kafka_version}.tgz" $package_url = "${mirror_url}${mirror_path}/${basefilename}" $source = $mirror_url ?{ /tgz$/ => $mirror_url, default => $package_url, } $install_directory = $install_dir ? { # if install_dir was not changed, # we adapt it for the scala_version and the version - $kafka::params::install_dir => "/opt/kafka-${scala_version}-${version}", + $kafka::params::install_dir => "/opt/kafka-${scala_version}-${kafka_version}", # else, we just take whatever was supplied: default => $install_dir, } file { $package_dir: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, require => [ - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], } file { $install_directory: ensure => directory, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $install_mode, require => [ - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], } file { '/opt/kafka': ensure => link, target => $install_directory, require => File[$install_directory], } if $proxy_server == undef and $proxy_host != undef and $proxy_port != undef { $final_proxy_server = "${proxy_host}:${proxy_port}" } else { $final_proxy_server = $proxy_server } archive { "${package_dir}/${basefilename}": ensure => present, extract => true, extract_command => 'tar xfz %s --strip-components=1', extract_path => $install_directory, source => $source, creates => "${install_directory}/config", cleanup => true, proxy_server => $final_proxy_server, proxy_type => $proxy_type, - user => $user, - group => $group, + user => $user_name, + group => $group_name, require => [ File[$package_dir], File[$install_directory], - Group[$group], - User[$user], + Group[$group_name], + User[$user_name], ], before => File[$config_dir], } } else { package { $package_name: ensure => $package_ensure, before => File[$config_dir], } } } diff --git a/manifests/mirror.pp b/manifests/mirror.pp index 7bc291d..939b56e 100644 --- a/manifests/mirror.pp +++ b/manifests/mirror.pp @@ -1,147 +1,147 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::mirror # # This class will install kafka with the mirror role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*consumer_config*] # A hash of the consumer configuration options. # # [*producer_config*] # A hash of the producer configuration options. # # [*service_config*] # A hash of the mirror script options. # # === Examples # # Create the mirror service connecting to a local zookeeper # # class { 'kafka::mirror': # consumer_config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::mirror ( - String $version = $kafka::params::version, - String $scala_version = $kafka::params::scala_version, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka-mirror', - Boolean $service_install = $kafka::params::service_install, + String[1] $service_name = 'kafka-mirror', + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, - Hash $consumer_config = {}, - Hash $producer_config = {}, - Hash $service_config = {}, - String $heap_opts = $kafka::params::mirror_heap_opts, - String $jmx_opts = $kafka::params::mirror_jmx_opts, - String $log4j_opts = $kafka::params::mirror_log4j_opts, + Hash[String[1],String[1]] $consumer_config = {}, + Hash[String[1],String[1]] $producer_config = {}, + Hash[String[1],String[1]] $service_config = {}, + String[1] $heap_opts = $kafka::params::mirror_heap_opts, + String[1] $jmx_opts = $kafka::params::mirror_jmx_opts, + String[1] $log4j_opts = $kafka::params::mirror_log4j_opts, ) inherits kafka::params { class { 'kafka::mirror::install': } -> class { 'kafka::mirror::config': } -> class { 'kafka::mirror::service': } -> Class['kafka::mirror'] } diff --git a/manifests/mirror/config.pp b/manifests/mirror/config.pp index b909bfc..38a667a 100644 --- a/manifests/mirror/config.pp +++ b/manifests/mirror/config.pp @@ -1,51 +1,51 @@ # @summary # This class handles the Kafka (mirror) config. # # @api private # class kafka::mirror::config( - Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, - String $service_name = $kafka::mirror::service_name, - Boolean $service_install = $kafka::mirror::service_install, - Boolean $service_restart = $kafka::mirror::service_restart, - Hash $consumer_config = $kafka::mirror::consumer_config, - Hash $producer_config = $kafka::mirror::producer_config, - Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, - String $user = $kafka::mirror::user, - String $group = $kafka::mirror::group, + Boolean $manage_service = $kafka::mirror::manage_service, + String[1] $service_name = $kafka::mirror::service_name, + Boolean $service_restart = $kafka::mirror::service_restart, + Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, + Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, + Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, + String[1] $user_name = $kafka::mirror::user_name, + String[1] $group_name = $kafka::mirror::group_name, + Stdlib::Filemode $config_mode = $kafka::mirror::config_mode, ) { assert_private() if $consumer_config['group.id'] == '' { fail('[Consumer] You need to specify a value for group.id') } if $consumer_config['zookeeper.connect'] == '' { fail('[Consumer] You need to specify a value for zookeeper.connect') } if $producer_config['bootstrap.servers'] == '' { fail('[Producer] You need to specify a value for bootstrap.servers') } class { 'kafka::consumer::config': - config_dir => $config_dir, - config_mode => $config_mode, + manage_service => $manage_service, service_name => $service_name, - service_install => $service_install, service_restart => $service_restart, config => $consumer_config, - user => $user, - group => $group, + config_dir => $config_dir, + user_name => $user_name, + group_name => $group_name, + config_mode => $config_mode, } class { 'kafka::producer::config': - config_dir => $config_dir, - config_mode => $config_mode, + manage_service => $manage_service, service_name => $service_name, - service_install => $service_install, service_restart => $service_restart, config => $producer_config, - user => $user, - group => $group, + config_dir => $config_dir, + user_name => $user_name, + group_name => $group_name, + config_mode => $config_mode, } } diff --git a/manifests/mirror/install.pp b/manifests/mirror/install.pp index e69ec25..1d3f741 100644 --- a/manifests/mirror/install.pp +++ b/manifests/mirror/install.pp @@ -1,30 +1,30 @@ # @summary # This class handles the Kafka (mirror) package. # # @api private # class kafka::mirror::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::mirror::version, - scala_version => $kafka::mirror::scala_version, - install_dir => $kafka::mirror::install_dir, - mirror_url => $kafka::mirror::mirror_url, - install_java => $kafka::mirror::install_java, - package_dir => $kafka::mirror::package_dir, - package_name => $kafka::mirror::package_name, - package_ensure => $kafka::mirror::package_ensure, - user => $kafka::mirror::user, - group => $kafka::mirror::group, - user_id => $kafka::mirror::user_id, + manage_java => $kafka::mirror::manage_java, + manage_group => $kafka::mirror::manage_group, group_id => $kafka::mirror::group_id, + group_name => $kafka::mirror::group_name, manage_user => $kafka::mirror::manage_user, - manage_group => $kafka::mirror::manage_group, + user_id => $kafka::mirror::user_id, + user_name => $kafka::mirror::user_name, config_dir => $kafka::mirror::config_dir, log_dir => $kafka::mirror::log_dir, + mirror_url => $kafka::mirror::mirror_url, + kafka_version => $kafka::mirror::kafka_version, + scala_version => $kafka::mirror::scala_version, + install_dir => $kafka::mirror::install_dir, + package_dir => $kafka::mirror::package_dir, + package_ensure => $kafka::mirror::package_ensure, + package_name => $kafka::mirror::package_name, } } } diff --git a/manifests/mirror/service.pp b/manifests/mirror/service.pp index 76cff20..b4132cb 100644 --- a/manifests/mirror/service.pp +++ b/manifests/mirror/service.pp @@ -1,69 +1,69 @@ # @summary # This class handles the Kafka (mirror) service. # # @api private # class kafka::mirror::service( - String $user = $kafka::mirror::user, - String $group = $kafka::mirror::group, + Boolean $manage_service = $kafka::mirror::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, + String[1] $service_name = $kafka::mirror::service_name, + String[1] $user_name = $kafka::mirror::user_name, + String[1] $group_name = $kafka::mirror::group_name, Stdlib::Absolutepath $config_dir = $kafka::mirror::config_dir, Stdlib::Absolutepath $log_dir = $kafka::mirror::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::mirror::bin_dir, - String $service_name = $kafka::mirror::service_name, - Boolean $service_install = $kafka::mirror::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::mirror::service_ensure, - Array[String] $service_requires = $kafka::mirror::service_requires, - Optional[String] $limit_nofile = $kafka::mirror::limit_nofile, - Optional[String] $limit_core = $kafka::mirror::limit_core, + Array[String[1]] $service_requires = $kafka::mirror::service_requires, + Optional[String[1]] $limit_nofile = $kafka::mirror::limit_nofile, + Optional[String[1]] $limit_core = $kafka::mirror::limit_core, Hash $env = $kafka::mirror::env, - Hash $consumer_config = $kafka::mirror::consumer_config, - Hash $producer_config = $kafka::mirror::producer_config, - Hash $service_config = $kafka::mirror::service_config, - String $heap_opts = $kafka::mirror::heap_opts, - String $jmx_opts = $kafka::mirror::jmx_opts, - String $log4j_opts = $kafka::mirror::log4j_opts, + Hash[String[1],String[1]] $consumer_config = $kafka::mirror::consumer_config, + Hash[String[1],String[1]] $producer_config = $kafka::mirror::producer_config, + Hash[String[1],String[1]] $service_config = $kafka::mirror::service_config, + String[1] $heap_opts = $kafka::mirror::heap_opts, + String[1] $jmx_opts = $kafka::mirror::jmx_opts, + String[1] $log4j_opts = $kafka::mirror::log4j_opts, ) { assert_private() - if $service_install { + if $manage_service { $env_defaults = { 'KAFKA_HEAP_OPTS' => $heap_opts, 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) if $::service_provider == 'systemd' { include systemd file { "/etc/systemd/system/${service_name}.service": ensure => file, mode => '0644', content => template('kafka/unit.erb'), } file { "/etc/init.d/${service_name}": ensure => absent, } File["/etc/systemd/system/${service_name}.service"] ~> Exec['systemctl-daemon-reload'] -> Service[$service_name] } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/params.pp b/manifests/params.pp index 23e9be0..b3f08d9 100644 --- a/manifests/params.pp +++ b/manifests/params.pp @@ -1,73 +1,73 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class kafka::params # # This class is meant to be called from kafka::broker # It sets variables according to platform # class kafka::params { # this is all only tested on Debian and RedHat # params gets included everywhere so we can do the validation here unless $facts['os']['family'] =~ /(RedHat|Debian)/ { warning("${facts['os']['family']} is not supported") } - $version = '2.4.1' + $kafka_version = '2.4.1' $scala_version = '2.12' - $install_dir = "/opt/kafka-${scala_version}-${version}" + $install_dir = "/opt/kafka-${scala_version}-${kafka_version}" $config_dir = '/opt/kafka/config' $bin_dir = '/opt/kafka/bin' $log_dir = '/var/log/kafka' $mirror_url = 'https://www.apache.org/dyn/closer.lua?action=download&filename=' - $mirror_subpath = "kafka/${version}" - $install_java = false + $mirror_subpath = "kafka/${kafka_version}" + $manage_java = false $package_dir = '/var/tmp/kafka' $package_name = undef $proxy_server = undef $proxy_host = undef $proxy_port = undef $proxy_type = undef $package_ensure = 'present' - $user = 'kafka' - $group = 'kafka' + $user_name = 'kafka' + $group_name = 'kafka' $user_id = undef $group_id = undef $system_user = false $system_group = false $manage_user = true $manage_group = true $config_mode = '0644' $install_mode = '0755' - $service_install = true + $manage_service = true $service_ensure = 'running' $service_restart = true $service_requires = $facts['os']['family'] ? { 'RedHat' => ['network.target', 'syslog.target'], default => [], } $limit_nofile = undef $limit_core = undef $timeout_stop = undef $exec_stop = false $daemon_start = false $broker_heap_opts = '-Xmx1G -Xms1G' $broker_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false \ -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9990' $broker_log4j_opts = "-Dlog4j.configuration=file:${config_dir}/log4j.properties" $broker_opts = '' $mirror_heap_opts = '-Xmx256M' $mirror_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991' $mirror_log4j_opts = $broker_log4j_opts $producer_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9992' $producer_log4j_opts = $broker_log4j_opts $consumer_jmx_opts = '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993' $consumer_log4j_opts = $broker_log4j_opts } diff --git a/manifests/producer.pp b/manifests/producer.pp index d083824..7f46b3e 100644 --- a/manifests/producer.pp +++ b/manifests/producer.pp @@ -1,146 +1,146 @@ # Author:: Liam Bennett (mailto:lbennett@opentable.com) # Copyright:: Copyright (c) 2013 OpenTable Inc # License:: MIT # == Class: kafka::producer # # This class will install kafka with the producer role. # # === Requirements/Dependencies # # Currently requires the puppetlabs/stdlib module on the Puppet Forge in # order to validate much of the the provided configuration. # # === Parameters # -# [*version*] +# [*kafka_version*] # The version of kafka that should be installed. # # [*scala_version*] # The scala version what kafka was built with. # # [*install_dir*] # The directory to install kafka to. # # [*mirror_url*] # The url where the kafka is downloaded from. # -# [*install_java*] +# [*manage_java*] # Install java if it's not already installed. # # [*package_dir*] # The directory to install kafka. # # [*package_name*] # Package name, when installing kafka from a package. # # [*package_ensure*] # Package version (or 'present', 'absent', 'latest'), when installing kafka from a package. # -# [*user*] +# [*user_name*] # User to run kafka as. # -# [*group*] +# [*group_name*] # Group to run kafka as. # # [*user_id*] # Create the kafka user with this ID. # # [*group_id*] # Create the kafka group with this ID. # # [*manage_user*] # Create the kafka user if it's not already present. # # [*manage_group*] # Create the kafka group if it's not already present. # # [*config_mode*] # The permissions for the config files. # # [*config_dir*] # The directory to create the kafka config files to. # # [*log_dir*] # The directory for kafka log files. # # [*bin_dir*] # The directory where the kafka scripts are. # # [*service_name*] # Set the name of the service. # -# [*service_install*] +# [*manage_service*] # Install the init.d or systemd service. # # [*service_ensure*] # Set the ensure state of the service to 'stopped' or 'running'. # # [*service_restart*] # Whether the configuration files should trigger a service restart. # # [*service_requires*] # Set the list of services required to be running before Kafka. # # [*limit_nofile*] # Set the 'LimitNOFILE' option of the systemd service. # # [*limit_core*] # Set the 'LimitCORE' option of the systemd service. # # [*env*] # A hash of the environment variables to set. # # [*config*] # A hash of the producer configuration options. # # [*service_config*] # A hash of the `kafka-console-producer.sh` script options. # # === Examples # # Create the producer service connecting to a local zookeeper # # class { 'kafka::producer': # config => { 'client.id' => '0', 'zookeeper.connect' => 'localhost:2181' } # } # class kafka::producer ( - $input, - String $version = $kafka::params::version, - String $scala_version = $kafka::params::scala_version, + Optional[String[1]] $input, + String[1] $kafka_version = $kafka::params::kafka_version, + String[1] $scala_version = $kafka::params::scala_version, Stdlib::Absolutepath $install_dir = $kafka::params::install_dir, Stdlib::HTTPUrl $mirror_url = $kafka::params::mirror_url, - Boolean $install_java = $kafka::params::install_java, + Boolean $manage_java = $kafka::params::manage_java, Stdlib::Absolutepath $package_dir = $kafka::params::package_dir, - Optional[String] $package_name = $kafka::params::package_name, - String $package_ensure = $kafka::params::package_ensure, - String $user = $kafka::params::user, - String $group = $kafka::params::group, + Optional[String[1]] $package_name = $kafka::params::package_name, + String[1] $package_ensure = $kafka::params::package_ensure, + String[1] $user_name = $kafka::params::user_name, + String[1] $group_name = $kafka::params::group_name, Optional[Integer] $user_id = $kafka::params::user_id, Optional[Integer] $group_id = $kafka::params::group_id, Boolean $manage_user = $kafka::params::manage_user, Boolean $manage_group = $kafka::params::manage_group, Stdlib::Filemode $config_mode = $kafka::params::config_mode, Stdlib::Absolutepath $config_dir = $kafka::params::config_dir, Stdlib::Absolutepath $log_dir = $kafka::params::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::params::bin_dir, - String $service_name = 'kafka-producer', - Boolean $service_install = $kafka::params::service_install, + String[1] $service_name = 'kafka-producer', + Boolean $manage_service = $kafka::params::manage_service, Enum['running', 'stopped'] $service_ensure = $kafka::params::service_ensure, Boolean $service_restart = $kafka::params::service_restart, - Array[String] $service_requires = $kafka::params::service_requires, - Optional[String] $limit_nofile = $kafka::params::limit_nofile, - Optional[String] $limit_core = $kafka::params::limit_core, + Array[String[1]] $service_requires = $kafka::params::service_requires, + Optional[String[1]] $limit_nofile = $kafka::params::limit_nofile, + Optional[String[1]] $limit_core = $kafka::params::limit_core, Hash $env = {}, - Hash $config = {}, - Hash $service_config = {}, - String $jmx_opts = $kafka::params::producer_jmx_opts, - String $log4j_opts = $kafka::params::producer_log4j_opts, + Hash[String[1], Any] $config = {}, + Hash[String[1],String[1]] $service_config = {}, + String[1] $jmx_opts = $kafka::params::producer_jmx_opts, + String[1] $log4j_opts = $kafka::params::producer_log4j_opts, ) inherits kafka::params { class { 'kafka::producer::install': } -> class { 'kafka::producer::config': } -> class { 'kafka::producer::service': } -> Class['kafka::producer'] } diff --git a/manifests/producer/config.pp b/manifests/producer/config.pp index a078658..36a8c13 100644 --- a/manifests/producer/config.pp +++ b/manifests/producer/config.pp @@ -1,31 +1,31 @@ # @summary # This class handles the Kafka (producer) config. # class kafka::producer::config( - Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, - String $service_name = $kafka::producer::service_name, - Boolean $service_install = $kafka::producer::service_install, + Boolean $manage_service = $kafka::producer::manage_service, + String[1] $service_name = $kafka::producer::service_name, Boolean $service_restart = $kafka::producer::service_restart, - Hash $config = $kafka::producer::config, + Hash[String[1], Any] $config = $kafka::producer::config, + Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, + String[1] $user_name = $kafka::producer::user_name, + String[1] $group_name = $kafka::producer::group_name, Stdlib::Filemode $config_mode = $kafka::producer::config_mode, - String $user = $kafka::producer::user, - String $group = $kafka::producer::group, ) { - if ($service_install and $service_restart) { + if ($manage_service and $service_restart) { $config_notify = Service[$service_name] } else { $config_notify = undef } $doctag = 'producerconfigs' file { "${config_dir}/producer.properties": ensure => present, - owner => $user, - group => $group, + owner => $user_name, + group => $group_name, mode => $config_mode, content => template('kafka/properties.erb'), notify => $config_notify, require => File[$config_dir], } } diff --git a/manifests/producer/install.pp b/manifests/producer/install.pp index b25ee69..f3cd3ce 100644 --- a/manifests/producer/install.pp +++ b/manifests/producer/install.pp @@ -1,30 +1,30 @@ # @summary # This class handles the Kafka (producer) package. # # @api private # class kafka::producer::install { assert_private() if !defined(Class['kafka']) { class { 'kafka': - version => $kafka::producer::version, - scala_version => $kafka::producer::scala_version, - install_dir => $kafka::producer::install_dir, - mirror_url => $kafka::producer::mirror_url, - install_java => $kafka::producer::install_java, - package_dir => $kafka::producer::package_dir, - package_name => $kafka::producer::package_name, - package_ensure => $kafka::producer::package_ensure, - user => $kafka::producer::user, - group => $kafka::producer::group, - user_id => $kafka::producer::user_id, + manage_java => $kafka::producer::manage_java, + manage_group => $kafka::producer::manage_group, group_id => $kafka::producer::group_id, + group_name => $kafka::producer::group_name, manage_user => $kafka::producer::manage_user, - manage_group => $kafka::producer::manage_group, + user_id => $kafka::producer::user_id, + user_name => $kafka::producer::user_name, config_dir => $kafka::producer::config_dir, log_dir => $kafka::producer::log_dir, + mirror_url => $kafka::producer::mirror_url, + kafka_version => $kafka::producer::kafka_version, + scala_version => $kafka::producer::scala_version, + install_dir => $kafka::producer::install_dir, + package_dir => $kafka::producer::package_dir, + package_ensure => $kafka::producer::package_ensure, + package_name => $kafka::producer::package_name, } } } diff --git a/manifests/producer/service.pp b/manifests/producer/service.pp index 76d6cb6..7d8b4a9 100644 --- a/manifests/producer/service.pp +++ b/manifests/producer/service.pp @@ -1,59 +1,60 @@ # @summary # This class handles the Kafka (producer) service. # # @api private # class kafka::producer::service( - String $user = $kafka::producer::user, - String $group = $kafka::producer::group, + Boolean $manage_service = $kafka::producer::manage_service, + Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, + String[1] $service_name = $kafka::producer::service_name, + String[1] $user_name = $kafka::producer::user_name, + String[1] $group_name = $kafka::producer::group_name, Stdlib::Absolutepath $config_dir = $kafka::producer::config_dir, Stdlib::Absolutepath $log_dir = $kafka::producer::log_dir, Stdlib::Absolutepath $bin_dir = $kafka::producer::bin_dir, - String $service_name = $kafka::producer::service_name, - Boolean $service_install = $kafka::producer::service_install, - Enum['running', 'stopped'] $service_ensure = $kafka::producer::service_ensure, - Array[String] $service_requires = $kafka::producer::service_requires, - Optional[String] $limit_nofile = $kafka::producer::limit_nofile, - Optional[String] $limit_core = $kafka::producer::limit_core, + Array[String[1]] $service_requires = $kafka::producer::service_requires, + Optional[String[1]] $limit_nofile = $kafka::producer::limit_nofile, + Optional[String[1]] $limit_core = $kafka::producer::limit_core, Hash $env = $kafka::producer::env, - $input = $kafka::producer::input, - String $jmx_opts = $kafka::producer::jmx_opts, - String $log4j_opts = $kafka::producer::log4j_opts, - Hash $service_config = $kafka::producer::service_config, + Optional[String[1]] $input = $kafka::producer::input, + String[1] $jmx_opts = $kafka::producer::jmx_opts, + String[1] $log4j_opts = $kafka::producer::log4j_opts, + Hash[String[1],String[1]] $service_config = $kafka::producer::service_config, ) { assert_private() - if $service_install { + if $manage_service { + if $service_config['broker-list'] == '' { fail('[Producer] You need to specify a value for broker-list') } if $service_config['topic'] == '' { fail('[Producer] You need to specify a value for topic') } $env_defaults = { 'KAFKA_JMX_OPTS' => $jmx_opts, 'KAFKA_LOG4J_OPTS' => $log4j_opts, } $environment = deep_merge($env_defaults, $env) - if $::service_provider == 'systemd' { + if $facts['service_provider'] == 'systemd' { fail('Console Producer is not supported on systemd, because the stdin of the process cannot be redirected') } else { file { "/etc/init.d/${service_name}": ensure => file, mode => '0755', content => template('kafka/init.erb'), before => Service[$service_name], } } service { $service_name: ensure => $service_ensure, enable => true, hasstatus => true, hasrestart => true, } } } diff --git a/manifests/topic.pp b/manifests/topic.pp index b16ba08..fc0f8d5 100644 --- a/manifests/topic.pp +++ b/manifests/topic.pp @@ -1,68 +1,68 @@ # @summary # This defined type handles the creation of Kafka topics. # # @example Basic usage # kafka::topic { 'test': # ensure => present, # zookeeper => 'localhost:2181', # replication_factor => 1, # partitions => 1, # } # # @param ensure # Should the topic be created. # # @param zookeeper # The connection string for the ZooKeeper connection in the form host:port. # Multiple hosts can be given to allow fail-over. # # @param replication_factor # The replication factor for each partition in the topic being created. If # not supplied, defaults to the cluster default. # # @param partitions # The number of partitions for the topic being created or altered. If not # supplied for create, defaults to the cluster default. # # @param bin_dir # The directory where the file kafka-topics.sh is located. # # @param config # A topic configuration override for the topic being created or altered. # See the Kafka documentation for full details on the topic configs. # define kafka::topic( - String $ensure = '', - String $zookeeper = '', - Variant[Integer,String] $replication_factor = 1, - Variant[Integer,String] $partitions = 1, - String $bin_dir = '/opt/kafka/bin', - Optional[Hash[String,String]] $config = undef, + String[1] $ensure = '', + String[1] $zookeeper = '', + Variant[Integer,String[1]] $replication_factor = 1, + Variant[Integer,String[1]] $partitions = 1, + String[1] $bin_dir = '/opt/kafka/bin', + Optional[Hash[String[1],String[1]]] $config = undef, ) { if is_string($replication_factor) { - deprication('kafka::topic', 'Please use Integer type, not String, for paramter replication_factor') + deprecation('kafka::topic', 'Please use Integer type, not String, for paramter replication_factor') } if is_string($partitions) { - deprication('kafka::topic', 'Please use Integer type, not String, for paramter partitions') + deprecation('kafka::topic', 'Please use Integer type, not String, for paramter partitions') } $_zookeeper = "--zookeeper ${zookeeper}" $_replication_factor = "--replication-factor ${replication_factor}" $_partitions = "--partitions ${partitions}" if $config { $_config_array = $config.map |$key, $value| { "--config ${key}=${value}" } $_config = join($_config_array, ' ') } else { $_config = '' } if $ensure == 'present' { exec { "create topic ${name}": path => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}", command => "kafka-topics.sh --create ${_zookeeper} ${_replication_factor} ${_partitions} --topic ${name} ${_config}", unless => "kafka-topics.sh --list ${_zookeeper} | grep -x ${name}", } } } diff --git a/metadata.json b/metadata.json index 23a7899..47c81fa 100644 --- a/metadata.json +++ b/metadata.json @@ -1,73 +1,73 @@ { "name": "puppet-kafka", "version": "6.0.1-rc0", "author": "Vox Pupuli", "summary": "Puppet module for Kafka", "license": "MIT", "source": "https://github.com/voxpupuli/puppet-kafka", "project_page": "https://github.com/voxpupuli/puppet-kafka", "issues_url": "https://github.com/voxpupuli/puppet-kafka/issues", "dependencies": [ { "name": "puppet/archive", "version_requirement": ">= 1.0.0 < 5.0.0" }, { "name": "puppetlabs/java", "version_requirement": ">= 1.4.2 < 6.0.0" }, { "name": "puppetlabs/stdlib", "version_requirement": ">= 4.22.0 < 7.0.0" }, { "name": "deric/zookeeper", - "version_requirement": ">= 0.5.1 < 1.0.0" + "version_requirement": ">= 0.5.1 < 2.0.0" }, { "name": "camptocamp/systemd", "version_requirement": ">= 0.4.0 < 3.0.0" } ], "operatingsystem_support": [ { "operatingsystem": "CentOS", "operatingsystemrelease": [ "6", "7" ] }, { "operatingsystem": "RedHat", "operatingsystemrelease": [ "6", "7" ] }, { "operatingsystem": "Ubuntu", "operatingsystemrelease": [ "16.04", "18.04" ] }, { "operatingsystem": "Debian", "operatingsystemrelease": [ "8", "9", "10" ] } ], "requirements": [ { "name": "puppet", "version_requirement": ">= 5.5.8 < 7.0.0" } ], "tags": [ "kafka", "pubsub" ] } diff --git a/spec/acceptance/01_zookeeper_spec.rb b/spec/acceptance/01_zookeeper_spec.rb index d5c7087..acfcfeb 100644 --- a/spec/acceptance/01_zookeeper_spec.rb +++ b/spec/acceptance/01_zookeeper_spec.rb @@ -1,41 +1,41 @@ require 'spec_helper_acceptance' describe 'zookeeper prereq' do zookeeper = <<-EOS if $::osfamily == 'RedHat' { class { 'java' : package => 'java-1.8.0-openjdk-devel', } exec { 'create pid dir': command => '/bin/mkdir -p /var/run/', creates => '/var/run/', } file { '/var/run/zookeeper/': ensure => directory, owner => 'zookeeper', group => 'zookeeper', } $zookeeper_service_provider = $facts['os']['release']['major'] ? { '6' => 'redhat', '7' => 'systemd', } class { 'zookeeper': install_method => 'archive', - archive_version => '3.6.0', + archive_version => '3.6.1', service_provider => $zookeeper_service_provider, manage_service_file => true, } } else { include zookeeper } EOS it 'installs zookeeper with no errors' do apply_manifest(zookeeper, catch_failures: true) apply_manifest(zookeeper, catch_changes: true) end end diff --git a/spec/acceptance/broker_spec.rb b/spec/acceptance/broker_spec.rb index 8f9973c..9926af1 100644 --- a/spec/acceptance/broker_spec.rb +++ b/spec/acceptance/broker_spec.rb @@ -1,224 +1,234 @@ require 'spec_helper_acceptance' describe 'kafka::broker' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } -> kafka::topic { 'demo': ensure => present, zookeeper => 'localhost:2181', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::broker::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::broker::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } it { is_expected.to contain 'zookeeper.connect=localhost:2181' } end end context 'with custom config dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, config_dir => '/opt/kafka/custom_config' } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/custom_config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } it { is_expected.to contain 'zookeeper.connect=localhost:2181' } end end context 'with specific version' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': - version => '2.4.0', - config => { + kafka_version => '2.4.0', + config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/server.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::broker::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/etc/init.d/kafka'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } end describe file('/etc/systemd/system/kafka.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } end describe service('kafka') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end describe 'kafka::broker::service' do context 'with log4j/jmx parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create log dir': command => '/bin/mkdir -p /some/path/to/logs', creates => '/some/path/to/logs', } -> class { 'kafka::broker': config => { 'zookeeper.connect' => 'localhost:2181', }, heap_opts => '-Xmx512M -Xmx512M', log4j_opts => '-Dlog4j.configuration=file:/tmp/log4j.properties', jmx_opts => '-Dcom.sun.management.jmxremote', opts => '-Djava.security.policy=/some/path/my.policy', log_dir => '/some/path/to/logs' } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote"' } it { is_expected.to contain 'export KAFKA_HEAP_OPTS="-Xmx512M -Xmx512M"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/tmp/log4j.properties"' } end + describe file('/etc/init.d/kafka'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to be_file } + it { is_expected.to be_owned_by 'root' } + it { is_expected.to be_grouped_into 'root' } + it { is_expected.to contain %r{^# Provides:\s+kafka$} } + it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote"' } + it { is_expected.to contain 'export KAFKA_HEAP_OPTS="-Xmx512M -Xmx512M"' } + it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/tmp/log4j.properties"' } + end + describe file('/etc/systemd/system/kafka.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain "Environment='KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote'" } it { is_expected.to contain "Environment='KAFKA_HEAP_OPTS=-Xmx512M -Xmx512M'" } it { is_expected.to contain "Environment='KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/tmp/log4j.properties'" } it { is_expected.to contain "Environment='KAFKA_OPTS=-Djava.security.policy=/some/path/my.policy'" } it { is_expected.to contain "Environment='LOG_DIR=/some/path/to/logs'" } end describe service('kafka') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/consumer_spec.rb b/spec/acceptance/consumer_spec.rb index 31c4526..0650a4a 100644 --- a/spec/acceptance/consumer_spec.rb +++ b/spec/acceptance/consumer_spec.rb @@ -1,157 +1,166 @@ require 'spec_helper_acceptance' describe 'kafka::consumer' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::consumer::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::consumer::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::consumer::config' do context 'with custom config_dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) end describe file('/opt/kafka/custom_config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::consumer::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::consumer': service_config => { topic => 'demo', bootstrap-server => 'localhost:9092', }, } EOS apply_manifest(pp, catch_failures: true) end describe file('/etc/init.d/kafka-consumer'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end + describe file('/etc/init.d/kafka-consumer'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to be_file } + it { is_expected.to be_owned_by 'root' } + it { is_expected.to be_grouped_into 'root' } + it { is_expected.to contain %r{^# Provides:\s+kafka-consumer$} } + it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993"' } + it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } + end + describe file('/etc/systemd/system/kafka-consumer.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'Environment=\'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9993\'' } it { is_expected.to contain 'Environment=\'KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties\'' } end describe service('kafka-consumer') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/init_spec.rb b/spec/acceptance/init_spec.rb index fdd3b92..e74d8db 100644 --- a/spec/acceptance/init_spec.rb +++ b/spec/acceptance/init_spec.rb @@ -1,212 +1,212 @@ require 'spec_helper_acceptance' describe 'kafka' do it 'works with no errors' do pp = <<-EOS class { 'kafka': } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::init' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka': } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific kafka version' do it 'works with no errors' do pp = <<-EOS class { 'kafka': - version => '2.4.0', + kafka_version => '2.4.0', } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.0') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.0') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific scala version' do it 'works with no errors' do pp = <<-EOS class { 'kafka': scala_version => '2.13', } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.13-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.13-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific config dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka': config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/custom_config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end end diff --git a/spec/acceptance/mirror_spec.rb b/spec/acceptance/mirror_spec.rb index e61102f..1301b47 100644 --- a/spec/acceptance/mirror_spec.rb +++ b/spec/acceptance/mirror_spec.rb @@ -1,235 +1,244 @@ require 'spec_helper_acceptance' describe 'kafka::mirror' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::mirror::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::mirror::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with custom config_dir' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, config_dir => '/opt/kafka/custom_config', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/custom_config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/custom_config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end context 'with specific version' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': - version => '2.4.0', + kafka_version => '2.4.0', consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/consumer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::mirror::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::mirror': consumer_config => { 'group.id' => 'kafka-mirror', 'bootstrap.servers' => 'localhost:9092', }, producer_config => { 'bootstrap.servers' => 'localhost:9092', }, service_config => { 'whitelist' => '.*', }, } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka-mirror'), if: (fact('operatingsystemmajrelease') =~ %r{(5|6)} && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end + describe file('/etc/init.d/kafka-mirror'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to be_file } + it { is_expected.to be_owned_by 'root' } + it { is_expected.to be_grouped_into 'root' } + it { is_expected.to contain %r{^# Provides:\s+kafka-mirror$} } + it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991"' } + it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } + end + describe file('/etc/systemd/system/kafka-mirror.service'), if: (fact('operatingsystemmajrelease') == '7' && fact('osfamily') == 'RedHat') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'Environment=\'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9991\'' } it { is_expected.to contain 'Environment=\'KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties\'' } end describe service('kafka-mirror') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/acceptance/producer_spec.rb b/spec/acceptance/producer_spec.rb index c97d2f3..b2e18f8 100644 --- a/spec/acceptance/producer_spec.rb +++ b/spec/acceptance/producer_spec.rb @@ -1,147 +1,151 @@ require 'spec_helper_acceptance' describe 'kafka::producer', if: (fact('operatingsystemmajrelease') == '6' && fact('osfamily') == 'RedHat') do # systemd systems not supported by kafka::producer::service it 'works with no errors' do pp = <<-EOS exec { 'create fifo': command => '/usr/bin/mkfifo /tmp/kafka-producer', user => 'kafka', creates => '/tmp/kafka-producer', } -> class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe 'kafka::producer::install' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create fifo': command => '/usr/bin/mkfifo /tmp/kafka-producer', user => 'kafka', creates => '/tmp/kafka-producer', } -> class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe group('kafka') do it { is_expected.to exist } end describe user('kafka') do it { is_expected.to exist } it { is_expected.to belong_to_group 'kafka' } it { is_expected.to have_login_shell '/bin/bash' } end describe file('/var/tmp/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka-2.12-2.4.1') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/opt/kafka') do it { is_expected.to be_linked_to('/opt/kafka-2.12-2.4.1') } end describe file('/opt/kafka/config') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end describe file('/var/log/kafka') do it { is_expected.to be_directory } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::producer::config' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS exec { 'create fifo': command => '/usr/bin/mkfifo /tmp/kafka-producer', user => 'kafka', creates => '/tmp/kafka-producer', } -> class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/opt/kafka/config/producer.properties') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'kafka' } it { is_expected.to be_grouped_into 'kafka' } end end end describe 'kafka::producer::service' do context 'with default parameters' do it 'works with no errors' do pp = <<-EOS class { 'kafka::producer': service_config => { 'broker-list' => 'localhost:9092', topic => 'demo', }, input => '3<>/tmp/kafka-producer 0>&3', } EOS apply_manifest(pp, catch_failures: true) apply_manifest(pp, catch_changes: true) end describe file('/etc/init.d/kafka-producer') do it { is_expected.to be_file } it { is_expected.to be_owned_by 'root' } it { is_expected.to be_grouped_into 'root' } it { is_expected.to contain 'export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9992"' } it { is_expected.to contain 'export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"' } end + describe file('/etc/init.d/kafka-producer'), if: (fact('service_provider') == 'upstart' && fact('osfamily') == 'Debian') do + it { is_expected.to contain %r{^# Provides:\s+kafka-producer$} } + end + describe service('kafka-producer') do it { is_expected.to be_running } it { is_expected.to be_enabled } end end end end diff --git a/spec/classes/broker_spec.rb b/spec/classes/broker_spec.rb index a617f89..ee9017f 100644 --- a/spec/classes/broker_spec.rb +++ b/spec/classes/broker_spec.rb @@ -1,153 +1,173 @@ require 'spec_helper' require 'shared_examples_param_validation' describe 'kafka::broker', type: :class do let :facts do { osfamily: 'Debian', os: { family: 'Debian' }, operatingsystem: 'Ubuntu', operatingsystemrelease: '14.04', lsbdistcodename: 'trusty', architecture: 'amd64', service_provider: 'upstart' } end let :common_params do { config: { 'zookeeper.connect' => 'localhost:2181' } } end let :params do common_params end it { is_expected.to contain_class('kafka::broker::install').that_comes_before('Class[kafka::broker::config]') } it { is_expected.to contain_class('kafka::broker::config').that_comes_before('Class[kafka::broker::service]') } it { is_expected.to contain_class('kafka::broker::service').that_comes_before('Class[kafka::broker]') } it { is_expected.to contain_class('kafka::broker') } context 'on Debian' do describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do - context 'service_install false' do + context 'manage_service false' do let :params do - common_params.merge(service_install: false) + common_params.merge(manage_service: false) end it { is_expected.not_to contain_file('/etc/init.d/kafka') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do it { is_expected.to contain_file('/etc/init.d/kafka') } + context 'limit_nofile set' do + let :params do + { + limit_nofile: '65536' + } + end + + it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -n 65536$} } + end + + context 'limit_core set' do + let :params do + { + limit_core: 'infinity' + } + end + + it { is_expected.to contain_file('/etc/init.d/kafka').with_content %r{ulimit -c infinity$} } + end + it { is_expected.to contain_service('kafka') } end end end context 'on Centos' do let :facts do { osfamily: 'RedHat', os: { family: 'RedHat' }, operatingsystem: 'CentOS', operatingsystemrelease: '7', operatingsystemmajrelease: '7', architecture: 'amd64', path: '/usr/local/sbin', service_provider: 'systemd' } end describe 'kafka::broker::install' do context 'defaults' do it { is_expected.to contain_class('kafka') } end end describe 'kafka::broker::config' do context 'defaults' do it { is_expected.to contain_file('/opt/kafka/config/server.properties') } end end describe 'kafka::broker::service' do - context 'service_install false' do + context 'manage_service false' do let :params do - common_params.merge(service_install: false) + common_params.merge(manage_service: false) end it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service') } it { is_expected.not_to contain_service('kafka') } end context 'defaults' do it { is_expected.to contain_file('/etc/systemd/system/kafka.service').that_notifies('Exec[systemctl-daemon-reload]') } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=network\.target syslog\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=network\.target syslog\.target$} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=} } it { is_expected.not_to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=} } it do is_expected.to contain_file('/etc/init.d/kafka').with( ensure: 'absent' ) end it { is_expected.to contain_exec('systemctl-daemon-reload').that_comes_before('Service[kafka]') } it { is_expected.to contain_service('kafka') } end context 'limit_nofile set' do let :params do { limit_nofile: '65536' } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitNOFILE=65536$} } end context 'limit_core set' do let :params do { limit_core: 'infinity' } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^LimitCORE=infinity$} } end context 'service_requires set' do let :params do { service_requires: ['dummy.target'] } end it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^After=dummy\.target$} } it { is_expected.to contain_file('/etc/systemd/system/kafka.service').with_content %r{^Wants=dummy\.target$} } end end end it_validates_parameter 'mirror_url' end diff --git a/spec/classes/init_spec.rb b/spec/classes/init_spec.rb index 410d440..75851a0 100644 --- a/spec/classes/init_spec.rb +++ b/spec/classes/init_spec.rb @@ -1,62 +1,62 @@ require 'spec_helper' describe 'kafka', type: :class do let :facts do { osfamily: 'Debian', os: { family: 'Debian' }, operatingsystem: 'Ubuntu', operatingsystemrelease: '14.04', lsbdistcodename: 'trusty', architecture: 'amd64', service_provider: 'upstart' } end it { is_expected.to contain_class('kafka::params') } context 'on Debian' do describe 'kafka' do context 'defaults' do it { is_expected.to contain_group('kafka') } it { is_expected.to contain_user('kafka') } it { is_expected.to contain_file('/var/tmp/kafka') } it { is_expected.to contain_file('/opt/kafka-2.12-2.4.1') } it { is_expected.to contain_file('/opt/kafka') } it { is_expected.to contain_file('/opt/kafka/config') } it { is_expected.to contain_file('/var/log/kafka') } end end end context 'on Debian' do describe 'kafka' do context 'all (compatible) parameters' do let :params do { - version: '0.10.0.1', + kafka_version: '0.10.0.1', scala_version: '2.13', install_dir: '/usr/local/kafka', user_id: 9092, group_id: 9092, - user: 'mykafka', - group: 'mykafka', - install_java: false, + user_name: 'mykafka', + group_name: 'mykafka', + manage_java: false, config_dir: '/opt/kafka/custom_config', log_dir: '/var/log/custom_kafka' } end it { is_expected.to contain_group('mykafka').with(gid: 9092) } it { is_expected.to contain_user('mykafka').with(uid: 9092) } it { is_expected.to contain_file('/var/tmp/kafka') } it { is_expected.to contain_file('/opt/kafka') } it { is_expected.to contain_file('/usr/local/kafka') } it { is_expected.to contain_file('/opt/kafka/custom_config') } it { is_expected.to contain_file('/var/log/custom_kafka') } end end end end diff --git a/templates/init.erb b/templates/init.erb index 84459f0..3cdd431 100644 --- a/templates/init.erb +++ b/templates/init.erb @@ -1,153 +1,161 @@ #!/bin/sh # # Init file for Apache Kafka <%= @service_name.split(/-/)[1] and @service_name.split(/-/)[1].capitalize %> # <%- if @osfamily == 'Debian' -%> ### BEGIN INIT INFO -# Provides: <%- @service_name -%> +# Provides: <%= @service_name %> # Required-Start: <%= @service_requires.join(' ') %> # Required-Stop: # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # X-Interactive: true # Short-Description: Apache Kafka is a distributed publish-subscribe messaging system ### END INIT INFO <%- else -%> # chkconfig: 35 85 15 # description: Apache Kafka is a distributed publish-subscribe messaging system # pidfile: /var/run/<%= @service_name -%>.pid <%- end -%> NAME=<%= @service_name %> <% @environment.sort.map do |k,v| -%> <% unless v.to_s.strip.empty? -%> export <%= k %>="<%= v %>" <% end -%> <% end -%> PID_FILE="/var/run/$NAME.pid" -KAFKA_USER=<%= @user %> +KAFKA_USER=<%= @user_name %> <%- case @service_name when 'kafka' -%> PGREP_PATTERN=kafka.Kafka DAEMON="<%= @bin_dir %>/kafka-server-start.sh" DAEMON_OPTS="<%= @config_dir %>/server.properties" <%- when 'kafka-consumer' -%> PGREP_PATTERN=kafka.tools.ConsoleConsumer DAEMON="<%= @bin_dir %>/kafka-console-consumer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-mirror' -%> PGREP_PATTERN=kafka.tools.MirrorMaker DAEMON="<%= @bin_dir %>/kafka-run-class.sh" DAEMON_OPTS="kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --producer.config <%= @config_dir %>/producer.properties <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" <%- when 'kafka-producer' -%> PGREP_PATTERN=kafka.tools.ConsoleProducer DAEMON="<%= @bin_dir %>/kafka-console-producer.sh" DAEMON_OPTS="<% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%>" PRODUCER_INPUT="<%= @input %>" <%- end -%> if [ -f /etc/default/kafka ]; then . /etc/default/kafka fi start() { - ulimit -n 65536 + + <% if @limit_nofile -%> + ulimit -n <%= @limit_nofile %> + <% end -%> + + <% if @limit_core -%> + ulimit -c <%= @limit_core %> + <% end -%> + ulimit -s 10240 - ulimit -c unlimited + if [ -f "$PID_FILE" ]; then PID=`cat "$PID_FILE"` if [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$PID_FILE exists, process is already running" exit 0 fi echo "$PID_FILE exists but the process is not running. Deleting $PID_FILE and re-trying" rm -f -- "$PID_FILE" start return $? fi /bin/su "$KAFKA_USER" -c "KAFKA_JMX_OPTS=\"$KAFKA_JMX_OPTS\" $DAEMON $DAEMON_OPTS<%- if @service_name == 'kafka-producer' -%> $PRODUCER_INPUT<%- end -%> >/dev/null 2>&1 &" sleep 2 PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` if [ -z "$PID" ]; then echo "$NAME could not be started" exit 1 fi echo "$PID" > "$PID_FILE"; echo "$NAME started" return 0 } stop() { if ! [ -f "$PID_FILE" ]; then echo -n "$PID_FILE does not exist" if PID=`ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ; then echo -n ", but process is running" echo "$PID" > "$PID_FILE" else echo -n ", and process is not running" return 1 fi fi PID=`cat $PID_FILE` kill $PID; rm -f -- "$PID_FILE"; # wait until the process is finished RETRIES=0 MAX_RETRIES=10 while [ ! -z `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}'` ]; do sleep 1 RETRIES=$((RETRIES+1)) if [ "$RETRIES" -ge "$MAX_RETRIES" ]; then echo "$NAME service: stop tried $MAX_RETRIES times but process $PID is still running" return 1 fi done echo "$NAME stopped" return 0 } status() { if ! [ -f "$PID_FILE" ]; then echo "$NAME stopped" exit 1 fi PID=`cat "$PID_FILE"` if ! [ `ps -p "$PID" -o pid= || echo 1` -eq `ps ax | grep -i "$PGREP_PATTERN" | grep -v grep | awk '{print $1}' || echo 2` ] ; then echo "$NAME stopped but pid file exists" exit 1 fi echo "$NAME running with pid $PID" exit 0 } case "$1" in status) status ;; start) echo "Starting daemon: $NAME" start ;; stop) echo "Stopping daemon: $NAME" stop ;; restart) echo "Restarting daemon: $NAME" stop sleep 2 start ;; *) echo "Usage: "$1" {status|start|stop|restart}" exit 1 esac exit 0 diff --git a/templates/unit.erb b/templates/unit.erb index cefb16d..b4abcef 100644 --- a/templates/unit.erb +++ b/templates/unit.erb @@ -1,45 +1,45 @@ [Unit] Description=Apache Kafka server (<%= (@service_name.split(/-/)[1] and @service_name.split(/-/)[1].capitalize) or 'broker' -%>) Documentation=http://kafka.apache.org/documentation.html <% if !@service_requires.empty? -%> After=<%= @service_requires.join(' ') %> Wants=<%= @service_requires.join(' ') %> <%- end -%> [Service] -User=<%= @user %> -Group=<%= @group %> +User=<%= @user_name %> +Group=<%= @group_name %> SyslogIdentifier=<%= @service_name %> <% @environment.sort.map do |k,v| -%> <% unless v.to_s.strip.empty? -%> Environment='<%= k %>=<%= v %>' <% end -%> <% end -%> <%- case @service_name when 'kafka' -%> Type=<%= @daemon_start ? 'forking' : 'simple' %> ExecStart=<%= @bin_dir %>/kafka-server-start.sh<%- if @daemon_start -%> -daemon<%- end -%> <%= @config_dir %>/server.properties <%- if @exec_stop -%> ExecStop=<%= @bin_dir %>/kafka-server-stop.sh <%- end -%> <%- if @timeout_stop -%> TimeoutStopSec=<%= @timeout_stop %> <%- end -%> <%- when 'kafka-consumer' -%> Type=simple ExecStart=<%= @bin_dir %>/kafka-console-consumer.sh <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end %> <%- when 'kafka-mirror' -%> Type=simple ExecStart=<%= @bin_dir %>/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config <%= @config_dir %>/consumer.properties --producer.config <%= @config_dir %>/producer.properties <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end %> <%- when 'kafka-producer' -%> Type=simple ExecStart=<%= @bin_dir %>/kafka-console-producer.sh <% @service_config.sort.each do |k,v| -%><% unless v.to_s.strip.empty? -%>--<%= k -%> '<%= v.is_a?(Array) ? v.join(',') : v %>' <% end -%><% end -%> <%= @input %> <%- end -%> <%- if @limit_nofile -%> LimitNOFILE=<%= @limit_nofile %> <%- end -%> <%- if @limit_core -%> LimitCORE=<%= @limit_core %> <%- end -%> [Install] WantedBy=multi-user.target