Page MenuHomeSoftware Heritage

server.pp
No OneTemporary

server.pp

# == Class kafka::server
# Sets up a Kafka Broker Server and ensures that it is running.
#
# == Parameters:
# $enabled - If false, Kafka Broker Server will not be
# started. Default: true.
#
# $log_dir - Directory in which the broker will store its
# received log event data.
# (This is log.dir in server.properties).
# Default: /var/spool/kafka
#
# $jmx_port - Port on which to expose JMX metrics. Default: 9999
#
# $num_partitions - The number of logical event partitions per
# topic per server. Default: 1
#
# $num_network_threads - The number of threads handling network
# requests. Default: 2
#
# $num_io_threads - The number of threads doing disk I/O. Default: 2
#
# $socket_send_buffer_bytes - The byte size of the send buffer (SO_SNDBUF)
# used by the socket server. Default: 1048576
#
# $socket_receive_buffer_bytes - The byte size of receive buffer (SO_RCVBUF)
# used by the socket server. Default: 1048576
#
# $socket_request_max_bytes - The maximum size of a request that the socket
# server will accept. Default: 104857600
#
# $log_flush_interval_messages - The number of messages to accept before
# forcing a flush of data to disk. Default 10000
#
# $log_flush_interval_ms - The maximum amount of time a message can sit
# in a log before we force a flush: Default 1000 (1 second)
#
# $log_retention_hours - The minimum age of a log file to be eligible
# for deletion. Default 1 week
#
# $log_retention_size - A size-based retention policy for logs.
# Default: undef (disabled)
#
# $log_segment_bytes - The maximum size of a log segment file. When
# this size is reached a new log segment will
# be created: Default 536870912 (512MB)
#
# $log_cleanup_interval_mins - The interval at which log segments are checked
# to see if they can be deleted according to the
# retention policies. Default: 1
#
# $log_cleanup_policy - The default policy for handling log tails.
# Can be either delete or dedupe. Default: delete
#
# $metrics_dir - Directory in which to store metrics CSVs. Default: undef (metrics disabled)
#
class kafka::server(
$enabled = true,
$log_dir = $kafka::defaults::log_dir,
$jmx_port = $kafka::defaults::jmx_port,
$num_partitions = $kafka::defaults::num_partitions,
$num_network_threads = $kafka::defaults::num_network_threads,
$num_io_threads = $kafka::defaults::num_io_threads,
$socket_send_buffer_bytes = $kafka::defaults::socket_send_buffer_bytes,
$socket_receive_buffer_bytes = $kafka::defaults::socket_receive_buffer_bytes,
$socket_request_max_bytes = $kafka::defaults::socket_request_max_bytes,
$log_flush_interval_messages = $kafka::defaults::log_flush_interval_messages,
$log_flush_interval_ms = $kafka::defaults::log_flush_interval_ms,
$log_retention_hours = $kafka::defaults::log_retention_hours,
$log_retention_bytes = $kafka::defaults::log_retention_bytes,
$log_segment_bytes = $kafka::defaults::log_segment_bytes,
$log_cleanup_interval_mins = $kafka::defaults::log_cleanup_interval_mins,
$log_cleanup_policy = $kafka::defaults::log_cleanup_policy,
$metrics_dir = $kafka::defaults::metrics_dir,
$server_properties_template = $kafka::defaults::server_properties_template,
$default_template = $kafka::defaults::default_template
) inherits kafka::defaults
{
# kafka class must be included before kafka::servver
Class['kafka'] -> Class['kafka::server']
# define local variables from kafka class for use in ERb template.
$zookeeper_hosts = $kafka::zookeeper_hosts
$zookeeper_connection_timeout_ms = $kafka::zookeeper_connection_timeout_ms
$zookeeper_chroot = $kafka::zookeeper_chroot
# Get this broker's id and port out of the $kafka::hosts configuration hash
$broker_id = $kafka::hosts[$::fqdn]['id']
# Using a conditional assignment selector with a
# Hash value results in a puppet syntax error.
# Using an if/else instead.
if ($kafka::hosts[$::fqdn]['port']) {
$broker_port = $kafka::hosts[$::fqdn]['port']
}
else {
$broker_port = $kafka::defaults::default_broker_port
}
file { '/etc/default/kafka':
content => template($default_template)
}
file { '/etc/kafka/server.properties':
content => template($server_properties_template),
}
file { $log_dir:
ensure => 'directory',
owner => 'kafka',
group => 'kafka',
mode => '0755',
}
# If we are using Kafka Metrics Reporter, ensure
# that the $metrics_dir exists.
if ($metrics_dir and !defined(File[$metrics_dir])) {
file { $metrics_dir:
ensure => 'directory',
owner => 'kafka',
group => 'kafka',
mode => '0755',
}
}
# Start the Kafka server.
# We don't want to subscribe to the config files here.
# It will be better to manually restart Kafka when
# the config files changes.
$kafka_ensure = $enabled ? {
false => 'stopped',
default => 'running',
}
service { 'kafka':
ensure => $kafka_ensure,
require => [
File['/etc/kafka/server.properties'],
File['/etc/default/kafka'],
File[$log_dir]
],
}
}

File Metadata

Mime Type
text/plain
Expires
Sat, Jun 21, 6:21 PM (2 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3418862

Event Timeline