基于Flink的实时数据流构建

业务详解

处理数据主要包括的是从互联网上采集来的数据,包括常见的新闻、微博、论坛、贴吧、博客、微博、微信等信源,对原始格式进行实时响应处理,以期望满足业务系统使用,这便是当前数据流中数据处理的服务宗旨。

技术点概览

利用广播变量动态更新规则

  • 业务场景
    数据流中会动态实时加载规则表,而这种实时性要求并不强烈,可以是分钟级别、小时级别;所以没有必要浪费大量资源对于每一条数据都进行一次数据库请求。所以适合的应用场景是以固定的频率更新各个算子中的规则,当然这种规则所占用的内存消耗是比较小的。
  • 技术组件 - BroadcastVariable
    前置条件: Flink 中多并行度的情况下,每个算子或者不同算子运行所在的 Slot 不一致,这就导致它们不会共享同一个内存,也就不可以通过静态变量的方式去获取这些共享变量值。
    BroadcastVariable: 可以理解是一个公共的共享变量(可能是固定不变的数据集合,也可能是动态变化的数据集合),在作业中将该共享变量广播出去,然后下游的所有任务都可以获取到该共享变量,这样就可以不用将这个变量拷贝到下游的每个任务中。
    使用方式:
    1: 动态读取规则
    继承 RichSourceFunction,对应以下实现:
public class GetHostSourceMappingFunction extends RichSourceFunction<Set<HostSourceMappingEntity>> {
    private static    final Logger logger = 
   LoggerFactory.getLogger(GetHostSourceMappingFunction.class);
    private volatile  boolean isRunning = true;
    private volatile  String key = null;
    private volatile  String connector = null;
    BaseConfigEntity configEntity;
    // 这里的实例不可以是单例,该算子会有自己的生命周期
    private JedisClusterUtil jedisClusterUtil; 
    public static final String [] TYPE_ARRAY = {
            FieldConstants.NEWS,
            FieldConstants.MEDIA
    };
    public GetHostSourceMappingFunction(BaseConfigEntity configEntity) {
        this.configEntity = configEntity;
    }
    @Override
    public void open(Configuration parameters) throws Exception{
        connector = configEntity.getRedis().getConnector();
        key = JedisClusterUtil.combineKey(configEntity.getRedis().getHostSourceMappingKey(),configEntity.getRedis());
        jedisClusterUtil = new JedisClusterUtil(configEntity.getRedis());
    }
    @Override
    public void run(SourceContext<Set<HostSourceMappingEntity>> sourceContext) throws Exception {
        while (isRunning) {
           Set<HostSourceMappingEntity> set = new CopyOnWriteArraySet<>();
            long start = System.currentTimeMillis();
            for (String type:TYPE_ARRAY) {
                try {
                    String result = jedisClusterUtil.getValue(key + connector + type);
                    if(StringUtils.isNotEmpty(result)){
                        set.addAll(JSONArray.parseArray(result,HostSourceMappingEntity.class));
                    }
                }catch(Exception e){
                    logger.error("Maybe occur NNllException ,because the type does not exist,and ignore it",e);
                }
            }
           logger.info("select host source mapping  from redis cluster, the rule size :{}, " +
                   "the used time(ms) is :{}",
                   set.size(),(System.currentTimeMillis() - start));
           // 往下游发送数据集
           sourceContext.collect(set);
           //以固定的频率进行规则的动态更新
           Thread.sleep(configEntity.getRedis().getInterval());
        }
    }
    @Override
    public void cancel() {
        try{
            super.close();
            // 进行连接的释放
            jedisClusterUtil.close();
        }catch (Exception e) {
            logger.error("runException:{}", e);
        }
        isRunning = false;
    }
}
  1. 主数据关联规则数据
 DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingDataStream = env.addSource(new GetHostSourceMappingFunction(configEntity)).
                setParallelism(configEntity.getFlink().getRedisSourceParallelism());
 // main stream connect rule stream;可以认为是大数据流和小数据流的join操作 
 SingleOutputStreamOperator<JSONObject> hostSourceFormatDataStream = HostSourceMappingConnectOperator.
                formatSourceConnectOperator(blackFilterDataStream, hostSourceMappingDataStream).
                setParallelism(configEntity.getFlink().getMediumParallelism());
  1. 数据处理
    1. processElement 事件处理方法
    2. processBroadcastElement 广播处理方法
    3. 广播状态中的事件顺序可能因任务而异:尽管广播流的数据元保证所有数据元将(最终)转到所有下游任务,但数据元可能以不同的顺序到达每个任务。因此,每个传入数据元的状态更新不得取决于传入事件的顺序。言外之意,1和2方法没有先后执行之分, 在 processElement方法中对于获取的广播变量要做好异常值判定,因为很可能processBroadcastElement中还没有进行初始化操作。详情代码如下:
public class HostSourceMappingConnectOperator {
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(HostSourceMappingConnectOperator.class);
    private static String  HOST_SOURCE_MAPPING_NAME = "host_source_mapping_name";
    final static MapStateDescriptor<String, Set<HostSourceMappingEntity>> HOST_SOURCE_MAPPING;

    static {
        HOST_SOURCE_MAPPING = new MapStateDescriptor<>(
                HOST_SOURCE_MAPPING_NAME,
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<HostSourceMappingEntity>>(){}));
    }

    public static SingleOutputStreamOperator<JSONObject> formatSourceConnectOperator(DataStream<JSONObject> jsonObjectDataStream,
                                                                                   DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingStream){
        SingleOutputStreamOperator<JSONObject> filterDataConnectStream = jsonObjectDataStream.connect(hostSourceMappingStream.broadcast(HOST_SOURCE_MAPPING))
                .process(new BroadcastProcessFunction<JSONObject, Set<HostSourceMappingEntity>, JSONObject>() {
                    /**
                     main stream 处理
                     */
                    @Override
                    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
                        ReadOnlyBroadcastState<String, Set<HostSourceMappingEntity>> broadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
                        Set<HostSourceMappingEntity> hostSourceSet = broadcastState.get(HOST_SOURCE_MAPPING_NAME);
                        if (!FieldConstants.WEI_BO.equals(value.getString(FieldConstants.DOC_TYPE)) &&
                                !FieldConstants.WEI_XIN.equals(value.getString(FieldConstants.DOC_TYPE)) &&
                                !CollectionUtils.isEmpty(hostSourceSet)) {
                                HostSourceMappingEntity targetEntity = null;
                                for (HostSourceMappingEntity hostSourceMappingEntity : hostSourceSet) {
                                    if (value.getString(FieldConstants.URL).contains(hostSourceMappingEntity.getHost())
                                            && hostSourceMappingEntity.getHost().length() > (targetEntity == null ? 0 : targetEntity.getHost().length())) {
                                        targetEntity = hostSourceMappingEntity;
                                    }
                                }
                                if (targetEntity != null) {
                                    logger.info("[target host mapping]" + targetEntity);
                                    value.put(FieldConstants.SOURCE, targetEntity.getTargetSource());
                                    value.put(FieldConstants.DOC_TYPE, targetEntity.getTargetDocType());
                                }

                        }else{
                            if(CollectionUtils.isEmpty(hostSourceSet)) {
                                logger.warn("[hostSource format] broadcast load lazy !!!!!!");
                            }
                        }
                        out.collect(value);
                    }
                     /**
                       处理广播变量数据
                     */
                    @Override
                    public void processBroadcastElement(Set<HostSourceMappingEntity> value, Context ctx, Collector<JSONObject> out) throws Exception {

                        if (CollectionUtil.isNullOrEmpty(value)) {
                            return;
                        }
                        BroadcastState<String, Set<HostSourceMappingEntity>> hostSourceMappingBroadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
                        hostSourceMappingBroadcastState.put(HOST_SOURCE_MAPPING_NAME, value);
                    }
                }).name(FLINK_OPERATOR_HOST_SOURCE_MAPPING_FROM_REDIS);
        return filterDataConnectStream;
    }
}
  1. 整体的数据流图片段


    广播变量的应用

参数的解析和配置文件规范化设置

  1. 关于配置文件,相信大家很少再去使用properties,取而代之的是YAML,其良好的层次结构,以及和Java Bean的映射,让大家爱不释手。举例如下:
#kafka
kafka:
  kafkaZk: "172.24.4.18:2181,172.24.4.19:2181,172.24.4.20:2181"
  kafkaTopic: "mf-dev-hl"
  kafkaBrokerList: "172.24.2.78:9092,172.24.2.79:9092,172.24.2.80:9092"
  groupId: "mf-dev-hl"
  autoOffsetReset: "earliest"
  fetchMessageMaxBytes: 4194304
#redis cluster
redis:
  ipPort: "172.24.4.18:7000,172.24.4.18:7001,172.24.4.18:7002,172.24.4.19:7000,172.24.4.19:7001,172.24.4.19:7002"
  businessId: mf
  spamWebSiteKey: spam_website
  connector: _
  areaCategoryMappingKey: area_mapping_dict
  mediaRegionMappingKey: media_region

#es cluster
es:
  clusterName: cluster_index
  nodes: "192.168.x.x,192.168.x.x"
  tcpPorts: "9300,9300"
  httpPorts: "9200"
flink:
  parallelism: 1
  appName: Test1
  1. 关于命令行解析的使用
    Flink 官方提供ParameterTool应具有较好的实用性,在官方最佳实践篇也具有较好的解释;refer:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/best_practices.html
    在这里我使用的是: apache.commons.cli,也具有极好的使用性和易用性;代码片段如下:
private static Options initOptions(){
        Options options = new Options();
        options.addOption(Option.builder(ARGS_TYPE).hasArg(true).required(true).desc("the logistic type in [hl,zhxg,bfd]").build());
        options.addOption(Option.builder(ARGS_ENV).hasArg(true).required(true).desc("the env  in [dev,test,prod]").build());
        return options;
    }

    public static CommandLine getCommandLineWithCheck(String[] args){
        CommandLine cmd = null;
        try {
            CommandLineParser parser = new DefaultParser();
            cmd =  parser.parse( initOptions(), args);
        } catch (ParseException e) {
            logger.error("args parse exception, and invoke system exit function",e);
        }finally {
            if(cmd == null){
                logger.error("参数解析异常,退出系统");
                System.exit(-1);
            }
            if(!ARGS_TYPE_VALUES.contains(cmd.getOptionValue(ARGS_TYPE))){
                logger.error("the type must in " + ARGS_TYPE_VALUES);
                System.exit(-1);
            }

            if(!ARGS_ENV_VALUES.contains(cmd.getOptionValue(ARGS_ENV))){
                logger.error("the env must in " + ARGS_ENV_VALUES);
                System.exit(-1);
            }
        }
        return cmd;
    }

关于TaskManger、slot(task slot)、parallelism的认知

  • TaskManager: 独立的JVM进程,其并行能力由slot个数决定。配置文件中默认如下:taskmanager.numberOfTaskSlots: 1, 每个Taskmanager由一个slot组成,即其并行度为1。该 slot独享内存空间,如果有多个slot,那么均分Taskmanager内存空间。
  • slot:在flink中, slot是资源组的含义。那么他有以下几个特性:


    Flink内部图.png
  1. TaskManager 是从 JobManager 处接收需要部署的 Task,任务能配置的最大并行度由 TaskManager 上可用的 Slot 决定。
  2. 每个任务代表分配给任务槽的一组资源,Slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 Slot 中,这样就可以并行的执行程序。
  3. TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。
  4. 默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 Task 的 subtask,只要它们来自相同的 Job,这种共享模式可以大大的提高资源利用率。
  • parallelism: 对应的算子或者Task的并发执行能力。

  • slot 和 parallelism的区别

    1. Slot 是指 TaskManager 最大能并发执行的能力


      slot.png

    上图表示每个 TaskManager 中含有3个slot,在 Flink on yarn 中的Per Job模式下, taskManager的个数是动态计算出来的,依据operator的最大并行度(max(p));计算公式:ceil( max(p) / numberOfTaskSlots);

  1. parallelism 是指 TaskManager 实际使用的并发能力


    parallelism.png

    在并行度为1条件下,只会使用其中一个 slot。接下来以实验来作此说明。

  2. 实验环境
    参数说明:-ys:每个taskManger对应的Slot个数
    -ytm: 单个TaskMananger占用的内存大小
    -yn: --yarncontainer Number of YARN container to allocate
    (=Number of Task Managers);对于Per Job模式是不起作用的,为啦作区分度,下述的日志以 -yn=80
    程序启动日志显示:

2020-02-20 00:02:47,157 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2020-02-20 00:02:47,279 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, numberTaskManagers=80, slotsPerTaskManager=5}

1.png

2.png

ok,相信这一块已经讲的很明白啦,有啦这个基础,便可以进行以下的内容。

关于Operator Chains的理解

为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
下图便是进行啦一次算子的合并:


operator chain.png

那么算子合并的条件是什么哪?

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)
  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  6. 两个节点间数据分区方式是 forward(参考理解数据流的分区
  7. 用户没有禁用 chain

可以认为效果等同于将所有operator的实现都封装于一个大的方法体中串行执行,的确是提供啦极大的灵活性。一套代码即实现啦算子的合并,也可实现算子的拆分。 这里有一篇不错的文章可以参照原理实现:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/#

BackPressure的原理

一旦提起storm、spark streaming、Flink三种常用的数据流框架,最惹人吐槽的便是storm的反压机制,storm是通过监控bolt的负载,暴力的在spout端进行数据的暂停消费。而Flink的反压机制这是基于由下游往上游动态传递反压信息。
未完待续......

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容

  • 本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习...
    大数据研习社阅读 2,307评论 0 2
  • 一、整体架构 Flink整体由JobManager和TaskManager组成,遵循主从设计原则,JobManag...
    寇寇寇先森阅读 1,750评论 0 2
  • Flink系统组成 Flink是一个分层系统,从下到上分为:系统部署层、任务运行层、API层以及基于API开发的通...
    零度沸腾_yjz阅读 1,657评论 0 8
  • 简介 Flink运行时主要角色有两个:JobManager和TaskManager,无论是standalone集群...
    香山上的麻雀阅读 23,335评论 0 15
  • 满眼的繁星 霸占了整个天空 几千米大脑回路 都是你的身影 月儿丢了 玉儿远了 牛郎星很亮 你的织女星 也很亮吗 我...
    田园听雨阅读 285评论 2 1