一、参数调优
//batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
properties.put("batch.size", /16384 * 100/ 1024 * 128); //16384是默认值 16k 改成128k
properties.put("linger.ms", 10);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送
以上两个参数是配置batch提交策略,缓存满提交OR定时flash
properties.put("buffer.memory", 128 * 1024 * 1024 ); //默认值为:33554432合计为32M 修改为128M
properties.put("max.request.size",10 * 1024 * 1024); //每次发送给Kafka服务器请求的最大大小 默认是1M 调整为10M
二、修改sink分区策略
自定义分区器
public class RandomKafkaPartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private static final ThreadLocalRandom ran = ThreadLocalRandom.current();
public RandomKafkaPartitioner() {
}
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
return partitions[this.ran.nextInt(1000000) % partitions.length];
}
public boolean equals(Object o) {
return this == o || o instanceof RandomKafkaPartitioner;
}
public int hashCode() {
return RandomKafkaPartitioner.class.hashCode();
}
}
创建Sink 算子
new FlinkKafkaProducer011<>("LOGWRITTER_OTHER_TEST",
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
propertiesResult,
Optional.of(new RandomKafkaPartitioner<>()),
FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE,
10)