0.8.2.1版本KafkaProducer消息发送超时原因分析
问题描述
我们在测试环境收到用户反馈发送kafka消息出现ReadTimeout异常。对此我们进行了简单的分析
用户的请求日志
"request-start@2018-08-22 10:30:45.399" "192.168.5.116 (172.16.74.67) -> 172.16.52.72:80" "POST //kafka/publish HTTP/1.1" 603 process-req-time:60003 "response-end@2018-08-22 10:31:45.402" status:200 97 http-nio-8080-exec-9 "-" "Java/1.8.0_144" traceId:- topic:payment.abc.msg connection-status:???X???
我们发现多条请求超时日志,出现大约60s左右的耗时,检查这些topic的配置都是正常的(topic存在,leader节点存在) .
继续查看kafka服务端日志,有部分ERROR日志,大致意思是有不合法的topic请求存在
[2018-08-22 11:00:00,174] ERROR [KafkaApi-2] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 6226810; ClientId: producer-30; Topics: fincx.xxx.status (wpk-reactor-web),payment.abc.msg (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:748)
问题分析
这个问题困惑的地方在于发送一个配置正常的topic竟然发送超时,日志中含有其他topic的异常错误信息,不同topic之间的消息发送竟然相互干扰?
为什么会因为有一个异常topic存在,导致其他原本正常的topic都无法发送消息,这是个比较严重的问题了。因为KafkaProducer是线程安全,一个应用中往往使用一个KafkaProducer发送多个topic消息。
重现问题
重启服务,新初始化的KafkaProducer发送正常topic的消息发送成功,没有超时,尝试发送topic命名不合法的topic,发送超时,kafka抛出异常kafka.common.InvalidTopicException。此后再发送正常topic的消息均发送失败。
证明KafkaProducer会因为发送一个异常topic消息导致完全不可用。
代码分析
0.8.2.1的KafkaProducer源码分析
根据异常日志我们比较容易分析出异常原因是KafkaProducer发送消息前需要拉取broker端的TopicMetadata;如果发现topic不存在,会调用AdminUtils.createTopic()方法创建,此时topic命名不规范,抛出异常。接下来我们看下KafkaProducer是如何处理这一异常的。
分析KafkaProducer.waitOnMetadata方法:
private void waitOnMetadata(String topic, long maxWaitMs) {
if (metadata.fetch().partitionsForTopic(topic) != null) {
return;
} else {
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
metadata.add(topic);
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
}
上述方法maxWaitMs参数取自生产者配置参数metadata.fetch.timeout.ms,官方默认值为60000ms,和前面我们超时日志请求时间60s是完全对的上的。
第一步:metadata.fetch().partitionsForTopic(topic)检查topic的meta信息是否已经有缓存,有的化就可以直接用了
第二步:获取最新TopicMetadata数据,此处while循环,强制在metadata.fetch.timeout.ms时间内完成TopicMetadata的更新操作,metadata.requestUpdate设置当前TopicMetadata需要强制更新
调用NetworkClient.metadataRequest发送消息
private ClientRequest metadataRequest(long now, int node, Set<String> topics) {
MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
return new ClientRequest(now, true, send, null);
}
NetworkClient.metadataRequest的调用方NetworkClient.maybeUpdateMetadata方法片段
Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
我们看到MetadataRequest的请求是包含了整个KafkaProducer缓存的所有topic列表数据,并不是只对当前发送的topic进行元数据更新。
至此我们大致了解了KafkaProducer发送超时的前因后果:因为KafkaProducer缓存的topic列表中有一个非法的topic,导致每次批量更新元数据时,都包含了非法的topic,每次元数据更新操作都失败。这是KafkaProducer异常处理缺陷导致。不过该BUG只有在auto.create.topics.enable=true才触发,如果为false,就不会创建topic,不会抛出异常。这是一个kafka官方BUG KAFKA-1884
关于auto.create.topics.enable参数
该参数默认为true,老proxy代理的kafka集群该参数配置为false,禁止用户自动创建topic,官方也不建议开启该参数。
此外,auto.create.topics.enable=true时候,当启动一个消费者,消费的topic不存在时,也会导致topic创建,导致服务器上存在一堆垃圾topic。
万幸的是我们线上业务集群都是禁用自动生成topic的,使得这一BUG没有影响到线上系统。
1.0.0的KafkaProducer是否存在同样问题
我们新研发的消息系统是基于Kafka1.0.0版本的,针对KafkaProducer这一明显设计缺陷,1.0.0版本官方是否已经解决?
我们在本地进行了验证
public class KafkaProducerTest {
public static void main(String[] args) throws Exception {
Properties props=new Properties();
props.put("bootstrap.servers", "127.0.0.1:9080");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> kafkaProducer=new KafkaProducer(props);
sendMessage(kafkaProducer,"test");
sendMessage(kafkaProducer,"fincx.repayment.status (wpk-reactor-web)");
sendMessage(kafkaProducer,"test");
}
private static void sendMessage(KafkaProducer kafkaProducer,String topic) {
try{
Future<RecordMetadata> f1= kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, "12", "12".getBytes()));
RecordMetadata meta=f1.get(5, TimeUnit.SECONDS);
System.out.println(meta.offset()+",topic:"+meta.topic());
}catch (Exception e){
e.printStackTrace();
}
}
}
测试结果发现1.0.0版本的KafkaProducer已经修复了这一问题,异常的topic不会影响正常topic数据的发送。 具体原因分析如下
1.0.0版本服务端KafkaApis.createTopic方法
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
java.util.Collections.emptyList())
} catch {
case _: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
java.util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
java.util.Collections.emptyList())
}
}
0.8.2.1版本服务端KafkaApis中createTopic方法
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
1.0.0版本的broker对创建topic的异常进行了捕获,并且返回一个带有异常信息、不包含分区及leader信息的空的TopicMetadata(这种场景和topic不存在类似)。并且在客户端获取到该TopicMetadata时,会自动忽略带有异常信息的TopicMetadata。从而避免一个topic异常导致整个KafkaProducer不可用。
0.8.2.1的kafka.javaapi.producer.Producer分析
未升级前我们使用的是kafka.javaapi.producer.Producer,并且在测试环境和线上使用时均未出现问题,我们可以分析下改Producer的实现。
kafka broker配置auto.create.topics.enable=true。用kafka.javaapi.producer.Producer尝试发送一个topic命名不合法的消息,客户端抛出如下:
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:40)
at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:31)
kafka服务端抛出如下异常
kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.immutable.Set$Set1.map(Set.scala:73)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:748)
分析发送TopicMetadata的调用栈
kafka在发消息前需要先获取该消息的TopicMetadata信息,这些信息包含了分区数,各个分区的leader节点等信息。
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
- locked <0x68d> (a java.lang.Object)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:41)
at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:32)
分析关键代码BrokerPartitionInfo.updateInfo。此处从broker拉取topic相关的metadata信息,
def updateInfo(topics: Set[String], correlationId: Int) {
var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
if(tmd.errorCode == ErrorMapping.NoError) {
topicPartitionInfo.put(tmd.topic, tmd)
} else
warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
ErrorMapping.exceptionFor(pmd.errorCode).getClass))
} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
})
})
producerPool.updateProducer(topicsMetadata)
}
当topic不合法时候,topicsMetadata返回包含错误信息,并且partitionsMetadata为空集。
TopicMetadata for topic fincx.repayment.status (wpk-reactor-web) ->
No partition metadata for topic fincx.repayment.status (wpk-reactor-web) due to kafka.common.InvalidTopicException
跟踪调用栈,客户端发送消息时,对拉取TopicMetaData失败的异常均有处理,并且最大重试次数受message.send.max.retries=3参数控制。所以kafka.javaapi.producer.Producer遇到此种异常会快速失败,并且TopicMetaData不是全量更新。设计上也是合理的,不存在改BUG。
相关资料
apache官方bug说明 https://issues.apache.org/jira/browse/KAFKA-1884
[Kafka-dev] [jira] [Commented] (KAFKA-1884) New Producer blocks forever for Invalid topic names
https://webmail.dev411.com/p/kafka/dev/15260xjzjg/jira-commented-kafka-1884-new-producer-blocks-forever-for-invalid-topic-names
修改kafka源码,编译kafka 0.8.2.2,解决bug kafka.common.InvalidTopicException https://blog.csdn.net/u010670689/article/details/78393214