Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9125378
mirror.pp
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
mirror.pp
View Options
# == Define kafka::mirror
# Sets up a Kafka MirrorMaker instance and ensures that it is running.
# You must declare your kafka::mirror::consumers before this class.
#
# NOTE: This does not work without systemd. Make sure you are including
# this define on a node that supports systemd.
#
# TODO: support ensure => 'absent'
#
# == Usage
#
# # Mirror the 'main' and 'secondary' Kafka clusters
# # to the 'aggregate' Kafka cluster.
# kafka::mirror::consumer { 'main':
# mirror_name => 'aggregate',
# zookeeper_url => 'zk:2181/kafka/main',
# }
# kafka::mirror::consumer { 'secondary':
# mirror_name => 'aggregate',
# zookeeper_url => 'zk:2181/kafka/secondary',
# }
#
# kafka::mirror { 'aggregate':
# destination_brokers => ['ka01:9092','ka02:9092']
# ...,
# }
#
# == Parameters
#
# $destination_brokers - Array of Kafka brokers hosts in your
# destination cluster. These brokers
# will be used for bootstrapping the producers
# configs and metadata.
#
# $enabled - If false, kafka mirror-maker service will not
# be started. Default: true.
#
# $whitelist - Java regex matching topics to mirror.
# You must set either this or $topic_blacklist
# Default: '.*'
#
# $blacklist - Java regex matching topics to not mirror.
# Default: undef
# You must set either this or $topic_whitelist
#
# $num_producers - Number of producer threads. Default: 1
#
# $num_streams - Number of consumer threads. Default: 1
#
# $queue_size - Size of intermediate consumer -> producer
# queue. Note that this is different than
# $queue_buffering_max_messages, which is the
# queue size of messages in async producers.
# Default: 10000
#
# $heap_opts - Heap options to pass to JVM on startup.
# Default: undef
#
# $request_required_acks - Required number of acks for a produce request.
# Default: -1 (all replicas)
# $producer_type - sync or async. Default: async
#
# $compression_codec - none, gzip, or snappy. Default: snappy
#
# $batch_num_messages - If async producer, the number of messages
# to batch together in a single produce request.
# Default: 200
#
# queue_buffering_max_ms - Maximum time to buffer data when using async
# mode. For example a setting of 100 will try to
# batch together 100ms of messages to send at
# once. Default: 5000
#
# queue_buffering_max_messages - The maximum number of unsent messages that can
# be queued up the producer when using async
# mode before either the producer must be
# blocked or data must be dropped.
# Default: 10000
#
# queue_enqueue_timeout_ms - The amount of time to block before dropping
# messages when running in async mode and the
# buffer has reached
# queue.buffering.max.messages. If set to 0
# events will be enqueued immediately or dropped
# if the queue is full (the producer send call
# will never block). If set to -1 the producer
# will block indefinitely and never willingly
# drop a send. Default: -1
#
# $jmx_port - Port on which to expose MirrorMaker
# JMX metrics. Default: 9998
#
define
kafka
::
mirror
(
$destination_brokers,
$enabled
=
true
,
$whitelist
=
'.*'
,
$blacklist
=
undef
,
$num_producers
=
1
,
$num_streams
=
1
,
$queue_size
=
10000
,
$heap_opts
=
undef
,
# Producer Settings
$request_required_acks
=
-
1
,
$producer_type
=
'async'
,
$compression_codec
=
'snappy'
,
# Async Producer Settings
$batch_num_messages
=
200
,
$queue_buffering_max_ms
=
5000
,
$queue_buffering_max_messages
=
10000
,
$queue_enqueue_timeout_ms
=
-
1
,
$jmx_port
=
9998
,
$producer_properties_template
=
'kafka/mirror/producer.properties.erb'
,
$systemd_service_template
=
'kafka/mirror/kafka-mirror.systemd.erb'
,
$default_template
=
'kafka/mirror/kafka-mirror.default.erb'
,
$log4j_properties_template
=
'kafka/log4j.properties.erb'
,
)
{
if
(
!
$whitelist
and
!
$blacklist)
or
(
$whitelist
and
$blacklist)
{
fail
(
'Must set only one of $whitelist or $blacklist.'
)
}
include
kafka
::
mirror
::
init
$mirror_name
=
$title
file
{
"/etc/default/kafka-mirror-${mirror_name}"
:
content
=>
template
(
$default_template),
require
=>
Package
[
'kafka-mirror'
],
}
file
{
"/etc/kafka/mirror/${mirror_name}"
:
ensure
=>
'directory'
,
recurse
=>
true
,
purge
=>
true
,
}
# Log to custom log file for this MirrorMaker instance.
$kafka_log_file
=
"/var/log/kafka/kafka-mirror-${mirror_name}.log"
file
{
"/etc/kafka/mirror/${mirror_name}/log4j.properties"
:
content
=>
template
(
$log4j_properties_template),
}
file
{
"/etc/kafka/mirror/${mirror_name}/producer.properties"
:
content
=>
template
(
$producer_properties_template),
require
=>
Package
[
'kafka-mirror'
],
}
# Realize all consumer properties files for this MirrorMaker instance.
# --consumer.configs will be passed a wildcard matching all of the
# files in /etc/kafka/mirror/$mirror_name/consumer*.properties
File
<|
tag
==
"kafka-mirror-${mirror_name}-consumer"
|>
# Render a systemd service unit file
file
{
"/lib/systemd/system/kafka-mirror-${mirror_name}.service"
:
content
=>
template
(
$systemd_service_template),
require
=>
Package
[
'kafka-mirror'
],
}
# systemd needs a reload to pick up changes to this file.
exec
{
"systemd-reload-for-kafka-mirror-${mirror_name}"
:
command
=>
'/bin/systemctl daemon-reload'
,
refreshonly
=>
true
,
subscribe
=>
File
[
"/lib/systemd/system/kafka-mirror-${mirror_name}.service"
],
}
# Start the Kafka MirrorMaker daemon.
# We don't want to subscribe to the config files here.
# It will be better to manually restart Kafka MirrorMaker
# when the config files changes.
$service_ensure
=
$enabled
?
{
false
=>
'stopped'
,
default
=>
'running'
,
}
service
{
"kafka-mirror-${mirror_name}"
:
ensure
=>
$service_ensure,
require
=>
[
File
[
"/etc/kafka/mirror/${mirror_name}/producer.properties"
],
Exec
[
"systemd-reload-for-kafka-mirror-${mirror_name}"
],
],
hasrestart
=>
true
,
hasstatus
=>
true
,
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Jun 21, 8:38 PM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3245373
Attached To
rSPKFK Puppet Kafka module
Event Timeline
Log In to Comment