0.8.2.1版本KafkaProducer消息发送超时原因分析

0.8.2.1版本KafkaProducer消息发送超时原因分析

问题描述

我们在测试环境收到用户反馈发送kafka消息出现ReadTimeout异常。对此我们进行了简单的分析
用户的请求日志

"request-start@2018-08-22 10:30:45.399" "192.168.5.116 (172.16.74.67) -> 172.16.52.72:80"            "POST //kafka/publish HTTP/1.1" 603 process-req-time:60003 "response-end@2018-08-22 10:31:45.402" status:200 97            http-nio-8080-exec-9 "-" "Java/1.8.0_144" traceId:- topic:payment.abc.msg connection-status:???X???

我们发现多条请求超时日志,出现大约60s左右的耗时,检查这些topic的配置都是正常的(topic存在,leader节点存在) .
继续查看kafka服务端日志,有部分ERROR日志,大致意思是有不合法的topic请求存在

[2018-08-22 11:00:00,174] ERROR [KafkaApi-2] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 6226810; ClientId: producer-30; Topics: fincx.xxx.status (wpk-reactor-web),payment.abc.msg (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
        at kafka.common.Topic$.validate(Topic.scala:42)
        at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
        at scala.collection.SetLike$class.map(SetLike.scala:92)
        at scala.collection.AbstractSet.map(Set.scala:47)
        at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
        at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:748)

问题分析

这个问题困惑的地方在于发送一个配置正常的topic竟然发送超时,日志中含有其他topic的异常错误信息,不同topic之间的消息发送竟然相互干扰?
为什么会因为有一个异常topic存在,导致其他原本正常的topic都无法发送消息,这是个比较严重的问题了。因为KafkaProducer是线程安全,一个应用中往往使用一个KafkaProducer发送多个topic消息。

重现问题

重启服务,新初始化的KafkaProducer发送正常topic的消息发送成功,没有超时,尝试发送topic命名不合法的topic,发送超时,kafka抛出异常kafka.common.InvalidTopicException。此后再发送正常topic的消息均发送失败。
证明KafkaProducer会因为发送一个异常topic消息导致完全不可用。

代码分析

0.8.2.1的KafkaProducer源码分析

根据异常日志我们比较容易分析出异常原因是KafkaProducer发送消息前需要拉取broker端的TopicMetadata;如果发现topic不存在,会调用AdminUtils.createTopic()方法创建,此时topic命名不规范,抛出异常。接下来我们看下KafkaProducer是如何处理这一异常的。

分析KafkaProducer.waitOnMetadata方法:

private void waitOnMetadata(String topic, long maxWaitMs) {
        if (metadata.fetch().partitionsForTopic(topic) != null) {
            return;
        } else {
            long begin = time.milliseconds();
            long remainingWaitMs = maxWaitMs;
            while (metadata.fetch().partitionsForTopic(topic) == null) {
                log.trace("Requesting metadata update for topic {}.", topic);
                int version = metadata.requestUpdate();
                metadata.add(topic);
                sender.wakeup();
                metadata.awaitUpdate(version, remainingWaitMs);
                long elapsed = time.milliseconds() - begin;
                if (elapsed >= maxWaitMs)
                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
                remainingWaitMs = maxWaitMs - elapsed;
            }
        }
    }

上述方法maxWaitMs参数取自生产者配置参数metadata.fetch.timeout.ms,官方默认值为60000ms,和前面我们超时日志请求时间60s是完全对的上的。
第一步:metadata.fetch().partitionsForTopic(topic)检查topic的meta信息是否已经有缓存,有的化就可以直接用了
第二步:获取最新TopicMetadata数据,此处while循环,强制在metadata.fetch.timeout.ms时间内完成TopicMetadata的更新操作,metadata.requestUpdate设置当前TopicMetadata需要强制更新

调用NetworkClient.metadataRequest发送消息

private ClientRequest metadataRequest(long now, int node, Set<String> topics) {
        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
        RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
        return new ClientRequest(now, true, send, null);
    }

NetworkClient.metadataRequest的调用方NetworkClient.maybeUpdateMetadata方法片段

Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);

我们看到MetadataRequest的请求是包含了整个KafkaProducer缓存的所有topic列表数据,并不是只对当前发送的topic进行元数据更新。
至此我们大致了解了KafkaProducer发送超时的前因后果:因为KafkaProducer缓存的topic列表中有一个非法的topic,导致每次批量更新元数据时,都包含了非法的topic,每次元数据更新操作都失败。这是KafkaProducer异常处理缺陷导致。不过该BUG只有在auto.create.topics.enable=true才触发,如果为false,就不会创建topic,不会抛出异常。这是一个kafka官方BUG KAFKA-1884

关于auto.create.topics.enable参数

该参数默认为true,老proxy代理的kafka集群该参数配置为false,禁止用户自动创建topic,官方也不建议开启该参数。
此外,auto.create.topics.enable=true时候,当启动一个消费者,消费的topic不存在时,也会导致topic创建,导致服务器上存在一堆垃圾topic。
万幸的是我们线上业务集群都是禁用自动生成topic的,使得这一BUG没有影响到线上系统。

1.0.0的KafkaProducer是否存在同样问题

我们新研发的消息系统是基于Kafka1.0.0版本的,针对KafkaProducer这一明显设计缺陷,1.0.0版本官方是否已经解决?
我们在本地进行了验证

public class KafkaProducerTest {
    public static void main(String[] args) throws Exception {
        Properties props=new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9080");
        props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<String, byte[]> kafkaProducer=new KafkaProducer(props);
        sendMessage(kafkaProducer,"test");
        sendMessage(kafkaProducer,"fincx.repayment.status (wpk-reactor-web)");
        sendMessage(kafkaProducer,"test");
    }

    private static void sendMessage(KafkaProducer kafkaProducer,String topic) {
        try{
            Future<RecordMetadata> f1= kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, "12", "12".getBytes()));
            RecordMetadata meta=f1.get(5, TimeUnit.SECONDS);
            System.out.println(meta.offset()+",topic:"+meta.topic());
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

测试结果发现1.0.0版本的KafkaProducer已经修复了这一问题,异常的topic不会影响正常topic数据的发送。 具体原因分析如下
1.0.0版本服务端KafkaApis.createTopic方法

private def createTopic(topic: String,
                          numPartitions: Int,
                          replicationFactor: Int,
                          properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
    try {
      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
        .format(topic, numPartitions, replicationFactor))
      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
        java.util.Collections.emptyList())
    } catch {
      case _: TopicExistsException => // let it go, possibly another broker created this topic
        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
          java.util.Collections.emptyList())
      case ex: Throwable  => // Catch all to prevent unhandled errors
        new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
          java.util.Collections.emptyList())
    }
  }

0.8.2.1版本服务端KafkaApis中createTopic方法

AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
                                     offsetsTopicReplicationFactor,
                                     offsetManager.offsetsTopicConfig)

1.0.0版本的broker对创建topic的异常进行了捕获,并且返回一个带有异常信息、不包含分区及leader信息的空的TopicMetadata(这种场景和topic不存在类似)。并且在客户端获取到该TopicMetadata时,会自动忽略带有异常信息的TopicMetadata。从而避免一个topic异常导致整个KafkaProducer不可用。

0.8.2.1的kafka.javaapi.producer.Producer分析

未升级前我们使用的是kafka.javaapi.producer.Producer,并且在测试环境和线上使用时均未出现问题,我们可以分析下改Producer的实现。
kafka broker配置auto.create.topics.enable=true。用kafka.javaapi.producer.Producer尝试发送一个topic命名不合法的消息,客户端抛出如下:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:40)
    at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:31)

kafka服务端抛出如下异常

kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
    at kafka.common.Topic$.validate(Topic.scala:42)
    at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
    at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
    at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
    at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)
    at scala.collection.SetLike$class.map(SetLike.scala:93)
    at scala.collection.immutable.Set$Set1.map(Set.scala:73)
    at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
    at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
    at java.lang.Thread.run(Thread.java:748)

分析发送TopicMetadata的调用栈

kafka在发消息前需要先获取该消息的TopicMetadata信息,这些信息包含了分区数,各个分区的leader节点等信息。

"main@1" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
      at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
      at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
      at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
      at kafka.utils.Utils$.swallow(Utils.scala:172)
      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
      at kafka.utils.Utils$.swallowError(Utils.scala:45)
      at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
      at kafka.producer.Producer.send(Producer.scala:77)
      - locked <0x68d> (a java.lang.Object)
      at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:41)
      at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:32)

分析关键代码BrokerPartitionInfo.updateInfo。此处从broker拉取topic相关的metadata信息,

def updateInfo(topics: Set[String], correlationId: Int) {
    var topicsMetadata: Seq[TopicMetadata] = Nil
    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
    topicsMetadata = topicMetadataResponse.topicsMetadata
    // throw partition specific exception
    topicsMetadata.foreach(tmd =>{
      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
      if(tmd.errorCode == ErrorMapping.NoError) {
        topicPartitionInfo.put(tmd.topic, tmd)
      } else
        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
      tmd.partitionsMetadata.foreach(pmd =>{
        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })
    })
    producerPool.updateProducer(topicsMetadata)
  }

当topic不合法时候,topicsMetadata返回包含错误信息,并且partitionsMetadata为空集。

    TopicMetadata for topic fincx.repayment.status (wpk-reactor-web) -> 
No partition metadata for topic fincx.repayment.status (wpk-reactor-web) due to kafka.common.InvalidTopicException

跟踪调用栈,客户端发送消息时,对拉取TopicMetaData失败的异常均有处理,并且最大重试次数受message.send.max.retries=3参数控制。所以kafka.javaapi.producer.Producer遇到此种异常会快速失败,并且TopicMetaData不是全量更新。设计上也是合理的,不存在改BUG。

相关资料

apache官方bug说明 https://issues.apache.org/jira/browse/KAFKA-1884
[Kafka-dev] [jira] [Commented] (KAFKA-1884) New Producer blocks forever for Invalid topic names
https://webmail.dev411.com/p/kafka/dev/15260xjzjg/jira-commented-kafka-1884-new-producer-blocks-forever-for-invalid-topic-names
修改kafka源码,编译kafka 0.8.2.2,解决bug kafka.common.InvalidTopicException https://blog.csdn.net/u010670689/article/details/78393214

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,715评论 13 425
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,812评论 4 54
  • kafka数据可靠性深度解读 Kafka起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache...
    it_zzy阅读 2,004评论 2 20
  • 起风了 「文」水金时 起风了 密闭的房间里也有波动 冷空气袭杀进屋 冲进身体里取暖 今夜 该有一场大雪 匹配这恰如...
    水金时阅读 212评论 4 5