flume - 启动过程分析(1)




Usage: /Users/lebron374/Library/apache-flume-1.8.0-bin/bin/flume-ng <command> [options]...

  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info

global options:
  --conf,-c <conf>          use configs in <conf> directory
  --classpath,-C <cp>       append to the classpath
  --dryrun,-d               do not actually start Flume, just print the command
  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                            plugins.d section in the user guide for more details.
                            Default: $FLUME_HOME/plugins.d
  -Dproperty=value          sets a Java system property value
  -Xproperty=value          sets a Java -X option

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)
  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
  --no-reload-conf          do not reload config file if changed
  --help,-h                 display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.



# example.conf: A single-node Flume configuration

# 1.定义三个组件的名称
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2.配置Source(从哪里连接Sources)
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = cen-ubuntu
a1.sources.r1.port = 44444

# 3.配置Sink(主要用于输出日志信息)
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 1024

# 4.配置Channel(使用存储当做管道)
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 5.绑定三个组件
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


bin/flume-ng agent --conf conf --name a1 --conf-file conf/a1.conf -Dflume.root.logger=DEBUG,console


 Application是整个flume的启动核心,通过下面的代码我们看出命令行显示帮助可以通过apache commons-cli包来实现,如果以后需要实现类似shell命令中的help提示可以考虑用commons-cli包实现。

public static void main(String[] args) {

    try {

      boolean isZkConfigured = false;
        // // TODO: 2018/5/18 这里用了apache commons-cli的jar包实现的命令行提示  
      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");

      option = new Option("f", "conf-file", true,
          "specify a config file (required if -z missing)");

      option = new Option(null, "no-reload-conf", false,
          "do not reload config file if changed");

      // Options for Zookeeper
      option = new Option("z", "zkConnString", true,
          "specify the ZooKeeper connection to use (required if -f missing)");

      option = new Option("p", "zkBasePath", true,
          "specify the base path in ZooKeeper for agent configs");

      option = new Option("h", "help", false, "display help text");

      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);

      if (commandLine.hasOption('h')) {
        new HelpFormatter().printHelp("flume-ng agent", options, true);

      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf");

      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
        isZkConfigured = true;
      Application application = null;
        //// TODO: 2018/5/18 走的zk的配置信息 
      if (isZkConfigured) {
           //todo 这里把非核心代码注释了
      } else {
          //// TODO: 2018/5/18 走的配置文件的配置信息
        File configurationFile = new File(commandLine.getOptionValue('f'));

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          //// TODO: 2018/5/18  这个是components中加入的component是configurationProvider
            //// TODO: 2018/5/18  configurationProvider的初始状态是IDEL
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          application = new Application(components);
            // // TODO: 2018/5/18 guava的eventBus的注册功能,内部其实就是一个反射调用 
        } else {
          PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();

      final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        public void run() {



 PollingPropertiesFileConfigurationProvider创建configurationProvider对象然后添加到components当中,然后components关联到application并通过application.start()启动逐步跟进以后会进入到 PollingPropertiesFileConfigurationProvider.start()方法。

public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    //// TODO: 2018/5/18 启动单个线程的线程池 
    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")

    //// TODO: 2018/5/18 启动配置文件监控 
    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,

    //// TODO: 2018/5/18 设置状态为START
    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");


public class FileWatcherRunnable implements Runnable {

    private final File file;
    private final CounterGroup counterGroup;

    private long lastChange;

    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
      this.file = file;
      this.counterGroup = counterGroup;
      this.lastChange = 0L;

    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);


      long lastModified = file.lastModified();

        //// TODO: 2018/5/18 直接通过检测文件的变更时间来确认文件是否发生变更 
      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);


        lastChange = lastModified;

        try {
            //// TODO: 2018/5/18 通过eventBus的总线实现线程间通信,然后把配置通知监听者

 这里通过getFlumeConfiguration解析配置文件,稍微多讲一点就是我们通过解析配置文件后就会source、channel、sink几个对象了。继续跟进getFlumeConfiguration然后我们会调到PropertiesFileConfigurationProvider类,因为PollingPropertiesFileConfigurationProvider 继承自PropertiesFileConfigurationProvider类。

public MaterializedConfiguration getConfiguration() {
    //// TODO: 2018/5/18 SimpleMaterializedConfiguration包含了source、channel、sink三者的map对象 
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
      //// TODO: 2018/5/18 这里我们访问的是PropertiesFileConfigurationProvider
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
        for (String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.get(channelName);
          if (channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            Map<String, Channel> nameChannelMap =
            if (nameChannelMap != null) {
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
        for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
    return conf;


public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      String resolverClassName = System.getProperty("propertiesImplementation",
      Class<? extends Properties> propsclass = Class.forName(resolverClassName)
      Properties properties = propsclass.newInstance();
      //// TODO: 2018/5/18 最简单的java.util.property加载的配置,然后转成map而已,最后封装成FlumeConfiguration返回就可以了
      return new FlumeConfiguration(toMap(properties));


public FlumeConfiguration(Map<String, String> properties) {
      //// TODO: 2018/5/18 这里的agentConfigMap其实就是一个多层map而已,外层是agent的名字,内部是AgentConfiguration对象
      //// TODO: 2018/5/18 AgentConfiguration内部又分sources、channels、sinks的map
      //// TODO: 2018/5/18 然后sources又是一个map,内部以名字进行区分,每个名字下面有多个配置,多个配置再以map组织
    agentConfigMap = new HashMap<>();
    errors = new LinkedList<>();
    // Construct the in-memory component hierarchy
    for (Entry<String, String> entry : properties.entrySet()) {
      //// TODO: 2018/5/18 遍历配置文件,将每个agent的配置信息加入到AgentConfiguration当中
      if (!addRawProperty(entry.getKey(), entry.getValue())) {
        LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue());


private boolean addRawProperty(String rawName, String rawValue) {
    // Null names and values not supported
    if (rawName == null || rawValue == null) {
      addError("", AGENT_NAME_MISSING, ERROR);
      return false;

    // Remove leading and trailing spaces
    String name = rawName.trim();
    String value = rawValue.trim();

    // Empty values are not supported
    if (value.isEmpty()) {
      addError(name, PROPERTY_VALUE_NULL, ERROR);
      return false;

    int index = name.indexOf('.');

    // All configuration keys must have a prefix defined as agent name
    if (index == -1) {
      addError(name, AGENT_NAME_MISSING, ERROR);
      return false;

      //// TODO: 2018/5/18 每个agent都有一个对应的配置,通过agent名字区分
      //// TODO: 2018/5/18  agent1.sources.s1、agent1.channels.c1、agent1.sinks.k1
    String agentName = name.substring(0, index);

    // Agent name must be specified for all properties
    if (agentName.isEmpty()) {
      addError(name, AGENT_NAME_MISSING, ERROR);
      return false;

      //// TODO: 2018/5/18 标记是什么配置sources、channels、sinks
      //// TODO: 2018/5/18  configKey是sources.s1、channels.c1、sinks.k1
      String configKey = name.substring(index + 1);

    // Configuration key must be specified for every property
    if (configKey.isEmpty()) {
      addError(name, PROPERTY_NAME_NULL, ERROR);
      return false;

      //// TODO: 2018/5/18 每个agent一个配置文件对象AgentConfiguration 
    AgentConfiguration aconf = agentConfigMap.get(agentName);

      //// TODO: 2018/5/18 每个agentName包含一个AgentConfiguration对象
    if (aconf == null) {
      aconf = new AgentConfiguration(agentName, errors);
      agentConfigMap.put(agentName, aconf);

    // Each configuration key must begin with one of the three prefixes:
    // sources, sinks, or channels.
    return aconf.addProperty(configKey, value);


  • a1.sources = r1
  • a1.sinks = k1
  • a1.channels = c1


  • addAsSourceConfig: 解析source的配置信息
  • addAsChannelValue: 解析channel的配置信息
  • addAsSinkConfig:解析sink的配置信息
  • addAsSinkGroupConfig:解析SinkGroup的配置信息
  • addAsConfigFilterConfig:解析filter的配置信息
private boolean addProperty(String key, String value) {

      // Check for configFilters
      if (CONFIG_CONFIGFILTERS.equals(key)) {
        if (configFilters == null) {
          configFilters = value;
          return true;
        } else {
          LOGGER.warn("Duplicate configfilter list specified for agent: {}", agentName);
          return false;
      // Check for sources,保存agent1.sources的配置
      if (CONFIG_SOURCES.equals(key)) {
        if (sources == null) {
          sources = value;
          return true;
        } else {
          LOGGER.warn("Duplicate source list specified for agent: {}", agentName);
          return false;

      // Check for sinks 保存agent1.sinks配置
      if (CONFIG_SINKS.equals(key)) {
        if (sinks == null) {
          sinks = value;
          LOGGER.info("Added sinks: {} Agent: {}", sinks, agentName);
          return true;
        } else {
          LOGGER.warn("Duplicate sink list specfied for agent: {}", agentName);
          return false;

      // Check for channels 保存agent1.channels配置
      if (CONFIG_CHANNELS.equals(key)) {
        if (channels == null) {
          channels = value;

          return true;
        } else {
          LOGGER.warn("Duplicate channel list specified for agent: {}", agentName);
          return false;

      // Check for sinkgroups
      if (CONFIG_SINKGROUPS.equals(key)) {
        if (sinkgroups == null) {
          sinkgroups = value;

          return true;
        } else {
          LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName);
          return false;

        //// TODO: 2018/5/18 这里保存的是真正每个agent的配置信息,如agent.sources.s1、agent1.channels.c1、agent1.sinks.k1等
      if (addAsSourceConfig(key, value)
          || addAsChannelValue(key, value)
          || addAsSinkConfig(key, value)
          || addAsSinkGroupConfig(key, value)
          || addAsConfigFilterConfig(key, value)
      ) {
        return true;

      LOGGER.warn("Invalid property specified: {}", key);
      addError(key, INVALID_PROPERTY, ERROR);
      return false;


    private boolean addAsSourceConfig(String key, String value) {
      //todo 这里的key为sources.s1.type、sources.s1.bind、sources.s1.port
      return addComponentConfig(
          key, value, CONFIG_SOURCES_PREFIX, sourceContextMap

    private boolean addComponentConfig(
        String key, String value, String configPrefix, Map<String, Context> contextMap

    ) {
        //// TODO: 2018/5/18 configPrefix是sources.
      ComponentNameAndConfigKey parsed = parseConfigKey(key, configPrefix);
      if (parsed != null) {
        String name = parsed.getComponentName().trim();
        LOGGER.info("Processing:{}", name);
          //// TODO: 2018/5/18 从这里可以看出来每个sources有一个对于的Context文件对象
        Context context = contextMap.get(name);

        if (context == null) {
          LOGGER.debug("Created context for {}: {}", name, parsed.getConfigKey());
          context = new Context();
          contextMap.put(name, context);

          //// TODO: 2018/5/18 context其实就是一个map,放置这个sources对应的配置的key和对应的value
        context.put(parsed.getConfigKey(), value);
        return true;

      return false;

      private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
      // key must start with prefix,这里已经把agent的名字去掉了,剩下的就是source.开头的
      if (!key.startsWith(prefix)) {
        return null;

      // key must have a component name part after the prefix of the format:
      // <prefix><component-name>.<config-key>
      //// TODO: 2018/5/18 类似sources.s1.type、sources.s1.bind、source.s1.port等信息
      int index = key.indexOf('.', prefix.length() + 1);

      if (index == -1) {
        return null;

        //// TODO: 2018/5/18 sources的名字 
      String name = key.substring(prefix.length(), index);
        //// TODO: 2018/5/18 configkey表示 
      String configKey = key.substring(prefix.length() + name.length() + 1);

      // name and config key must be non-empty
      if (name.isEmpty() || configKey.isEmpty()) {
        return null;

        //// TODO: 2018/5/18 返回sources的对应的名字和对应的key 
      return new ComponentNameAndConfigKey(name, configKey);

  public static class ComponentNameAndConfigKey {

    private final String componentName;
    private final String configKey;

    private ComponentNameAndConfigKey(String name, String configKey) {
      this.componentName = name;
      this.configKey = configKey;

    public String getComponentName() {
      return componentName;

    public String getConfigKey() {
      return configKey;


public static class AgentConfiguration {

    private final String agentName;
    private String configFilters;
    private String sources;
    private String sinks;
    private String channels;
    private String sinkgroups;

    private final Map<String, ComponentConfiguration> sourceConfigMap;
    private final Map<String, ComponentConfiguration> sinkConfigMap;
    private final Map<String, ComponentConfiguration> channelConfigMap;
    private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
    private final Map<String, ComponentConfiguration> configFilterConfigMap;

      //// TODO: 2018/5/18 核心的保存每个sources、channels、sinks对应的对象,其中key为对应的名字,value是这个名字下相关的配置信息
    private Map<String, Context> configFilterContextMap;
    private Map<String, Context> sourceContextMap;
    private Map<String, Context> sinkContextMap;
    private Map<String, Context> channelContextMap;
    private Map<String, Context> sinkGroupContextMap;

    private Set<String> sinkSet;
    private Set<String> configFilterSet;
    private Set<String> sourceSet;
    private Set<String> channelSet;
    private Set<String> sinkgroupSet;

    private final List<FlumeConfigurationError> errorList;
    private List<ConfigFilter> configFiltersInstances;
    private Map<String, Pattern> configFilterPatternCache;



  • agentConfigMap作为最外面的map,维护agent的所有配置信息,其中key为agent的name,value为agent的配置,是一个AgentConfiguration对象。
  • AgentConfiguration对象包含sourceConfigMap、sinkConfigMap、channelConfigMap等map,其中key为各自的name,如sourceConfigMap中key为source的名字,value为一个map对象包含这个source下面的所有属性名和属性值。



public void run() {
      LOGGER.debug("Checking file:{} for changes", file);


      long lastModified = file.lastModified();

        //// TODO: 2018/5/18 直接通过检测文件的变更时间来确认文件是否发生变更 
      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);


        lastChange = lastModified;

        try {
            //// TODO: 2018/5/18 通过eventBus的总线实现线程间通信,然后把配置通知监听者
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);

 Application对象通过@Subscribe的注解订阅了eventBus的事件,然后获取了解析的配置信息,通过 stopAllComponents和startAllComponents重启服务,回想下我们刚刚提到的通过动态检测文件的思路,就可以理解flume如何实现不重启重新加载配置的功能,也为我们日常的需求提供了思路。

public class Application {

  public void handleConfigurationEvent(MaterializedConfiguration conf) {
    try {
        //todo 通过重启实现配置的重新更新
        //// TODO: 2018/5/18 这里根据配置启动服务 
    } catch (InterruptedException e) {
      logger.info("Interrupted while trying to handle configuration event");
    } finally {
      // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
      if (lifecycleLock.isHeldByCurrentThread()) {


private void stopAllComponents() {
    if (this.materializedConfiguration != null) {
      logger.info("Shutting down configuration: {}", this.materializedConfiguration);
      for (Entry<String, SourceRunner> entry :
           this.materializedConfiguration.getSourceRunners().entrySet()) {
        try {
          logger.info("Stopping Source " + entry.getKey());
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);

      for (Entry<String, SinkRunner> entry :
           this.materializedConfiguration.getSinkRunners().entrySet()) {
        try {
          logger.info("Stopping Sink " + entry.getKey());
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);

      for (Entry<String, Channel> entry :
           this.materializedConfiguration.getChannels().entrySet()) {
        try {
          logger.info("Stopping Channel " + entry.getKey());
        } catch (Exception e) {
          logger.error("Error while stopping {}", entry.getValue(), e);
    if (monitorServer != null) {


private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;

      //// TODO: 2018/5/18 启动channels 
    for (Entry<String, Channel> entry :
        materializedConfiguration.getChannels().entrySet()) {
      try {
        logger.info("Starting Channel " + entry.getKey());
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);

     * Wait for all channels to start.
    for (Channel ch : materializedConfiguration.getChannels().values()) {
      while (ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)) {
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);

      //// TODO: 2018/5/18 启动sinks 
    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
      try {
        logger.info("Starting Sink " + entry.getKey());
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);

      //// TODO: 2018/5/18 启动sources
    for (Entry<String, SourceRunner> entry :
         materializedConfiguration.getSourceRunners().entrySet()) {
      try {
        logger.info("Starting Source " + entry.getKey());
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);

      //todo 启动监控



