Flink入门案例-WordCount流处理

一、maven项目的pom.xml中的依赖

   <properties>
        <flink.version>1.9.1</flink.version>
    </properties>
    <!--引入flink依赖-->
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

二、测试数据

input file path:./hello.txt

hello world
hello flink
hello spark
hello scala
how are you
fine thank you
and you

三、Flink WordCount Java版

package com.cn.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(2);
        String inputPath = "C:\\workstation\\maven_project\\flink_wordcount\\src\\main\\resources\\hello.txt";

        // DataStreamSource 继承SingleOutputStreamOperator,其继承DataStream
        // 从文件中读取数据并模仿流式数据
        DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);

        // 从socket(localhost:9000 可自己定义)文本流读取数据
        // DataStreamSource<String> inputDataStream2 = env.socketTextStream("localhost", 9000);

        // 基于数据流进行转换操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        })
        .keyBy(0) // 针对相同的word 合并,批处理groupby,流处理keyby
        .sum(1);
        result.print(); // 到这里不会输出
        // 触发流执行任务
        env.execute();
    }
}

四、运行结果

注:随着流的不断的触发任务会不断更新结果。
2> (how,1)
1> (hello,1)
2> (you,1)
2> (fine,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (and,1)
1> (spark,1)
1> (hello,4)
2> (you,3)
1> (scala,1)
2> (world,1)
1> (are,1)
1> (thank,1)
2> (flink,1)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,619评论 0 11
  • 彩排完,天已黑
    刘凯书法阅读 4,325评论 1 3
  • 没事就多看看书,因为腹有诗书气自华,读书万卷始通神。没事就多出去旅游,别因为没钱而找借口,因为只要你省吃俭用,来...
    向阳之心阅读 4,828评论 3 11
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 126,186评论 2 7