整体的event传输流程如下图:
1. 配置文件的解析和相关组件的加载
通过如下命令即可启动flume进程。
bin/flume-ng agent -n name -c conf -f jobs/a.conf
入口函数是flume-ng-node
子项目中的Application的main方法。先通过Commons-cli对命令行进行解析,获取name和file,其中file就是配置文件。
本小结是根据配置文件生成MaterializedConfiguration对象
这个过程大体上分为如下步骤:
- 读取配置文件properties
- loadChannels(agentConf, channelComponentMap),利用反射根据name和type创建Channel实例
- loadSources(agentConf, channelComponentMap, sourceRunnerMap),利用反射根据name和type创建Source实例,并创建ChannelSelector和ChannelProcessor,将source关联到channel
- loadSinks(agentConf, channelComponentMap, sinkRunnerMap),利用反射根据name和type创建Sink实例,并将sink关联到channel,创建SinkProcessor和SinkRunner
配置文件的读取有两种方式:通过ZooKeeper和配置文件。
当读取的是配置文件,有选项控制是否当配置文件变更后,进行reload操作。
List<LifecycleAware> components = Lists.newArrayList();
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
//解析配置文件的前期准备
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
//configurationProvider.getConfiguration() 真正开始解析配置文件
//并根据配置文件启动flume
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
解析配置文件的前期准备
public class PropertiesFileConfigurationProvider extends
AbstractConfigurationProvider {
private final File file;
public PropertiesFileConfigurationProvider(String agentName, File file) {
super(agentName);
this.file = file;
}
//以Properties的方式实现配置文件的读取
@Override
public FlumeConfiguration getFlumeConfiguration() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
Properties properties = new Properties();
properties.load(reader);
return new FlumeConfiguration(toMap(properties));
} catch (IOException ex) {
LOGGER.error("Unable to load file:" + file
+ " (I/O failure) - Exception follows.", ex);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
LOGGER.warn(
"Unable to close file reader for file: " + file, ex);
}
}
}
return new FlumeConfiguration(new HashMap<String, String>());
}
}
public abstract class AbstractConfigurationProvider implements ConfigurationProvider {
private final String agentName;
private final SourceFactory sourceFactory;
private final SinkFactory sinkFactory;
private final ChannelFactory channelFactory;
private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;
public AbstractConfigurationProvider(String agentName) {
super();
this.agentName = agentName;
this.sourceFactory = new DefaultSourceFactory();
this.sinkFactory = new DefaultSinkFactory();
this.channelFactory = new DefaultChannelFactory();
channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
}
//由子类具体实现配置文件的读取
protected abstract FlumeConfiguration getFlumeConfiguration();
//根据配置文件,生成MaterializedConfiguration
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();//钩子方法,实现配置文件的读取
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
// SourceRunner,SinkRunner用于对Source和Sink进行驱动
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
//根据配置信息,装载channel相关数据
loadChannels(agentConf, channelComponentMap);
//根据配置信息和channel数据,装载Source
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
//根据配置信息和channel数据,装载Sink
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
//遍历channel
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
省略一些异常处理...
} else {
//完成channel的映射
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
//完成source和SourceRunner的映射
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
//完成sink和SinkRunner的映射
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
1.1 根据配置文件创建Channel实例
有几种默认的Channel类型定义,当type是已知的几种时或者是自定义Channel时,都可以获取到全限定类名,根据类名,通过反射,完成channel实例创建。
//装载Channel
private void loadChannels(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap)
throws InstantiationException {
Set<String> channelNames = agentConf.getChannelSet();
Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
for (String chName : channelNames) {
ComponentConfiguration comp = compMap.get(chName);
if (comp != null) {
//根据channel名称和类型创建Channel实例
Channel channel = getOrCreateChannel(channelsNotReused,
comp.getComponentName(), comp.getType());
try {
Configurables.configure(channel, comp);
channelComponentMap.put(comp.getComponentName(),
new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
}
private Channel getOrCreateChannel(
ListMultimap<Class<? extends Channel>, String> channelsNotReused,
String name, String type)
throws FlumeException {
//根据Channel的type获取Class
Class<? extends Channel> channelClass = channelFactory.getClass(type);
if (channelClass.isAnnotationPresent(Disposable.class)) {
Channel channel = channelFactory.create(name, type);
channel.setName(name);
return channel;
}
Map<String, Channel> channelMap = channelCache.get(channelClass);
if (channelMap == null) {
channelMap = new HashMap<String, Channel>();
channelCache.put(channelClass, channelMap);
}
Channel channel = channelMap.get(name);
if (channel == null) {
//根据channel的name和type创建Channel对象
channel = channelFactory.create(name, type);
channel.setName(name);
channelMap.put(name, channel);
}
channelsNotReused.get(channelClass).remove(name);
return channel;
}
public class DefaultChannelFactory implements ChannelFactory {
@Override
public Channel create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(type, "type");
//根据type获取Channel的class信息
Class<? extends Channel> channelClass = getClass(type);
try {
//创建Channel
return channelClass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to create channel: " + name
+ ", type: " + type + ", class: " + channelClass.getName(), ex);
}
}
@SuppressWarnings("unchecked")
@Override
public Class<? extends Channel> getClass(String type) throws FlumeException {
String channelClassName = type;
ChannelType channelType = ChannelType.OTHER;
try {
//全部转缓存大写
channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException ex) {
logger.debug("Channel type {} is a custom type", type);
}
if (!channelType.equals(ChannelType.OTHER)) {
//默认的几种类型
channelClassName = channelType.getChannelClassName();
}
try {
//自定义type,全路径限制类
return (Class<? extends Channel>) Class.forName(channelClassName);
} catch (Exception ex) {
throw new FlumeException("Unable to load channel type: " + type
+ ", class: " + channelClassName, ex);
}
}
}
几种默认的Channel类型,都匹配不上的,就是自定义Channel。
public enum ChannelType {
OTHER(null),
FILE("org.apache.flume.channel.file.FileChannel"),
MEMORY("org.apache.flume.channel.MemoryChannel"),
JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");
private final String channelClassName;
private ChannelType(String channelClassName) {
this.channelClassName = channelClassName;
}
public String getChannelClassName() {
return channelClassName;
}
}
1.2 根据配置文件完成Source加载
source的实例化部分与channel的相同。完成source实例化后,还需要将Source和Channel进行关联,以及设置
private void loadSources(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap,
Map<String, SourceRunner> sourceRunnerMap)
throws InstantiationException {
Set<String> sourceNames = agentConf.getSourceSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSourceConfigMap();
//遍历source
for (String sourceName : sourceNames) {
ComponentConfiguration comp = compMap.get(sourceName);
if (comp != null) {
SourceConfiguration config = (SourceConfiguration) comp;
//与channel的实例化类似,根据source的name和type,通过反射创建Source实例
Source source = sourceFactory.create(comp.getComponentName(),
comp.getType());
try {
//获取该source关联的channel信息
Configurables.configure(source, config);
Set<String> channelNames = config.getChannels();
List<Channel> sourceChannels = new ArrayList<Channel>();
for (String chName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(chName);
if (channelComponent != null) {
sourceChannels.add(channelComponent.channel);
}
}
if (sourceChannels.isEmpty()) {
//不允许source没有channel
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
ChannelSelectorConfiguration selectorConfig =
config.getSelectorConfiguration();
//实例化ChannelSelector,默认是复制策略
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
//创建ChannelProcessor
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);
sourceRunnerMap.put(comp.getComponentName(),
SourceRunner.forSource(source));//source和SourceRunner进行映射
for (Channel channel : sourceChannels) {
ChannelComponent channelComponent =
Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
}
有两种提供的Source类型,同理也有两种提供的SourceRunner。
public abstract class SourceRunner implements LifecycleAware {
private Source source;
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
}
提供了两种ChannelSelector类型,一种是REPLICATING(复制),一种是MULTIPLEXING(多路分发)
public enum ChannelSelectorType {
OTHER(null),
REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),
MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
private final String channelSelectorClassName;
private ChannelSelectorType(String channelSelectorClassName) {
this.channelSelectorClassName = channelSelectorClassName;
}
public String getChannelSelectorClassName() {
return channelSelectorClassName;
}
}
1.3 根据配置文件加载sink
private void loadSinks(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkNames = agentConf.getSinkSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkConfigMap();
Map<String, Sink> sinks = new HashMap<String, Sink>();
//遍历sink
for (String sinkName : sinkNames) {
ComponentConfiguration comp = compMap.get(sinkName);
if (comp != null) {
SinkConfiguration config = (SinkConfiguration) comp;
//实例化sink
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
try {
Configurables.configure(sink, config);
ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
if (channelComponent == null) {
String msg = String.format("Sink %s is not connected to a " +
"channel", sinkName);
throw new IllegalStateException(msg);
}
sink.setChannel(channelComponent.channel);
sinks.put(comp.getComponentName(), sink);
channelComponent.components.add(sinkName);
} catch (Exception e) {
String msg = String.format("Sink %s has been removed due to an " +
"error during configuration", sinkName);
LOGGER.error(msg, e);
}
}
}
loadSinkGroups(agentConf, sinks, sinkRunnerMap);
}
private void loadSinkGroups(AgentConfiguration agentConf,
Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkGroupConfigMap();
Map<String, String> usedSinks = new HashMap<String, String>();
for (String groupName: sinkGroupNames) {
ComponentConfiguration comp = compMap.get(groupName);
if (comp != null) {
SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
List<Sink> groupSinks = new ArrayList<Sink>();
for (String sink : groupConf.getSinks()) {
Sink s = sinks.remove(sink);
if (s == null) {
String sinkUser = usedSinks.get(sink);
if (sinkUser != null) {
throw new InstantiationException(String.format(
"Sink %s of group %s already " +
"in use by group %s", sink, groupName, sinkUser));
} else {
throw new InstantiationException(String.format(
"Sink %s of group %s does "
+ "not exist or is not properly configured", sink,
groupName));
}
}
groupSinks.add(s);
usedSinks.put(sink, groupName);
}
try {
SinkGroup group = new SinkGroup(groupSinks);
Configurables.configure(group, groupConf);
sinkRunnerMap.put(comp.getComponentName(),
new SinkRunner(group.getProcessor()));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", groupName);
LOGGER.error(msg, e);
}
}
}
// add any unassigned sinks to solo collectors
for (Entry<String, Sink> entry : sinks.entrySet()) {
if (!usedSinks.containsValue(entry.getKey())) {
try {
//创建SinkProcessor
SinkProcessor pr = new DefaultSinkProcessor();
List<Sink> sinkMap = new ArrayList<Sink>();
sinkMap.add(entry.getValue());
pr.setSinks(sinkMap);
Configurables.configure(pr, new Context());
//完成Sink和Sink映射
sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", entry.getKey());
LOGGER.error(msg, e);
}
}
}
}