Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123914
server.pp
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
server.pp
View Options
# == Class kafka::server
# Sets up a Kafka Broker Server and ensures that it is running.
#
# == Parameters:
# $enabled - If false, Kafka Broker Server will not be
# started. Default: true.
#
# $log_dir - Directory in which the broker will store its
# received log event data.
# (This is log.dir in server.properties).
# Default: /var/spool/kafka
#
# $jmx_port - Port on which to expose JMX metrics. Default: 9999
#
# $num_partitions - The number of logical event partitions per
# topic per server. Default: 1
#
# $num_network_threads - The number of threads handling network
# requests. Default: 2
#
# $num_io_threads - The number of threads doing disk I/O. Default: 2
#
# $socket_send_buffer_bytes - The byte size of the send buffer (SO_SNDBUF)
# used by the socket server. Default: 1048576
#
# $socket_receive_buffer_bytes - The byte size of receive buffer (SO_RCVBUF)
# used by the socket server. Default: 1048576
#
# $socket_request_max_bytes - The maximum size of a request that the socket
# server will accept. Default: 104857600
#
# $log_flush_interval_messages - The number of messages to accept before
# forcing a flush of data to disk. Default 10000
#
# $log_flush_interval_ms - The maximum amount of time a message can sit
# in a log before we force a flush: Default 1000 (1 second)
#
# $log_retention_hours - The minimum age of a log file to be eligible
# for deletion. Default 1 week
#
# $log_retention_size - A size-based retention policy for logs.
# Default: undef (disabled)
#
# $log_segment_bytes - The maximum size of a log segment file. When
# this size is reached a new log segment will
# be created: Default 536870912 (512MB)
#
# $log_cleanup_interval_mins - The interval at which log segments are checked
# to see if they can be deleted according to the
# retention policies. Default: 1
#
# $log_cleanup_policy - The default policy for handling log tails.
# Can be either delete or dedupe. Default: delete
#
# $metrics_dir - Directory in which to store metrics CSVs. Default: undef (metrics disabled)
#
class
kafka
::
server
(
$enabled
=
true
,
$log_dir
=
$kafka::defaults::log_dir,
$jmx_port
=
$kafka::defaults::jmx_port,
$num_partitions
=
$kafka::defaults::num_partitions,
$num_network_threads
=
$kafka::defaults::num_network_threads,
$num_io_threads
=
$kafka::defaults::num_io_threads,
$socket_send_buffer_bytes
=
$kafka::defaults::socket_send_buffer_bytes,
$socket_receive_buffer_bytes
=
$kafka::defaults::socket_receive_buffer_bytes,
$socket_request_max_bytes
=
$kafka::defaults::socket_request_max_bytes,
$log_flush_interval_messages
=
$kafka::defaults::log_flush_interval_messages,
$log_flush_interval_ms
=
$kafka::defaults::log_flush_interval_ms,
$log_retention_hours
=
$kafka::defaults::log_retention_hours,
$log_retention_bytes
=
$kafka::defaults::log_retention_bytes,
$log_segment_bytes
=
$kafka::defaults::log_segment_bytes,
$log_cleanup_interval_mins
=
$kafka::defaults::log_cleanup_interval_mins,
$log_cleanup_policy
=
$kafka::defaults::log_cleanup_policy,
$metrics_dir
=
$kafka::defaults::metrics_dir,
$server_properties_template
=
$kafka::defaults::server_properties_template,
$default_template
=
$kafka::defaults::default_template
)
inherits
kafka
::
defaults
{
# kafka class must be included before kafka::servver
Class
[
'kafka'
]
->
Class
[
'kafka::server'
]
# define local variables from kafka class for use in ERb template.
$zookeeper_hosts
=
$kafka::zookeeper_hosts
$zookeeper_connection_timeout_ms
=
$kafka::zookeeper_connection_timeout_ms
$zookeeper_chroot
=
$kafka::zookeeper_chroot
# Get this broker's id and port out of the $kafka::hosts configuration hash
$broker_id
=
$kafka::hosts[$::fqdn]
[
'id'
]
# Using a conditional assignment selector with a
# Hash value results in a puppet syntax error.
# Using an if/else instead.
if
(
$kafka::hosts[$::fqdn]
[
'port'
])
{
$broker_port
=
$kafka::hosts[$::fqdn]
[
'port'
]
}
else
{
$broker_port
=
$kafka::defaults::default_broker_port
}
file
{
'/etc/default/kafka'
:
content
=>
template
(
$default_template)
}
file
{
'/etc/kafka/server.properties'
:
content
=>
template
(
$server_properties_template),
}
file
{
$log_dir:
ensure
=>
'directory'
,
owner
=>
'kafka'
,
group
=>
'kafka'
,
mode
=>
'0755'
,
}
# If we are using Kafka Metrics Reporter, ensure
# that the $metrics_dir exists.
if
(
$metrics_dir
and
!
defined
(
File
[
$metrics_dir]))
{
file
{
$metrics_dir:
ensure
=>
'directory'
,
owner
=>
'kafka'
,
group
=>
'kafka'
,
mode
=>
'0755'
,
}
}
# Start the Kafka server.
# We don't want to subscribe to the config files here.
# It will be better to manually restart Kafka when
# the config files changes.
$kafka_ensure
=
$enabled
?
{
false
=>
'stopped'
,
default
=>
'running'
,
}
service
{
'kafka'
:
ensure
=>
$kafka_ensure,
require
=>
[
File
[
'/etc/kafka/server.properties'
],
File
[
'/etc/default/kafka'
],
File
[
$log_dir]
],
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Jun 21, 6:21 PM (2 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3418862
Attached To
rSPKFK Puppet Kafka module
Event Timeline
Log In to Comment