maven 依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
Flink 批处理能力
/**
* 批处理
*/
public class WordCount {
public static void main(String[] args) throws Exception{
//1.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.从文件读取数据
String inputPath = "/you/path/hello.txt";
DataSet<String> inputDataSet = env.readTextFile(inputPath);
//3.对数据集进行处理 ,拆成 单个单词,转换成 2元组(word ,1)进行统计
DataSet<Tuple2<String,Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) //按照第一位置的word分组
.sum(1); //按照第二个位置上的数据求和
resultSet.print(); //输出
}
//自定义类实现FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//按照空格分词
String[] words = value.split(" ");
//遍历所有的 word ,包成而元组输出
for (String word: words ) {
out.collect(new Tuple2<String,Integer>(word ,1));
}
}
}
}
Flink 流处理能力
/**
* 流处理
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception{
//env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认的并行度,是按照的当前电脑的cpu
// env.getParallelism();
// env.setParallelism(4);
//前面的数字 1 ,代表当前我们现在环境当前并行执行的线程的编号
// 1> (hello,1)
//2.从文件读取数据
// String inputPath = "/path/to/hello.txt";
// DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);
//用 parameter tool 工具从程序的启动参数中提取配置项
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
//从队列读取数据
//从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);
//基于数据流进行转换计算 // keyBey按照key的hash ,进行重分区,不做计算
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
//打印输出
resultStream.print();
//执行任务
env.execute();
}
}
2.启动设置
# idea 设置
program arguments : --host localhost --port 7777
- 启动nc
nc -lk 7777