目录
- 简单使用示例
- kafka生产者总体架构
- 配置模块
- 拦截器模块
- 序列化模块
- 分区模块
- RecordAccumulator模块
- Sender发送模块
- kafka生产者配置对应源码部分
- 设计模式学习
简单使用示例
public class Test {
private static String topicName;
private static int msgNum;
public static void main(String[] args) {
// 配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*props.put("interceptor.classes", "kafka.productor.ProducerInterceptorDemo,kafka.productor.ProducerInterceptorDemo");*/
topicName = "test1";
// 发送的消息数
msgNum = 10;
// KafkaProducer线程安全类
Producer<String, String> producer = new KafkaProducer<>(props);
//发送带有回调的消息
for (int i = 0; i < msgNum; i++) {
String msg = i + " This is seeger's msg." + System.currentTimeMillis();
producer.send( new ProducerRecord<>(topicName, msg), (metadata, exception) -> {
if (exception != null) {
System.out.println("进行异常处理" + exception.getMessage());
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
producer.close();
}
}
kafka生产者总体架构
-
盗用《apache Kafka源码剖析》的一张图
- 从图中可以学习到:
- 使用KafkaProducer发送消息时只是发送到RecordAccumulator缓存保存。
- 真正将消息发送到kafka服务器的是Send线程,这里相当于nio的selector。
- 利用RecordAccumulator中缓存队列 + send线程来解决主线程发送请求跟实际发送请求到服务器的速率不一致问题,以达到提高效率。也完成了主线程发送请求与实际发送请求到服务器的解耦。相同的做法在- 2000万条数据迁移从几天到几个小时这篇文章也有体现。
- 各个图标含义
- 首先获取配置信息根据配置不同1-11会有不同表现
- 拦截器拦截
- 序列化Key Value
- 选择合适的分区
- RecordAccumulator收集消息批量发送
- send线程从RecordAccumulator获取消息
- 构造ClientRequest
- 将ClientRequest交给NetWorkClient
- NetWorkClient将请求放入KafkaChannel
- 执行真正的网络io
- 收到响应调用ClentRequest回调函数
- 调用RecordBatch回调函数,最终调用每个消息上注册的回调函数。
配置模块
-
参考kafka手动写一个配置模块,时序图
main方法
public class Test {
public static void main(String[] args) {
Properties props = new Properties();
props.put(MockProducerConfig.RETRIES_CONFIG, "2");
props.put(MockProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9091");
props.put("key.serializer", "java.util.HashMap");
MockKafkaProducer mockKafkaProducer = new MockKafkaProducer(props);
}
}
// 测试用,所以简单写
public class MockKafkaProducer {
private final Integer retries;
private final List<String> addresses;
//测试用所以这么写
private final HashMap keySerializer;
public MockKafkaProducer(Properties properties) {
this(new MockProducerConfig(properties));
}
@SuppressWarnings("unchecked")
private MockKafkaProducer(MockProducerConfig config) {
this.retries = config.getInt(MockProducerConfig.RETRIES_CONFIG);
System.out.println("配置已生效retries: " + retries);
addresses = config.getList(MockProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
addresses.stream().forEach((s -> System.out.println("配置已生效addresses :" + s)));
this.keySerializer = config.getConfiguredInstance(MockProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
HashMap.class);
System.out.println("配置已生效keySerializer: " + keySerializer.toString());
}
}
import java.util.Map;
import kafka.productor.config.learn.MockConfigDef.MockType;
/**
* kafka 生产者配置
*/
public class MockProducerConfig extends MockAbstractConfig {
private static final MockConfigDef CONFIG;
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String RETRIES_CONFIG = "retries";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
static {
CONFIG = new MockConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, MockType.LIST, null)
.define(RETRIES_CONFIG, MockType.INT, 0)
.define(KEY_SERIALIZER_CLASS_CONFIG, MockType.CLASS, null);
}
public MockProducerConfig(Map<?, ?> props) {
super(CONFIG, props);
}
}
public class MockAbstractConfig {
/**
* 用户原始配置
*/
private final Map<String, ?> originals;
/**
* 解析后配置
*/
private final Map<String, Object> values;
@SuppressWarnings("unchecked")
public MockAbstractConfig(MockConfigDef definition, Map<?, ?> originals) {
// 校验key是否都属于String
Optional keyString = originals.keySet().stream().filter(key -> !(key instanceof String)).findFirst();
if (keyString.isPresent()) {
throw new MockConfigException(keyString.get().toString(), originals.get(keyString.get()), "Key must be a string.");
}
this.originals = (Map<String, ?>) originals;
this.values = definition.parse(this.originals);
}
public Integer getInt(String key) {
return (Integer) get(key);
}
@SuppressWarnings("unchecked")
public List<String> getList(String key) {
return (List<String>) get(key);
}
protected Object get(String key) {
if (!values.containsKey(key)) {
throw new MockConfigException(String.format("Unknown configuration '%s'", key));
}
return values.get(key);
}
/**
* 利用反射实例化,很好的设计值得借鉴,key.serializer, value.serializer都直接复用这个接口了
*/
public <T> T getConfiguredInstance(String key, Class<T> t) {
Class<?> c = getClass(key);
if (c == null) {
return null;
}
Object o = Utils.newInstance(c);
if (!t.isInstance(o)) {
throw new MockKafkaException(c.getName() + " is not an instance of " + t.getName());
}
return t.cast(o);
}
public Class<?> getClass(String key) {
return (Class<?>) get(key);
}
}
public class MockConfigDef {
public static final String NO_DEFAULT_VALUE = "";
private final Map<String, MockConfigKey> configKeys = new HashMap<>();
public static class MockConfigKey {
public final String name;
public final MockType type;
public final Object defaultValue;
public MockConfigKey(String name, MockType type, Object defaultValue) {
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
}
public boolean hasDefault() {
return this.defaultValue != NO_DEFAULT_VALUE;
}
}
/**
* 配置类型
*/
public enum MockType {
BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
}
public MockConfigDef define(String name, MockType type, Object defaultValue) {
if (configKeys.containsKey(name)) {
throw new MockConfigException("Configuration " + name + " is defined twice.");
}
Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
configKeys.put(name, new MockConfigKey(name, type, parsedDefault));
return this;
}
public Map<String, Object> parse(Map<?, ?> props) {
Map<String, Object> values = new HashMap<>();
for (MockConfigKey key : configKeys.values()) {
Object value;
// props map contains setting - assign ConfigKey value
if (props.containsKey(key.name)) {
value = parseType(key.name, props.get(key.name), key.type);
// props map doesn't contain setting, the key is required because no default value specified - its an error
} else if (key.defaultValue == NO_DEFAULT_VALUE) {
throw new MockConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
} else {
// otherwise assign setting its default value
value = key.defaultValue;
}
// 当然还需要校验传入参数有效性,这里省略
values.put(key.name, value);
}
return values;
}
private Object parseType(String name, Object value, MockType type) {
try {
if (value == null) {
return null;
}
String trimmed = null;
if (value instanceof String) {
trimmed = ((String) value).trim();
}
switch (type) {
case STRING:
if (value instanceof String) {
return trimmed;
} else {
throw new MockConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
}
case INT:
if (value instanceof Integer) {
return (Integer) value;
} else if (value instanceof String) {
return Integer.parseInt(trimmed);
} else {
throw new MockConfigException(name, value, "Expected value to be an number.");
}
case LIST:
if (value instanceof List) {
return (List<?>) value;
} else if (value instanceof String) {
if (trimmed.isEmpty()) {
return Collections.emptyList();
} else {
return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
}
} else {
throw new MockConfigException(name, value, "Expected a comma separated list.");
}
case CLASS:
if (value instanceof Class) {
return (Class<?>) value;
} else if (value instanceof String) {
return Class.forName(trimmed, true, MockUtils.getContextOrMockKafkaClassLoader());
} else {
throw new MockConfigException(name, value, "Expected a Class instance or class name.");
}
default:
throw new IllegalStateException("Unknown type.");
}
} catch (NumberFormatException e) {
throw new MockConfigException(name, value, "Not a number of type " + type);
} catch (ClassNotFoundException e) {
throw new MockConfigException(name, value, "Class " + value + " could not be found.");
}
}
}
public class MockConfigException extends MockKafkaException {
private static final long serialVersionUID = 1L;
public MockConfigException(String message) {
super(message);
}
public MockConfigException(String name, Object value) {
this(name, value, null);
}
public MockConfigException(String name, Object value, String message) {
super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
}
}
public class MockKafkaException extends RuntimeException {
private final static long serialVersionUID = 1L;
public MockKafkaException(String message, Throwable cause) {
super(message, cause);
}
public MockKafkaException(String message) {
super(message);
}
public MockKafkaException(Throwable cause) {
super(cause);
}
public MockKafkaException() {
super();
}
}
public class MockUtils {
public static ClassLoader getContextOrMockKafkaClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return cl == null ? getMockKafkaClassLoader() : cl;
}
public static ClassLoader getMockKafkaClassLoader() {
return MockUtils.class.getClassLoader();
}
public static <T> T newInstance(Class<T> c) {
try {
return c.newInstance();
} catch (IllegalAccessException e) {
throw new MockKafkaException("Could not instantiate class " + c.getName(), e);
} catch (InstantiationException e) {
throw new MockKafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
} catch (NullPointerException e) {
throw new MockKafkaException("Requested class was null", e);
}
}
}
拦截器模块
- todo 图
- 在配置里面添加interceptor.classes,可加入自定义实现ProducerInterceptor的拦截器
- 配置晚后初始化KafKaProducer时,会自动帮初始化interceptors属性,改属性是ProducerInterceptors类,持有我们拓展的ProducerInterceptor集合
- 发送时调用拦截器
序列化模块
- todo图
- 在配置里面可以指定key, value指定的序列化实现类,也可以自定义
- 通过不同配置,调用不同实现,相当于是策略模式,由客户端指定序列化方式
分区模块
- 优先根据指定partation序号分区
- 如果没指定那么,Partitioner有默认实现类DefaultPartitioner, 如果消息没有key会根据count与分区个数取模来决定分配到哪个分区,count(AtomicInteger类型保证线程安全)会不断自增。如果存在key会对key进行hash,使用murmur底碰撞Hash算法,然后与分区数取模来达到负载均衡。根据分区数来的,所以分区数一定确定不能改变,否则hash会出现不一致
-
小技巧,使用接口来达到配置项与对外提供的配置项提供统一的参数
RecordAccumulator模块
总览之并发安全设计
- 队列对象因为读多写少所以用cow,key是TopPartition,所以写少。
- kafka同步发送如何实现,同步发送在send()结果加个get()即可实现,本质上同步是调用FutureRecordMetadata.get方法, 实际是使用CountDownLatch实现。
- NIO需要有Buffer,而创建和销毁Buff比较耗时,所以Kafka弄了个个BufferPool,BufferPool线程安全是仿造AQS实现
- 有些不可变类的设计保证线程安全
- append大体流程(这里只是提取关键信息,详细可看后文)
5.1 步骤2 加锁,以实现非线程安全队列的插入
5.2 步骤5 追加失败 则向Buffer对象池中申请对象,Buffer是线程安全的设计
5.3 向Buffer对象池申请到对象后,再加锁,重试插入必须要重试,否则有可能产生内存碎片。
数据结构
- batches: 队列,cow类型
- batchSize: 队列中RecordBatch底层ByteBuffer大小
- compression:压缩类型
- incomplete:未发送完成的RecordBatch集合
- free: BufferPool对象
- KafkaProducer的send方法最终会调用RecordAccumulator.append方法添加到RecordAccumulator中。
前置知识之队列对象
- RecordAccumulator持有一个队列对象,这个队列对象读多写少,所以用了COW, 而不是用ConcurrentHashMap。TopicPartition 作为Map的key被设计成不可变类,不可变用来put再查找才没问题,否则设计成可变可能查找不到。重写equals一定要重写hashcode 。否则两对象相同如果hashcode不同,put map里面的时候可能存在放到不同桶里面,这就违反了单一数据原则。hashcode相同equals可以不同,map里面有体现。
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
@Override
public int hashCode() {
if (hash != 0)
return hash;
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopicPartition other = (TopicPartition) obj;
if (partition != other.partition)
return false;
if (topic == null) {
if (other.topic != null)
return false;
} else if (!topic.equals(other.topic))
return false;
return true;
}
@Override
public String toString() {
return topic + "-" + partition;
}
}
前置知识之MemoryRecords
- 队列对象的RecordBatch持有MemoryRecords对象引用,这里是真是保存数据的地方,
MemoryRecords持有Compressor对象,用来进行压缩处理。Compressor对象有几个比较重要的属性。如图装饰器模式典型应用展示,bufferStream通过添加自动扩容给ByteBuffer 添加功能,appendStream通过添加压缩功能对bufferStream进行功能添加。往recordBatch插入数据时会判断MemoryRecords引用的ByteBuffer是否还可插入,可以的话插入,不可以就new一个新的recordBatch
MemoryRecords:
private final Compressor compressor;
private ByteBuffer buffer;
Compressor:
private final CompressionType type;
private final DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;
- 看下Compressor如何对ByteBuffer实现装饰
public Compressor(ByteBuffer buffer, CompressionType type) {
// 通过kafka配置设置的压缩类型
this.type = type;
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
try {
switch (type) {
case NONE:
return new DataOutputStream(buffer);
// 装饰器模式GZIPOutputStream jdk自带
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
// snappy压缩方式,可以使用反射获取,这种方式在不额外依赖jar包的情况下,可以不用额外依赖
case SNAPPY:
try {
OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
前置知识之RecordBatch
- kafka同步发送如何实现,同步发送在send()结果加个get()即可实现,也就是说同步是调用FutureRecordMetadata.get方法, 实际是使用CountDownLatch。
FutureRecordMetadata:
private final ProduceRequestResult result;
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await();
return valueOrError();
}
ProduceRequestResult:
private final CountDownLatch latch = new CountDownLatch(1);
public void await() throws InterruptedException {
latch.await();
}
-
插入记录的RecordBatch和插入后返回的FutureRecordMetadata共同持有同一个ProduceRequestResult, 实现的手段也比较简单,就是讲返回 FutureRecordMetadata调用插入的RecordBatch中方法,再将RecordBatch对应属性回传给FutureRecordMetadata。
- 当RecordBatch发送完成时会调用ProduceRequestResult的done方法。
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
this.topicPartition = topicPartition;
this.baseOffset = baseOffset;
this.error = error;
// 这里实现了,get同步的结束
this.latch.countDown();
}
前置知识之BufferPool
-
ByteBuffer分配和释放比较耗资源,Kafka实现了一个缓存池BufferPool。
- 主要看下分配和释放资源的代码,分配资源:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//加锁同步
this.lock.lock();
try {
// 请求的是指定大小而且free有空闲的则直接从空闲队列返回
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// free队列都是固定poolableSize大小的
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
// 为了让availableMemory > size,freeUp方法会从free队列不断释放ByteBuff
freeUp(size);
this.availableMemory -= size;
lock.unlock();
// new一个Heap的ByteBuff
return ByteBuffer.allocate(size);
} else {
// 没有足够空间只能阻塞
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
//放到等待队列
this.waiters.addLast(moreMemory);
// 循环等待
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// 阻塞等待, await会释放锁
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// 异常则移除阻塞队列
this.waiters.remove(moreMemory);
throw e;
} finally {
// 时间统计
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
// 超时报错
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// 有空闲了
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// 先分配一部分等更多的
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// 移除等待队列
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// 还是有空闲唤醒下一个线程
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// 解锁
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
- 看下释放空间的
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
// 释放的是poolableSize大小的,则放入free队列管理
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
//释放的不是poolableSize大小,仅仅修改availableMemory的值
this.availableMemory += size;
}
// 唤醒一个因空间不足的线程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
append实现逻辑
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// 统计正在向RecordAccmulator中追加的线程数
appendsInProgress.incrementAndGet();
try {
// 步骤1 查找TopPartition对应的Deque
Deque<RecordBatch> dq = getOrCreateDeque(tp);
// 步骤2 加锁,synchronized不把free.allocate一起加进来的原因是:
// 减少锁的持有时间,free.allocate会阻塞,假设线程1消息比较大,线程2消息比较小
// 线程1不能插入现有的RecordBatch需要new一个,线程2可以插入,此时如果free.allocate也在同步代码块
// 如果线程2这样的线程比较多,则会造成多个线程阻塞。
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
// 步骤3 追加record
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
// 步骤4 成功则直接返回
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
// 步骤5 追加失败 则向Buffer对象池中申请对象
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
// 这里加锁是为了避免内存碎片,如果不加锁多线程可能会创建多个RecordBatch,后续线程插入只插入最后一个
// 那么,有几个已经RecordBatch就不会被插入。
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
// 步骤7 追加record,重试的原因为: 此时如果有多个线程阻塞在free.allocate,某个线程成功了,则新建RecordBatch
// 其他线程插入此时的RecordBatch即可
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) {
// 步骤8 追加成功需释放资源并返回,因为只有new的RecordBatch才需要申请新的Buffer
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
// 步骤9 在新建的RecordBatch中新增record
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
// 步骤10 添加到incomplete中
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
Sender发送模块
- kafka是封装了java的nio,所以如果没有nio基础需要先学习Nio以便更好理解sender发送模块
发送时序图
- 这里主要讲send和poll方法,因为这里是真正出发请求发送到Node的源码
send和poll主体流程
- 这里send只是将需要发送的请求标记为SelectionKey.OP_WRITE,真正发送的主体还是poll流程
poll流程
public void poll(long timeout) throws IOException {
/* check ready keys */
int numReadyKeys = select(timeout);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
pollSelectionKeys(readyKeys, false, endSelect);
// 清除所有SelectionKey,避免下一次在进行处理
readyKeys.clear();
//处理发起连接时,马上就建立连接的请求,这种一般只在broker和client在同一台机器上才存在
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
}
//将暂存起来的网络响应添加到已完成网络响应集合里面
addToCompletedReceives();
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
boolean sendFailed = false;
//READ事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
//read方法会从channel中将数据读取到Buffer中(还是通过KafkaChannel中的transportLayer),
while ((networkReceive = channel.read()) != null) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
//将读到的请求存起来
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
}
//写事件
if (channel.ready() && key.isWritable()) {
//从buffer中写入数据到Channel(KafkaChannel中的transportLayer)
Send send = channel.write();
}
}
- 为何没注册 读也可以读key.interestOps可以多熟悉
finish connect的时候这么做, 写入之后有写回就可以读了
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- 时序性保证
- 应用在 Server 端时,Server 为了保证消息的时序性,在 Selector 中提供了两个方法:mute(String id) 和 unmute(String id),对该 KafkaChannel 做标记来保证同时只能处理这个 Channel 的一个 request(可以理解为排它锁)。当 Server 端接收到 request 后,先将其放入 stagedReceives 集合中,此时该 Channel 还未 mute,这个 Receive 会被放入 completedReceives 集合中。Server 在对 completedReceives 集合中的 request 进行处理时,会先对该 Channel mute,处理后的 response 发送完成后再对该 Channel unmute,然后才能处理该 Channel 其他的请求
- 应用在 Client 端时,Client 并不会调用 Selector 的 mute() 和 unmute() 方法,client 发送消息的时序性而是通过 InFlightRequests(保存了max.in.flight.requests.per.connection参数的值) 和 RecordAccumulator 的 mutePartition 来保证的,因此对于 Client 端而言,这里接收到的所有 Receive 都会被放入到 completedReceives 的集合中等待后续处理。
- 传输过程中tcp能保证消息按顺序到达
- 总结下发送流程
- sender 线程第一次调用 poll() 方法时,初始化与 node 的连接;
- sender 线程第二次调用 poll() 方法时,发送 Metadata 请求;
- sender 线程第三次调用 poll() 方法时,获取 metadataResponse,并更新 metadata。
kafka生产者配置对应源码部分
参数名称 | 说明 | 默认值 |
---|---|---|
acks | 用于设置在什么情况一条才被认为已经发送成功了。acks=0:msg 只要被 producer 发送出去就认为已经发送完成了;acks=1:如果 leader 接收到消息并发送 ack (不会等会该 msg 是否同步到其他副本)就认为 msg 发送成功了; acks=all或者-1:leader 接收到 msg 并从所有 isr 接收到 ack 后再向 producer 发送 ack,这样才认为 msg 发送成功了,这是最高级别的可靠性保证。 | 1 |
buffer.memory | producer 可以使用的最大内存,如果超过这个值,producer 将会 block max.block.ms 之后抛出异常 | 32mb |
compression.type | Producer 数据的压缩格式,可以选择 none、gzip、snappy、lz4, 策略模式实现 | none |
retries | msg 发送失败后重试的次数,允许重试,如果 max.in.flight.requests.per.connection 设置不为1,可能会导致乱序. 实现原理是可重试的异常重新丢入队列 | 0 |
batch.size | producer 向 partition 发送数据时,是以 batch 形式的发送数据,当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送 batch | 16kb |
linger.ms | 在一个 batch 达不到 batch.size 时,这个 batch 最多将会等待 linger.ms 时间,超过这个时间这个 batch 就会被发送 | 0 |
max.in.flight.requests.per.connection | 对一个 connection,同时发送最大请求数,不为1时,不能保证顺序性。 | 5 |
设计模式学习
- 配置时配置TransportLayer的实现类,运行时根据配置参数决定具体实现类
- Builder模式,解析配置时使用, 效果类似连续. append().append()
- 装饰器模式 new DatOp(new BuyyteOp)