Flink HA
Flink HA 的HighAvailabilityMode类中定义了是那种高可用性模式枚举:
- NONE:非HA模式
- ZOOKEEPER:基于ZK实现HA
- FACTORY_CLASS:自定义HA工厂类,实现HighAvailabilityServiceFactory接口。
ZooKeeperHaService主要提供了创建LeaderRetrievalService和LeaderElectionService等方法,并给出了各个服务组件使用的ZK节点名称。
Flink Exactly-once实现原理解析
流处理引擎通常为用户的应用程序提供是那种数据处理语义:最多一次、至少一次、精确一次。
- 最多一次: 用户数据只会被处理一次,不管成功还是失败,不会重试也不会重发。
- 至少一次: 系统会保证数据或事件被处理一次。如果中间发生错误或者丢失,就会重发或者重试。
- 精确一次: 每一条数据只会被精确地处理一次,不多也不少。
Flink的快照可以到算子级别,并且对全局数据也可以做快照。
Flink分布式快照的核心元素之一是Barrier,该标记是严格有序的,并随着数据往下流动。
每个流的barrier n到达时间不一致怎么办,这是Flink采取的措施是快流等慢流。
Flink在做存储时,可采用异步方式,每次都是进行的全量checkpoint,是基于上次进行更新的。
快照机制能够保证作业出现fail-over后可以从最新的快照进行恢复,即分布式快照机制可以保证Flink系统内部的精确一次处理。
两阶段处理继承TwoPhaseCommitSinkFunction,需要实现beginTransaction、preCommit、commit、abort方法来实现精确一次的处理语义,
- beginTransaction:在开启事务之前,在目标文件系统的临时目录中创建一个临时文件,后面在处理数据时将数据写入此文件。
- preCommit:在预提交阶段,刷写文件,然后关闭文件,之后就不能写入到文件,为属于下一个检查点的任何后续写入启动新事务。
- commit:在提交阶段,将预提交的文件原子性移动到真正的目标目录中,这会增加输出数据可见性的延迟。
- abort:在终止阶段,删除临时文件。
Kafka-Flink-Kafka过程:
- Flink开始做checkpoint操作, 进入pre-commit阶段,同时Flink JobManager会将检查点Barrier注入数据流中。
- 当所有barrier在算子中成功进行一遍传递,并完成快照后,则pre-commit阶段完成
- 等所有的算子完成预提交,就会发起一个提交动作,但是任何一个预提交失败都会导致Flink回滚到最近的checkpoint;
- pre-commit完成,必须要确保commit也要成功。
如何排查生产环境中的反压问题
不同框架的反压对比:
- Storm:从1.0版本之后引入反压,Storm会主动监控工作节点,工作节点接收数据超过阈值,反压信息会被发送到ZooKeeper,ZooKeeper通知所有的工作节点
进入反压状态,最后数据的生产源头会降低数据的发送速度。- Spark Streaming:RateController组件,利用经典的PID算法,根据消息数量、调度时间、处理时间等计算出来速率,然后进行限速。
- Flink:利用网络传输和动态限流,流中的数据在算子间进行计算和转换时,会被放入分布式的阻塞队列中。当消费者的阻塞队列满时,则会降低生产者的数据生产速度。
Flink Web UI Back Pressure出现数值:
- OK: 0<=Ratio<=0.10,正常;
- LOW:0.10<Ratio<=0.50,一般;
- HIGH: 0.5 < Ratio <=1,严重。
指标名称 | 用途 | 解释 |
---|---|---|
outPoolUsage | 发送端缓冲池的使用率 | 当前Task的数据发送率,如果数值很低,当前节点有可能为反压节点 |
inPoolUsage | 接收端缓冲池的使用率 | Task的接收速度,inPoolUsage很高,outPoolUsage很低,这个节点有可能是反压节点 |
floatingBuffersUsage | 处理节点缓冲池的使用率 | |
exclusiveBuffersUsage | 数据输入方缓冲池的使用率 |
反压问题处理:
- 数据倾斜:使用类似的KeyBy等分组聚合函数导致,需要用户将热点key进行预处理,降低或者消除热点key的影响。
- GC:使用-XX:+PrintGCDetails参数查看GC日志
- 代码本身:查看机器的CPU、内存使用
如何处理生产环境中的数据倾斜问题
两阶段聚合解决KeyBy热点
根据type进行KeyBy时,如果数据的type分布不均匀就会导致大量的数据分配到一个task中,发生数据倾斜。解决的思路为:
- 首先把分组的key打散,比如添加随机后缀;
- 对打散后的数据进行聚合;
- 将打散的key还原为原先的key
- 二次KeyBy进行结果统计,然后输出。
Flink消费Kafka数据时,要保证Kafka的分区数等于Flink Consumer的并行度。如果不一致,需要设置Flink的Redistributing(数据充分配),
Rebalance分区策略,数据会以round-robin的方式对数据进行再次分区,可以全局负载均衡。
Rescale分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中。
生产环境中的并行度和资源设置
在Flink集群中,一个TaskManager就是一个JVM进程,并且会用独立的线程来执行task,slot仅仅用来做内存的隔离,对CPU不起作用。
默认情况下,Flink还允许同一个Job的子任务共享slot。
Flink自身会把不同的算子的task连接在一起组成一个新的task。因为task在同一个线程中执行,可以有效减少线程间上下文的切换,减少序列化/反序列化带来的资源消耗,
提高任务的吞吐量。
并行度级别:算子级别、环境级别、客户端级别、集群配置级别。
在生产中,推荐在算子级别显式指定各自的并行度,方便进行显式和精确的资源控制。
环境级别:任务中的所有算子的并行度都是指定的值,生产环境不推荐。
设置并行度的优先级为:算子级别 > 环境级别 > 客户端级别 > 集群级别配置。
Flink如何做维表关联
业务对维表数据关联的时效性要求,有以下几种解决方案:
- 实时查询维表:用户在Flink算子中直接访问外部数据库,这种是同步方式,数据保证是最新的。
- 预加载全量数据:每次启动时,将维表中全部数据加载到内存中。
- LRU缓存:将最近最少使用的数据则被淘汰。
实时查询维表
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DimSync extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
private Connection conn = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
}
public Order map(String in) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(in);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//根据city_id 查询 city_name
PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
pst.setInt(1,cityId);
ResultSet resultSet = pst.executeQuery();
String cityName = null;
while (resultSet.next()){
cityName = resultSet.getString(1);
}
pst.close();
return new Order(cityId,userName,items,cityName);
}
public void close() throws Exception {
super.close();
conn.close();
}
}
要保证及时关闭连接池
public class Order {
private Integer cityId;
private String userName;
private String items;
private String cityName;
public Order(Integer cityId, String userName, String items, String cityName) {
this.cityId = cityId;
this.userName = userName;
this.items = items;
this.cityName = cityName;
}
public Order() {
}
public Integer getCityId() {
return cityId;
}
public void setCityId(Integer cityId) {
this.cityId = cityId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getItems() {
return items;
}
public void setItems(String items) {
this.items = items;
}
public String getCityName() {
return cityName;
}
public void setCityName(String cityName) {
this.cityName = cityName;
}
@Override
public String toString() {
return "Order{" +
"cityId=" + cityId +
", userName='" + userName + '\'' +
", items='" + items + '\'' +
", cityName='" + cityName + '\'' +
'}';
}
}
预加载全量数据
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class WholeLoad extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
ScheduledExecutorService executor = null;
private Map<String,String> cache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
e.printStackTrace();
}
}
},5,5, TimeUnit.MINUTES);
}
@Override
public Order map(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
String cityName = cache.get(cityId);
return new Order(cityId,userName,items,cityName);
}
public void load() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
ResultSet rs = statement.executeQuery();
//全量更新维度数据到内存
while (rs.next()) {
String cityId = rs.getString("city_id");
String cityName = rs.getString("city_name");
cache.put(cityId, cityName);
}
con.close();
}
}
LRU缓存
import com.alibaba.fastjson.JSONObject;
import com.stumbleupon.async.Callback;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class LRU extends RichAsyncFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
String table = "info";
Cache<String, String> cache = null;
private HBaseClient client = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建hbase客户端
client = new HBaseClient("127.0.0.1","7071");
cache = CacheBuilder.newBuilder()
//最多存储10000条
.maximumSize(10000)
//过期时间为1分钟
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
@Override
public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(input);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//读缓存
String cacheCityName = cache.getIfPresent(cityId);
//如果缓存获取失败再从hbase获取维度数据
if(cacheCityName != null){
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(cacheCityName);
resultFuture.complete(Collections.singleton(order));
}else {
client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
for (KeyValue kv : arg) {
String value = new String(kv.value());
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(value);
resultFuture.complete(Collections.singleton(order));
cache.put(String.valueOf(cityId), value);
}
return null;
});
}
}
}
海量数据去重
Flink中实时去重的方案:
- 基于状态后端
- 基于HyperLogLog
- 基于布隆过滤器
- 基于BitMap
- 基于外部数据库
基于状态后端
状态后端的种类之一是RocksDBStateBackend,它会将正在云心中的状态数据保存在RockDB数据库中,该数据库默认将数据存储在TaskManager运行节点的数据目录下。
计算每天每个商品的访问量:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
private transient ValueState<Integer> counts;
@Override
public void open(Configuration parameters) throws Exception {
//我们设置ValueState的TTL的生命周期为24小时,到期自动清除状态
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//设置ValueState的默认值
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
descriptor.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String f0 = value.f0;
//如果不存在则新增
if(counts.value() == null){
counts.update(1);
}else{
//如果存在则加1
counts.update(counts.value()+1);
}
out.collect(Tuple2.of(f0, counts.value()));
}
}
基于HyperLogLo
HyperLogLog是一种估计统计算法,被用来统计一饿集合中不同数据的个数。
import net.agkn.hll.HLL;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
@Override
public HLL createAccumulator() {
return new HLL(14, 5);
}
@Override
public HLL add(Tuple2<String, Long> value, HLL accumulator) {
//value为购买记录 <商品sku, 用户id>
accumulator.addRaw(value.f1);
return accumulator;
}
@Override
public Long getResult(HLL accumulator) {
long cardinality = accumulator.cardinality();
return cardinality;
}
@Override
public HLL merge(HLL a, HLL b) {
a.union(b);
return a;
}
}
添加相应的pom依赖:
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
如果元素是非数值型,需要hash过后才能插入。
基于布隆过滤器
BloomFilter类似于一个HashSet,用于快速判断某个元素是否存在与集合中,其典型的应用场景就是能够快速判断一个key是否存在某个容器中,不存在直接返回。
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
@Override
public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
BloomFilter bloomFilter = bloomState.value();
Long skuCount = countState.value();
if(bloomFilter == null){
BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
}
if(skuCount == null){
skuCount = 0L;
}
if(!bloomFilter.mightContain(value)){
bloomFilter.put(value);
skuCount = skuCount + 1;
}
bloomState.update(bloomFilter);
countState.update(skuCount);
out.collect(countState.value());
}
}
BitMap
HyperLogLog 和BloomFilter虽然减少了存储但是丢失了精度。
BitMap的基本思想是用一个bit位来标记某个元素对应的value,而key即是该元素。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}
@Override
public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
accumulator.add(value);
return accumulator;
}
@Override
public Long getResult(Roaring64NavigableMap accumulator) {
return accumulator.getLongCardinality();
}
@Override
public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
return null;
}
}
添加依赖:
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.21</version>
</dependency>