flink的local-cluster模式

Flink中的Local-cluster(本地集群)模式,主要用于测试、学习,可帮助我们快速入门flink。

1)local-cluster模式配置

local-cluster模式基本属于零配置。
配置步骤为:

1.上传Flink的安装包flink-1.12.0-bin-scala_2.11.tgz到hadoop162

2.解压
tar -zxvf flink-1.12.0-bin-scala_2.11.tgz -C /opt/module

3.进入目录/opt/module, 把刚刚解压出来的文件夹 flink-1.12.0 复制为 flink-local :
cd /opt/module
cp -r flink-1.12.0 flink-local

2)local-cluster模式下运行无界的WordCount

代码:

package com.evscn;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WC_UnBoundStream {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中获取流
        DataStreamSource<String> stream = env.socketTextStream("hadoop162", 9999);

        // 做转换
        SingleOutputStreamOperator<Tuple2<String, Long>> tmp = stream.flatMap((String value, Collector<Tuple2<String, Long>> out) -> {
            String[] arr = value.split(" ");
            for (String s : arr) {
                out.collect(Tuple2.of(s, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        KeyedStream<Tuple2<String, Long>, String> keyedStream = tmp.keyBy(value -> value.f0);
        SingleOutputStreamOperator<Tuple2<String, Long>> ans = keyedStream.sum(1);
        ans.print("sum: ");

        // 启动执行环境
        env.execute();
    }
}

打包并上传:

image.png

提交任务:


image.png

测试输入:

图片.png

如下图,观察 slot数、CPU核数 等信息,可见CPU核数是8核(虚拟8核,等价于8核),而flink的local-cluster模式下,只分配了1个核(即slot)(此时可简单认为1个slot对应1个核)给潜在的任务(Jobs)——这是因为,代码中设定了此任务的并行度为1【env.setParallelism(1);】:

图片.png

如下两图,可观察TM(TaskManager)的相关信息,留意心跳时间会不断被更新,每次心跳发生时,TaskManager得以与其他组件沟通:

图片.png
图片.png

在浏览器中查看程序的执行情况:

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容