Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击 Flume-ng源码解析之启动流程 查看

1 接口介绍

组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent

1.1 LifecycleAware

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface LifecycleAware {

  public void start();

  public void stop();

  public LifecycleState getLifecycleState();

}

非常简单就是三个方法,start()、stop()和getLifecycleState,这个接口是flume好多类都要实现的接口,包括 Flume-ng源码解析之启动流程
所中提到PollingPropertiesFileConfigurationProvider(),只要涉及到生命周期的都会实现该接口,当然组件们也是要实现的!

1.2 NamedComponent

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface NamedComponent {

  public void setName(String name);

  public String getName();

}

这个没什么好讲的,就是用来设置名字的。

2 Channel

作为Flume三大核心组件之一的Channel,我们有必要来看看它的构成:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {

  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

那么从上面的接口中我们可以看到Channel的主要功能就是put()和take(),那么我们就来看一下它的具体实现。这里我们选择MemoryChannel作为例子,但是MemoryChannel太长了,我们就截取一小段来看看

public class MemoryChannel extends BasicChannelSemantics {
    private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
    private static final Integer defaultCapacity = Integer.valueOf(100);
    private static final Integer defaultTransCapacity = Integer.valueOf(100);
    
    public MemoryChannel() {
    }

    ...
}

我们又看到它继承了BasicChannelSemantics ,从名字我们可以看出它是一个基础的Channel,我们继续看看看它的实现

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class BasicChannelSemantics extends AbstractChannel {

  private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();

  private boolean initialized = false;

  protected void initialize() {}

  protected abstract BasicTransactionSemantics createTransaction();

  @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

  @Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
}

找了许久,终于发现了put()和take(),但是仔细一看,它们内部调用的是BasicTransactionSemantics 的put()和take(),有点失望,继续来看看BasicTransactionSemantics

public abstract class BasicTransactionSemantics implements Transaction {

  private State state;
  private long initialThreadId;

  protected void doBegin() throws InterruptedException {}
  protected abstract void doPut(Event event) throws InterruptedException;
  protected abstract Event doTake() throws InterruptedException;
  protected abstract void doCommit() throws InterruptedException;
  protected abstract void doRollback() throws InterruptedException;
  protected void doClose() {}

  protected BasicTransactionSemantics() {
    state = State.NEW;
    initialThreadId = Thread.currentThread().getId();
  }

  protected void put(Event event) {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "put() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "put() called when transaction is %s!", state);
    Preconditions.checkArgument(event != null,
        "put() called with null event!");

    try {
      doPut(event);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new ChannelException(e.toString(), e);
    }
  }

  protected Event take() {
    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
        "take() called from different thread than getTransaction()!");
    Preconditions.checkState(state.equals(State.OPEN),
        "take() called when transaction is %s!", state);

    try {
      return doTake();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    }
  }

  protected State getState() {
    return state;
  }

  ...//我们这里只是讨论put和take,所以一些暂时不涉及的方法就被我干掉,有兴趣恩典朋友可以自行阅读

  protected static enum State {
    NEW, OPEN, COMPLETED, CLOSED
  }
}

又是一个抽象类,put()和take()内部调用的还是抽象方法doPut()和doTake(),看到这里,我相信没有耐心的同学已经崩溃了,但是就差最后一步了,既然是抽象类,那么最终Channel所使用的肯定是它的一个实现类,这时候我们可以回到一开始使用的MemoryChannel,到里面找找有没有线索,一看,MemoryChannel中就藏着个内部类

private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }

    @Override
    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);

      if (!putList.offer(event)) {
        throw new ChannelException(
            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

    @Override
    protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();
      if (takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      synchronized (queueLock) {
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      takeList.put(event);

      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }

   //...依然删除暂时不需要的方法

  }

在这个类中我们可以看到doPut()和doTake()的实现方法,也明白MemoryChannel的put()和take()最终调用的是MemoryTransaction 的doPut()和doTake()。

有朋友看到这里以为这次解析就要结束了,其实好戏还在后头,Channel中还有两个重要的类ChannelProcessor和ChannelSelector,耐心地听我慢慢道来。

3 ChannelProcessor

ChannelProcessor 的作用就是执行put操作,将数据放到channel里面。每个ChannelProcessor实例都会配备一个ChannelSelector来决定event要put到那个channl当中

public class ChannelProcessor implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class);
    private final ChannelSelector selector;
    private final InterceptorChain interceptorChain;

    public ChannelProcessor(ChannelSelector selector) {
        this.selector = selector;
        this.interceptorChain = new InterceptorChain();
    }

    public void initialize() {
        this.interceptorChain.initialize();
    }

    public void close() {
        this.interceptorChain.close();
    }

    public void configure(Context context) {
        this.configureInterceptors(context);
    }

    private void configureInterceptors(Context context) {
        //配置拦截器
    }

    public ChannelSelector getSelector() {
        return this.selector;
    }

    public void processEventBatch(List<Event> events) {
        ...
        while(i$.hasNext()) {
            Event optChannel = (Event)i$.next();
            List tx = this.selector.getRequiredChannels(optChannel);

            ...//将event放到Required队列

            t1 = this.selector.getOptionalChannels(optChannel);

            Object eventQueue;
            ...//将event放到Optional队列
           
        }

        ...//event的分配操作

    }

    public void processEvent(Event event) {
        event = this.interceptorChain.intercept(event);
        if(event != null) {
            List requiredChannels = this.selector.getRequiredChannels(event);
            Iterator optionalChannels = requiredChannels.iterator();

            ...//event的分配操作

            List optionalChannels1 = this.selector.getOptionalChannels(event);
            Iterator i$1 = optionalChannels1.iterator();

            ...//event的分配操作
        }
    }
}

为了简化代码,我进行了一些删除,只保留需要讲解的部分,说白了Channel中的两个写入方法,都是需要从作为参数传入的selector中获取对应的channel来执行event的put操作。接下来我们来看看ChannelSelector

4 ChannelSelector

ChannelSelector是一个接口,我们可以通过ChannelSelectorFactory来创建它的子类,Flume提供了两个实现类MultiplexingChannelSelector和ReplicatingChannelSelector。

public interface ChannelSelector extends NamedComponent, Configurable {
    void setChannels(List<Channel> var1);

    List<Channel> getRequiredChannels(Event var1);

    List<Channel> getOptionalChannels(Event var1);

    List<Channel> getAllChannels();
}

通过ChannelSelectorFactory 的create来创建,create中调用getSelectorForType来获得一个selector,通过配置文件中的type来创建相应的子类

public class ChannelSelectorFactory {

  private static final Logger LOGGER = LoggerFactory.getLogger(
      ChannelSelectorFactory.class);

  public static ChannelSelector create(List<Channel> channels,
      Map<String, String> config) {

      ...
  }

  public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null) {
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

  private static ChannelSelector getSelectorForType(String type) {
    if (type == null || type.trim().length() == 0) {
      return new ReplicatingChannelSelector();
    }

    String selectorClassName = type;
    ChannelSelectorType  selectorType = ChannelSelectorType.OTHER;

    try {
      selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
      LOGGER.debug("Selector type {} is a custom type", type);
    }

    if (!selectorType.equals(ChannelSelectorType.OTHER)) {
      selectorClassName = selectorType.getChannelSelectorClassName();
    }

    ChannelSelector selector = null;

    try {
      @SuppressWarnings("unchecked")
      Class<? extends ChannelSelector> selectorClass =
          (Class<? extends ChannelSelector>) Class.forName(selectorClassName);
      selector = selectorClass.newInstance();
    } catch (Exception ex) {
      throw new FlumeException("Unable to load selector type: " + type
          + ", class: " + selectorClassName, ex);
    }

    return selector;
  }

}

对于这两种Selector简单说一下:

1)MultiplexingChannelSelector
下面是一个channel selector 配置文件

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

MultiplexingChannelSelector类中定义了三个属性,用于存储不同类型的channel

    private Map<String, List<Channel>> channelMapping;
    private Map<String, List<Channel>> optionalChannels;
    private List<Channel> defaultChannels;

那么具体分配原则如下:

  • 如果设置了maping,那么会event肯定会给指定的channel,如果同时设置了optional,也会发送给optionalchannel
  • 如果没有设置maping,设置default,那么event会发送给defaultchannel,如果还同时设置了optional,那么也会发送给optionalchannel
  • 如果maping和default都没指定,如果有指定option,那么会发送给optionalchannel,但是发送给optionalchannel不会进行失败重试

2)ReplicatingChannelSelector

分配原则比较简单

  • 如果是replicating的话,那么如果没有指定optional,那么全部channel都有,如果某个channel指定为option的话,那么就要从requiredChannel移除,只发送给optionalchannel

5 总结:

作为一个承上启下的组件,Channel的作用就是将source来的数据通过自己流向sink,那么ChannelProcessor就起到将event put到分配好的channel中,而分配的规则是由selector决定的,flume提供的selector有multiplexing和replicating两种。所以ChannelProcessor一般都是在Source中被调用。那么Channel的take()肯定是在Sink中调用的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容