序
本文主要研究一下kafka的consumer.timeout.ms属性。
consumer的属性值
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scala
/** a string that uniquely identifies a set of consumers within the same consumer group */
val groupId = props.getString("group.id")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(props.getString("consumer.id", null))
/** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
/** the socket receive buffer for network requests */
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
/** the number threads used to fetch data */
val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/
val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)
/** max number of retries during rebalance */
val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
/** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
/** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */
val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
/** backoff time to refresh the leader of a partition after it loses the current leader */
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
/** backoff time to reconnect the offsets channel or to retry offset fetches/commits */
val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs)
/** socket timeout to use when reading responses for Offset Fetch/Commit requests. This timeout will also be used for
* the ConsumerMetdata requests that are used to query for the offset coordinator. */
val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs)
/** Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during
* shut-down. It does not apply to commits from the auto-commit thread. It also does not apply to attempts to query
* for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason,
* it is retried and that retry does not count toward this limit. */
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
/** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
/** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
* is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
* given consumer group, it is safe to turn this off after all instances within that group have been migrated to
* the new jar that commits offsets to the broker (instead of directly to ZooKeeper). */
val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
/**
* Client id is specified by the kafka consumer client, used to distinguish different clients
*/
val clientId = props.getString("client.id", groupId)
/** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
/** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
属性默认值
val RefreshMetadataBackoffMs = 200
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
val NumConsumerFetchers = 1
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 60 * 1000
val MaxQueuedChunks = 2
val MaxRebalanceRetries = 4
val AutoOffsetReset = OffsetRequest.LargestTimeString
val ConsumerTimeoutMs = -1
val MinFetchBytes = 1
val MaxFetchWaitMs = 100
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
val OffsetsChannelBackoffMs = 1000
val OffsetsChannelSocketTimeoutMs = 10000
val OffsetsCommitMaxRetries = 5
val OffsetsStorage = "zookeeper"
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val ExcludeInternalTopics = true
val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
其中consumerTimeoutMs的解释为throw a timeout exception to the consumer if no message is available for consumption after the specified interval,其默认值为-1
-1的话,表示如果没有消息,一直阻塞等待,这里的等待是ConsumerIterator里头的hasNext方法,而不是next方法
IteratorTemplate.hasNext
def hasNext(): Boolean = {
if(state == FAILED)
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
protected def makeNext(): T
def maybeComputeNext(): Boolean = {
state = FAILED
nextItem = makeNext()
if(state == DONE) {
false
} else {
state = READY
true
}
}
这里委托给了子类的makeNext方法
ConsumerIterator.makeNext
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerIterator.scala
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
debug("Received the shutdown command")
return allDone
} else {
currentTopicInfo = currentDataChunk.topicInfo
val cdcFetchOffset = currentDataChunk.fetchOffset
val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
if (ctiConsumeOffset < cdcFetchOffset) {
error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
.format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
}
localCurrent = currentDataChunk.messages.iterator
current.set(localCurrent)
}
// if we just updated the current chunk and it is empty that means the fetch size is too small!
if(currentDataChunk.messages.validBytes == 0)
throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
"%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
.format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
}
var item = localCurrent.next()
// reject the messages that have already been consumed
while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
item = localCurrent.next()
}
consumedOffset = item.nextOffset
item.message.ensureValid() // validate checksum of message to ensure it is valid
new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
}
这个方法就是提前准备好nextItem
当consumerTimeoutMs小于0的时候,调用的是channel.take,大于0的时候调用的是channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
而channel是BlockingQueue[FetchedDataChunk]
当取不到nextItem的时候,抛出ConsumerTimeoutException
ConsumerIterator.next
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
next方法首先调用的父类的next方法
kafka_2.10-0.8.2.2-sources.jar!/kafka/utils/IteratorTemplate.scala
def next(): T = {
if(!hasNext())
throw new NoSuchElementException()
state = NOT_READY
if(nextItem == null)
throw new IllegalStateException("Expected item but none found.")
nextItem
}
而next方法首先调用的hasNext方法,也就是提前准备下一个元素。
所以不管怎样,阻塞是在hasNext方法里头
验证
Properties props = new Properties();
props.put("zookeeper.connect", zk);
// props.put("auto.offset.reset","smallest");
props.put("group.id",group);
props.put("zookeeper.session.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.interval.ms", "10000");
props.put("consumer.timeout.ms","10000"); //设置ConsumerIterator的hasNext的超时时间,不设置则永远阻塞直到有新消息来
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, consumerCount);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
consumerMap.get(topic).stream().forEach(stream -> {
pool.submit(new Runnable() {
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//it.hasNext()取决于consumer.timeout.ms的值,默认为-1
try{
while (it.hasNext()) {
System.out.println(Thread.currentThread().getName()+" hello");
//是hasNext抛出异常,而不是next抛出
System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
}
}catch (ConsumerTimeoutException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" end");
}
});
});
- 异常
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
pool-2-thread-3 end
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
pool-2-thread-2 end
pool-2-thread-4 end
pool-2-thread-1 end
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这里只复现了直接调用hasNext抛出的ConsumerTimeoutException,可以理解为hasNext这里提前准备了nextItem,然后只要hasNext返回true,则next方法一般是有值的。