Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的event传递。一旦事务中所有的event全部传递到channel且提交成功,那么source就将该文件标记为完成。同理,事务以类似的方式处理从channel到sink的传递过程,如果因为某种原因使得event无法记录,那么事务将会回滚,且所有的event都会保持到channel中,等待重新传递。
Flume的事务机制保证了source产生的每个event都会传送到sink中(如果失败会无限重试),flume采用的是At-least-once的提交方式,这样就造成每个source产生的event至少到达sink一次,这种方式保证了数据的可靠性,但数据可能重复。
Transaction接口定义如下:
public void begin();
public void commit();
public void rollback();
public void close();
以MemoryTransaction介绍介绍下事务机制:
MemoryTransaction是MemoryChannel中的一个内部类,内部有2个阻塞队列putList和takeList,MemoryChannel内部有个queue阻塞队列。putList接收Source交给Channel的event数据,takeList保存Channel交给Sink的event数据。
- 如果Source交给Channel任务完成,进行commit时,会把putList中的所有event放到MemoryChannel中的queue。
- 如果Source交给Channel任务失败,进行rollback时,程序就不会继续走下去,比如KafkaSource需要commitOffsets,如果任务失败就不会commitOffsets。
- 如果Sink处理完Channel带来的event,进行commit的时,会清空takeList中的event数据,因为已经没consume。
- 如果Sink处理Channel带来的event失败,进行rollback的时,会把takeList中的event写回到queue中。
commit的关键代码:
@Override
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
// 清空putList,丢到外部类MemoryChannel中的queue队列里
while(!putList.isEmpty()) {
// MemoryChannel中的queue队列
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
}
rollback的关键代码
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized(queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
// 把takeList中的数据放回到queue中
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
}