public class ConsumerConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
* THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/
/**
* <code>group.id</code>
*/
public static final String GROUP_ID_CONFIG = "group.id";
private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
/**
* <code>session.timeout.ms</code>
*/
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
/**
* <code>bootstrap.servers</code>
*/
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/**
* <code>enable.auto.commit</code>
*/
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
/**
* <code>auto.commit.interval.ms</code>
*/
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
/**
* <code>partition.assignment.strategy</code>
*/
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used";
/**
* <code>auto.offset.reset</code>
*/
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>smallest: automatically reset the offset to the smallest offset<li>largest: automatically reset the offset to the largest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";
/**
* <code>fetch.min.bytes</code>
*/
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
/**
* <code>fetch.max.wait.ms</code>
*/
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
/** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
/**
* <code>max.partition.fetch.bytes</code>
*/
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
/** <code>send.buffer.bytes</code> */
public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
/** <code>receive.buffer.bytes</code> */
public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
/**
* <code>client.id</code>
*/
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
/**
* <code>reconnect.backoff.ms</code>
*/
public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
/**
* <code>retry.backoff.ms</code>
*/
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
/**
* <code>metrics.sample.window.ms</code>
*/
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
/**
* <code>metrics.num.samples</code>
*/
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
/**
* <code>metric.reporters</code>
*/
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
/**
* <code>rebalance.callback.class</code>
*/
public static final String CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG = "rebalance.callback.class";
private static final String CONSUMER_REBALANCE_CALLBACK_CLASS_DOC = "A user-provided callback to execute when partition assignments change.";
/**
* <code>check.crcs</code>
*/
public static final String CHECK_CRCS_CONFIG = "check.crcs";
private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
/** <code>key.deserializer</code> */
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
/** <code>value.deserializer</code> */
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
/** <code>connections.max.idle.ms</code> */
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+ case _ => throw new InvalidConfigException("Wrong value " + strategy + " of partition.assignment.strategy in consumer config; " +
+ "Valid values are 'range' and 'roundrobin'")
+ }
+ }
}
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
import ConsumerConfig._
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
/** a string that uniquely identifies a set of consumers within the same consumer group */
val groupId = props.getString("group.id")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(props.getString("consumer.id", null))
/** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** the number threads used to fetch data */
val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/
val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)
/** max number of retries during rebalance */
val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */
val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
/** backoff time to refresh the leader of a partition after it loses the current leader */
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
/** backoff time to reconnect the offsets channel or to retry offset fetches/commits */
val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs)
/** socket timeout to use when reading responses for Offset Fetch/Commit requests. This timeout will also be used for
* the ConsumerMetdata requests that are used to query for the offset coordinator. */
val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs)
/** Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during
* shut-down. It does not apply to commits from the auto-commit thread. It also does not apply to attempts to query
* for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason,
* it is retried and that retry does not count toward this limit. */
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
/** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
/** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
* is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
* given consumer group, it is safe to turn this off after all instances within that group have been migrated to
* the new jar that commits offsets to the broker (instead of directly to ZooKeeper). */
val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
val clientId = props.getString("client.id", groupId)
/** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
/** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)