Flume源码分析之启动Flume

整体的event传输流程如下图:

flume传输流程.png

1. 配置文件的解析和相关组件的加载

通过如下命令即可启动flume进程。

bin/flume-ng agent -n name -c conf -f jobs/a.conf 

入口函数是flume-ng-node子项目中的Application的main方法。先通过Commons-cli对命令行进行解析,获取name和file,其中file就是配置文件。

本小结是根据配置文件生成MaterializedConfiguration对象

这个过程大体上分为如下步骤:

  • 读取配置文件properties
  • loadChannels(agentConf, channelComponentMap),利用反射根据name和type创建Channel实例
  • loadSources(agentConf, channelComponentMap, sourceRunnerMap),利用反射根据name和type创建Source实例,并创建ChannelSelector和ChannelProcessor,将source关联到channel
  • loadSinks(agentConf, channelComponentMap, sinkRunnerMap),利用反射根据name和type创建Sink实例,并将sink关联到channel,创建SinkProcessor和SinkRunner

配置文件的读取有两种方式:通过ZooKeeper和配置文件。

当读取的是配置文件,有选项控制是否当配置文件变更后,进行reload操作。

        List<LifecycleAware> components = Lists.newArrayList();

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          //解析配置文件的前期准备
          PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();

          //configurationProvider.getConfiguration() 真正开始解析配置文件
          //并根据配置文件启动flume
          application.handleConfigurationEvent(configurationProvider.getConfiguration());
        }

解析配置文件的前期准备

public class PropertiesFileConfigurationProvider extends
    AbstractConfigurationProvider {

  private final File file;

  public PropertiesFileConfigurationProvider(String agentName, File file) {
    super(agentName);
    this.file = file;
  }

  //以Properties的方式实现配置文件的读取
  @Override
  public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      Properties properties = new Properties();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }
}
public abstract class AbstractConfigurationProvider implements ConfigurationProvider {

  private final String agentName;
  private final SourceFactory sourceFactory;
  private final SinkFactory sinkFactory;
  private final ChannelFactory channelFactory;

  private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache;

  public AbstractConfigurationProvider(String agentName) {
    super();
    this.agentName = agentName;
    this.sourceFactory = new DefaultSourceFactory();
    this.sinkFactory = new DefaultSinkFactory();
    this.channelFactory = new DefaultChannelFactory();

    channelCache = new HashMap<Class<? extends Channel>, Map<String, Channel>>();
  }

  //由子类具体实现配置文件的读取
  protected abstract FlumeConfiguration getFlumeConfiguration();

  //根据配置文件,生成MaterializedConfiguration
  public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();//钩子方法,实现配置文件的读取
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      // SourceRunner,SinkRunner用于对Source和Sink进行驱动
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        //根据配置信息,装载channel相关数据
        loadChannels(agentConf, channelComponentMap);
        //根据配置信息和channel数据,装载Source
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        //根据配置信息和channel数据,装载Sink
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        //遍历channel
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            省略一些异常处理...
          } else {
            //完成channel的映射
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          //完成source和SourceRunner的映射
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          //完成sink和SinkRunner的映射
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

1.1 根据配置文件创建Channel实例

有几种默认的Channel类型定义,当type是已知的几种时或者是自定义Channel时,都可以获取到全限定类名,根据类名,通过反射,完成channel实例创建。


  //装载Channel
  private void loadChannels(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap)
          throws InstantiationException {
    Set<String> channelNames = agentConf.getChannelSet();
    Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
    
    for (String chName : channelNames) {
      ComponentConfiguration comp = compMap.get(chName);
      if (comp != null) {
        //根据channel名称和类型创建Channel实例
        Channel channel = getOrCreateChannel(channelsNotReused,
            comp.getComponentName(), comp.getType());
        try {
          Configurables.configure(channel, comp);
          channelComponentMap.put(comp.getComponentName(),
              new ChannelComponent(channel));
          LOGGER.info("Created channel " + chName);
        } catch (Exception e) {
          String msg = String.format("Channel %s has been removed due to an " +
              "error during configuration", chName);
          LOGGER.error(msg, e);
        }
      }
    }
  }

  private Channel getOrCreateChannel(
      ListMultimap<Class<? extends Channel>, String> channelsNotReused,
      String name, String type)
      throws FlumeException {

    //根据Channel的type获取Class
    Class<? extends Channel> channelClass = channelFactory.getClass(type);
    
    if (channelClass.isAnnotationPresent(Disposable.class)) {
      Channel channel = channelFactory.create(name, type);
      channel.setName(name);
      return channel;
    }
    Map<String, Channel> channelMap = channelCache.get(channelClass);
    if (channelMap == null) {
      channelMap = new HashMap<String, Channel>();
      channelCache.put(channelClass, channelMap);
    }
    Channel channel = channelMap.get(name);
    if (channel == null) {
      //根据channel的name和type创建Channel对象
      channel = channelFactory.create(name, type);
      channel.setName(name);
      channelMap.put(name, channel);
    }
    channelsNotReused.get(channelClass).remove(name);
    return channel;
  }


public class DefaultChannelFactory implements ChannelFactory {
  @Override
  public Channel create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");

    //根据type获取Channel的class信息
    Class<? extends Channel> channelClass = getClass(type);
    try {
      //创建Channel
      return channelClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to create channel: " + name
          + ", type: " + type + ", class: " + channelClass.getName(), ex);
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public Class<? extends Channel> getClass(String type) throws FlumeException {
    String channelClassName = type;
    ChannelType channelType = ChannelType.OTHER;
    try {
      //全部转缓存大写 
      channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      logger.debug("Channel type {} is a custom type", type);
    }
    if (!channelType.equals(ChannelType.OTHER)) {
      //默认的几种类型
      channelClassName = channelType.getChannelClassName();
    }
    try {
      //自定义type,全路径限制类
      return (Class<? extends Channel>) Class.forName(channelClassName);
    } catch (Exception ex) {
      throw new FlumeException("Unable to load channel type: " + type
          + ", class: " + channelClassName, ex);
    }
  }
}

几种默认的Channel类型,都匹配不上的,就是自定义Channel。

public enum ChannelType {
  OTHER(null),
  FILE("org.apache.flume.channel.file.FileChannel"),
  MEMORY("org.apache.flume.channel.MemoryChannel"),
  JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
  SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");

  private final String channelClassName;

  private ChannelType(String channelClassName) {
    this.channelClassName = channelClassName;
  }

  public String getChannelClassName() {
    return channelClassName;
  }
}

1.2 根据配置文件完成Source加载

source的实例化部分与channel的相同。完成source实例化后,还需要将Source和Channel进行关联,以及设置

 private void loadSources(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap,
      Map<String, SourceRunner> sourceRunnerMap)
      throws InstantiationException {

    Set<String> sourceNames = agentConf.getSourceSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSourceConfigMap();
    
    //遍历source
    for (String sourceName : sourceNames) {
      ComponentConfiguration comp = compMap.get(sourceName);
      if (comp != null) {
        SourceConfiguration config = (SourceConfiguration) comp;

        //与channel的实例化类似,根据source的name和type,通过反射创建Source实例
        Source source = sourceFactory.create(comp.getComponentName(),
            comp.getType());
        try {

          //获取该source关联的channel信息
          Configurables.configure(source, config);
          Set<String> channelNames = config.getChannels();
          List<Channel> sourceChannels = new ArrayList<Channel>();
          for (String chName : channelNames) {
            ChannelComponent channelComponent = channelComponentMap.get(chName);
            if (channelComponent != null) {
              sourceChannels.add(channelComponent.channel);
            }
          }

          if (sourceChannels.isEmpty()) {
            //不允许source没有channel
            String msg = String.format("Source %s is not connected to a " +
                "channel",  sourceName);
            throw new IllegalStateException(msg);
          }

          ChannelSelectorConfiguration selectorConfig =
              config.getSelectorConfiguration();

          //实例化ChannelSelector,默认是复制策略
          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

            //创建ChannelProcessor
          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, config);
          source.setChannelProcessor(channelProcessor);
          sourceRunnerMap.put(comp.getComponentName(),
              SourceRunner.forSource(source));//source和SourceRunner进行映射
              
          for (Channel channel : sourceChannels) {
            ChannelComponent channelComponent =
                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
                                           String.format("Channel %s", channel.getName()));
            channelComponent.components.add(sourceName);
          }
        } catch (Exception e) {
          String msg = String.format("Source %s has been removed due to an " +
              "error during configuration", sourceName);
          LOGGER.error(msg, e);
        }
      }
    }
  }

有两种提供的Source类型,同理也有两种提供的SourceRunner。

public abstract class SourceRunner implements LifecycleAware {
  private Source source;

  public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }
    return runner;
  }
}

提供了两种ChannelSelector类型,一种是REPLICATING(复制),一种是MULTIPLEXING(多路分发)

public enum ChannelSelectorType {
  OTHER(null),
  REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),
  MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
  private final String channelSelectorClassName;
  private ChannelSelectorType(String channelSelectorClassName) {
    this.channelSelectorClassName = channelSelectorClassName;
  }
  public String getChannelSelectorClassName() {
    return channelSelectorClassName;
  }
}

1.3 根据配置文件加载sink

private void loadSinks(AgentConfiguration agentConf,
      Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
      throws InstantiationException {
    Set<String> sinkNames = agentConf.getSinkSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSinkConfigMap();
    Map<String, Sink> sinks = new HashMap<String, Sink>();

    //遍历sink
    for (String sinkName : sinkNames) {
      ComponentConfiguration comp = compMap.get(sinkName);
      if (comp != null) {
        SinkConfiguration config = (SinkConfiguration) comp;
        //实例化sink
        Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
        try {
          Configurables.configure(sink, config);
          ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
          if (channelComponent == null) {
            String msg = String.format("Sink %s is not connected to a " +
                "channel",  sinkName);
            throw new IllegalStateException(msg);
          }
          sink.setChannel(channelComponent.channel);
          sinks.put(comp.getComponentName(), sink);
          channelComponent.components.add(sinkName);
        } catch (Exception e) {
          String msg = String.format("Sink %s has been removed due to an " +
              "error during configuration", sinkName);
          LOGGER.error(msg, e);
        }
      }
    }
       
    loadSinkGroups(agentConf, sinks, sinkRunnerMap);
  }

  private void loadSinkGroups(AgentConfiguration agentConf,
      Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
          throws InstantiationException {
    Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
    Map<String, ComponentConfiguration> compMap =
        agentConf.getSinkGroupConfigMap();
    Map<String, String> usedSinks = new HashMap<String, String>();
    for (String groupName: sinkGroupNames) {
      ComponentConfiguration comp = compMap.get(groupName);
      if (comp != null) {
        SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
        List<Sink> groupSinks = new ArrayList<Sink>();
        for (String sink : groupConf.getSinks()) {
          Sink s = sinks.remove(sink);
          if (s == null) {
            String sinkUser = usedSinks.get(sink);
            if (sinkUser != null) {
              throw new InstantiationException(String.format(
                  "Sink %s of group %s already " +
                      "in use by group %s", sink, groupName, sinkUser));
            } else {
              throw new InstantiationException(String.format(
                  "Sink %s of group %s does "
                      + "not exist or is not properly configured", sink,
                      groupName));
            }
          }
          groupSinks.add(s);
          usedSinks.put(sink, groupName);
        }
        try {
          SinkGroup group = new SinkGroup(groupSinks);
          Configurables.configure(group, groupConf);
          sinkRunnerMap.put(comp.getComponentName(),
              new SinkRunner(group.getProcessor()));
        } catch (Exception e) {
          String msg = String.format("SinkGroup %s has been removed due to " +
              "an error during configuration", groupName);
          LOGGER.error(msg, e);
        }
      }
    }
    // add any unassigned sinks to solo collectors
    for (Entry<String, Sink> entry : sinks.entrySet()) {
      if (!usedSinks.containsValue(entry.getKey())) {
        try {

          //创建SinkProcessor
          SinkProcessor pr = new DefaultSinkProcessor();
          List<Sink> sinkMap = new ArrayList<Sink>();
          sinkMap.add(entry.getValue());
          pr.setSinks(sinkMap);
          Configurables.configure(pr, new Context());
          
          //完成Sink和Sink映射
          sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
        } catch (Exception e) {
          String msg = String.format("SinkGroup %s has been removed due to " +
              "an error during configuration", entry.getKey());
          LOGGER.error(msg, e);
        }
      }
    }
  }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351