业务详解
处理数据主要包括的是从互联网上采集来的数据,包括常见的新闻、微博、论坛、贴吧、博客、微博、微信等信源,对原始格式进行实时响应处理,以期望满足业务系统使用,这便是当前数据流中数据处理的服务宗旨。
技术点概览
利用广播变量动态更新规则
- 业务场景
数据流中会动态实时加载规则表,而这种实时性要求并不强烈,可以是分钟级别、小时级别;所以没有必要浪费大量资源对于每一条数据都进行一次数据库请求。所以适合的应用场景是以固定的频率更新各个算子中的规则,当然这种规则所占用的内存消耗是比较小的。 - 技术组件 - BroadcastVariable
前置条件: Flink 中多并行度的情况下,每个算子或者不同算子运行所在的 Slot 不一致,这就导致它们不会共享同一个内存,也就不可以通过静态变量的方式去获取这些共享变量值。
BroadcastVariable: 可以理解是一个公共的共享变量(可能是固定不变的数据集合,也可能是动态变化的数据集合),在作业中将该共享变量广播出去,然后下游的所有任务都可以获取到该共享变量,这样就可以不用将这个变量拷贝到下游的每个任务中。
使用方式:
1: 动态读取规则
继承 RichSourceFunction,对应以下实现:
public class GetHostSourceMappingFunction extends RichSourceFunction<Set<HostSourceMappingEntity>> {
private static final Logger logger =
LoggerFactory.getLogger(GetHostSourceMappingFunction.class);
private volatile boolean isRunning = true;
private volatile String key = null;
private volatile String connector = null;
BaseConfigEntity configEntity;
// 这里的实例不可以是单例,该算子会有自己的生命周期
private JedisClusterUtil jedisClusterUtil;
public static final String [] TYPE_ARRAY = {
FieldConstants.NEWS,
FieldConstants.MEDIA
};
public GetHostSourceMappingFunction(BaseConfigEntity configEntity) {
this.configEntity = configEntity;
}
@Override
public void open(Configuration parameters) throws Exception{
connector = configEntity.getRedis().getConnector();
key = JedisClusterUtil.combineKey(configEntity.getRedis().getHostSourceMappingKey(),configEntity.getRedis());
jedisClusterUtil = new JedisClusterUtil(configEntity.getRedis());
}
@Override
public void run(SourceContext<Set<HostSourceMappingEntity>> sourceContext) throws Exception {
while (isRunning) {
Set<HostSourceMappingEntity> set = new CopyOnWriteArraySet<>();
long start = System.currentTimeMillis();
for (String type:TYPE_ARRAY) {
try {
String result = jedisClusterUtil.getValue(key + connector + type);
if(StringUtils.isNotEmpty(result)){
set.addAll(JSONArray.parseArray(result,HostSourceMappingEntity.class));
}
}catch(Exception e){
logger.error("Maybe occur NNllException ,because the type does not exist,and ignore it",e);
}
}
logger.info("select host source mapping from redis cluster, the rule size :{}, " +
"the used time(ms) is :{}",
set.size(),(System.currentTimeMillis() - start));
// 往下游发送数据集
sourceContext.collect(set);
//以固定的频率进行规则的动态更新
Thread.sleep(configEntity.getRedis().getInterval());
}
}
@Override
public void cancel() {
try{
super.close();
// 进行连接的释放
jedisClusterUtil.close();
}catch (Exception e) {
logger.error("runException:{}", e);
}
isRunning = false;
}
}
- 主数据关联规则数据
DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingDataStream = env.addSource(new GetHostSourceMappingFunction(configEntity)).
setParallelism(configEntity.getFlink().getRedisSourceParallelism());
// main stream connect rule stream;可以认为是大数据流和小数据流的join操作
SingleOutputStreamOperator<JSONObject> hostSourceFormatDataStream = HostSourceMappingConnectOperator.
formatSourceConnectOperator(blackFilterDataStream, hostSourceMappingDataStream).
setParallelism(configEntity.getFlink().getMediumParallelism());
- 数据处理
- processElement 事件处理方法
- processBroadcastElement 广播处理方法
- 广播状态中的事件顺序可能因任务而异:尽管广播流的数据元保证所有数据元将(最终)转到所有下游任务,但数据元可能以不同的顺序到达每个任务。因此,每个传入数据元的状态更新不得取决于传入事件的顺序。言外之意,1和2方法没有先后执行之分, 在 processElement方法中对于获取的广播变量要做好异常值判定,因为很可能processBroadcastElement中还没有进行初始化操作。详情代码如下:
public class HostSourceMappingConnectOperator {
/** logger */
private static final Logger logger = LoggerFactory.getLogger(HostSourceMappingConnectOperator.class);
private static String HOST_SOURCE_MAPPING_NAME = "host_source_mapping_name";
final static MapStateDescriptor<String, Set<HostSourceMappingEntity>> HOST_SOURCE_MAPPING;
static {
HOST_SOURCE_MAPPING = new MapStateDescriptor<>(
HOST_SOURCE_MAPPING_NAME,
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Set<HostSourceMappingEntity>>(){}));
}
public static SingleOutputStreamOperator<JSONObject> formatSourceConnectOperator(DataStream<JSONObject> jsonObjectDataStream,
DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingStream){
SingleOutputStreamOperator<JSONObject> filterDataConnectStream = jsonObjectDataStream.connect(hostSourceMappingStream.broadcast(HOST_SOURCE_MAPPING))
.process(new BroadcastProcessFunction<JSONObject, Set<HostSourceMappingEntity>, JSONObject>() {
/**
main stream 处理
*/
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
ReadOnlyBroadcastState<String, Set<HostSourceMappingEntity>> broadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
Set<HostSourceMappingEntity> hostSourceSet = broadcastState.get(HOST_SOURCE_MAPPING_NAME);
if (!FieldConstants.WEI_BO.equals(value.getString(FieldConstants.DOC_TYPE)) &&
!FieldConstants.WEI_XIN.equals(value.getString(FieldConstants.DOC_TYPE)) &&
!CollectionUtils.isEmpty(hostSourceSet)) {
HostSourceMappingEntity targetEntity = null;
for (HostSourceMappingEntity hostSourceMappingEntity : hostSourceSet) {
if (value.getString(FieldConstants.URL).contains(hostSourceMappingEntity.getHost())
&& hostSourceMappingEntity.getHost().length() > (targetEntity == null ? 0 : targetEntity.getHost().length())) {
targetEntity = hostSourceMappingEntity;
}
}
if (targetEntity != null) {
logger.info("[target host mapping]" + targetEntity);
value.put(FieldConstants.SOURCE, targetEntity.getTargetSource());
value.put(FieldConstants.DOC_TYPE, targetEntity.getTargetDocType());
}
}else{
if(CollectionUtils.isEmpty(hostSourceSet)) {
logger.warn("[hostSource format] broadcast load lazy !!!!!!");
}
}
out.collect(value);
}
/**
处理广播变量数据
*/
@Override
public void processBroadcastElement(Set<HostSourceMappingEntity> value, Context ctx, Collector<JSONObject> out) throws Exception {
if (CollectionUtil.isNullOrEmpty(value)) {
return;
}
BroadcastState<String, Set<HostSourceMappingEntity>> hostSourceMappingBroadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
hostSourceMappingBroadcastState.put(HOST_SOURCE_MAPPING_NAME, value);
}
}).name(FLINK_OPERATOR_HOST_SOURCE_MAPPING_FROM_REDIS);
return filterDataConnectStream;
}
}
-
整体的数据流图片段
参数的解析和配置文件规范化设置
- 关于配置文件,相信大家很少再去使用properties,取而代之的是YAML,其良好的层次结构,以及和Java Bean的映射,让大家爱不释手。举例如下:
#kafka
kafka:
kafkaZk: "172.24.4.18:2181,172.24.4.19:2181,172.24.4.20:2181"
kafkaTopic: "mf-dev-hl"
kafkaBrokerList: "172.24.2.78:9092,172.24.2.79:9092,172.24.2.80:9092"
groupId: "mf-dev-hl"
autoOffsetReset: "earliest"
fetchMessageMaxBytes: 4194304
#redis cluster
redis:
ipPort: "172.24.4.18:7000,172.24.4.18:7001,172.24.4.18:7002,172.24.4.19:7000,172.24.4.19:7001,172.24.4.19:7002"
businessId: mf
spamWebSiteKey: spam_website
connector: _
areaCategoryMappingKey: area_mapping_dict
mediaRegionMappingKey: media_region
#es cluster
es:
clusterName: cluster_index
nodes: "192.168.x.x,192.168.x.x"
tcpPorts: "9300,9300"
httpPorts: "9200"
flink:
parallelism: 1
appName: Test1
- 关于命令行解析的使用
Flink 官方提供ParameterTool应具有较好的实用性,在官方最佳实践篇也具有较好的解释;refer:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/best_practices.html
在这里我使用的是: apache.commons.cli,也具有极好的使用性和易用性;代码片段如下:
private static Options initOptions(){
Options options = new Options();
options.addOption(Option.builder(ARGS_TYPE).hasArg(true).required(true).desc("the logistic type in [hl,zhxg,bfd]").build());
options.addOption(Option.builder(ARGS_ENV).hasArg(true).required(true).desc("the env in [dev,test,prod]").build());
return options;
}
public static CommandLine getCommandLineWithCheck(String[] args){
CommandLine cmd = null;
try {
CommandLineParser parser = new DefaultParser();
cmd = parser.parse( initOptions(), args);
} catch (ParseException e) {
logger.error("args parse exception, and invoke system exit function",e);
}finally {
if(cmd == null){
logger.error("参数解析异常,退出系统");
System.exit(-1);
}
if(!ARGS_TYPE_VALUES.contains(cmd.getOptionValue(ARGS_TYPE))){
logger.error("the type must in " + ARGS_TYPE_VALUES);
System.exit(-1);
}
if(!ARGS_ENV_VALUES.contains(cmd.getOptionValue(ARGS_ENV))){
logger.error("the env must in " + ARGS_ENV_VALUES);
System.exit(-1);
}
}
return cmd;
}
关于TaskManger、slot(task slot)、parallelism的认知
- TaskManager: 独立的JVM进程,其并行能力由slot个数决定。配置文件中默认如下:taskmanager.numberOfTaskSlots: 1, 每个Taskmanager由一个slot组成,即其并行度为1。该 slot独享内存空间,如果有多个slot,那么均分Taskmanager内存空间。
-
slot:在flink中, slot是资源组的含义。那么他有以下几个特性:
- TaskManager 是从 JobManager 处接收需要部署的 Task,任务能配置的最大并行度由 TaskManager 上可用的 Slot 决定。
- 每个任务代表分配给任务槽的一组资源,Slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 Slot 中,这样就可以并行的执行程序。
- TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。
- 默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 Task 的 subtask,只要它们来自相同的 Job,这种共享模式可以大大的提高资源利用率。
parallelism: 对应的算子或者Task的并发执行能力。
-
slot 和 parallelism的区别
-
Slot 是指 TaskManager 最大能并发执行的能力
上图表示每个 TaskManager 中含有3个slot,在 Flink on yarn 中的Per Job模式下, taskManager的个数是动态计算出来的,依据operator的最大并行度(max(p));计算公式:ceil( max(p) / numberOfTaskSlots);
-
-
parallelism 是指 TaskManager 实际使用的并发能力
在并行度为1条件下,只会使用其中一个 slot。接下来以实验来作此说明。
- 实验环境
参数说明:-ys:每个taskManger对应的Slot个数
-ytm: 单个TaskMananger占用的内存大小
-yn: --yarncontainer Number of YARN container to allocate
(=Number of Task Managers);对于Per Job模式是不起作用的,为啦作区分度,下述的日志以 -yn=80
程序启动日志显示:
2020-02-20 00:02:47,157 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2020-02-20 00:02:47,279 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, numberTaskManagers=80, slotsPerTaskManager=5}
ok,相信这一块已经讲的很明白啦,有啦这个基础,便可以进行以下的内容。
关于Operator Chains的理解
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
下图便是进行啦一次算子的合并:
那么算子合并的条件是什么哪?
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
- 上下游节点都在同一个 slot group 中(下面会解释 slot group)
- 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
- 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
- 两个节点间数据分区方式是 forward(参考理解数据流的分区)
- 用户没有禁用 chain
可以认为效果等同于将所有operator的实现都封装于一个大的方法体中串行执行,的确是提供啦极大的灵活性。一套代码即实现啦算子的合并,也可实现算子的拆分。 这里有一篇不错的文章可以参照原理实现:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/#
BackPressure的原理
一旦提起storm、spark streaming、Flink三种常用的数据流框架,最惹人吐槽的便是storm的反压机制,storm是通过监控bolt的负载,暴力的在spout端进行数据的暂停消费。而Flink的反压机制这是基于由下游往上游动态传递反压信息。
未完待续......