Page MenuHomeSoftware Heritage

server.pp
No OneTemporary

server.pp

# == Class kafka::server
# Sets up a Kafka Broker and ensures that it is running.
#
# == Parameters:
# $enabled - If false, Kafka Broker Server will not be
# started. Default: true.
#
# $brokers - Hash of Kafka Broker configs keyed by
# fqdn of each kafka broker node. This Hash
# should be of the form:
# { 'hostA' => { 'id' => 1, 'port' => 12345 }, 'hostB' => { 'id' => 2 }, ... }
# 'port' is optional, and will default to 9092.
#
# $log_dirs - Array of directories in which the broker will store its
# received log event data.
# (This is log.dir in server.properties).
# Default: [/var/spool/kafka]
#
# $zookeeper_hosts - Array of zookeeper hostname/IP(:port)s.
# Default: ['localhost:2181]
#
# $zookeeper_connection_timeout_ms - Timeout in ms for connecting to Zookeeper.
# Default: 6000
#
# $zookeeper_session_timeout_ms - Timeout in ms for session to Zookeeper.
# Default: 6000
#
# $zookeeper_chroot - Path in zookeeper in which to keep Kafka data.
# Default: undef (the root znode). Note that if you set
# this paramater, the znode will not be created for you.
# You must do so manually yourself. See the README
# for instructions on how to do so.
#
# $java_home - Value for the JAVA_HOME environment variable. Default: undef
#
# $java_opts - Extra Java options. Default: undef
#
# $classpath - Extra classpath entries. Default: undef
#
# $jmx_port - Port on which to expose JMX metrics. Default: 9999
#
# $heap_opts - Heap options to pass to JVM on startup. Default: undef
#
# $nofiles_ulimit - If set, the kafka user's ulimit will be set to this, and the
# kafka server will set this via ulimit -n. You
# will probably need to reboot for this to take effect.
#
# $auto_create_topics_enable - If autocreation of topics is allowed. Default: false
#
# $num_partitions - The default number of partitions per topic.
# Default: size($log_dirs)
#
# $default_replication_factor - The default replication factor for automatically created topics.
# Default: size(keys($brokers))
#
# $replica_lag_time_max_ms - If a follower hasn't sent any fetch requests for this window
# of time, the leader will remove the follower from ISR.
# Default: undef
#
# $replica_lag_max_messages - If a replica falls more than this many messages behind the leader,
# the leader will remove the follower from ISR.
# Default: undef
#
# $replica_socket_timeout_ms - The socket timeout for network requests to the leader for
# replicating data. Default: undef
#
# $replica_socket_receive_buffer_bytes - The socket receive buffer for network requests to the leader
# for replicating data. Default: undef
#
# $num_replica_fetchers - Number of threads used to replicate messages from leaders.
# Default: 1
#
# $replica_fetch_max_bytes - The number of bytes of messages to attempt to fetch for each
# partition in the fetch requests the replicas send to the leader.
# Default: 1024 * 1024
#
# $num_network_threads - The number of threads handling network
# requests. Default: 3
#
# $num_io_threads - The number of threads doing disk I/O. Default: size($log_dirs)
#
# $socket_send_buffer_bytes - The byte size of the send buffer (SO_SNDBUF)
# used by the socket server. Default: 1048576
#
# $socket_receive_buffer_bytes - The byte size of receive buffer (SO_RCVBUF)
# used by the socket server. Default: 1048576
#
# $socket_request_max_bytes - The maximum size of a request that the socket
# server will accept. Default: 104857600
#
# $log_flush_interval_messages - The number of messages to accept before
# forcing a flush of data to disk. Default 10000
#
# $log_flush_interval_ms - The maximum amount of time a message can sit
# in a log before we force a flush: Default 1000 (1 second)
#
# $log_retention_hours - The minimum age of a log file to be eligible
# for deletion. Default 1 week
#
# $log_retention_size - A size-based retention policy for logs.
# Default: undef (disabled)
#
# $log_segment_bytes - The maximum size of a log segment file. When
# this size is reached a new log segment will
# be created: Default 536870912 (512MB)
#
# $log_cleanup_interval_mins - The interval at which log segments are checked
# to see if they can be deleted according to the
# retention policies. Default: 1
#
# $log_cleanup_policy - The default policy for handling log tails.
# Can be either delete or dedupe. Default: delete
#
# $metrics_properties - Config hash of Kafka metrics property key => value pairs.
# Use this for configuring your own metrics reporter classes.
# Default: undef
#
# $kafka_log_file - File in which to write Kafka logs (not event message data).
# Default: /var/log/kafka/kafka.log
#
# $jvm_performance_opts - Value to use for KAFKA_JVM_PERFORMANCE_OPTS in /etc/default/kafka.
# This controls GC settings. Default: undef.
#
class kafka::server(
$enabled = true,
$brokers = $kafka::defaults::brokers,
$log_dirs = $kafka::defaults::log_dirs,
$zookeeper_hosts = $kafka::defaults::zookeeper_hosts,
$zookeeper_connection_timeout_ms = $kafka::defaults::zookeeper_connection_timeout_ms,
$zookeeper_session_timeout_ms = $kafka::defaults::zookeeper_session_timeout_ms,
$zookeeper_chroot = $kafka::defaults::zookeeper_chroot,
$java_home = $kafka::defaults::java_home,
$java_opts = $kafka::defaults::java_opts,
$classpath = $kafka::defaults::classpath,
$jmx_port = $kafka::defaults::jmx_port,
$heap_opts = $kafka::defaults::heap_opts,
$nofiles_ulimit = $kafka::defaults::nofiles_ulimit,
$auto_create_topics_enable = $kafka::defaults::auto_create_topics_enable,
$num_partitions = size($log_dirs),
$default_replication_factor = size(keys($brokers)),
$replica_lag_time_max_ms = $kafka::defaults::replica_lag_time_max_ms,
$replica_lag_max_messages = $kafka::defaults::replica_lag_max_messages,
$replica_socket_timeout_ms = $kafka::defaults::replica_socket_timeout_ms,
$replica_socket_receive_buffer_bytes = $kafka::defaults::replica_socket_receive_buffer_bytes,
$num_replica_fetchers = $kafka::defaults::num_replica_fetchers,
$replica_fetch_max_bytes = $kafka::defaults::replica_fetch_max_bytes,
$num_network_threads = $kafka::defaults::num_network_threads,
$num_io_threads = size($log_dirs),
$socket_send_buffer_bytes = $kafka::defaults::socket_send_buffer_bytes,
$socket_receive_buffer_bytes = $kafka::defaults::socket_receive_buffer_bytes,
$socket_request_max_bytes = $kafka::defaults::socket_request_max_bytes,
$log_flush_interval_messages = $kafka::defaults::log_flush_interval_messages,
$log_flush_interval_ms = $kafka::defaults::log_flush_interval_ms,
$log_retention_hours = $kafka::defaults::log_retention_hours,
$log_retention_bytes = $kafka::defaults::log_retention_bytes,
$log_segment_bytes = $kafka::defaults::log_segment_bytes,
$log_cleanup_interval_mins = $kafka::defaults::log_cleanup_interval_mins,
$log_cleanup_policy = $kafka::defaults::log_cleanup_policy,
$metrics_properties = $kafka::defaults::metrics_properties,
$kafka_log_file = $kafka::defaults::kafka_log_file,
$jvm_performance_opts = $kafka::defaults::jvm_performance_opts,
$server_properties_template = $kafka::defaults::server_properties_template,
$default_template = $kafka::defaults::server_default_template,
$log4j_properties_template = $kafka::defaults::log4j_properties_template
) inherits kafka::defaults
{
# Kafka class must be included before kafka::server.
# Using 'require' here rather than an explicit class dependency
# so that this class can be used without having to manually
# include the base kafka class. This is for elegance only.
# You'd only need to manually include the base kafka class if
# you need to explicitly set the version of the Kafka package
# you want installed.
require ::kafka
package { 'kafka-server':
ensure => $::kafka::version
}
# Get this broker's id and port out of the $kafka::hosts configuration hash
$broker_id = $brokers[$::fqdn]['id']
# Using a conditional assignment selector with a
# Hash value results in a puppet syntax error.
# Using an if/else instead.
if ($brokers[$::fqdn]['port']) {
$broker_port = $brokers[$::fqdn]['port']
}
else {
$broker_port = $kafka::defaults::default_broker_port
}
# Render out Kafka Broker config files.
file { '/etc/default/kafka':
content => template($default_template),
require => Package['kafka-server'],
}
file { '/etc/kafka/server.properties':
content => template($server_properties_template),
require => Package['kafka-server'],
}
# This is the message data directory,
# not to be confused with the $kafka_log_file,
# which contains daemon process logs.
file { $log_dirs:
ensure => 'directory',
owner => 'kafka',
group => 'kafka',
mode => '0755',
require => Package['kafka-server'],
}
# log4j configuration for Kafka daemon
# process logs (this uses $kafka_log_dir).
file { '/etc/kafka/log4j.properties':
content => template($log4j_properties_template),
require => Package['kafka-server'],
}
# Start the Kafka server.
# We don't want to subscribe to the config files here.
# It will be better to manually restart Kafka when
# the config files changes.
$kafka_ensure = $enabled ? {
false => 'stopped',
default => 'running',
}
service { 'kafka':
ensure => 'running',
require => [
File['/etc/kafka/server.properties'],
File['/etc/kafka/log4j.properties'],
File['/etc/default/kafka'],
File[$log_dirs],
],
hasrestart => true,
hasstatus => true,
}
}

File Metadata

Mime Type
text/plain
Expires
Sat, Jun 21, 6:39 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408399

Event Timeline