Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
index 1d531233..0cf969f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -1,266 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.FilteredIterator;
import org.apache.kafka.streams.kstream.internals.WindowSupport;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.Stamped;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
private final String name;
private final long duration;
private final int maxCount;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
public SlidingWindowSupplier(
String name,
long duration,
int maxCount,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Deserializer<K> keyDeseriaizer,
Deserializer<V> valueDeserializer) {
this.name = name;
this.duration = duration;
this.maxCount = maxCount;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.keyDeserializer = keyDeseriaizer;
this.valueDeserializer = valueDeserializer;
}
@Override
public String name() {
return name;
}
@Override
public Window<K, V> get() {
return new SlidingWindow();
}
public class SlidingWindow extends WindowSupport implements Window<K, V> {
private final Object lock = new Object();
private ProcessorContext context;
private int partition;
private int slotNum; // used as a key for Kafka log compaction
private LinkedList<K> list = new LinkedList<K>();
private HashMap<K, ValueList<V>> map = new HashMap<>();
@Override
public void init(ProcessorContext context) {
this.context = context;
- this.partition = context.id();
+ this.partition = context.id().partition;
SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
context.register(this, restoreFunc);
for (ValueList<V> valueList : map.values()) {
valueList.clearDirtyValues();
}
this.slotNum = restoreFunc.slotNum;
}
@Override
public Iterator<V> findAfter(K key, final long timestamp) {
return find(key, timestamp, timestamp + duration);
}
@Override
public Iterator<V> findBefore(K key, final long timestamp) {
return find(key, timestamp - duration, timestamp);
}
@Override
public Iterator<V> find(K key, final long timestamp) {
return find(key, timestamp - duration, timestamp + duration);
}
/*
* finds items in the window between startTime and endTime (both inclusive)
*/
private Iterator<V> find(K key, final long startTime, final long endTime) {
final ValueList<V> values = map.get(key);
if (values == null) {
return Collections.emptyIterator();
} else {
return new FilteredIterator<V, Value<V>>(values.iterator()) {
@Override
protected V filter(Value<V> item) {
if (startTime <= item.timestamp && item.timestamp <= endTime)
return item.value;
else
return null;
}
};
}
}
@Override
public void put(K key, V value, long timestamp) {
synchronized (lock) {
slotNum++;
list.offerLast(key);
ValueList<V> values = map.get(key);
if (values == null) {
values = new ValueList<>();
map.put(key, values);
}
values.add(slotNum, value, timestamp);
}
evictExcess();
evictExpired(timestamp - duration);
}
private void evictExcess() {
while (list.size() > maxCount) {
K oldestKey = list.pollFirst();
ValueList<V> values = map.get(oldestKey);
values.removeFirst();
if (values.isEmpty()) map.remove(oldestKey);
}
}
private void evictExpired(long cutoffTime) {
while (true) {
K oldestKey = list.peekFirst();
ValueList<V> values = map.get(oldestKey);
Stamped<V> oldestValue = values.first();
if (oldestValue.timestamp < cutoffTime) {
list.pollFirst();
values.removeFirst();
if (values.isEmpty()) map.remove(oldestKey);
} else {
break;
}
}
}
@Override
public String name() {
return name;
}
@Override
public void flush() {
IntegerSerializer intSerializer = new IntegerSerializer();
ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
ValueList<V> values = entry.getValue();
if (values.hasDirtyValues()) {
K key = entry.getKey();
byte[] keyBytes = keySerializer.serialize(name, key);
Iterator<Value<V>> iterator = values.dirtyValueIterator();
while (iterator.hasNext()) {
Value<V> dirtyValue = iterator.next();
byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
int offset = 0;
offset += putLong(combined, offset, dirtyValue.timestamp);
offset += puts(combined, offset, keyBytes);
offset += puts(combined, offset, valBytes);
if (offset != combined.length)
throw new IllegalStateException("serialized length does not match");
collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer);
}
values.clearDirtyValues();
}
}
}
@Override
public void close() {
// TODO
}
@Override
public boolean persistent() {
// TODO: should not be persistent, right?
return false;
}
private class SlidingWindowRegistryCallback implements StateRestoreCallback {
final IntegerDeserializer intDeserializer;
int slotNum = 0;
SlidingWindowRegistryCallback() {
intDeserializer = new IntegerDeserializer();
}
@Override
public void restore(byte[] slot, byte[] bytes) {
slotNum = intDeserializer.deserialize("", slot);
int offset = 0;
// timestamp
long timestamp = getLong(bytes, offset);
offset += 8;
// key
int length = getInt(bytes, offset);
offset += 4;
K key = deserialize(bytes, offset, length, name, keyDeserializer);
offset += length;
// value
length = getInt(bytes, offset);
offset += 4;
V value = deserialize(bytes, offset, length, name, valueDeserializer);
put(key, value, timestamp);
}
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index f87cfa81..7d2188a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -1,97 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
public class DefaultPartitionGrouper extends PartitionGrouper {
- public Map<Integer, List<TopicPartition>> partitionGroups(Cluster metadata) {
- Map<Integer, List<TopicPartition>> groups = new HashMap<>();
- List<List<String>> sortedTopicGroups = sort(topicGroups);
+ public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
+ Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
+
+ for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
+ Integer topicGroupId = entry.getKey();
+ Set<String> topicGroup = entry.getValue();
- int taskId = 0;
- for (List<String> topicGroup : sortedTopicGroups) {
int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) {
- List<TopicPartition> group = new ArrayList<>(topicGroup.size());
+ Set<TopicPartition> group = new HashSet<>(topicGroup.size());
for (String topic : topicGroup) {
if (partitionId < metadata.partitionsForTopic(topic).size()) {
group.add(new TopicPartition(topic, partitionId));
}
}
- groups.put(taskId++, group);
+ groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
}
}
- // make the data unmodifiable, then return
- Map<Integer, List<TopicPartition>> unmodifiableGroups = new HashMap<>();
- for (Map.Entry<Integer, List<TopicPartition>> entry : groups.entrySet()) {
- unmodifiableGroups.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
- }
- return Collections.unmodifiableMap(unmodifiableGroups);
+ return Collections.unmodifiableMap(groups);
}
- protected int maxNumPartitions(Cluster metadata, List<String> topics) {
+ protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
int maxNumPartitions = 0;
for (String topic : topics) {
List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
if (infos == null)
throw new KafkaException("topic not found :" + topic);
int numPartitions = infos.size();
if (numPartitions > maxNumPartitions)
maxNumPartitions = numPartitions;
}
return maxNumPartitions;
}
- protected List<List<String>> sort(Collection<Set<String>> topicGroups) {
- TreeMap<String, String[]> sortedMap = new TreeMap<>();
-
- for (Set<String> group : topicGroups) {
- String[] arr = group.toArray(new String[group.size()]);
- Arrays.sort(arr);
- sortedMap.put(arr[0], arr);
- }
-
- ArrayList<List<String>> list = new ArrayList(sortedMap.size());
- for (String[] arr : sortedMap.values()) {
- list.add(Arrays.asList(arr));
- }
-
- return list;
- }
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 82bb36a2..026ec89f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -1,55 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class PartitionGrouper {
- protected Collection<Set<String>> topicGroups;
+ protected Map<Integer, Set<String>> topicGroups;
private KafkaStreamingPartitionAssignor partitionAssignor = null;
/**
* Returns a map of task ids to groups of partitions.
*
* @param metadata
* @return a map of task ids to groups of partitions
*/
- public abstract Map<Integer, List<TopicPartition>> partitionGroups(Cluster metadata);
+ public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
- public void topicGroups(Collection<Set<String>> topicGroups) {
+ public void topicGroups(Map<Integer, Set<String>> topicGroups) {
this.topicGroups = topicGroups;
}
public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
this.partitionAssignor = partitionAssignor;
}
- public Set<Integer> taskIds(TopicPartition partition) {
+ public Set<TaskId> taskIds(TopicPartition partition) {
return partitionAssignor.taskIds(partition);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index e7cf2573..88ac64e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -1,101 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingMetrics;
import java.io.File;
public interface ProcessorContext {
/**
* Returns the task id
*
* @return the task id
*/
- int id();
+ TaskId id();
/**
* Returns the key serializer
*
* @return the key serializer
*/
Serializer<?> keySerializer();
/**
* Returns the value serializer
*
* @return the value serializer
*/
Serializer<?> valueSerializer();
/**
* Returns the key deserializer
*
* @return the key deserializer
*/
Deserializer<?> keyDeserializer();
/**
* Returns the value deserializer
*
* @return the value deserializer
*/
Deserializer<?> valueDeserializer();
/**
* Returns the state directory for the partition.
*
* @return the state directory
*/
File stateDir();
/**
* Returns Metrics instance
*
* @return StreamingMetrics
*/
StreamingMetrics metrics();
/**
* Registers and possibly restores the specified storage engine.
*
* @param store the storage engine
*/
void register(StateStore store, StateRestoreCallback stateRestoreCallback);
StateStore getStateStore(String name);
void schedule(long interval);
<K, V> void forward(K key, V value);
<K, V> void forward(K key, V value, int childIndex);
void commit();
String topic();
int partition();
long offset();
long timestamp();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
new file mode 100644
index 00000000..3d474fe4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+public class TaskId {
+
+ public final int topicGroupId;
+ public final int partition;
+
+ public TaskId(int topicGroupId, int partition) {
+ this.topicGroupId = topicGroupId;
+ this.partition = partition;
+ }
+
+ public String toString() {
+ return topicGroupId + "_" + partition;
+ }
+
+ public static TaskId parse(String string) {
+ int index = string.indexOf('_');
+ if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
+
+ try {
+ int topicGroupId = Integer.parseInt(string.substring(0, index));
+ int partition = Integer.parseInt(string.substring(index + 1));
+
+ return new TaskId(topicGroupId, partition);
+ } catch (Exception e) {
+ throw new TaskIdFormatException();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TaskId) {
+ TaskId other = (TaskId) o;
+ return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ long n = ((long) topicGroupId << 32) | (long) partition;
+ return (int) (n % 0xFFFFFFFFL);
+ }
+
+ public static class TaskIdFormatException extends RuntimeException {
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index a475e1ec..077489c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -1,371 +1,398 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
* and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
* that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
*/
public class TopologyBuilder {
// list of node factories in a topological order
private final ArrayList<NodeFactory> nodeFactories = new ArrayList<>();
private final Set<String> nodeNames = new HashSet<>();
private final Set<String> sourceTopicNames = new HashSet<>();
- private final QuickUnion<String> nodeGroups = new QuickUnion<>();
+ private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
+ private Map<Integer, Set<String>> nodeGroups = null;
private interface NodeFactory {
ProcessorNode build();
}
private class ProcessorNodeFactory implements NodeFactory {
public final String[] parents;
private final String name;
private final ProcessorSupplier supplier;
public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
this.name = name;
this.parents = parents.clone();
this.supplier = supplier;
}
@Override
public ProcessorNode build() {
return new ProcessorNode(name, supplier.get());
}
}
private class SourceNodeFactory implements NodeFactory {
public final String[] topics;
private final String name;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;
private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
this.name = name;
this.topics = topics.clone();
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
}
@Override
public ProcessorNode build() {
return new SourceNode(name, keyDeserializer, valDeserializer);
}
}
private class SinkNodeFactory implements NodeFactory {
public final String[] parents;
public final String topic;
private final String name;
private Serializer keySerializer;
private Serializer valSerializer;
private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
this.name = name;
this.parents = parents.clone();
this.topic = topic;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
}
@Override
public ProcessorNode build() {
return new SinkNode(name, topic, keySerializer, valSerializer);
}
}
/**
* Create a new builder.
*/
public TopologyBuilder() {}
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
* The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
* {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
* {@link StreamingConfig streaming configuration}.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSource(String name, String... topics) {
return addSource(name, (Deserializer) null, (Deserializer) null, topics);
}
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
* The sink will use the specified key and value deserializers.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
* should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
* {@link StreamingConfig streaming configuration}
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
* should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
* {@link StreamingConfig streaming configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
if (nodeNames.contains(name))
throw new TopologyException("Processor " + name + " is already added.");
for (String topic : topics) {
if (sourceTopicNames.contains(topic))
throw new TopologyException("Topic " + topic + " has already been registered by another source.");
sourceTopicNames.add(topic);
}
nodeNames.add(name);
nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
nodeToTopics.put(name, topics.clone());
- nodeGroups.add(name);
+ nodeGrouper.add(name);
return this;
}
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
* {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link StreamingConfig streaming configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
}
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
* should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
* {@link StreamingConfig streaming configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
* should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
* {@link StreamingConfig streaming configuration}
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
if (nodeNames.contains(name))
throw new TopologyException("Processor " + name + " is already added.");
if (parentNames != null) {
for (String parent : parentNames) {
if (parent.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
}
if (!nodeNames.contains(parent)) {
throw new TopologyException("Parent processor " + parent + " is not added yet.");
}
}
}
nodeNames.add(name);
nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
return this;
}
/**
* Add a new processor node that receives and processes messages output by one or more parent source or processor node.
* Any new messages output by this processor will be forwarded to its child processor or sink nodes.
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
* @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
* and process
* @return this builder instance so methods can be chained together; never null
*/
public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
if (nodeNames.contains(name))
throw new TopologyException("Processor " + name + " is already added.");
if (parentNames != null) {
for (String parent : parentNames) {
if (parent.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
}
if (!nodeNames.contains(parent)) {
throw new TopologyException("Parent processor " + parent + " is not added yet.");
}
}
}
nodeNames.add(name);
nodeFactories.add(new ProcessorNodeFactory(name, parentNames, supplier));
- nodeGroups.add(name);
- nodeGroups.unite(name, parentNames);
+ nodeGrouper.add(name);
+ nodeGrouper.unite(name, parentNames);
return this;
}
/**
- * Returns the topic groups.
+ * Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
*
* @return groups of topic names
*/
- public Collection<Set<String>> topicGroups() {
- List<Set<String>> topicGroups = new ArrayList<>();
+ public Map<Integer, Set<String>> topicGroups() {
+ Map<Integer, Set<String>> topicGroups = new HashMap<>();
- for (Set<String> nodeGroup : generateNodeGroups(nodeGroups)) {
+ if (nodeGroups == null) {
+ nodeGroups = nodeGroups();
+ } else if (!nodeGroups.equals(nodeGroups())) {
+ throw new TopologyException("topology has mutated");
+ }
+
+ for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
Set<String> topicGroup = new HashSet<>();
- for (String node : nodeGroup) {
+ for (String node : entry.getValue()) {
String[] topics = nodeToTopics.get(node);
if (topics != null)
topicGroup.addAll(Arrays.asList(topics));
}
- topicGroups.add(Collections.unmodifiableSet(topicGroup));
+ topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
}
- return Collections.unmodifiableList(topicGroups);
+ return Collections.unmodifiableMap(topicGroups);
}
- private Collection<Set<String>> generateNodeGroups(QuickUnion<String> grouping) {
- HashMap<String, Set<String>> nodeGroupMap = new HashMap<>();
+ private Map<Integer, Set<String>> nodeGroups() {
+ HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
+ HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
- for (String nodeName : nodeNames) {
- String root = grouping.root(nodeName);
- Set<String> nodeGroup = nodeGroupMap.get(root);
+ int nodeGroupId = 0;
+
+ // Go through source nodes first. This makes the group id assignment easy to predict in tests
+ for (String nodeName : Utils.sorted(nodeToTopics.keySet())) {
+ String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
if (nodeGroup == null) {
nodeGroup = new HashSet<>();
- nodeGroupMap.put(root, nodeGroup);
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(nodeGroupId++, nodeGroup);
}
nodeGroup.add(nodeName);
}
- return nodeGroupMap.values();
+ // Go through non-source nodes
+ for (String nodeName : Utils.sorted(nodeNames)) {
+ if (!nodeToTopics.containsKey(nodeName)) {
+ String root = nodeGrouper.root(nodeName);
+ Set<String> nodeGroup = rootToNodeGroup.get(root);
+ if (nodeGroup == null) {
+ nodeGroup = new HashSet<>();
+ rootToNodeGroup.put(root, nodeGroup);
+ nodeGroups.put(nodeGroupId++, nodeGroup);
+ }
+ nodeGroup.add(nodeName);
+ }
+ }
+
+ return nodeGroups;
}
/**
* Asserts that the streams of the specified source nodes must be copartitioned.
*
* @param sourceNodes a set of source node names
*/
public void copartitionSources(Collection<String> sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
}
/**
* Returns the copartition groups.
* A copartition group is a group of topics that are required to be copartitioned.
*
* @return groups of topic names
*/
public Collection<Set<String>> copartitionGroups() {
List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
for (Set<String> nodeNames : copartitionSourceGroups) {
Set<String> copartitionGroup = new HashSet<>();
for (String node : nodeNames) {
String[] topics = nodeToTopics.get(node);
if (topics != null)
copartitionGroup.addAll(Arrays.asList(topics));
}
list.add(Collections.unmodifiableSet(copartitionGroup));
}
return Collections.unmodifiableList(list);
}
/**
* Build the topology. This is typically called automatically when passing this builder into the
* {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
*
* @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
*/
@SuppressWarnings("unchecked")
public ProcessorTopology build() {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
try {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories) {
ProcessorNode node = factory.build();
processorNodes.add(node);
processorMap.put(node.name(), node);
if (factory instanceof ProcessorNodeFactory) {
for (String parent : ((ProcessorNodeFactory) factory).parents) {
processorMap.get(parent).addChild(node);
}
} else if (factory instanceof SourceNodeFactory) {
for (String topic : ((SourceNodeFactory) factory).topics) {
topicSourceMap.put(topic, (SourceNode) node);
}
} else if (factory instanceof SinkNodeFactory) {
for (String parent : ((SinkNodeFactory) factory).parents) {
processorMap.get(parent).addChild(node);
}
} else {
throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
}
} catch (Exception e) {
throw new KafkaException("ProcessorNode construction failed: this should not happen.");
}
return new ProcessorTopology(processorNodes, topicSourceMap);
}
/**
* Get the names of topics that are to be consumed by the source nodes created by this builder.
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
*/
public Set<String> sourceTopics() {
return Collections.unmodifiableSet(sourceTopicNames);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index ee5bb93a..f7b14ad3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -1,133 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
private PartitionGrouper partitionGrouper;
- private Map<TopicPartition, Set<Integer>> partitionToTaskIds;
+ private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
@Override
public void configure(Map<String, ?> configs) {
Object o = configs.get(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE);
if (o == null)
throw new KafkaException("PartitionGrouper is not specified");
if (!PartitionGrouper.class.isInstance(o))
throw new KafkaException(o.getClass().getName() + " is not an instance of " + PartitionGrouper.class.getName());
partitionGrouper = (PartitionGrouper) o;
partitionGrouper.partitionAssignor(this);
}
@Override
public String name() {
return "streaming";
}
@Override
public Subscription subscription(Set<String> topics) {
return new Subscription(new ArrayList<>(topics));
}
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- Map<Integer, List<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
+ Map<TaskId, Set<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
String[] clientIds = subscriptions.keySet().toArray(new String[subscriptions.size()]);
- Integer[] taskIds = partitionGroups.keySet().toArray(new Integer[partitionGroups.size()]);
+ TaskId[] taskIds = partitionGroups.keySet().toArray(new TaskId[partitionGroups.size()]);
Map<String, Assignment> assignment = new HashMap<>();
for (int i = 0; i < clientIds.length; i++) {
List<TopicPartition> partitions = new ArrayList<>();
- List<Integer> ids = new ArrayList<>();
+ List<TaskId> ids = new ArrayList<>();
for (int j = i; j < taskIds.length; j += clientIds.length) {
- Integer taskId = taskIds[j];
+ TaskId taskId = taskIds[j];
for (TopicPartition partition : partitionGroups.get(taskId)) {
partitions.add(partition);
ids.add(taskId);
}
}
- ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 4);
+ ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 8);
//version
buf.putInt(1);
// encode task ids
- for (Integer id : ids) {
- buf.putInt(id);
+ for (TaskId id : ids) {
+ buf.putInt(id.topicGroupId);
+ buf.putInt(id.partition);
}
buf.rewind();
assignment.put(clientIds[i], new Assignment(partitions, buf));
}
return assignment;
}
@Override
public void onAssignment(Assignment assignment) {
List<TopicPartition> partitions = assignment.partitions();
ByteBuffer data = assignment.userData();
data.rewind();
- Map<TopicPartition, Set<Integer>> partitionToTaskIds = new HashMap<>();
+ Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
// check version
int version = data.getInt();
if (version == 1) {
for (TopicPartition partition : partitions) {
- Set<Integer> taskIds = partitionToTaskIds.get(partition);
+ Set<TaskId> taskIds = partitionToTaskIds.get(partition);
if (taskIds == null) {
taskIds = new HashSet<>();
partitionToTaskIds.put(partition, taskIds);
}
// decode a task id
- taskIds.add(data.getInt());
+ taskIds.add(new TaskId(data.getInt(), data.getInt()));
}
} else {
KafkaException ex = new KafkaException("unknown assignment data version: " + version);
log.error(ex.getMessage(), ex);
throw ex;
}
this.partitionToTaskIds = partitionToTaskIds;
}
- public Set<Integer> taskIds(TopicPartition partition) {
+ public Set<TaskId> taskIds(TopicPartition partition) {
return partitionToTaskIds.get(partition);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index dfc838ce..3c1e0595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -1,180 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
- private final int id;
+ private final TaskId id;
private final StreamTask task;
private final StreamingMetrics metrics;
private final RecordCollector collector;
private final ProcessorStateManager stateMgr;
private final Serializer<?> keySerializer;
private final Serializer<?> valSerializer;
private final Deserializer<?> keyDeserializer;
private final Deserializer<?> valDeserializer;
private boolean initialized;
@SuppressWarnings("unchecked")
- public ProcessorContextImpl(int id,
+ public ProcessorContextImpl(TaskId id,
StreamTask task,
StreamingConfig config,
RecordCollector collector,
ProcessorStateManager stateMgr,
StreamingMetrics metrics) {
this.id = id;
this.task = task;
this.metrics = metrics;
this.collector = collector;
this.stateMgr = stateMgr;
this.keySerializer = config.getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.valSerializer = config.getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.keyDeserializer = config.getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.valDeserializer = config.getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.initialized = false;
}
@Override
public RecordCollector recordCollector() {
return this.collector;
}
public void initialized() {
this.initialized = true;
}
- @Override
- public int id() {
+ public TaskId id() {
return id;
}
@Override
public Serializer<?> keySerializer() {
return this.keySerializer;
}
@Override
public Serializer<?> valueSerializer() {
return this.valSerializer;
}
@Override
public Deserializer<?> keyDeserializer() {
return this.keyDeserializer;
}
@Override
public Deserializer<?> valueDeserializer() {
return this.valDeserializer;
}
@Override
public File stateDir() {
return stateMgr.baseDir();
}
@Override
public StreamingMetrics metrics() {
return metrics;
}
@Override
public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
if (initialized)
throw new KafkaException("Can only create state stores during initialization.");
stateMgr.register(store, stateRestoreCallback);
}
@Override
public StateStore getStateStore(String name) {
return stateMgr.getStore(name);
}
@Override
public String topic() {
if (task.record() == null)
throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed");
return task.record().topic();
}
@Override
public int partition() {
if (task.record() == null)
throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed");
return task.record().partition();
}
@Override
public long offset() {
if (this.task.record() == null)
throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed");
return this.task.record().offset();
}
@Override
public long timestamp() {
if (task.record() == null)
throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed");
return task.record().timestamp;
}
@Override
public <K, V> void forward(K key, V value) {
task.forward(key, value);
}
@Override
public <K, V> void forward(K key, V value, int childIndex) {
task.forward(key, value, childIndex);
}
@Override
public void commit() {
task.needCommit();
}
@Override
public void schedule(long interval) {
task.schedule(interval);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 1de6f9bd..d83d721a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1,369 +1,370 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
*/
public class StreamTask implements Punctuator {
private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
- private final int id;
+ private final TaskId id;
private final int maxBufferedSize;
private final Consumer consumer;
private final PartitionGroup partitionGroup;
private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
private final PunctuationQueue punctuationQueue;
private final ProcessorContextImpl processorContext;
private final ProcessorTopology topology;
private final Map<TopicPartition, Long> consumedOffsets;
private final RecordCollector recordCollector;
private final ProcessorStateManager stateMgr;
private boolean commitRequested = false;
private boolean commitOffsetNeeded = false;
private StampedRecord currRecord = null;
private ProcessorNode currNode = null;
private boolean requiresPoll = true;
/**
* Create {@link StreamTask} with its assigned partitions
*
* @param id the ID of this task
* @param consumer the instance of {@link Consumer}
* @param producer the instance of {@link Producer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamingConfig} specified by the user
* @param metrics the {@link StreamingMetrics} created by the thread
*/
- public StreamTask(int id,
+ public StreamTask(TaskId id,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
StreamingConfig config,
StreamingMetrics metrics) {
this.id = id;
this.consumer = consumer;
this.punctuationQueue = new PunctuationQueue();
this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
this.topology = topology;
// create queues for each assigned partition and associate them
// to corresponding source nodes in the processor topology
Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
for (TopicPartition partition : partitions) {
SourceNode source = topology.source(partition.topic());
RecordQueue queue = createRecordQueue(partition, source);
partitionQueues.put(partition, queue);
}
TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
// initialize the consumed and produced offset cache
this.consumedOffsets = new HashMap<>();
// create the record recordCollector that maintains the produced offsets
this.recordCollector = new RecordCollector(producer);
log.info("Creating restoration consumer client for stream task #" + id());
// create the processor state manager
try {
- File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), Integer.toString(id));
- this.stateMgr = new ProcessorStateManager(id, stateFile, restoreConsumer);
+ File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+ this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
// initialize the topology with its own context
this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
// initialize the task by initializing all its processor nodes in the topology
for (ProcessorNode node : this.topology.processors()) {
this.currNode = node;
try {
node.init(this.processorContext);
} finally {
this.currNode = null;
}
}
this.processorContext.initialized();
}
- public int id() {
+ public TaskId id() {
return id;
}
public Set<TopicPartition> partitions() {
return this.partitionGroup.partitions();
}
/**
* Adds records to queues
*
* @param partition the partition
* @param records the records
*/
@SuppressWarnings("unchecked")
public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
int queueSize = partitionGroup.addRawRecords(partition, records);
// if after adding these records, its partition queue's buffered size has been
// increased beyond the threshold, we can then pause the consumption for this partition
if (queueSize > this.maxBufferedSize) {
consumer.pause(partition);
}
}
/**
* Process one record
*
* @return number of records left in the buffer of this task's partition group after the processing is done
*/
@SuppressWarnings("unchecked")
public int process() {
synchronized (this) {
// get the next record to process
StampedRecord record = partitionGroup.nextRecord(recordInfo);
// if there is no record to process, return immediately
if (record == null) {
requiresPoll = true;
return 0;
}
requiresPoll = false;
try {
// process the record by passing to the source node of the topology
this.currRecord = record;
this.currNode = recordInfo.node();
TopicPartition partition = recordInfo.partition();
log.debug("Start processing one record [" + currRecord + "]");
this.currNode.process(currRecord.key(), currRecord.value());
log.debug("Completed processing one record [" + currRecord + "]");
// update the consumed offset map after processing is done
consumedOffsets.put(partition, currRecord.offset());
commitOffsetNeeded = true;
// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
consumer.resume(partition);
requiresPoll = true;
}
if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
requiresPoll = true;
}
} finally {
this.currRecord = null;
this.currNode = null;
}
return partitionGroup.numBuffered();
}
}
public boolean requiresPoll() {
return requiresPoll;
}
/**
* Possibly trigger registered punctuation functions if
* current time has reached the defined stamp
*
* @param timestamp
*/
public boolean maybePunctuate(long timestamp) {
return punctuationQueue.mayPunctuate(timestamp, this);
}
@Override
public void punctuate(ProcessorNode node, long timestamp) {
if (currNode != null)
throw new IllegalStateException("Current node is not null");
currNode = node;
try {
node.processor().punctuate(timestamp);
} finally {
currNode = null;
}
}
public StampedRecord record() {
return this.currRecord;
}
public ProcessorNode node() {
return this.currNode;
}
public ProcessorTopology topology() {
return this.topology;
}
/**
* Commit the current task state
*/
public void commit() {
// 1) flush local state
stateMgr.flush();
// 2) flush produced records in the downstream and change logs of local states
recordCollector.flush();
// 3) commit consumed offsets if it is dirty already
if (commitOffsetNeeded) {
Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue()));
}
consumer.commitSync(consumedOffsetsAndMetadata);
commitOffsetNeeded = false;
}
commitRequested = false;
}
/**
* Whether or not a request has been made to commit the current state
*/
public boolean commitNeeded() {
return this.commitRequested;
}
/**
* Request committing the current task's state
*/
public void needCommit() {
this.commitRequested = true;
}
/**
* Schedules a punctuation for the processor
*
* @param interval the interval in milliseconds
*/
public void schedule(long interval) {
if (currNode == null)
throw new IllegalStateException("Current node is null");
punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
}
public void close() {
this.partitionGroup.close();
this.consumedOffsets.clear();
// close the processors
// make sure close() is called for each node even when there is a RuntimeException
RuntimeException exception = null;
for (ProcessorNode node : this.topology.processors()) {
currNode = node;
try {
node.close();
} catch (RuntimeException e) {
exception = e;
} finally {
currNode = null;
}
}
if (exception != null)
throw exception;
try {
stateMgr.close(recordCollector.offsets());
} catch (IOException e) {
throw new KafkaException("Error while closing the state manager in processor context", e);
}
}
private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
return new RecordQueue(partition, source);
}
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value) {
ProcessorNode thisNode = currNode;
for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
currNode = childNode;
try {
childNode.process(key, value);
} finally {
currNode = thisNode;
}
}
}
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value, int childIndex) {
ProcessorNode thisNode = currNode;
ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
currNode = childNode;
try {
childNode.process(key, value);
} finally {
currNode = thisNode;
}
}
public ProcessorContext context() {
return processorContext;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e3803a10..abc5c5dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1,576 +1,577 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
private final AtomicBoolean running;
protected final StreamingConfig config;
protected final TopologyBuilder builder;
protected final PartitionGrouper partitionGrouper;
protected final Producer<byte[], byte[]> producer;
protected final Consumer<byte[], byte[]> consumer;
protected final Consumer<byte[], byte[]> restoreConsumer;
- private final Map<Integer, StreamTask> tasks;
+ private final Map<TaskId, StreamTask> tasks;
private final String clientId;
private final Time time;
private final File stateDir;
private final long pollTimeMs;
private final long cleanTimeMs;
private final long commitTimeMs;
private final long totalRecordsToProcess;
private final StreamingMetricsImpl sensors;
private long lastClean;
private long lastCommit;
private long recordsProcessed;
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
addPartitions(assignment);
lastClean = time.milliseconds(); // start the cleaning cycle
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
commitAll();
removePartitions();
lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
}
};
public StreamThread(TopologyBuilder builder,
StreamingConfig config,
String clientId,
Metrics metrics,
Time time) throws Exception {
this(builder, config, null , null, null, clientId, metrics, time);
}
StreamThread(TopologyBuilder builder,
StreamingConfig config,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
String clientId,
Metrics metrics,
Time time) throws Exception {
super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
this.config = config;
this.builder = builder;
this.clientId = clientId;
this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
this.partitionGrouper.topicGroups(builder.topicGroups());
// set the producer and consumer clients
this.producer = (producer != null) ? producer : createProducer();
this.consumer = (consumer != null) ? consumer : createConsumer();
this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
// initialize the task list
this.tasks = new HashMap<>();
// read in task specific config values
this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
this.stateDir.mkdir();
this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
this.lastCommit = time.milliseconds();
this.recordsProcessed = 0;
this.time = time;
this.sensors = new StreamingMetricsImpl(metrics);
this.running = new AtomicBoolean(true);
}
private Producer<byte[], byte[]> createProducer() {
log.info("Creating producer client for stream thread [" + this.getName() + "]");
return new KafkaProducer<>(config.getProducerConfigs(),
new ByteArraySerializer(),
new ByteArraySerializer());
}
private Consumer<byte[], byte[]> createConsumer() {
log.info("Creating consumer client for stream thread [" + this.getName() + "]");
return new KafkaConsumer<>(config.getConsumerConfigs(partitionGrouper),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
private Consumer<byte[], byte[]> createRestoreConsumer() {
log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
return new KafkaConsumer<>(config.getConsumerConfigs(),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
}
/**
* Execute the stream processors
*/
@Override
public void run() {
log.info("Starting stream thread [" + this.getName() + "]");
try {
runLoop();
} catch (RuntimeException e) {
log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e);
throw e;
} finally {
shutdown();
}
}
/**
* Shutdown this streaming thread.
*/
public void close() {
running.set(false);
}
- public Map<Integer, StreamTask> tasks() {
+ public Map<TaskId, StreamTask> tasks() {
return Collections.unmodifiableMap(tasks);
}
private void shutdown() {
log.info("Shutting down stream thread [" + this.getName() + "]");
// Exceptions should not prevent this call from going through all shutdown steps.
try {
commitAll();
} catch (Throwable e) {
// already logged in commitAll()
}
try {
producer.close();
} catch (Throwable e) {
log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
}
try {
consumer.close();
} catch (Throwable e) {
log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e);
}
try {
restoreConsumer.close();
} catch (Throwable e) {
log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
}
try {
removePartitions();
} catch (Throwable e) {
// already logged in removePartition()
}
log.info("Stream thread shutdown complete [" + this.getName() + "]");
}
private void runLoop() {
try {
int totalNumBuffered = 0;
boolean requiresPoll = true;
ensureCopartitioning(builder.copartitionGroups());
consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
while (stillRunning()) {
// try to fetch some records if necessary
if (requiresPoll) {
long startPoll = time.milliseconds();
ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
if (!records.isEmpty()) {
for (StreamTask task : tasks.values()) {
for (TopicPartition partition : task.partitions()) {
task.addRecords(partition, records.records(partition));
}
}
}
long endPoll = time.milliseconds();
sensors.pollTimeSensor.record(endPoll - startPoll);
}
// try to process one record from each task
totalNumBuffered = 0;
requiresPoll = false;
for (StreamTask task : tasks.values()) {
long startProcess = time.milliseconds();
totalNumBuffered += task.process();
requiresPoll = requiresPoll || task.requiresPoll();
sensors.processTimeSensor.record(time.milliseconds() - startProcess);
}
maybePunctuate();
maybeClean();
maybeCommit();
}
} catch (Exception e) {
throw new KafkaException(e);
}
}
private boolean stillRunning() {
if (!running.get()) {
log.debug("Shutting down at user request.");
return false;
}
if (totalRecordsToProcess >= 0 && recordsProcessed >= totalRecordsToProcess) {
log.debug("Shutting down as we've reached the user configured limit of {} records to process.", totalRecordsToProcess);
return false;
}
return true;
}
private void maybePunctuate() {
for (StreamTask task : tasks.values()) {
try {
long now = time.milliseconds();
if (task.maybePunctuate(now))
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
} catch (Exception e) {
log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
}
protected void maybeCommit() {
long now = time.milliseconds();
if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) {
log.trace("Committing processor instances because the commit interval has elapsed.");
commitAll();
lastCommit = now;
} else {
for (StreamTask task : tasks.values()) {
try {
if (task.commitNeeded())
commitOne(task, time.milliseconds());
} catch (Exception e) {
log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
}
}
/**
* Commit the states of all its tasks
*/
private void commitAll() {
for (StreamTask task : tasks.values()) {
try {
commitOne(task, time.milliseconds());
} catch (Exception e) {
log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
}
/**
* Commit the state of a task
*/
private void commitOne(StreamTask task, long now) {
try {
task.commit();
} catch (Exception e) {
log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
sensors.commitTimeSensor.record(time.milliseconds() - now);
}
/**
* Cleanup any states of the tasks that have been removed from this thread
*/
protected void maybeClean() {
long now = time.milliseconds();
if (now > lastClean + cleanTimeMs) {
File[] stateDirs = stateDir.listFiles();
if (stateDirs != null) {
for (File dir : stateDirs) {
try {
- int id = Integer.parseInt(dir.getName());
+ TaskId id = TaskId.parse(dir.getName());
// try to acquire the exclusive lock on the state directory
FileLock directoryLock = null;
try {
directoryLock = ProcessorStateManager.lockStateDirectory(dir);
if (directoryLock != null) {
log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
Utils.delete(dir);
}
} catch (IOException e) {
log.error("Failed to lock the state directory due to an unexpected exception", e);
} finally {
if (directoryLock != null) {
try {
directoryLock.release();
} catch (IOException e) {
log.error("Failed to release the state directory lock");
}
}
}
- } catch (NumberFormatException e) {
+ } catch (TaskId.TaskIdFormatException e) {
// there may be some unknown files that sits in the same directory,
// we should ignore these files instead trying to delete them as well
}
}
}
lastClean = now;
}
}
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
sensors.taskCreationSensor.record();
return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
}
private void addPartitions(Collection<TopicPartition> assignment) {
- HashMap<Integer, Set<TopicPartition>> partitionsForTask = new HashMap<>();
+ HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
for (TopicPartition partition : assignment) {
- Set<Integer> taskIds = partitionGrouper.taskIds(partition);
- for (Integer taskId : taskIds) {
+ Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
+ for (TaskId taskId : taskIds) {
Set<TopicPartition> partitions = partitionsForTask.get(taskId);
if (partitions == null) {
partitions = new HashSet<>();
partitionsForTask.put(taskId, partitions);
}
partitions.add(partition);
}
}
// create the tasks
- for (Integer taskId : partitionsForTask.keySet()) {
+ for (TaskId taskId : partitionsForTask.keySet()) {
try {
tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
} catch (Exception e) {
log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
lastClean = time.milliseconds();
}
private void removePartitions() {
// TODO: change this clearing tasks behavior
for (StreamTask task : tasks.values()) {
log.info("Removing task {}", task.id());
try {
task.close();
} catch (Exception e) {
log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
sensors.taskDestructionSensor.record();
}
tasks.clear();
}
public PartitionGrouper partitionGrouper() {
return partitionGrouper;
}
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {
for (Set<String> copartitionGroup : copartitionGroups) {
ensureCopartitioning(copartitionGroup);
}
}
private void ensureCopartitioning(Set<String> copartitionGroup) {
int numPartitions = -1;
for (String topic : copartitionGroup) {
List<PartitionInfo> infos = consumer.partitionsFor(topic);
if (infos == null)
throw new KafkaException("topic not found: " + topic);
if (numPartitions == -1) {
numPartitions = infos.size();
} else if (numPartitions != infos.size()) {
String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
}
}
}
private class StreamingMetricsImpl implements StreamingMetrics {
final Metrics metrics;
final String metricGrpName;
final Map<String, String> metricTags;
final Sensor commitTimeSensor;
final Sensor pollTimeSensor;
final Sensor processTimeSensor;
final Sensor punctuateTimeSensor;
final Sensor taskCreationSensor;
final Sensor taskDestructionSensor;
public StreamingMetricsImpl(Metrics metrics) {
this.metrics = metrics;
this.metricGrpName = "streaming-metrics";
this.metricTags = new LinkedHashMap<>();
this.metricTags.put("client-id", clientId + "-" + getName());
this.commitTimeSensor = metrics.sensor("commit-time");
this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
this.pollTimeSensor = metrics.sensor("poll-time");
this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
this.processTimeSensor = metrics.sensor("process-time");
this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
this.punctuateTimeSensor = metrics.sensor("punctuate-time");
this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
this.taskCreationSensor = metrics.sensor("task-creation");
this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
this.taskDestructionSensor = metrics.sensor("task-destruction");
this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
}
@Override
public void recordLatency(Sensor sensor, long startNs, long endNs) {
sensor.record((endNs - startNs) / 1000000, endNs);
}
@Override
public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
// extract the additional tags if there are any
Map<String, String> tagMap = new HashMap<>(this.metricTags);
if ((tags.length % 2) != 0)
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
for (int i = 0; i < tags.length; i += 2)
tagMap.put(tags[i], tags[i + 1]);
// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(operationName);
addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
// add the store operation metrics with additional tags
Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
return sensor;
}
private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
"The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
"The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
"The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
}
private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
if (!metrics.metrics().containsKey(name))
sensor.add(name, stat);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 779bc75d..a7f4c12e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -1,270 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
protected final Serdes<K, V> serialization;
private final Time time;
private final Sensor putTime;
private final Sensor getTime;
private final Sensor deleteTime;
private final Sensor putAllTime;
private final Sensor allTime;
private final Sensor rangeTime;
private final Sensor flushTime;
private final Sensor restoreTime;
private final StreamingMetrics metrics;
private final String topic;
private final int partition;
private final Set<K> dirty;
private final Set<K> removed;
private final int maxDirty;
private final int maxRemoved;
private final ProcessorContext context;
// always wrap the logged store with the metered store
public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context,
Serdes<K, V> serialization, String metricGrp, Time time) {
this.inner = inner;
this.serialization = serialization;
this.time = time;
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
this.topic = name;
- this.partition = context.id();
+ this.partition = context.id().partition;
this.context = context;
this.dirty = new HashSet<K>();
this.removed = new HashSet<K>();
this.maxDirty = 100; // TODO: this needs to be configurable
this.maxRemoved = 100; // TODO: this needs to be configurable
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
try {
final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
final Deserializer<V> valDeserializer = serialization.valueDeserializer();
context.register(this, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(topic, key),
valDeserializer.deserialize(topic, value));
}
});
} finally {
this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
}
}
@Override
public String name() {
return inner.name();
}
@Override
public boolean persistent() {
return inner.persistent();
}
@Override
public V get(K key) {
long startNs = time.nanoseconds();
try {
return this.inner.get(key);
} finally {
this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
}
}
@Override
public void put(K key, V value) {
long startNs = time.nanoseconds();
try {
this.inner.put(key, value);
this.dirty.add(key);
this.removed.remove(key);
maybeLogChange();
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
@Override
public void putAll(List<Entry<K, V>> entries) {
long startNs = time.nanoseconds();
try {
this.inner.putAll(entries);
for (Entry<K, V> entry : entries) {
K key = entry.key();
this.dirty.add(key);
this.removed.remove(key);
}
maybeLogChange();
} finally {
this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
}
}
@Override
public V delete(K key) {
long startNs = time.nanoseconds();
try {
V value = this.inner.delete(key);
this.dirty.remove(key);
this.removed.add(key);
maybeLogChange();
return value;
} finally {
this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
}
}
/**
* Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
* store other than {@link #delete(Object)}.
*
* @param key the key for the entry that the inner store removed
*/
protected void removed(K key) {
this.dirty.remove(key);
this.removed.add(key);
maybeLogChange();
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
}
@Override
public KeyValueIterator<K, V> all() {
return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
}
@Override
public void close() {
inner.close();
}
@Override
public void flush() {
long startNs = time.nanoseconds();
try {
this.inner.flush();
logChange();
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
}
private void maybeLogChange() {
if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
logChange();
}
private void logChange() {
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
if (collector != null) {
Serializer<K> keySerializer = serialization.keySerializer();
Serializer<V> valueSerializer = serialization.valueSerializer();
for (K k : this.removed) {
collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
}
for (K k : this.dirty) {
V v = this.inner.get(k);
collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
}
this.removed.clear();
this.dirty.clear();
}
}
private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
private final KeyValueIterator<K1, V1> iter;
private final Sensor sensor;
private final long startNs;
public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
this.iter = iter;
this.sensor = sensor;
this.startNs = time.nanoseconds();
}
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public Entry<K1, V1> next() {
return iter.next();
}
@Override
public void remove() {
iter.remove();
}
@Override
public void close() {
try {
iter.close();
} finally {
metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
}
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
index 7393bb10..1de345e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -1,284 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import java.io.File;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* A {@link KeyValueStore} that stores all entries in a local RocksDB database.
*
* @param <K> the type of keys
* @param <V> the type of values
*
* @see Stores#create(String, ProcessorContext)
*/
public class RocksDBKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
protected RocksDBKeyValueStore(String name, ProcessorContext context, Serdes<K, V> serdes, Time time) {
super(name, new RocksDBStore<K, V>(name, context, serdes), context, serdes, "rocksdb-state", time != null ? time : new SystemTime());
}
private static class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int TTL_NOT_USED = -1;
// TODO: these values should be configurable
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final int TTL_SECONDS = TTL_NOT_USED;
private static final int MAX_WRITE_BUFFERS = 3;
private static final String DB_FILE_DIR = "rocksdb";
private final Serdes<K, V> serdes;
private final String topic;
private final int partition;
private final ProcessorContext context;
private final Options options;
private final WriteOptions wOptions;
private final FlushOptions fOptions;
private final String dbName;
private final String dirName;
private RocksDB db;
public RocksDBStore(String name, ProcessorContext context, Serdes<K, V> serdes) {
this.topic = name;
- this.partition = context.id();
+ this.partition = context.id().partition;
this.context = context;
this.serdes = serdes;
// initialize the rocksdb options
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE);
options = new Options();
options.setTableFormatConfig(tableConfig);
options.setWriteBufferSize(WRITE_BUFFER_SIZE);
options.setCompressionType(COMPRESSION_TYPE);
options.setCompactionStyle(COMPACTION_STYLE);
options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
options.setCreateIfMissing(true);
options.setErrorIfExists(false);
wOptions = new WriteOptions();
wOptions.setDisableWAL(true);
fOptions = new FlushOptions();
fOptions.setWaitForFlush(true);
dbName = this.topic + "." + this.partition;
dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS);
}
private RocksDB openDB(File dir, Options options, int ttl) {
try {
if (ttl == TTL_NOT_USED) {
dir.getParentFile().mkdirs();
return RocksDB.open(options, dir.toString());
} else {
throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
}
}
@Override
public String name() {
return this.topic;
}
@Override
public boolean persistent() {
return false;
}
@Override
public V get(K key) {
try {
return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
}
}
@Override
public void put(K key, V value) {
try {
if (value == null) {
db.remove(wOptions, serdes.rawKey(key));
} else {
db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
}
}
@Override
public void putAll(List<Entry<K, V>> entries) {
for (Entry<K, V> entry : entries)
put(entry.key(), entry.value());
}
@Override
public V delete(K key) {
V value = get(key);
put(key, null);
return value;
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
}
@Override
public KeyValueIterator<K, V> all() {
RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
return new RocksDbIterator<K, V>(innerIter, serdes);
}
@Override
public void flush() {
try {
db.flush(fOptions);
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new KafkaException("Error while executing flush from store " + this.topic, e);
}
}
@Override
public void close() {
flush();
db.close();
}
private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
private final RocksIterator iter;
private final Serdes<K, V> serdes;
public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
this.iter = iter;
this.serdes = serdes;
}
protected byte[] peekRawKey() {
return iter.key();
}
protected Entry<K, V> getEntry() {
return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
}
@Override
public boolean hasNext() {
return iter.isValid();
}
@Override
public Entry<K, V> next() {
if (!hasNext())
throw new NoSuchElementException();
Entry<K, V> entry = this.getEntry();
iter.next();
return entry;
}
@Override
public void remove() {
throw new UnsupportedOperationException("RocksDB iterator does not support remove");
}
@Override
public void close() {
}
}
private static class LexicographicComparator implements Comparator<byte[]> {
@Override
public int compare(byte[] left, byte[] right) {
for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
int leftByte = left[i] & 0xff;
int rightByte = right[j] & 0xff;
if (leftByte != rightByte) {
return leftByte - rightByte;
}
}
return left.length - right.length;
}
}
private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = new LexicographicComparator();
byte[] rawToKey;
public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
K from, K to) {
super(iter, serdes);
iter.seek(serdes.rawKey(from));
this.rawToKey = serdes.rawKey(to);
}
@Override
public boolean hasNext() {
return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
}
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index 388955e4..d43fc531 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -1,76 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
public class DefaultPartitionGrouperTest {
private List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos);
@Test
public void testGrouping() {
PartitionGrouper grouper = new DefaultPartitionGrouper();
- int taskId;
- Map<Integer, List<TopicPartition>> expected;
+ int topicGroupId;
+ Map<TaskId, Set<TopicPartition>> expected;
+ Map<Integer, Set<String>> topicGroups;
- grouper.topicGroups(mkList(mkSet("topic1"), mkSet("topic2")));
+ topicGroups = new HashMap<>();
+ topicGroups.put(0, mkSet("topic1"));
+ topicGroups.put(1, mkSet("topic2"));
+ grouper.topicGroups(topicGroups);
expected = new HashMap<>();
- taskId = 0;
- expected.put(taskId++, mkList(new TopicPartition("topic1", 0)));
- expected.put(taskId++, mkList(new TopicPartition("topic1", 1)));
- expected.put(taskId++, mkList(new TopicPartition("topic1", 2)));
- expected.put(taskId++, mkList(new TopicPartition("topic2", 0)));
- expected.put(taskId, mkList(new TopicPartition("topic2", 1)));
+ topicGroupId = 0;
+ expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
+ expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
+ expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
+ topicGroupId++;
+ expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
+ expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
assertEquals(expected, grouper.partitionGroups(metadata));
- grouper.topicGroups(mkList(mkSet("topic1", "topic2")));
+ topicGroups = new HashMap<>();
+ topicGroups.put(0, mkSet("topic1", "topic2"));
+ grouper.topicGroups(topicGroups);
expected = new HashMap<>();
- taskId = 0;
- expected.put(taskId++, mkList(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
- expected.put(taskId++, mkList(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
- expected.put(taskId, mkList(new TopicPartition("topic1", 2)));
+ topicGroupId = 0;
+ expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
+ expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
+ expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
assertEquals(expected, grouper.partitionGroups(metadata));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 05d24d39..b77c253a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -1,138 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import static org.junit.Assert.assertEquals;
import static org.apache.kafka.common.utils.Utils.mkSet;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class TopologyBuilderTest {
@Test(expected = TopologyException.class)
public void testAddSourceWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic-1");
builder.addSource("source", "topic-2");
}
@Test(expected = TopologyException.class)
public void testAddSourceWithSameTopic() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic-1");
builder.addSource("source-2", "topic-1");
}
@Test(expected = TopologyException.class)
public void testAddProcessorWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic-1");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
}
@Test(expected = TopologyException.class)
public void testAddProcessorWithWrongParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
}
@Test(expected = TopologyException.class)
public void testAddProcessorWithSelfParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
}
@Test(expected = TopologyException.class)
public void testAddSinkWithSameName() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic-1");
builder.addSink("sink", "topic-2", "source");
builder.addSink("sink", "topic-3", "source");
}
@Test(expected = TopologyException.class)
public void testAddSinkWithWrongParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSink("sink", "topic-2", "source");
}
@Test(expected = TopologyException.class)
public void testAddSinkWithSelfParent() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSink("sink", "topic-2", "sink");
}
@Test
public void testSourceTopics() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source-1", "topic-1");
builder.addSource("source-2", "topic-2");
builder.addSource("source-3", "topic-3");
assertEquals(3, builder.sourceTopics().size());
}
@Test
public void testTopicGroups() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source-1", "topic-1", "topic-1x");
builder.addSource("source-2", "topic-2");
builder.addSource("source-3", "topic-3");
builder.addSource("source-4", "topic-4");
builder.addSource("source-5", "topic-5");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
builder.copartitionSources(list("source-1", "source-2"));
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
- Collection<Set<String>> topicGroups = builder.topicGroups();
+ Map<Integer, Set<String>> topicGroups = builder.topicGroups();
+
+ Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
+ expectedTopicGroups.put(0, set("topic-1", "topic-1x", "topic-2"));
+ expectedTopicGroups.put(1, set("topic-3", "topic-4"));
+ expectedTopicGroups.put(2, set("topic-5"));
assertEquals(3, topicGroups.size());
- assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("topic-3", "topic-4"), mkSet("topic-5")), new HashSet<>(topicGroups));
+ assertEquals(expectedTopicGroups, topicGroups);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
}
+ private <T> Set<T> set(T... items) {
+ Set<T> set = new HashSet<>();
+ for (T item : items) {
+ set.add(item);
+ }
+ return set;
+ }
+
private <T> List<T> list(T... elems) {
return Arrays.asList(elems);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 92b86846..0b828f71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1,206 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.MockSourceNode;
import org.junit.Test;
import org.junit.Before;
import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StreamTaskTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
private final TopicPartition partition1 = new TopicPartition("topic1", 1);
private final TopicPartition partition2 = new TopicPartition("topic2", 1);
private final HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(partition1, partition2));
private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final ProcessorTopology topology = new ProcessorTopology(
Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
new HashMap<String, SourceNode>() {
{
put("topic1", source1);
put("topic2", source2);
}
});
private StreamingConfig createConfig(final File baseDir) throws Exception {
return new StreamingConfig(new Properties() {
{
setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
}
});
}
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
@Before
public void setup() {
consumer.assign(Arrays.asList(partition1, partition2));
}
@SuppressWarnings("unchecked")
@Test
public void testProcessOrder() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+ StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
));
assertEquals(task.process(), 5);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 0);
assertEquals(task.process(), 4);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 3);
assertEquals(source1.numReceived, 2);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 2);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 1);
assertEquals(task.process(), 1);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 2);
assertEquals(task.process(), 0);
assertEquals(source1.numReceived, 3);
assertEquals(source2.numReceived, 3);
task.close();
} finally {
Utils.delete(baseDir);
}
}
@SuppressWarnings("unchecked")
@Test
public void testPauseResume() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
+ StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
));
assertEquals(task.process(), 5);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 0);
assertEquals(consumer.paused().size(), 1);
assertTrue(consumer.paused().contains(partition2));
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
));
assertEquals(consumer.paused().size(), 2);
assertTrue(consumer.paused().contains(partition1));
assertTrue(consumer.paused().contains(partition2));
assertEquals(task.process(), 7);
assertEquals(source1.numReceived, 1);
assertEquals(source2.numReceived, 1);
assertEquals(consumer.paused().size(), 1);
assertTrue(consumer.paused().contains(partition1));
assertEquals(task.process(), 6);
assertEquals(source1.numReceived, 2);
assertEquals(source2.numReceived, 1);
assertEquals(consumer.paused().size(), 0);
task.close();
} finally {
Utils.delete(baseDir);
}
}
private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cbb2558c..d5011a30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1,460 +1,461 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Test;
import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class StreamThreadTest {
private TopicPartition t1p1 = new TopicPartition("topic1", 1);
private TopicPartition t1p2 = new TopicPartition("topic1", 2);
private TopicPartition t2p1 = new TopicPartition("topic2", 1);
private TopicPartition t2p2 = new TopicPartition("topic2", 2);
private TopicPartition t3p1 = new TopicPartition("topic3", 1);
private TopicPartition t3p2 = new TopicPartition("topic3", 2);
private List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos);
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));
// task0 is unused
- private final int task1 = 1;
- private final int task2 = 2;
- // task3 is unused
- private final int task4 = 4;
- private final int task5 = 5;
+ private final TaskId task1 = new TaskId(0, 1);
+ private final TaskId task2 = new TaskId(0, 2);
+ private final TaskId task3 = new TaskId(0, 3);
+ private final TaskId task4 = new TaskId(1, 1);
+ private final TaskId task5 = new TaskId(1, 2);
private Properties configProps() {
return new Properties() {
{
setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
}
};
}
private static class TestStreamTask extends StreamTask {
public boolean committed = false;
- public TestStreamTask(int id,
+ public TestStreamTask(TaskId id,
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
StreamingConfig config) {
super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
}
@Override
public void commit() {
super.commit();
committed = true;
}
}
private ByteArraySerializer serializer = new ByteArraySerializer();
@SuppressWarnings("unchecked")
@Test
public void testPartitionAssignmentChange() throws Exception {
StreamingConfig config = new StreamingConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
@Override
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
}
};
initPartitionGrouper(thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
assertTrue(thread.tasks().isEmpty());
List<TopicPartition> revokedPartitions;
List<TopicPartition> assignedPartitions;
Set<TopicPartition> expectedGroup1;
Set<TopicPartition> expectedGroup2;
revokedPartitions = Collections.emptyList();
assignedPartitions = Collections.singletonList(t1p1);
expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task1));
assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
assertEquals(1, thread.tasks().size());
revokedPartitions = assignedPartitions;
assignedPartitions = Collections.singletonList(t1p2);
expectedGroup2 = new HashSet<>(Arrays.asList(t1p2));
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task2));
assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
assertEquals(1, thread.tasks().size());
revokedPartitions = assignedPartitions;
assignedPartitions = Arrays.asList(t1p1, t1p2);
expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task1));
assertTrue(thread.tasks().containsKey(task2));
assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
assertEquals(2, thread.tasks().size());
revokedPartitions = assignedPartitions;
assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2);
expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1));
expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2));
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task4));
assertTrue(thread.tasks().containsKey(task5));
assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
assertEquals(2, thread.tasks().size());
revokedPartitions = assignedPartitions;
assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1);
expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().containsKey(task1));
assertTrue(thread.tasks().containsKey(task4));
assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
assertEquals(2, thread.tasks().size());
revokedPartitions = assignedPartitions;
assignedPartitions = Collections.emptyList();
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertTrue(thread.tasks().isEmpty());
}
@Test
public void testMaybeClean() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final long cleanupDelay = 1000L;
Properties props = configProps();
props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
StreamingConfig config = new StreamingConfig(props);
- File stateDir1 = new File(baseDir, "1");
- File stateDir2 = new File(baseDir, "2");
- File stateDir3 = new File(baseDir, "3");
+ File stateDir1 = new File(baseDir, task1.toString());
+ File stateDir2 = new File(baseDir, task2.toString());
+ File stateDir3 = new File(baseDir, task3.toString());
File extraDir = new File(baseDir, "X");
stateDir1.mkdir();
stateDir2.mkdir();
stateDir3.mkdir();
extraDir.mkdir();
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
MockTime mockTime = new MockTime();
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
@Override
public void maybeClean() {
super.maybeClean();
}
@Override
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
}
};
initPartitionGrouper(thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
assertTrue(thread.tasks().isEmpty());
mockTime.sleep(cleanupDelay);
// all directories exist since an assignment didn't happen
assertTrue(stateDir1.exists());
assertTrue(stateDir2.exists());
assertTrue(stateDir3.exists());
assertTrue(extraDir.exists());
List<TopicPartition> revokedPartitions;
List<TopicPartition> assignedPartitions;
Map<Integer, StreamTask> prevTasks;
//
// Assign t1p1 and t1p2. This should create task1 & task2
//
revokedPartitions = Collections.emptyList();
assignedPartitions = Arrays.asList(t1p1, t1p2);
prevTasks = new HashMap(thread.tasks());
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
// there shouldn't be any previous task
assertTrue(prevTasks.isEmpty());
// task 1 & 2 are created
assertEquals(2, thread.tasks().size());
// all directories should still exit before the cleanup delay time
mockTime.sleep(cleanupDelay - 10L);
thread.maybeClean();
assertTrue(stateDir1.exists());
assertTrue(stateDir2.exists());
assertTrue(stateDir3.exists());
assertTrue(extraDir.exists());
// all state directories except for task task2 & task3 will be removed. the extra directory should still exists
mockTime.sleep(11L);
thread.maybeClean();
assertTrue(stateDir1.exists());
assertTrue(stateDir2.exists());
assertFalse(stateDir3.exists());
assertTrue(extraDir.exists());
//
// Revoke t1p1 and t1p2. This should remove task1 & task2
//
revokedPartitions = assignedPartitions;
assignedPartitions = Collections.emptyList();
prevTasks = new HashMap(thread.tasks());
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
// previous tasks should be committed
assertEquals(2, prevTasks.size());
for (StreamTask task : prevTasks.values()) {
assertTrue(((TestStreamTask) task).committed);
((TestStreamTask) task).committed = false;
}
// no task
assertTrue(thread.tasks().isEmpty());
// all state directories for task task1 & task2 still exist before the cleanup delay time
mockTime.sleep(cleanupDelay - 10L);
thread.maybeClean();
assertTrue(stateDir1.exists());
assertTrue(stateDir2.exists());
assertFalse(stateDir3.exists());
assertTrue(extraDir.exists());
// all state directories for task task1 & task2 are removed
mockTime.sleep(11L);
thread.maybeClean();
assertFalse(stateDir1.exists());
assertFalse(stateDir2.exists());
assertFalse(stateDir3.exists());
assertTrue(extraDir.exists());
} finally {
Utils.delete(baseDir);
}
}
@Test
public void testMaybeCommit() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
final long commitInterval = 1000L;
Properties props = configProps();
props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
StreamingConfig config = new StreamingConfig(props);
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
MockTime mockTime = new MockTime();
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
@Override
public void maybeCommit() {
super.maybeCommit();
}
@Override
- protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
}
};
initPartitionGrouper(thread);
ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
List<TopicPartition> revokedPartitions;
List<TopicPartition> assignedPartitions;
//
// Assign t1p1 and t1p2. This should create Task 1 & 2
//
revokedPartitions = Collections.emptyList();
assignedPartitions = Arrays.asList(t1p1, t1p2);
rebalanceListener.onPartitionsRevoked(revokedPartitions);
rebalanceListener.onPartitionsAssigned(assignedPartitions);
assertEquals(2, thread.tasks().size());
// no task is committed before the commit interval
mockTime.sleep(commitInterval - 10L);
thread.maybeCommit();
for (StreamTask task : thread.tasks().values()) {
assertFalse(((TestStreamTask) task).committed);
}
// all tasks are committed after the commit interval
mockTime.sleep(11L);
thread.maybeCommit();
for (StreamTask task : thread.tasks().values()) {
assertTrue(((TestStreamTask) task).committed);
((TestStreamTask) task).committed = false;
}
// no task is committed before the commit interval, again
mockTime.sleep(commitInterval - 10L);
thread.maybeCommit();
for (StreamTask task : thread.tasks().values()) {
assertFalse(((TestStreamTask) task).committed);
}
// all tasks are committed after the commit interval, again
mockTime.sleep(11L);
thread.maybeCommit();
for (StreamTask task : thread.tasks().values()) {
assertTrue(((TestStreamTask) task).committed);
((TestStreamTask) task).committed = false;
}
} finally {
Utils.delete(baseDir);
}
}
private void initPartitionGrouper(StreamThread thread) {
PartitionGrouper partitionGrouper = thread.partitionGrouper();
KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
partitionAssignor.configure(
Collections.singletonMap(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper)
);
Map<String, PartitionAssignor.Assignment> assignments =
partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));
partitionAssignor.onAssignment(assignments.get("client"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 4dfa9c21..7e1512af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -1,441 +1,442 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.test.MockProcessorContext;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
* all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
* This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
* {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
* <p>
* <h2>Basic usage</h2>
* This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
*
* <pre>
* // Create the test driver ...
* KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
* KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
* .withIntegerKeys().withStringKeys()
* .inMemory().build();
*
* // Verify that the store reads and writes correctly ...
* store.put(0, "zero");
* store.put(1, "one");
* store.put(2, "two");
* store.put(4, "four");
* store.put(5, "five");
* assertEquals(5, driver.sizeOf(store));
* assertEquals("zero", store.get(0));
* assertEquals("one", store.get(1));
* assertEquals("two", store.get(2));
* assertEquals("four", store.get(4));
* assertEquals("five", store.get(5));
* assertNull(store.get(3));
* store.delete(5);
*
* // Flush the store and verify all current entries were properly flushed ...
* store.flush();
* assertEquals("zero", driver.flushedEntryStored(0));
* assertEquals("one", driver.flushedEntryStored(1));
* assertEquals("two", driver.flushedEntryStored(2));
* assertEquals("four", driver.flushedEntryStored(4));
* assertEquals(null, driver.flushedEntryStored(5));
*
* assertEquals(false, driver.flushedEntryRemoved(0));
* assertEquals(false, driver.flushedEntryRemoved(1));
* assertEquals(false, driver.flushedEntryRemoved(2));
* assertEquals(false, driver.flushedEntryRemoved(4));
* assertEquals(true, driver.flushedEntryRemoved(5));
* </pre>
*
* <p>
* <h2>Restoring a store</h2>
* This component can be used to test whether a {@link KeyValueStore} implementation properly
* {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
* the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
* <p>
* To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be
* passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store
* using this driver's {@link #context() ProcessorContext}:
*
* <pre>
* // Create the test driver ...
* KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
*
* // Add any entries that will be restored to any store that uses the driver's context ...
* driver.addRestoreEntry(0, "zero");
* driver.addRestoreEntry(1, "one");
* driver.addRestoreEntry(2, "two");
* driver.addRestoreEntry(4, "four");
*
* // Create the store, which should register with the context and automatically
* // receive the restore entries ...
* KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
* .withIntegerKeys().withStringKeys()
* .inMemory().build();
*
* // Verify that the store's contents were properly restored ...
* assertEquals(0, driver.checkForRestoredEntries(store));
*
* // and there are no other entries ...
* assertEquals(4, driver.sizeOf(store));
* </pre>
*
* @param <K> the type of keys placed in the store
* @param <V> the type of values placed in the store
*/
public class KeyValueStoreTestDriver<K, V> {
private static <T> Serializer<T> unusableSerializer() {
return new Serializer<T>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T data) {
throw new UnsupportedOperationException("This serializer should not be used");
}
@Override
public void close() {
}
};
};
private static <T> Deserializer<T> unusableDeserializer() {
return new Deserializer<T>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public T deserialize(String topic, byte[] data) {
throw new UnsupportedOperationException("This deserializer should not be used");
}
@Override
public void close() {
}
};
};
/**
* Create a driver object that will have a {@link #context()} that records messages
* {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides <em>unusable</em> default key and
* value serializers and deserializers. This can be used when the actual serializers and deserializers are supplied to the
* store during creation, which should eliminate the need for a store to depend on the ProcessorContext's default key and
* value serializers and deserializers.
*
* @return the test driver; never null
*/
public static <K, V> KeyValueStoreTestDriver<K, V> create() {
Serializer<K> keySerializer = unusableSerializer();
Deserializer<K> keyDeserializer = unusableDeserializer();
Serializer<V> valueSerializer = unusableSerializer();
Deserializer<V> valueDeserializer = unusableDeserializer();
Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
return new KeyValueStoreTestDriver<K, V>(serdes);
}
/**
* Create a driver object that will have a {@link #context()} that records messages
* {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
* deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class},
* {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the
* ProcessorContext's default key and value serializers and deserializers.
*
* @param keyClass the class for the keys; must be one of {@code String.class}, {@code Integer.class},
* {@code Long.class}, or {@code byte[].class}
* @param valueClass the class for the values; must be one of {@code String.class}, {@code Integer.class},
* {@code Long.class}, or {@code byte[].class}
* @return the test driver; never null
*/
public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
Serdes<K, V> serdes = Serdes.withBuiltinTypes("unexpected", keyClass, valueClass);
return new KeyValueStoreTestDriver<K, V>(serdes);
}
/**
* Create a driver object that will have a {@link #context()} that records messages
* {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and
* deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers
* and deserializers.
*
* @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null
* @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null
* @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null
* @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
* @return the test driver; never null
*/
public static <K, V> KeyValueStoreTestDriver<K, V> create(Serializer<K> keySerializer,
Deserializer<K> keyDeserializer,
Serializer<V> valueSerializer,
Deserializer<V> valueDeserializer) {
Serdes<K, V> serdes = new Serdes<K, V>("unexpected", keySerializer, keyDeserializer, valueSerializer, valueDeserializer);
return new KeyValueStoreTestDriver<K, V>(serdes);
}
private final Serdes<K, V> serdes;
private final Map<K, V> flushedEntries = new HashMap<>();
private final Set<K> flushedRemovals = new HashSet<>();
private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
private final MockProcessorContext context;
private final Map<String, StateStore> storeMap = new HashMap<>();
private final StreamingMetrics metrics = new StreamingMetrics() {
@Override
public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
return null;
}
@Override
public void recordLatency(Sensor sensor, long startNs, long endNs) {
}
};
private final RecordCollector recordCollector;
private File stateDir = new File("build/data").getAbsoluteFile();
protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
this.serdes = serdes;
ByteArraySerializer rawSerializer = new ByteArraySerializer();
Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, rawSerializer, rawSerializer);
this.recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
recordFlushed(record.key(), record.value());
}
};
this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
serdes.valueDeserializer(), recordCollector) {
@Override
- public int id() {
- return 1;
+ public TaskId id() {
+ return new TaskId(0, 1);
}
@Override
public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
forward(key, value);
}
@Override
public void register(StateStore store, StateRestoreCallback func) {
storeMap.put(store.name(), store);
restoreEntries(func);
}
@Override
public StateStore getStateStore(String name) {
return storeMap.get(name);
}
@Override
public StreamingMetrics metrics() {
return metrics;
}
@Override
public File stateDir() {
if (stateDir == null) {
stateDir = StateUtils.tempDir();
}
stateDir.mkdirs();
return stateDir;
}
};
}
/**
* Set the directory that should be used by the store for local disk storage.
*
* @param dir the directory; may be null if no local storage is allowed
*/
public void useStateDir(File dir) {
this.stateDir = dir;
}
@SuppressWarnings("unchecked")
protected <K1, V1> void recordFlushed(K1 key, V1 value) {
K k = (K) key;
if (value == null) {
// This is a removal ...
flushedRemovals.add(k);
flushedEntries.remove(k);
} else {
// This is a normal add
flushedEntries.put(k, (V) value);
flushedRemovals.remove(k);
}
}
private void restoreEntries(StateRestoreCallback func) {
for (Entry<K, V> entry : restorableEntries) {
if (entry != null) {
byte[] rawKey = serdes.rawKey(entry.key());
byte[] rawValue = serdes.rawValue(entry.value());
func.restore(rawKey, rawValue);
}
}
}
/**
* This method adds an entry to the "restore log" for the {@link KeyValueStore}, and is used <em>only</em> when testing the
* restore functionality of a {@link KeyValueStore} implementation.
* <p>
* To create such a test, create the test driver, call this method one or more times, and then create the
* {@link KeyValueStore}. Your tests can then check whether the store contains the entries from the log.
*
* <pre>
* // Set up the driver and pre-populate the log ...
* KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
* driver.addRestoreEntry(1,"value1");
* driver.addRestoreEntry(2,"value2");
* driver.addRestoreEntry(3,"value3");
*
* // Create the store using the driver's context ...
* ProcessorContext context = driver.context();
* KeyValueStore&lt;Integer, String> store = ...
*
* // Verify that the store's contents were properly restored from the log ...
* assertEquals(0, driver.checkForRestoredEntries(store));
*
* // and there are no other entries ...
* assertEquals(3, driver.sizeOf(store));
* </pre>
*
* @param key the key for the entry
* @param value the value for the entry
* @see #checkForRestoredEntries(KeyValueStore)
*/
public void addEntryToRestoreLog(K key, V value) {
restorableEntries.add(new Entry<K, V>(key, value));
}
/**
* Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
* written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
* {@link #flushedEntryRemoved(Object)} methods.
* <p>
* If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object)
* add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method.
*
* @return the processing context; never null
* @see #addEntryToRestoreLog(Object, Object)
*/
public ProcessorContext context() {
return context;
}
/**
* Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context()
* ProcessorContext}.
*
* @return the restore entries; never null but possibly a null iterator
*/
public Iterable<Entry<K, V>> restoredEntries() {
return restorableEntries;
}
/**
* Utility method that will count the number of {@link #addEntryToRestoreLog(Object, Object) restore entries} missing from the
* supplied store.
*
* @param store the store that is to have all of the {@link #restoredEntries() restore entries}
* @return the number of restore entries missing from the store, or 0 if all restore entries were found
* @see #addEntryToRestoreLog(Object, Object)
*/
public int checkForRestoredEntries(KeyValueStore<K, V> store) {
int missing = 0;
for (Entry<K, V> entry : restorableEntries) {
if (entry != null) {
V value = store.get(entry.key());
if (!Objects.equals(value, entry.value())) {
++missing;
}
}
}
return missing;
}
/**
* Utility method to compute the number of entries within the store.
*
* @param store the key value store using this {@link #context()}.
* @return the number of entries
*/
public int sizeOf(KeyValueStore<K, V> store) {
int size = 0;
for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
iterator.next();
++size;
}
return size;
}
/**
* Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key.
*
* @param key the key
* @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this
* key was {@link #flushedEntryStored(Object) removed} upon flush
*/
public V flushedEntryStored(K key) {
return flushedEntries.get(key);
}
/**
* Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key.
*
* @param key the key
* @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not
* removed when last flushed
*/
public boolean flushedEntryRemoved(K key) {
return flushedRemovals.contains(key);
}
/**
* Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals},
*/
public void clear() {
restorableEntries.clear();
flushedEntries.clear();
flushedRemovals.clear();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 16df9c55..40f11a0a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -1,172 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
private final KStreamTestDriver driver;
private final Serializer keySerializer;
private final Serializer valueSerializer;
private final Deserializer keyDeserializer;
private final Deserializer valueDeserializer;
private final RecordCollector.Supplier recordCollectorSupplier;
private Map<String, StateStore> storeMap = new HashMap<>();
long timestamp = -1L;
public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer, Deserializer<?> deserializer) {
this(driver, serializer, deserializer, serializer, deserializer, (RecordCollector.Supplier) null);
}
public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
final RecordCollector collector) {
this(driver, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
collector == null ? null : new RecordCollector.Supplier() {
@Override
public RecordCollector recordCollector() {
return collector;
}
});
}
public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
RecordCollector.Supplier collectorSupplier) {
this.driver = driver;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.recordCollectorSupplier = collectorSupplier;
}
@Override
public RecordCollector recordCollector() {
if (recordCollectorSupplier == null) {
throw new UnsupportedOperationException("No RecordCollector specified");
}
return recordCollectorSupplier.recordCollector();
}
public void setTime(long timestamp) {
this.timestamp = timestamp;
}
@Override
- public int id() {
- return 0;
+ public TaskId id() {
+ return new TaskId(0, 0);
}
@Override
public Serializer<?> keySerializer() {
return keySerializer;
}
@Override
public Serializer<?> valueSerializer() {
return valueSerializer;
}
@Override
public Deserializer<?> keyDeserializer() {
return keyDeserializer;
}
@Override
public Deserializer<?> valueDeserializer() {
return valueDeserializer;
}
@Override
public File stateDir() {
throw new UnsupportedOperationException("stateDir() not supported.");
}
@Override
public StreamingMetrics metrics() {
throw new UnsupportedOperationException("metrics() not supported.");
}
@Override
public void register(StateStore store, StateRestoreCallback func) {
if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
storeMap.put(store.name(), store);
}
@Override
public StateStore getStateStore(String name) {
return storeMap.get(name);
}
@Override
public void schedule(long interval) {
throw new UnsupportedOperationException("schedule() not supported");
}
@Override
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value) {
driver.forward(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value, int childIndex) {
driver.forward(key, value, childIndex);
}
@Override
public void commit() {
throw new UnsupportedOperationException("commit() not supported.");
}
@Override
public String topic() {
throw new UnsupportedOperationException("topic() not supported.");
}
@Override
public int partition() {
throw new UnsupportedOperationException("partition() not supported.");
}
@Override
public long offset() {
throw new UnsupportedOperationException("offset() not supported.");
}
@Override
public long timestamp() {
return this.timestamp;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 8eb2c624..0c4b1a26 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -1,336 +1,337 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class makes it easier to write tests to verify the behavior of topologies created with a {@link TopologyBuilder}.
* You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors,
* and sinks. And because it starts with a {@link TopologyBuilder}, you can create topologies specific to your tests or you
* can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real
* Kafka broker, so the tests execute very quickly with very little overhead.
* <p>
* Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamingConfig} and a
* TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any
* messages output by the topology.
* <p>
* Although the driver doesn't use a real Kafka broker, it does simulate Kafka {@link org.apache.kafka.clients.consumer.Consumer}s
* and {@link org.apache.kafka.clients.producer.Producer}s that read and write raw {@code byte[]} messages. You can either deal
* with messages that have {@code byte[]} keys and values, or you can supply the {@link Serializer}s and {@link Deserializer}s
* that the driver can use to convert the keys and values into objects.
- *
+ *
* <h2>Driver setup</h2>
* <p>
* In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The
* configuration needs to be representative of what you'd supply to the real topology, so that means including several key
* properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
* (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
- *
+ *
* <pre>
* StringSerializer strSerializer = new StringSerializer();
* StringDeserializer strDeserializer = new StringDeserializer();
* Properties props = new Properties();
* props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
* props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
* props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
* props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
* props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
* props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
* StreamingConfig config = new StreamingConfig(props);
* TopologyBuilder builder = ...
* ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
* </pre>
- *
+ *
* <h2>Processing messages</h2>
* <p>
* Your test can supply new input records on any of the topics that the topology's sources consume. Here's an example of an
* input message on the topic named {@code input-topic}:
- *
+ *
* <pre>
* driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
* </pre>
- *
+ *
* Immediately, the driver will pass the input message through to the appropriate source that consumes the named topic,
* and will invoke the processor(s) downstream of the source. If your topology's processors forward messages to sinks,
* your test can then consume these output messages to verify they match the expected outcome. For example, if our topology
* should have generated 2 messages on {@code output-topic-1} and 1 message on {@code output-topic-2}, then our test can
* obtain these messages using the {@link #readOutput(String, Deserializer, Deserializer)} method:
- *
+ *
* <pre>
* ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
* ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
* ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
* </pre>
- *
+ *
* Again, our example topology generates messages with string keys and values, so we supply our string deserializer instance
* for use on both the keys and values. Your test logic can then verify whether these output records are correct.
* <p>
* Finally, when completed, make sure your tests {@link #close()} the driver to release all resources and
* {@link org.apache.kafka.streams.processor.Processor}s.
- *
+ *
* <h2>Processor state</h2>
* <p>
* Some processors use Kafka {@link StateStore state storage}, so this driver class provides the {@link #getStateStore(String)}
* and {@link #getKeyValueStore(String)} methods so that your tests can check the underlying state store(s) used by your
* topology's processors. In our previous example, after we supplied a single input message and checked the three output messages,
* our test could also check the key value store to verify the processor correctly added, removed, or updated internal state.
* Or, our test might have pre-populated some state <em>before</em> submitting the input message, and verified afterward that the
* processor(s) correctly updated the state.
*/
public class ProcessorTopologyTestDriver {
private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
- private final int id;
+ private final TaskId id;
private final ProcessorTopology topology;
private final StreamTask task;
private final MockConsumer<byte[], byte[]> consumer;
private final MockProducer<byte[], byte[]> producer;
private final MockConsumer<byte[], byte[]> restoreStateConsumer;
private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
/**
* Create a new test driver instance.
* @param config the streaming configuration for the topology
* @param builder the topology builder that will be used to create the topology instance
* @param storeNames the optional names of the state stores that are used by the topology
*/
public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
- id = 0;
+ id = new TaskId(0, 0);
topology = builder.build();
// Set up the consumer and producer ...
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
producer = new MockProducer<>(true, bytesSerializer, bytesSerializer);
restoreStateConsumer = createRestoreConsumer(id, storeNames);
// Set up all of the topic+partition information and subscribe the consumer to each ...
for (String topic : topology.sourceTopics()) {
TopicPartition tp = new TopicPartition(topic, 1);
consumer.assign(Collections.singletonList(tp));
partitionsByTopic.put(topic, tp);
offsetsByTopicPartition.put(tp, new AtomicLong());
}
task = new StreamTask(id,
consumer,
producer,
restoreStateConsumer,
partitionsByTopic.values(),
topology,
config,
new StreamingMetrics() {
@Override
public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
return null;
}
@Override
public void recordLatency(Sensor sensor, long startNs, long endNs) {
// do nothing
}
});
}
/**
* Send an input message with the given key and value on the specified topic to the topology, and then commit the messages.
- *
+ *
* @param topicName the name of the topic on which the message is to be sent
* @param key the raw message key
* @param value the raw message value
*/
public void process(String topicName, byte[] key, byte[] value) {
TopicPartition tp = partitionsByTopic.get(topicName);
if (tp == null) {
throw new IllegalArgumentException("Unexpected topic: " + topicName);
}
// Add the record ...
long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, key, value)));
producer.clear();
// Process the record ...
task.process();
task.commit();
// Capture all the records sent to the producer ...
for (ProducerRecord<byte[], byte[]> record : producer.history()) {
Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
if (outputRecords == null) {
outputRecords = new LinkedList<>();
outputRecordsByTopic.put(record.topic(), outputRecords);
}
outputRecords.add(record);
}
}
/**
* Send an input message with the given key and value on the specified topic to the topology.
- *
+ *
* @param topicName the name of the topic on which the message is to be sent
* @param key the raw message key
* @param value the raw message value
* @param keySerializer the serializer for the key
* @param valueSerializer the serializer for the value
*/
public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value));
}
/**
* Read the next record from the given topic. These records were output by the topology during the previous calls to
* {@link #process(String, byte[], byte[])}.
- *
+ *
* @param topic the name of the topic
* @return the next record on that topic, or null if there is no record available
*/
public ProducerRecord<byte[], byte[]> readOutput(String topic) {
Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
if (outputRecords == null) return null;
return outputRecords.poll();
}
/**
* Read the next record from the given topic. These records were output by the topology during the previous calls to
* {@link #process(String, byte[], byte[])}.
- *
+ *
* @param topic the name of the topic
* @param keyDeserializer the deserializer for the key type
* @param valueDeserializer the deserializer for the value type
* @return the next record on that topic, or null if there is no record available
*/
public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
ProducerRecord<byte[], byte[]> record = readOutput(topic);
if (record == null) return null;
K key = keyDeserializer.deserialize(record.topic(), record.key());
V value = valueDeserializer.deserialize(record.topic(), record.value());
return new ProducerRecord<K, V>(record.topic(), record.partition(), key, value);
}
private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) {
return Collections.singleton(record);
}
/**
* Get the {@link StateStore} with the given name. The name should have been supplied via
* {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
* presumed to be used by a Processor within the topology.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
- *
+ *
* @param name the name of the store
* @return the state store, or null if no store has been registered with the given name
* @see #getKeyValueStore(String)
*/
public StateStore getStateStore(String name) {
return task.context().getStateStore(name);
}
/**
* Get the {@link KeyValueStore} with the given name. The name should have been supplied via
* {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
* presumed to be used by a Processor within the topology.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #process(String, byte[], byte[]) process an input message}, and/or to check the store afterward.
* <p>
- *
+ *
* @param name the name of the store
* @return the key value store, or null if no {@link KeyValueStore} has been registered with the given name
* @see #getStateStore(String)
*/
@SuppressWarnings("unchecked")
public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) {
StateStore store = getStateStore(name);
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) getStateStore(name) : null;
}
/**
* Close the driver, its topology, and all processors.
*/
public void close() {
task.close();
}
/**
* Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this
* driver object unless this method is overwritten with a functional consumer.
- *
+ *
* @param id the ID of the stream task
* @param storeNames the names of the stores that this
* @return the mock consumer; never null
*/
- protected MockConsumer<byte[], byte[]> createRestoreConsumer(int id, String... storeNames) {
+ protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String... storeNames) {
MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
@Override
public synchronized void seekToEnd(TopicPartition... partitions) {
// do nothing ...
}
@Override
public synchronized void seekToBeginning(TopicPartition... partitions) {
// do nothing ...
}
@Override
public synchronized long position(TopicPartition partition) {
// do nothing ...
return 0L;
}
};
// For each store name ...
for (String storeName : storeNames) {
String topicName = storeName;
// Set up the restore-state topic ...
// consumer.subscribe(new TopicPartition(topicName, 1));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
List<PartitionInfo> partitionInfos = new ArrayList<>();
- partitionInfos.add(new PartitionInfo(topicName, id, null, null, null));
+ partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
consumer.updatePartitions(topicName, partitionInfos);
- consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id), 0L));
+ consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
}
return consumer;
}
-}
\ No newline at end of file
+}

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:21 PM (5 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452574

Event Timeline