RecordAccumulator.java

public final class RecordAccumulator {

    // Sender线程准备关闭
    private volatile boolean closed;
    // 批次大小
    private final int batchSize;
    // 缓存了发往对应topicPartition的N个消息批次
    // Deque是非线程安全,所以出入队列需要加锁
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final BufferPool free;
    // 未发送完成的批次集合,底层通过Set<RecordBatch>实现
    private final IncompleteRecordBatches incomplete;
    // 使用drain方法批量导出批次,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次发送继续从该位置开始
    private int drainIndex;

    // 发送追加消息
    public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // 通过topic-partition查找map里指定的消息队列
        Deque<RecordBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            // 尝试往Dequeue的最后一个批次里追加消息
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null)
                // 追加成功返回
                return appendResult;
        }

        // 追加失败, 尝试申请缓存
        // 注意这里走出synchronized代码块,释放dq锁了,可能堵塞在申请缓存空间上,把发送消息让给其他需要更少空间的发送消息线程
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null) {
                // appendResult不为null说明第2次重试tryAppend成功了
                // 说明在synchronized释放申请缓存的时候,有其他线程释放了空间
                // 所以这里要把申请的缓存还回去
                free.deallocate(buffer);
                return appendResult;
            }
            // 申请到新空间后创建一个新批次
            // 之前从free那申请的缓存分配给MemoryRecords
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

            // 新建的batch加到Dequeue和incomplete里
            dq.addLast(batch);
            incomplete.add(batch);
            // 返回result,return后面会根据条件判断是否要唤醒Sender线程
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }
    }

    // 获取集群中符合发送消息条件的节点集合
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;

        // waiter队列里有成员在排队,说明有其他线程在等待BufferPool释放空间,需要发送消息释放空间
        boolean exhausted = this.free.queued() > 0;
        // 遍历每个topicPartition
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            // 获取parition的leader节点
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                // 标记需要更新Metadata
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                synchronized (deque) {
                    // 加锁从队列里取出第一个批次
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Dequeue中有多个批次或者第一个批次已经满了
                        // 如果缓存已用容量position超过writeLimit,表示批次已满
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        // 超时
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        // flushInProgress有线程正在等待flush操作完成
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            // 满足符合发送消息的节点
                            readyNodes.add(leader);
                        } else {
                            // 下次调用ready方法的间隔, 本质是控制selector.select()堵塞时参考的等待时长要素之一
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }

    // 将partition-批次集合 转成 节点-批次集合 映射
     public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {

        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
        // 遍历需要发送消息的节点
        for (Node node : nodes) {
            int size = 0;
            // node上的partition集合
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            // 待发送消息批次的集合
            List<RecordBatch> ready = new ArrayList<>();
            // drainIndex就是断电续传的书签
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                if (!muted.contains(tp)) {
                    // 获取该partion的消息批次集合
                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                    if (deque != null) {
                        synchronized (deque) {
                            // 获取第一条消息批次
                            RecordBatch first = deque.peekFirst();
                            if (first != null) {
                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                if (!backoff) {
                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                        // 要发送的集合大小超过配置的单个请求的maxSize,跳出循环
                                        // 因为外层循环是node节点,所以单个请求只是node粒度的
                                        break;
                                    } else {
                                        // 每个node的partition只取一个消息批次,后面会drainIndex+1寻找下一个partition
                                        // 防止只有一个partition在发送,其他partition处于饥饿状态
                                        RecordBatch batch = deque.pollFirst();
                                        // 关闭输出流
                                        batch.records.close();
                                        size += batch.records.sizeInBytes();
                                        ready.add(batch);
                                        batch.drainedMs = now;
                                    }
                                }
                            }
                        }
                    }
                }
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(node.id(), ready);
        }
        return batches;
    }
}

public final class BufferPool {
    // 整个pool的大小
    private final long totalMemory;
    // 指定free队列里每个byteBuffer的大小,等于batchSize
    private final int poolableSize;
    // 控制多线程并发分配和回收ByteBuffer
    private final ReentrantLock lock;
    // 缓存byteBuffer的队列,缓冲池
    // 池化队列free,避免ByteBuffer.allocate创建开销
    private final Deque<ByteBuffer> free;
    // 记录因申请不到足够的空间而阻塞的线程,实际记录阻塞线程对应的Condition对象
    private final Deque<Condition> waiters;
    // totalMemory - free
    private long availableMemory;

    // 追加消息时候会调用
    // 从缓冲池free里申请ByteBuffer,缓冲池空间不足就会阻塞调用线程
    // size是申请空间大小,maxTimeToBlockMs申请最大等待时间
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        // 加锁
        this.lock.lock();
        try {
            if (size == poolableSize && !this.free.isEmpty())
                // 申请大小正好等于一个ByteBuffer对象的大小,直接从队列里取出
                // 所以消息的长度不要超过batchSize
                return this.free.pollFirst();

            int freeListSize = this.free.size() * this.poolableSize;
            // 缓冲池+剩余可用足够申请
            if (this.availableMemory + freeListSize >= size) {
                // 先从free里一块块划走ByteBuffer缓存到availableMemory,直到满足size
                freeUp(size);
                // 然后availableMemory里扣除size部分的缓存
                this.availableMemory -= size;
                lock.unlock();
                return ByteBuffer.allocate(size);
            } else {
                int accumulated = 0;
                ByteBuffer buffer = null;
                // Condition阻塞等待有足够的缓存
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                // 加入waiters等待队列
                this.waiters.addLast(moreMemory);
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    // 阻塞Condition等待
                    boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

                    if (waitingTimeElapsed) {
                        // 表示等待超时,抛出异常结束
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    // 可能有空间申请,但还不够size,所以减去时间后继续等待
                    remainingTimeToBlockNs -= timeNs;
                    // 正好size是一个ByteBuffer
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }

                // 等待队列是先进先出的
                Condition removed = this.waiters.removeFirst();
                // 要是还有剩余空间唤醒下一个阻塞等待的其他线程
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                }

                lock.unlock();
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }

    // 释放缓存归还
    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            // 正好等于单个ByteBuffer就放回free队列
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                // 不等于就加到availableMemory,不放回free队列?
                this.availableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                // 唤醒正在堵塞等待可用空间的线程
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }
}

public final class RecordBatch {
    // 消息个数
    public int recordCount = 0;
    // 最大的消息的字节数
    public int maxRecordSize = 0;
    // 尝试发送当前批次的次数
    public volatile int attempts = 0;
    // 最后一次尝试发送的时间戳
    public long lastAttemptMs;
    // 消息最终存放的地方
    public final MemoryRecords records;
    // 当前批次发送给哪个topic的partition
    public final TopicPartition topicPartition;
    // 标志RecordBatch状态的Future对象
    public final ProduceRequestResult produceFuture;
    // 最后一次往批次里追加消息的时间戳
    public long lastAppendTime;
    private final List<Thunk> thunks;
    // 记录某消息在批次中的offset
    private long offsetCounter = 0L;
    // 是否正在重试
    private boolean retry;

    // 追加消息
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            // buffer没有空间返回null
            return null;
        } else {
            // 委托给MemoryRecords追加消息到缓存
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            // 通过produceFuture的latchDown实现异步future的特性
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                // thunks就是批次里所有消息callback的队列集合
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

    // 当批次成功收到正常响应、或超时、或关闭producer时,调用done方法
    public void done(long baseOffset, long timestamp, RuntimeException exception) {
        // 回调所有消息的callback
        for (int i = 0; i < this.thunks.size(); i++) {
            Thunk thunk = this.thunks.get(i);
            if (exception == null) {
                // thunk.future即服务端返回的结果,封装成RecordMetadata
                RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
                                                             timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
                                                             thunk.future.checksum(),
                                                             thunk.future.serializedKeySize(),
                                                             thunk.future.serializedValueSize());
                thunk.callback.onCompletion(metadata, null);
            } else {
                thunk.callback.onCompletion(null, exception);
            }
        }
        // 这里会调用latchDown.donw()
        this.produceFuture.done(topicPartition, baseOffset, exception);
    }

    final private static class Thunk {
        // 对应消息的callback
        final Callback callback;
        final FutureRecordMetadata future;
    }
}

// 类名字面意思就是包含RecordMetadata的Future对象
public final class FutureRecordMetadata implements Future<RecordMetadata> {

    // 类似指针,指向消息所在批次RecordBatch的produceFuture
    private final ProduceRequestResult result;
    // 消息在批次中的offset
    private final long relativeOffset;

    // 实现future接口方法
    @Override
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        // 本质是委托produceFuture的latchDown.await()
        this.result.await();
        return valueOrError();
    }

    RecordMetadata valueOrError() throws ExecutionException {
        if (this.result.error() != null)
            throw new ExecutionException(this.result.error());
        else
            return value();
    }

    // 当producer收到消息批次的响应时,get方法返回RecordMetadata对象,即消息的元数据
    RecordMetadata value() {
        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
                                  this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize);
    }

    // 在sender处理服务端返回的响应,回调callback时会被调用
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        this.error = error;
        // 通知future有结果了
        this.latch.countDown();
    }
}

public final class ProduceRequestResult {
    // 没有实现Future接口,通过CountDownLatch实现了类似Future的功能
    private final CountDownLatch latch = new CountDownLatch(1);
    // 服务端为批次RecordBatch中第一条消息分配的offset
    // 这个批次下每个消息根据baseOffset能够计算出自己再服务端分区中的偏移量
    private volatile long baseOffset = -1L;
    private volatile RuntimeException error;

    // RecordBatch会调用该方法,通知消息被正常响应
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        // 区分消息是正常响应还是异常响应
        this.error = error;
        // 唤醒阻塞在latch.await()的thread
        this.latch.countDown();
    }
}

public class MemoryRecords implements Records {

    // 压缩后的消息输出到buffer(压缩场景主要是瓶颈在网络带宽)
    private final Compressor compressor;
    // buffer最多可以写入多少个字节的数据
    private final int writeLimit;
    // 保存消息数据
    private ByteBuffer buffer;
    // 在MemoryRecords发送前会设置成false-只读
    private boolean writable;

    // 私有构造方法
    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
        ...
    }

    // 只能通过emptyRecords创建MemoryRecords实例
    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
        return new MemoryRecords(buffer, type, true, writeLimit);
    }

    // 追加消息
    public void append(long offset, Record record) {
        // 判断是否只读
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        // 调用compressor将消息写入ByteBuffer
        int size = record.size();
        compressor.putLong(offset);
        compressor.putInt(size);
        compressor.put(record.buffer());
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        record.buffer().rewind();
    }

    // 估算MemoryRecords剩余空间是否足够写入消息,主要是压缩后的消息
    // 如果估算不准,会导致底层ByteBuffer扩容
    public boolean hasRoomFor(byte[] key, byte[] value) {
        if (!this.writable)
            return false;

        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }

}

public class Compressor {
    // 添加了压缩功能
    private final DataOutputStream appendStream;
    // 封装了ByteBuffer,当写入数据超出ByteBuffer容量时会ByteBufferOutputStream会自动扩容
    private final ByteBufferOutputStream bufferStream;

    public Compressor(ByteBuffer buffer, CompressionType type) {
        // KafkaProducer传入的压缩类型
        this.type = type;
        // create the stream
        bufferStream = new ByteBufferOutputStream(buffer);
        // 根据压缩类型创建合适的压缩流,默认是NONE不压缩
        // 装饰器模式,在ByteBufferOutputStream上装饰一层DataOutputStream
        // 最终压缩流: compressor.put -> appendStream -> bufferStream -> buffer
        // 最后数据都写到MemoryRecords的buffer缓存里
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    }

    // 构造压缩流
    public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        switch (type) {
            case NONE:
                return new DataOutputStream(buffer);
            case GZIP:
                // GZIPOutputStream是JDK自带的,不需要反射
                return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
            case SNAPPY:
                // snappy需要引入额外的依赖包,缺省不适用snappy压缩的时候,不引入依赖包,为了编译通过这里就用反射动态创建
                OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                return new DataOutputStream(stream);
            case LZ4:
                OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                return new DataOutputStream(stream);
        }
    }

    // 写消息到缓存里,实质调用Record消息静态类的write放大
    private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
        maxTimestamp = Math.max(maxTimestamp, timestamp);
        Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容