1. 源码编译
- 下载地址
github选择branch 1.9(https://github.com/apache/flume/tree/flume-1.9)
git clone git@github.com:apache/flume.git
- 配置maven依赖库
下载完源码之后按照maven项目导入到idea中,然后配置maven的依赖库。
如果有国外代理可以不用配置,否则可将maven配置成国内的库,比如阿里云的maven库:
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>*</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
<!--以上阿里云maven库的配置来自网络,作者未亲自验证,如遇问题请读者自行查资料解决。-->
本人采用的是公司内部的maven库,依赖的包都比较全,编译很顺畅,没有出现网上遇到编译不通过的问题,所以此章节的配置请读者自行解决,网上有很多解决方案。
- 编译源码
如下命令编译源码:
mvn clean install -DskipTests
编译完成会在每个模块下看到各自相应的target文件夹,里面有编译之后的jar包。
[INFO] Reactor Summary:
[INFO]
[INFO] Build Support ...................................... SUCCESS [ 1.854 s]
[INFO] Apache Flume ....................................... SUCCESS [ 11.992 s]
[INFO] Flume NG SDK ....................................... SUCCESS [ 16.187 s]
[INFO] Flume NG Hadoop Credential Store Config Filter ..... SUCCESS [ 0.135 s]
[INFO] Flume NG Config Filters API ........................ SUCCESS [ 2.549 s]
[INFO] Flume NG Configuration ............................. SUCCESS [ 5.963 s]
[INFO] Flume Auth ......................................... SUCCESS [ 6.558 s]
[INFO] Flume NG Core ...................................... SUCCESS [ 18.504 s]
[INFO] Flume NG Sinks ..................................... SUCCESS [ 0.145 s]
[INFO] Flume NG HDFS Sink ................................. SUCCESS [ 8.566 s]
[INFO] Flume NG IRC Sink .................................. SUCCESS [ 3.347 s]
[INFO] Flume NG Channels .................................. SUCCESS [ 0.115 s]
[INFO] Flume NG JDBC channel .............................. SUCCESS [ 5.174 s]
[INFO] Flume NG file-based channel ........................ SUCCESS [ 11.072 s]
[INFO] Flume NG Spillable Memory channel .................. SUCCESS [ 3.804 s]
[INFO] Flume NG Node ...................................... SUCCESS [ 10.520 s]
[INFO] Flume NG Embedded Agent ............................ SUCCESS [ 3.632 s]
[INFO] Flume NG HBase Sink ................................ SUCCESS [ 7.410 s]
[INFO] Flume NG HBase2 Sink ............................... SUCCESS [ 8.654 s]
[INFO] Flume NG ElasticSearch Sink ........................ SUCCESS [ 4.994 s]
[INFO] Flume NG Morphline Solr Sink ....................... SUCCESS [ 6.844 s]
[INFO] Flume Shared Utils ................................. SUCCESS [ 0.067 s]
[INFO] Flume Shared Kafka ................................. SUCCESS [ 2.755 s]
[INFO] Flume Shared Kafka Test Utils ...................... SUCCESS [ 3.389 s]
[INFO] Flume Kafka Sink ................................... SUCCESS [ 3.689 s]
[INFO] Flume HTTP/S Sink .................................. SUCCESS [ 3.416 s]
[INFO] Flume NG Kite Dataset Sink ......................... SUCCESS [ 5.341 s]
[INFO] Flume NG Hive Sink ................................. SUCCESS [ 4.872 s]
[INFO] Flume Sources ...................................... SUCCESS [ 0.091 s]
[INFO] Flume Scribe Source ................................ SUCCESS [ 4.130 s]
[INFO] Flume JMS Source ................................... SUCCESS [ 3.664 s]
[INFO] Flume Twitter Source ............................... SUCCESS [ 3.274 s]
[INFO] Flume Kafka Source ................................. SUCCESS [ 4.323 s]
[INFO] Flume Taildir Source ............................... SUCCESS [ 4.337 s]
[INFO] flume-kafka-channel ................................ SUCCESS [ 3.949 s]
[INFO] Flume legacy Sources ............................... SUCCESS [ 0.050 s]
[INFO] Flume legacy Avro source ........................... SUCCESS [ 3.007 s]
[INFO] Flume legacy Thrift Source ......................... SUCCESS [ 4.780 s]
[INFO] Flume NG Environment Variable Config Filter ........ SUCCESS [ 2.054 s]
[INFO] flume-ng-hadoop-credential-store-config-filter ..... SUCCESS [ 2.773 s]
[INFO] Flume NG External Process Config Filter ............ SUCCESS [ 2.402 s]
[INFO] Flume NG Clients ................................... SUCCESS [ 0.059 s]
[INFO] Flume NG Log4j Appender ............................ SUCCESS [ 6.521 s]
[INFO] Flume NG Tools ..................................... SUCCESS [ 2.914 s]
[INFO] Flume NG distribution .............................. SUCCESS [ 11.804 s]
[INFO] Flume NG Integration Tests ......................... SUCCESS [ 3.099 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 03:45 min
[INFO] Finished at: 2019-01-21T11:54:33+08:00
[INFO] Final Memory: 252M/1584M
2. flume调试
2.1 flume使用
这里我们先简单复习下flume的使用,下面用一个最简单的例子做介绍。
启动一个flume agent的命令需要指定调用的模块名称,可用的模块包括:help, agent, avro-client, tool, version等,启动一个agent的命令格式如下:
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
首先新建一个agent的配置文件:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
然后执行如下命令:
bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=DEBUG,console
其中,--conf 的简称为 -c,--conf-file 的简称为 -f,--name 的简称为 -n。
flume执行脚本主函数run_flume()中默认会执行 set -x 的语句,所以执行flume启动命令之后,可以在日志中发现脚本最终执行了如下命令:
exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
通过telnet工具与flume的source建立链接,发送字符串,flume在接收到数据之后,logger sink将接受到的数据打印在屏幕上。
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
2.2 程序入口
flume-ng脚本:
################################
# constants
################################
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
...
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
...
# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then
run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
从脚本可以看出org.apache.flume.node.Application 为程序的主入口。
在idea中利用Alt+7命令查看类的structure结构如下:
2.3 远程Debug
为了便于对代码进行debug分析,下面介绍一下flume的远程debug的配置方法。总共分为两步:第一步,修改flume启动脚本;第二步,idea的debug配置中添加remote配置项。
(1)修改flume启动脚本
打开flume-ng启动文件,找到"JAVA_OPTS=",添加如下内容:
JAVA_OPTS="-Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y"
(2)修改flume启动脚本
在idea界面上依次点击"Run"->"debug..."->"Edit Configurations",点击左上角的加号,新增一个remote配置项,idea的默认端口号是5005,这里的端口号一定要跟flume配置的一致。
配置完成,启动flume,会看到flume正在监听5005端口,此时启动idea调试。
+ exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y -Dflume.root.logger=DEBUG,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
Listening for transport dt_socket at address: 5005
2.4 代码分析
我们从main函数开始分析。
首先是第一行初始化ssl的全局参数:
public static void main(String[] args) {
try {
/*初始化ssl的全局参数,利用System.getEnv()和System.setProperty(), flume可利用ssl进行加解密传输*/
SSLUtil.initGlobalSSLParameters();
进入函数内部发现主要是调用initSysPropFromEnvVar
函数将系统级别的关于ssl的参数放到Property
中。关于 System.getEnv() 和 System.getProperty() 的对比详见https://www.jianshu.com/p/dbe4795b61ac 。
private static void initSysPropFromEnvVar(String sysPropName, String envVarName,
String description) {
if (System.getProperty(sysPropName) != null) {
LOGGER.debug("Global SSL " + description + " has been initialized from system property.");
} else {
String envVarValue = System.getenv(envVarName);
if (envVarValue != null) {
System.setProperty(sysPropName, envVarValue);
LOGGER.debug("Global SSL " + description +
" has been initialized from environment variable.");
} else {
LOGGER.debug("No global SSL " + description + " specified.");
}
}
}
flume启动日志中也可以查看初始化ssl参数的过程:
2019-02-24 12:05:55,036 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore path specified.
2019-02-24 12:05:55,040 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore password specified.
2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore type specified.
2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore path specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore password specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore type specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include protocols specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude protocols specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include cipher suites specified.
2019-02-24 12:05:55,047 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude cipher suites specified.
下面继续看main函数,接下来是参数解析部分,先上代码。
/*参数解析*/
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
option = new Option(null, "no-reload-conf", false,
"do not reload config file if changed");
options.addOption(option);
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
"specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);
option = new Option("p", "zkBasePath", true,
"specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
boolean isZkConfigured = false;
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
在调试的时候很容易发现,执行章节2.1的flume命令,main函数中收到的参数包括--conf-file和--name:
args: --conf-file ./conf/example.conf --name a1
flume的配置文件有两种获取方式,可从zookeeper或者文件中获取配置信息。每种方式都含有自动更新配置(重启所有组件)和不自动更新配置两种操作。
参数zkConnString不为空时,会直接从zookeeper中获取配置信息,否则从文件中获取。如果加上配置参数 no-reload-conf,flume不会自动更新配置参数,默认不加这个参数flume会自动监听配置信息的变化并且利用eventBus触发重读配置文件并重新启动所有组件。
以从文件获取配置信息,并且监听配置文件变化自动重启所有组件的情况来举例说明flume的调用顺序,代码如下:
//...省略若干行
boolean reload = !commandLine.hasOption("no-reload-conf");
//...省略若干行
List<LifecycleAware> components = Lists.newArrayList();
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
}
//...省略若干行
application.start();
如果含有参数no-reload-conf,则 reload=true。
以上代码用到了guava EventBus,guava的EventBus是观察者模式的一种优雅的解决方案,利用EventBus实现事件的发布和订阅,可以节省很多工作量。guava EventBus的原理和使用参见:https://www.jianshu.com/p/f8ba312904f4 。EventBus的观察者(事件订阅者)需要用@Subscribe
注释标注的函数来处理事件发布者发过来的事件。EventBus.register()
用来注册观察者。
在类Application中,我们可以找到事件处理方法handleConfigurationEvent(MaterializedConfiguration conf)。
/*guava EventBus中用@Subscribe标记,定义监听处理方法*/
@Subscribe
public void handleConfigurationEvent(MaterializedConfiguration conf) {
try {
lifecycleLock.lockInterruptibly();
stopAllComponents();
startAllComponents(conf);
} catch (InterruptedException e) {
logger.info("Interrupted while trying to handle configuration event");
return;
} finally {
// If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
if (lifecycleLock.isHeldByCurrentThread()) {
lifecycleLock.unlock();
}
}
}
该方法调用stopAllComponents()和startAllComponents(conf)函数对所有的组件进行了重启。
找到了事件的处理逻辑,那么往EventBus发送事件的发布者在哪里??
带着问题,我们需要重新回到刚才的代码,可以看到在创建完 EventBus 对象之后,又new了一个类PollingPropertiesFileConfigurationProvider的对象,该类实现了接口LifecycleAware,flume中所有的组件都实现自该接口。
最终,PollingPropertiesFileConfigurationProvider的对象被添加到全局属性
List<LifecycleAware> components
中,
public Application(List<LifecycleAware> components) {
this.components = components;
supervisor = new LifecycleSupervisor();
}
然后调用Application的start()方法,对components进行启动。
public void start() {
lifecycleLock.lock();
try {
for (LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
} finally {
lifecycleLock.unlock();
}
}
具体每个component是怎么启动的,我们可以深入到LifecycleSupervisor.supervise()
函数中查看:
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
通过ScheduleWithFixedDelay延时调用任务monitorRunnable,任务执行完之后,等待3s继续调度执行。
MonitorRunnable的run函数中lifecycleAware.start()
说明执行了传入组件的start()方法。
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
回到刚才的PollingPropertiesFileConfigurationProvider类中,我们发现在start()方法中,new了一个单线程执行器Executors.newSingleThreadScheduledExecutor(),然后每隔30s(interval=30s,Application类调用的时候传入)调度执行一次FileWatcherRunnable任务。
public PollingPropertiesFileConfigurationProvider(String agentName,
File file, EventBus eventBus, int interval) {
super(agentName, file);
this.eventBus = eventBus;
this.file = file;
this.interval = interval;
counterGroup = new CounterGroup();
lifecycleState = LifecycleState.IDLE;
}
@Override
public void start() {
LOGGER.info("Configuration provider starting");
Preconditions.checkState(file != null,
"The parameter file must not be null");
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());
/*新启动一个线程,监控到有文件变动就将getConfiguration加到eventBus中,eventBus有事件更新会调用Application类中
* 用@Subscribe修饰的函数,也就是 public void handleConfigurationEvent(MaterializedConfiguration conf)
* eventBus.post(getConfiguration())将conf对象通过总线传给了handleConfigurationEvent去处理*/
FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);
executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);
lifecycleState = LifecycleState.START;
LOGGER.debug("Configuration provider started");
}
FileWatcherRunnable任务用于监控配置文件的变化,
long lastModified = file.lastModified();
if (lastModified > lastChange) {
//省略若干行
如果配置文件发生变化,则调用eventBus.post(getConfiguration())
语句将事件发送到eventBus主线,eventBus负责调用观察者(Application
)调用事件处理函数(handleConfigurationEvent(MaterializedConfiguration conf)
)处理事件。
public class FileWatcherRunnable implements Runnable {
private final File file;
private final CounterGroup counterGroup;
private long lastChange;
public FileWatcherRunnable(File file, CounterGroup counterGroup) {
super();
this.file = file;
this.counterGroup = counterGroup;
this.lastChange = 0L;
}
@Override
public void run() {
LOGGER.debug("Checking file:{} for changes", file);
counterGroup.incrementAndGet("file.checks");
long lastModified = file.lastModified();
if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);
counterGroup.incrementAndGet("file.loads");
lastChange = lastModified;
try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}
}
到这里整个flume的启动过程就讲完了,有人会问,这里只启动了PollingPropertiesFileConfigurationProvider,并没有启动flume的channel、source和sink。其实在第一次启动的时候,lastModified和lastChange这两个值是不相等的,
//省略若干行
this.lastChange = 0L;
//省略若干行
if (lastModified > lastChange) {
//省略若干行
就会触发eventBus,调用handleConfigurationEvent函数,handleConfigurationEvent函数中有语句startAllComponents(conf)
,里面有对channel、source和sink的启动语句,具体在下一篇文章里介绍。
2.5回顾
下面我们总结一下整个flume的调用顺序。
Application->LifecycleSupervisor-(3s调度一次)>MonitorRunnable->PollingPropertiesFileConfigurationProvider-(30s调度一次)>FileWatcherRunnable->EventBus->Application
期间我们看到一个调度器调度了另一个调度器,而且间隔几秒一次,为什么没有出现多个重复任务实例被调度起来?
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
我们看到在LifecycleSupervisor执行调度的时候传入了一个LifecycleState.START值,这个值便是下面代码(MonitorRunnable的run函数)中的desiredState:
//省略若干行
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
//省略若干行
实现lifecycleAware接口的PollingPropertiesFileConfigurationProvider类在首次调用start()函数的时候,就已经将lifecycleState的值变为START:lifecycleState = LifecycleState.START;
所以调度器在之后的调度过程中,由于if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState))
if条件不成立,便不会有新的任务被调度起来。PollingPropertiesFileConfigurationProvider任务只有一个线程实例,又由于调度FileWatcherRunnable的是一个单线程调度器,FileWatcherRunnable任务也只有一个线程实例。同理,各个channel、source和sink也都没有重复实例被调度起来。