Flink中的Local-cluster(本地集群)模式,主要用于测试、学习,可帮助我们快速入门flink。
1)local-cluster模式配置
local-cluster模式基本属于零配置。
配置步骤为:
1.上传Flink的安装包flink-1.12.0-bin-scala_2.11.tgz到hadoop162
2.解压
tar -zxvf flink-1.12.0-bin-scala_2.11.tgz -C /opt/module
3.进入目录/opt/module, 把刚刚解压出来的文件夹 flink-1.12.0 复制为 flink-local :
cd /opt/module
cp -r flink-1.12.0 flink-local
2)local-cluster模式下运行无界的WordCount
代码:
package com.evscn;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WC_UnBoundStream {
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件中获取流
DataStreamSource<String> stream = env.socketTextStream("hadoop162", 9999);
// 做转换
SingleOutputStreamOperator<Tuple2<String, Long>> tmp = stream.flatMap((String value, Collector<Tuple2<String, Long>> out) -> {
String[] arr = value.split(" ");
for (String s : arr) {
out.collect(Tuple2.of(s, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
KeyedStream<Tuple2<String, Long>, String> keyedStream = tmp.keyBy(value -> value.f0);
SingleOutputStreamOperator<Tuple2<String, Long>> ans = keyedStream.sum(1);
ans.print("sum: ");
// 启动执行环境
env.execute();
}
}
打包并上传:

image.png
提交任务:

image.png
测试输入:

图片.png
如下图,观察 slot数、CPU核数 等信息,可见CPU核数是8核(虚拟8核,等价于8核),而flink的local-cluster模式下,只分配了1个核(即slot)(此时可简单认为1个slot对应1个核)给潜在的任务(Jobs)——这是因为,代码中设定了此任务的并行度为1【env.setParallelism(1);】:

图片.png
如下两图,可观察TM(TaskManager)的相关信息,留意心跳时间会不断被更新,每次心跳发生时,TaskManager得以与其他组件沟通:

图片.png

图片.png
在浏览器中查看程序的执行情况:

图片.png