最近在基于flink做实时计算平台,准备写博客记录一些日常的工作。
本篇主要是记一下如何调试Flink的源码,以Standalone模式为例。
环境
Flink Standalone cluster环境的搭建看官方文档即可,这里不再赘述,我们讲下本地环境的准备:
IDE: IntelliJ IDEA (需安装Scala plugin 及 sdk)
java version: 1.8.0_92
flink version 1.5.4
打开ide导入工程后如下:
源码调试
通过单测代码调试
调试源码,最简单的莫过于通过单测代码,每个module下都有对应的test代码,这种方式最简单,但是毕竟是模拟的。
远程调试
首先,要启用远程调试,我们需要添加一段jvm启动参数,这个参数我们可以通过下图中找到
即 -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
其中端口号根据时间情况而定,如果要从启动方法里开始调试,则需要设置suspend=y
那么这段启动参数需要添加到flink那个地方呢?
我们先从启动脚本看,从start-cluster.sh找到config.sh,我们可以发现flink会将如下3个配置项添加到指定的jvm启动参数中:
env.java.opts: jar包启动时添加的参数,适用于jobmanager和taskmanager;
env.java.opts.jobmanager: jobmanager的启动参数;
env.java.opts.taskmanager: taskmanager的启动参数;
这3个配置我们均可以在flink-conf.yaml中配置,而我们的远程调试参数应当配置到后两者,并且使用不同的端口,
如果配置在env.java.opts中,启动jobmanager和taskmanager将会出现端口占用冲突。
在flink-conf.yaml中添加:
env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
然后启动集群
./start-cluster.sh
之后在idea中的步骤如下:
最后standalone cluster模式,
jobmanager的入口类是org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint,
taskmanager的入口类是org.apache.flink.runtime.taskexecutor.TaskManagerRunner,
找到main方法打上断点就可以开始调试了。