Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9344880
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
59 KB
Subscribers
None
View Options
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7851644c..c57bba08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1,1173 +1,1173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Coordinator;
import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.min;
/**
* A Kafka client that consumes records from a Kafka cluster.
* <p>
* It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
* data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
* consumers to load balance consumption using consumer groups (as described below).
* <p>
* The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
* Failure to close the consumer after use will leak these connections.
* <p>
* The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
*
* <h3>Offsets and Consumer Position</h3>
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
* a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
* which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There
* are actually two notions of position relevant to the user of the consumer.
* <p>
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
* out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
* every time the consumer receives data calls {@link #poll(long)} and receives messages.
* <p>
* The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the
* process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
* offsets periodically, or it can choose to control this committed position manually by calling
* {@link #commit(CommitType) commit}.
* <p>
* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
* detail below.
*
* <h3>Consumer Groups</h3>
*
* Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
* processing records. These processes can either be running on the same machine or, as is more likely, they can be
* distributed over many machines to provide additional scalability and fault tolerance for processing.
* <p>
* Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
* subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
* over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
* processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
* process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new
* process joins the group, partitions will be moved from existing consumers to this new process.
* <p>
* So if two processes subscribe to a topic both specifying different groups they will each get all the records in that
* topic; if they both specify the same group they will each get about half the records.
* <p>
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
* multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
* given topic without duplicating data (additional consumers are actually quite cheap).
* <p>
* This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to
* a queue in a traditional messaging system all processes would be part of a single consumer group and hence record
* delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can
* have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
* have it's own consumer group, so each process would subscribe to all the records published to the topic.
* <p>
* In addition, when offsets are committed they are always committed for a given consumer group.
* <p>
* It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
* partition balancing.
*
* <h3>Usage Examples</h3>
* The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
* demonstrate how to use them.
*
* <h4>Simple Processing</h4>
* This example demonstrates the simplest usage of Kafka's consumer api.
*
* <pre>
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
* props.put("group.id", "test");
* props.put("enable.auto.commit", "true");
* props.put("auto.commit.interval.ms", "1000");
* props.put("session.timeout.ms", "30000");
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
* consumer.subscribe("foo", "bar");
* while (true) {
* ConsumerRecords<String, String> records = consumer.poll(100);
* for (ConsumerRecord<String, String> record : records)
* System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
* }
* </pre>
*
* Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
* the config <code>auto.commit.interval.ms</code>.
* <p>
* The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
* configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the
* cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
* case there are servers down when the client is connecting).
* <p>
* In this example the client is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
* called <i>test</i> as described above.
* <p>
* The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
* consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
* the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
* to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
* considered dead and it's partitions will be assigned to another process.
* <p>
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
* are saying that our record's key and value will just be simple strings.
*
* <h4>Controlling When Messages Are Considered Consumed</h4>
*
* In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
* batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
* would be considered consumed after they were given out by the consumer, and it would be possible that our process
* could fail after we have read messages into our in-memory buffer but before they had been inserted into the database.
* To avoid this we will manually commit the offsets only once the corresponding messages have been inserted into the
* database. This gives us exact control of when a message is considered consumed. This raises the opposite possibility:
* the process could fail in the interval after the insert into the database but before the commit (even though this
* would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption
* would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
* Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
* time but in failure cases could be duplicated.
*
* <pre>
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
* props.put("group.id", "test");
* props.put("enable.auto.commit", "false");
* props.put("auto.commit.interval.ms", "1000");
* props.put("session.timeout.ms", "30000");
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
* consumer.subscribe("foo", "bar");
* int commitInterval = 200;
* List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
* while (true) {
* ConsumerRecords<String, String> records = consumer.poll(100);
* for (ConsumerRecord<String, String> record : records) {
* buffer.add(record);
* if (buffer.size() >= commitInterval) {
* insertIntoDb(buffer);
* consumer.commit(CommitType.SYNC);
* buffer.clear();
* }
* }
* }
* </pre>
*
* <h4>Subscribing To Specific Partitions</h4>
*
* In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
* a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
* instances of our program can divided up the work of processing records.
* <p>
* In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt
* will be made to rebalance partitions to other instances.
* <p>
* There are several cases where this makes sense:
* <ul>
* <li>The first case is if the process is maintaining some kind of local state associated with that partition (like a
* local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk.
* <li>Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a
* cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
* this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process
* will be restarted on another machine.
* </ul>
* <p>
* This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
* partitions:
*
* <pre>
* String topic = "foo";
* TopicPartition partition0 = new TopicPartition(topic, 0);
* TopicPartition partition1 = new TopicPartition(topic, 1);
* consumer.subscribe(partition0);
* consumer.subscribe(partition1);
* </pre>
*
* The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
* be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
* <p>
* It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
* balancing) using the same consumer instance.
*
* <h4>Managing Your Own Offsets</h4>
*
* The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
* choosing. The primary use case for this is allowing the application to store both the offset and the results of the
* consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
* possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
* stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.
* <p>
* Here are a couple of examples of this type of usage:
* <ul>
* <li>If the results of the consumption are being stored in a relational database, storing the offset in the database
* as well can allow committing both the results and offset in a single transaction. Thus either the transaction will
* succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset
* won't be updated.
* <li>If the results are being stored in a local store it may be possible to store the offset there as well. For
* example a search index could be built by subscribing to a particular partition and storing both the offset and the
* indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even
* if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well.
* This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
* from what it has ensuring that no updates are lost.
* </ul>
*
* Each record comes with it's own offset, so to manage your own offset you just need to do the following:
* <ol>
* <li>Configure <code>enable.auto.commit=false</code>
* <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
* <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
* </ol>
*
* This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
* search index use case described above). If the partition assignment is done automatically special care will also be
* needed to handle the case where partition assignments change. This can be handled using a special callback specified
* using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
* {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its
* offset for those partitions by implementing
* {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
* consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
* to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}.
* <p>
* Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
* partitions that are moved elsewhere.
*
* <h4>Controlling The Consumer's Position</h4>
*
* In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
* position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
* moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
* the most recent records without actually consuming the intermediate records.
* <p>
* There are several instances where manually controlling the consumer's position can be useful.
* <p>
* One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not
* attempt to catch up processing all records, but rather just skip to the most recent records.
* <p>
* Another use case is for a system that maintains local state as described in the previous section. In such a system
* the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
* if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
* reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
*
* Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
* methods for seeking to the earliest and latest offset the server maintains are also available (
* {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
*
*
* <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
*
* The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
* making the call. It is the responsibility of the user to ensure that multi-threaded access
* is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
*
* <p>
* The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to
* interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread
* blocking on the operation. This can be used to shutdown the consumer from another thread. The following
* snippet shows the typical pattern:
*
* <pre>
* public class KafkaConsumerRunner implements Runnable {
* private final AtomicBoolean closed = new AtomicBoolean(false);
* private final KafkaConsumer consumer;
*
* public void run() {
* try {
* consumer.subscribe("topic");
* while (!closed.get()) {
* ConsumerRecords records = consumer.poll(10000);
* // Handle new records
* }
* } catch (ConsumerWakeupException e) {
* // Ignore exception if closing
* if (!closed.get()) throw e;
* } finally {
* consumer.close();
* }
* }
*
* // Shutdown hook which can be called from a separate thread
* public void shutdown() {
* closed.set(true);
* consumer.wakeup();
* }
* }
* </pre>
*
* Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
*
* <pre>
* closed.set(true);
* consumer.wakeup();
* </pre>
*
* <p>
* We have intentionally avoided implementing a particular threading model for processing. This leaves several
* options for implementing multi-threaded processing of records.
*
*
* <h4>1. One Consumer Per Thread</h4>
*
* A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
* <ul>
* <li><b>PRO</b>: It is the easiest to implement
* <li><b>PRO</b>: It is often the fastest as no inter-thread co-ordination is needed
* <li><b>PRO</b>: It makes in-order processing on a per-partition basis very easy to implement (each thread just
* processes messages in the order it receives them).
* <li><b>CON</b>: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles
* connections very efficiently so this is generally a small cost.
* <li><b>CON</b>: Multiple consumers means more requests being sent to the server and slightly less batching of data
* which can cause some drop in I/O throughput.
* <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
* </ul>
*
* <h4>2. Decouple Consumption and Processing</h4>
*
* Another alternative is to have one or more consumer threads that do all data consumption and hands off
* {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
* the record processing.
*
* This option likewise has pros and cons:
* <ul>
* <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
* possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
* <li><b>CON</b>: Guaranteeing order across the processors requires particular care as the threads will execute
* independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of
* thread execution timing. For processing that has no ordering requirements this is not a problem.
* <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
* that processing is complete for that partition.
* </ul>
*
* There are many possible variations on this approach. For example each processor thread can have it's own queue, and
* the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
* commit.
*
*/
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Coordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Fetcher<K, V> fetcher;
private final Time time;
private final ConsumerNetworkClient client;
private final Metrics metrics;
private final SubscriptionState subscriptions;
private final Metadata metadata;
private final long retryBackoffMs;
private final boolean autoCommit;
private final long autoCommitIntervalMs;
private boolean closed = false;
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// refcount is used to allow reentrant access by the thread who has acquired currentThread
private final AtomicInteger refcount = new AtomicInteger(0);
// TODO: This timeout controls how long we should wait before retrying a request. We should be able
// to leverage the work of KAFKA-2120 to get this value from configuration.
private long requestTimeoutMs = 5000L;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be
* either strings or objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
*
* @param configs The consumer configs
*/
public KafkaConsumer(Map<String, Object> configs) {
this(configs, null, null, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, a
* {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
*
* @param configs The consumer configs
* @param callback A callback interface that the user can implement to manage customized offsets on the start and
* end of every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Map<String, Object> configs,
ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
callback,
keyDeserializer,
valueDeserializer);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. Valid
* configuration strings are documented at {@link ConsumerConfig} A consumer is instantiated by providing a
* {@link java.util.Properties} object as configuration. Valid configuration strings are documented at
* {@link ConsumerConfig}
*/
public KafkaConsumer(Properties properties) {
this(properties, null, null, null);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
* {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
*
* @param properties The consumer configuration properties
* @param callback A callback interface that the user can implement to manage customized offsets on the start and
* end of every rebalance operation.
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Properties properties,
ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
callback,
keyDeserializer,
valueDeserializer);
}
@SuppressWarnings("unchecked")
private KafkaConsumer(ConsumerConfig config,
ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
log.debug("Starting the Kafka consumer");
if (callback == null)
callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
ConsumerRebalanceCallback.class);
this.time = new SystemTime();
this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
String jmxPrefix = "kafka.consumer";
if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(jmxPrefix));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";
Map<String, String> metricsTags = new LinkedHashMap<String, String>();
metricsTags.put("client-id", clientId);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.coordinator = new Coordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
this.subscriptions,
metrics,
metricGrpPrefix,
metricsTags,
this.time,
requestTimeoutMs,
retryBackoffMs,
wrapRebalanceCallback(callback));
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
this.valueDeserializer = valueDeserializer;
}
this.fetcher = new Fetcher<K, V>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
metricsTags,
this.time,
this.retryBackoffMs);
config.logUnused();
if (autoCommit)
scheduleAutoCommitTask(autoCommitIntervalMs);
log.debug("Kafka consumer created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
/**
* The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
* partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that
* were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)}
* then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
* hasn't happened yet, or the partitions are in the process of getting reassigned).
*/
public Set<TopicPartition> subscriptions() {
acquire();
try {
return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
} finally {
release();
}
}
/**
* Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
* <p>
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular
* group and will trigger a rebalance operation if one of the following events trigger -
* <ul>
* <li>Number of partitions change for any of the subscribed list of topics
* <li>Topic is created or deleted
* <li>An existing member of the consumer group dies
* <li>A new member is added to an existing consumer group via the join API
* </ul>
*
* @param topics A variable list of topics that the consumer wants to subscribe to
*/
@Override
public void subscribe(String... topics) {
acquire();
try {
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
for (String topic : topics)
this.subscriptions.subscribe(topic);
metadata.addTopics(topics);
} finally {
release();
}
}
/**
* Incrementally subscribes to a specific topic partition and does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change.
* <p>
*
* @param partitions Partitions to incrementally subscribe to
*/
@Override
public void subscribe(TopicPartition... partitions) {
acquire();
try {
log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
for (TopicPartition tp : partitions) {
this.subscriptions.subscribe(tp);
metadata.addTopics(tp.topic());
}
} finally {
release();
}
}
/**
* Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
* be returned from the next {@link #poll(long) poll()} onwards
*
* @param topics Topics to unsubscribe from
*/
public void unsubscribe(String... topics) {
acquire();
try {
log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
// throw an exception if the topic was never subscribed to
for (String topic : topics)
this.subscriptions.unsubscribe(topic);
} finally {
release();
}
}
/**
* Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
* {@link #poll(long) poll()} onwards
*
* @param partitions Partitions to unsubscribe from
*/
public void unsubscribe(TopicPartition... partitions) {
acquire();
try {
log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
// throw an exception if the partition was never subscribed to
for (TopicPartition partition : partitions)
this.subscriptions.unsubscribe(partition);
} finally {
release();
}
}
/**
* Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
* <p>
* The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
* If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
* rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
* offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
*
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
* immediately with any records available now. Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
* @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
* offset reset policy has been configured.
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// poll for new data until the timeout expires
long remaining = timeout;
while (remaining >= 0) {
long start = time.milliseconds();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
long end = time.milliseconds();
if (!records.isEmpty()) {
// if data is available, then return it, but first send off the
// next round of fetches to enable pipelining while the user is
// handling the fetched records.
fetcher.initFetches(metadata.fetch());
client.poll(0);
return new ConsumerRecords<K, V>(records);
}
remaining -= end - start;
// nothing was available, so we should backoff before retrying
if (remaining > 0) {
Utils.sleep(min(remaining, retryBackoffMs));
remaining -= time.milliseconds() - end;
}
}
return ConsumerRecords.empty();
} finally {
release();
}
}
/**
* Do one round of polling. In addition to checking for new data, this does any needed
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
coordinator.ensureCoordinatorKnown();
// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// init any new fetches (won't resend pending fetches)
Cluster cluster = this.metadata.fetch();
fetcher.initFetches(cluster);
client.poll(timeout);
return fetcher.fetchedRecords();
}
private void scheduleAutoCommitTask(final long interval) {
DelayedTask task = new DelayedTask() {
public void run(long now) {
commit(CommitType.ASYNC);
client.schedule(this, now + interval);
}
};
client.schedule(task, time.milliseconds() + interval);
}
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
* Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
* commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
* {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
* block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
* to the caller).
*
* @param offsets The list of offsets per partition that should be committed to Kafka.
* @param commitType Control whether the commit is blocking
*/
@Override
public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
commit(offsets, commitType, null);
}
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
* Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
* commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
* {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
* this case, the error is either passed to the callback (if provided) or thrown to the caller.
*
* @param offsets The list of offsets per partition that should be committed to Kafka.
* @param commitType Control whether the commit is blocking
* @param callback Callback to invoke when the commit completes
*/
@Override
public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
coordinator.commitOffsets(offsets, commitType, callback);
} finally {
release();
}
}
/**
* Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
* <p>
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
* Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
* commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
* {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
* this case, the error is either passed to the callback (if provided) or thrown to the caller.
*
* @param commitType Whether or not the commit should block until it is acknowledged.
* @param callback Callback to invoke when the commit completes
*/
@Override
public void commit(CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
commit(subscriptions.allConsumed(), commitType, callback);
} finally {
release();
}
}
/**
* Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
* <p>
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
* Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
* commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
* {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
* block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
* to the caller).
*
* @param commitType Whether or not the commit should block until it is acknowledged.
*/
@Override
public void commit(CommitType commitType) {
commit(commitType, null);
}
/**
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
*/
@Override
public void seek(TopicPartition partition, long offset) {
acquire();
try {
log.debug("Seeking to offset {} for partition {}", offset, partition);
this.subscriptions.seek(partition, offset);
} finally {
release();
}
}
/**
* Seek to the first offset for each of the given partitions
*/
public void seekToBeginning(TopicPartition... partitions) {
acquire();
try {
Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
: Arrays.asList(partitions);
for (TopicPartition tp : parts) {
log.debug("Seeking to beginning of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
}
} finally {
release();
}
}
/**
* Seek to the last offset for each of the given partitions
*/
public void seekToEnd(TopicPartition... partitions) {
acquire();
try {
Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
: Arrays.asList(partitions);
for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
}
} finally {
release();
}
}
/**
* Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
*
* @param partition The partition to get the position for
* @return The offset
* @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
* available.
*/
public long position(TopicPartition partition) {
acquire();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
if (offset == null) {
updateFetchPositions(Collections.singleton(partition));
return this.subscriptions.consumed(partition);
} else {
return offset;
}
} finally {
release();
}
}
/**
* Fetches the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
* consumer hasn't yet initialized it's cache of committed offsets.
*
* @param partition The partition to check
- * @return The last committed offset or null if no offset has been committed
+ * @return The last committed offset
* @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
* partition.
*/
@Override
public long committed(TopicPartition partition) {
acquire();
try {
Long committed;
if (subscriptions.isAssigned(partition)) {
committed = this.subscriptions.committed(partition);
if (committed == null) {
coordinator.refreshCommittedOffsetsIfNeeded();
committed = this.subscriptions.committed(partition);
}
} else {
Map<TopicPartition, Long> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
committed = offsets.get(partition);
}
if (committed == null)
throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
return committed;
} finally {
release();
}
}
/**
* Get the metrics kept by the consumer
*/
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}
/**
* Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
* does not already have any metadata about the given topic.
*
* @param topic The topic to get partition metadata for
* @return The list of partitions
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
acquire();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
if (parts == null) {
metadata.add(topic);
client.awaitMetadataUpdate();
parts = metadata.fetch().partitionsForTopic(topic);
}
return parts;
} finally {
release();
}
}
/**
* Get metadata about partitions for all topics. This method will issue a remote call to the
* server.
*
* @return The map of topics and its partitions
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
acquire();
try {
return fetcher.getAllTopics(requestTimeoutMs);
} finally {
release();
}
}
/**
* Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
* any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
*/
@Override
public void pause(TopicPartition... partitions) {
acquire();
try {
for (TopicPartition partition: partitions) {
log.debug("Pausing partition {}", partition);
subscriptions.pause(partition);
}
} finally {
release();
}
}
/**
* Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
* {@link #poll(long)} will return records from these partitions if there are any to be fetched.
* If the partitions were not previously paused, this method is a no-op.
* @param partitions The partitions which should be resumed
*/
@Override
public void resume(TopicPartition... partitions) {
acquire();
try {
for (TopicPartition partition: partitions) {
log.debug("Resuming partition {}", partition);
subscriptions.resume(partition);
}
} finally {
release();
}
}
@Override
public void close() {
acquire();
try {
if (closed) return;
close(false);
} finally {
release();
}
}
/**
* Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
* The thread which is blocking in an operation will throw {@link ConsumerWakeupException}.
*/
@Override
public void wakeup() {
this.client.wakeup();
}
private void close(boolean swallowException) {
log.trace("Closing the Kafka consumer.");
AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
this.closed = true;
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
log.debug("The Kafka consumer has closed.");
if (firstException.get() != null && !swallowException) {
throw new KafkaException("Failed to close kafka consumer", firstException.get());
}
}
private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
return new Coordinator.RebalanceCallback() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
}
};
}
/**
* Set the fetch position to the committed position (if there is one)
* or reset it using the offset reset policy the user has configured.
*
* @param partitions The partitions that needs updating fetch positions
* @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
// refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
/*
* Check that the consumer hasn't been closed.
*/
private void ensureNotClosed() {
if (this.closed)
throw new IllegalStateException("This consumer has already been closed.");
}
/**
* Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
* when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
* supported).
* @throws IllegalStateException if the consumer has been closed
* @throws ConcurrentModificationException if another thread already has the lock
*/
private void acquire() {
ensureNotClosed();
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
/**
* Release the light lock protecting the consumer from multi-threaded access.
*/
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 2:53 PM (4 d, 9 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452600
Attached To
rPKAFKA Kafka Debian packaging
Event Timeline
Log In to Comment