Kafka源码分析(二)Arthas调试Kafka

在查看Kafka的方法调用时,断点调试与日志打印会显得非常重,如果学会了用Arthas这个武器,会让你探索Kafka的源码变得相对容易。

场景一

我知道接下来,kafka一定会执行到以下的方法: KafkaApis#handleProduceRequest,但是我想知道下一次具体的执行分支

// kafka.server.KafkaApis#handleProduceRequest
def handleProduceRequest(request: RequestChannel.Request) {
    val produceRequest = request.body[ProduceRequest]
    val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
    ...
}

用trace命令来跟踪内部每一个执行到的方法

[arthas@51687]$ trace kafka.server.KafkaApis handleProduceRequest
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 360 ms.
`---ts=2019-10-23 13:20:46;thread_name=kafka-request-handler-4;id=37;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6
    `---[3.661559ms] kafka.server.KafkaApis:handleProduceRequest()
        +---[0.010396ms] scala.reflect.ClassTag$:apply() #367
        +---[9.6E-4ms] scala.Predef$$eq$colon$eq$:tpEquals() #367
        +---[5.53E-4ms] kafka.utils.NotNothing$:notNothingEvidence() #367
        +---[5.49E-4ms] kafka.network.RequestChannel$Request:body() #367
        +---[4.53E-4ms] kafka.network.RequestChannel$Request:header() #368
        +---[0.008506ms] org.apache.kafka.common.requests.RequestHeader:toStruct() #368
        +---[0.001567ms] org.apache.kafka.common.protocol.types.Struct:sizeOf() #368
        +---[0.003086ms] kafka.network.RequestChannel$Request:sizeOfBodyInBytes() #368
        +---[0.002294ms] org.apache.kafka.common.requests.ProduceRequest:hasTransactionalRecords() #370
        +---[0.002079ms] org.apache.kafka.common.requests.ProduceRequest:hasIdempotentRecords() #379
        +---[0.009205ms] scala.collection.mutable.Map$:apply() #384
        +---[9.72E-4ms] scala.collection.mutable.Map$:apply() #385
        +---[9.21E-4ms] scala.collection.mutable.Map$:apply() #386
        +---[0.00206ms] org.apache.kafka.common.requests.ProduceRequest:partitionRecordsOrFail() #388
        +---[7.84E-4ms] scala.collection.JavaConverters$:mapAsScalaMapConverter() #388
        +---[0.001015ms] scala.collection.convert.Decorators$AsScala:asScala() #388
        +---[0.003071ms] kafka.server.KafkaApis$$anonfun$handleProduceRequest$1:<init>() #388
        +---[0.001735ms] scala.collection.TraversableLike:withFilter() #388
        +---[0.005349ms] kafka.server.KafkaApis$$anonfun$handleProduceRequest$2:<init>() #388
        +---[0.013075ms] scala.collection.generic.FilterMonadic:foreach() #388
        +---[0.006955ms] scala.collection.mutable.Map:isEmpty() #454
        +---[4.05E-4ms] kafka.network.RequestChannel$Request:header() #457
        +---[5.25E-4ms] org.apache.kafka.common.requests.RequestHeader:clientId() #457
        +---[0.024344ms] kafka.admin.AdminUtils$:AdminClientId() #457
        +---[8.02E-4ms] java.lang.Object:equals() #457
        +---[0.002155ms] org.apache.kafka.common.requests.ProduceRequest:timeout() #461
        +---[0.009585ms] org.apache.kafka.common.requests.ProduceRequest:acks() #462
        +---[0.00303ms] kafka.server.KafkaApis$$anonfun$12:<init>() #466
        +---[0.002418ms] kafka.server.KafkaApis$$anonfun$13:<init>() #467
        +---[min=5.63E-4ms,max=7.45E-4ms,total=0.001308ms,count=2] kafka.server.KafkaApis:replicaManager() #460
        +---[0.0033ms] kafka.server.ReplicaManager:appendRecords$default$7() #460
        +---[3.247819ms] kafka.server.ReplicaManager:appendRecords() #460
        `---[0.004697ms] org.apache.kafka.common.requests.ProduceRequest:clearPartitionRecords() #471

场景二

我知道kafka一定会执行到以下的方法: LogSegment#append,但是我想知道它是怎么调用到这个方法的

[arthas@51687]$ stack kafka.log.LogSegment append
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 69 ms.
ts=2019-10-23 13:27:26;thread_name=kafka-request-handler-4;id=37;is_daemon=true;priority=5;TCCL=sun.misc.Launcher$AppClassLoader@764c12b6
    @kafka.log.Log$$anonfun$append$2.apply()
        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:626)
        at kafka.log.Log.maybeHandleIOException(Log.scala:1691)
        at kafka.log.Log.append(Log.scala:626)
        at kafka.log.Log.appendAsLeader(Log.scala:597)
        at kafka.cluster.Partition$$anonfun$13.apply(Partition.scala:533)
        at kafka.cluster.Partition$$anonfun$13.apply(Partition.scala:521)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:223)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:520)
        at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:719)
        at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:703)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:703)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:453)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:460)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
        at java.lang.Thread.run(Thread.java:748)

场景三

查看现在kafka接受的都是些什么请求,也就是查看def handle(request: RequestChannel.Request) 的Request是什么类型的

[arthas@12587]$ watch kafka.server.KafkaApis handle params[0].header.apiKey
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 98 ms.
ts=2019-10-30 20:39:45; [cost=0.901428ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=0.947624ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=2.340937ms] result=@ApiKeys[METADATA]
ts=2019-10-30 20:39:45; [cost=1.204087ms] result=@ApiKeys[FIND_COORDINATOR]
ts=2019-10-30 20:39:45; [cost=0.402322ms] result=@ApiKeys[METADATA]
ts=2019-10-30 20:39:45; [cost=0.880728ms] result=@ApiKeys[FIND_COORDINATOR]
ts=2019-10-30 20:39:45; [cost=0.201719ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=0.116254ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=2.556618ms] result=@ApiKeys[JOIN_GROUP]
ts=2019-10-30 20:39:45; [cost=10.631961ms] result=@ApiKeys[SYNC_GROUP]
ts=2019-10-30 20:39:45; [cost=1.172244ms] result=@ApiKeys[OFFSET_FETCH]
ts=2019-10-30 20:39:45; [cost=0.508714ms] result=@ApiKeys[JOIN_GROUP]
ts=2019-10-30 20:39:45; [cost=5.21916ms] result=@ApiKeys[SYNC_GROUP]
ts=2019-10-30 20:39:45; [cost=0.269761ms] result=@ApiKeys[OFFSET_FETCH]
ts=2019-10-30 20:39:45; [cost=0.139089ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=0.235017ms] result=@[API_VERSIONS]
ts=2019-10-30 20:39:45; [cost=2.174569ms] result=@ApiKeys[FETCH]
ts=2019-10-30 20:39:45; [cost=1.251027ms] result=@ApiKeys[FETCH]
ts=2019-10-30 20:39:48; [cost=1.538105ms] result=@ApiKeys[OFFSET_COMMIT]
ts=2019-10-30 20:39:48; [cost=1.54736ms] result=@ApiKeys[OFFSET_COMMIT]

但是,如果想查看的是kafka.server.KafkaApis#authorize,如果在arthas输入以下内容,会报错,提示也很明显,这个方法不存在

[arthas@12587]$ watch kafka.server.KafkaApis authorize params[0]
No class or method is affected, try:
1. sm CLASS_NAME METHOD_NAME to make sure the method you are tracing actually exists (it might be in your parent class).
2. reset CLASS_NAME and try again, your method body might be too large.
3. check arthas log: /Users/ericfei/logs/arthas/arthas.log
4. visit https://github.com/alibaba/arthas/issues/47 for more details.

按照提示,可以用sm查看下authorize这个方法到底叫什么名字

[arthas@12587]$ sm kafka.server.KafkaApis
kafka.server.KafkaApis kafka$server$KafkaApis$$authorize(Lkafka/network/RequestChannel$Session;Lkafka/security/auth/Operation;Lkafka/security/auth/Resource;)Z

知道了该方法具体的名字,就可以watch它了

[arthas@12587]$ watch kafka.server.KafkaApis kafka$server$KafkaApis$$authorize params[0]
Press Q or Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 61 ms.
ts=2019-10-30 20:46:01; [cost=0.187904ms] result=@Session[
    principal=@KafkaPrincipal[User:ANONYMOUS],
    clientAddress=@Inet4Address[/172.16.115.163],
    sanitizedUser=@String[ANONYMOUS],
]
ts=2019-10-30 20:46:01; [cost=0.056613ms] result=@Session[
    principal=@KafkaPrincipal[User:ANONYMOUS],
    clientAddress=@Inet4Address[/172.16.115.163],
    sanitizedUser=@String[ANONYMOUS],
]
ts=2019-10-30 20:46:01; [cost=0.038524ms] result=@Session[
    principal=@KafkaPrincipal[User:ANONYMOUS],
    clientAddress=@Inet4Address[/172.16.115.163],
    sanitizedUser=@String[ANONYMOUS],
]
ts=2019-10-30 20:46:01; [cost=0.060588ms] result=@Session[
    principal=@KafkaPrincipal[User:ANONYMOUS],
    clientAddress=@Inet4Address[/172.16.115.163],
    sanitizedUser=@String[ANONYMOUS],
]
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354