# Note: This file is managed by Puppet. <% # @hosts is a hash of the form # { # 'hostA' => { 'id' => 1, 'port' => 12345 }, # 'hostB' => { 'id' => 2 } # } # 'port' is not required to be specified in the hash, # so we fetch it with a default of 9092 # # Sort hosts and loop through the resulting array # to get a list of ['hostname1:port1', 'hostname2:port2', ...] # for the broker.list setting. broker_list=[] @brokers.sort.each do |broker_config| hostname = broker_config[0] port = broker_config[1].fetch('port', 9092).to_s broker_list += ["#{hostname}:#{port}"] end -%> # # see kafka.producer.ProducerConfig for more details # ############################# Producer Basics ############################# # list of brokers used for bootstrapping # format: host1:port1,host2:port2 ... broker.list=<%= broker_list.join(',') %> # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=<%= @producer_type %> # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specify the compression codec for all data generated: none , gzip, snappy. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none # message encoder serializer.class=kafka.serializer.StringEncoder # allow topic level compression #compressed.topics= ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue #queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer #queue.buffering.max.messages= # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full #queue.enqueue.timeout.ms= # the number of messages batched at the producer batch.num.messages=<%= @producer_batch_num_messages %>