Page MenuHomeSoftware Heritage

mirror.pp
No OneTemporary

mirror.pp

# == Define kafka::mirror
# Sets up a Kafka MirrorMaker instance and ensures that it is running.
# You must declare your kafka::mirror::consumers before this class.
#
# NOTE: This does not work without systemd. Make sure you are including
# this define on a node that supports systemd.
#
# TODO: support ensure => 'absent'
#
# == Usage
#
# # Mirror the 'main' and 'secondary' Kafka clusters
# # to the 'aggregate' Kafka cluster.
# kafka::mirror::consumer { 'main':
# mirror_name => 'aggregate',
# zookeeper_url => 'zk:2181/kafka/main',
# }
# kafka::mirror::consumer { 'secondary':
# mirror_name => 'aggregate',
# zookeeper_url => 'zk:2181/kafka/secondary',
# }
#
# kafka::mirror { 'aggregate':
# destination_brokers => ['ka01:9092','ka02:9092']
# ...,
# }
#
# == Parameters
#
# $destination_brokers - Array of Kafka brokers hosts in your
# destination cluster. These brokers
# will be used for bootstrapping the producers
# configs and metadata.
#
# $enabled - If false, kafka mirror-maker service will not
# be started. Default: true.
#
# $whitelist - Java regex matching topics to mirror.
# You must set either this or $topic_blacklist
# Default: '.*'
#
# $blacklist - Java regex matching topics to not mirror.
# Default: undef
# You must set either this or $topic_whitelist
#
# $num_producers - Number of producer threads. Default: 1
#
# $num_streams - Number of consumer threads. Default: 1
#
# $queue_size - Size of intermediate consumer -> producer
# queue. Note that this is different than
# $queue_buffering_max_messages, which is the
# queue size of messages in async producers.
# Default: 10000
#
# $heap_opts - Heap options to pass to JVM on startup.
# Default: undef
#
# $request_required_acks - Required number of acks for a produce request.
# Default: -1 (all replicas)
# $producer_type - sync or async. Default: async
#
# $compression_codec - none, gzip, or snappy. Default: snappy
#
# $batch_num_messages - If async producer, the number of messages
# to batch together in a single produce request.
# Default: 200
#
# queue_buffering_max_ms - Maximum time to buffer data when using async
# mode. For example a setting of 100 will try to
# batch together 100ms of messages to send at
# once. Default: 5000
#
# queue_buffering_max_messages - The maximum number of unsent messages that can
# be queued up the producer when using async
# mode before either the producer must be
# blocked or data must be dropped.
# Default: 10000
#
# queue_enqueue_timeout_ms - The amount of time to block before dropping
# messages when running in async mode and the
# buffer has reached
# queue.buffering.max.messages. If set to 0
# events will be enqueued immediately or dropped
# if the queue is full (the producer send call
# will never block). If set to -1 the producer
# will block indefinitely and never willingly
# drop a send. Default: -1
#
# $jmx_port - Port on which to expose MirrorMaker
# JMX metrics. Default: 9998
#
define kafka::mirror(
$destination_brokers,
$enabled = true,
$whitelist = '.*',
$blacklist = undef,
$num_producers = 1,
$num_streams = 1,
$queue_size = 10000,
$heap_opts = undef,
# Producer Settings
$request_required_acks = -1,
$producer_type = 'async',
$compression_codec = 'snappy',
# Async Producer Settings
$batch_num_messages = 200,
$queue_buffering_max_ms = 5000,
$queue_buffering_max_messages = 10000,
$queue_enqueue_timeout_ms = -1,
$jmx_port = 9998,
$producer_properties_template = 'kafka/mirror/producer.properties.erb',
$systemd_service_template = 'kafka/mirror/kafka-mirror.systemd.erb',
$default_template = 'kafka/mirror/kafka-mirror.default.erb',
$log4j_properties_template = 'kafka/log4j.properties.erb',
)
{
if (!$whitelist and !$blacklist) or ($whitelist and $blacklist) {
fail('Must set only one of $whitelist or $blacklist.')
}
include kafka::mirror::init
$mirror_name = $title
file { "/etc/default/kafka-mirror-${mirror_name}":
content => template($default_template),
require => Package['kafka-mirror'],
}
file { "/etc/kafka/mirror/${mirror_name}":
ensure => 'directory',
recurse => true,
purge => true,
}
# Log to custom log file for this MirrorMaker instance.
$kafka_log_file = "/var/log/kafka/kafka-mirror-${mirror_name}.log"
file { "/etc/kafka/mirror/${mirror_name}/log4j.properties":
content => template($log4j_properties_template),
}
file { "/etc/kafka/mirror/${mirror_name}/producer.properties":
content => template($producer_properties_template),
require => Package['kafka-mirror'],
}
# Realize all consumer properties files for this MirrorMaker instance.
# --consumer.configs will be passed a wildcard matching all of the
# files in /etc/kafka/mirror/$mirror_name/consumer*.properties
File <| tag == "kafka-mirror-${mirror_name}-consumer" |>
# Render a systemd service unit file
file { "/lib/systemd/system/kafka-mirror-${mirror_name}.service":
content => template($systemd_service_template),
require => Package['kafka-mirror'],
}
# systemd needs a reload to pick up changes to this file.
exec { "systemd-reload-for-kafka-mirror-${mirror_name}":
command => '/bin/systemctl daemon-reload',
refreshonly => true,
subscribe => File["/lib/systemd/system/kafka-mirror-${mirror_name}.service"],
}
# Start the Kafka MirrorMaker daemon.
# We don't want to subscribe to the config files here.
# It will be better to manually restart Kafka MirrorMaker
# when the config files changes.
$service_ensure = $enabled ? {
false => 'stopped',
default => 'running',
}
service { "kafka-mirror-${mirror_name}":
ensure => $service_ensure,
require => [
File["/etc/kafka/mirror/${mirror_name}/producer.properties"],
Exec["systemd-reload-for-kafka-mirror-${mirror_name}"],
],
hasrestart => true,
hasstatus => true,
}
}

File Metadata

Mime Type
text/plain
Expires
Sat, Jun 21, 8:38 PM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3245373

Event Timeline