02 使用Flink的流处理完成词频统计

本节将阐述如何使用本地模式的flink完成流处理的词频统计。

1 系统、软件以及前提约束

2 操作

  • 1 在idea中创建一个maven项目
  • 2 修改该maven项目的pom.xml中的依赖
   <dependencies>
        <dependency>
            <!--spark依赖-->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!--scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <!--storm依赖-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
            <version>1.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
    </dependencies>
  • 3 在src/main/java中添加SocketWindowWordCountWithFlink.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountWithFlink {
    public static void main(String[] args) throws Exception {
        // final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // local模式
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
        @SuppressWarnings("serial")
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount(zyl_test)");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
  • 4 测试
    (1)打开windows命令行,执行以下命令:
nc -l -p 9999

(2)在idea中执行SocketWindowWordCountWithFlink.java
(3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。
以上就是使用Flink的本地模式进行的流处理词频统计过程,在本实验中,我们通过人输入字符串来模拟源源不断到来的数据流。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 前面我们已经安装了storm,storm有两种模式,一是本地模式,主要用于学习和测试,另一个是集群模式,实际生产中...
    张力的程序园阅读 1,465评论 0 1
  • 本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习...
    大数据研习社阅读 1,042评论 0 0
  • 《遇见王沥川》满足了我对爱情的所有幻想,也唤醒了我沉睡已久的少女心,往事的美好又清晰地展现,回味曾经的柔软,...
    残照当楼阅读 3,034评论 0 4
  • 我叫八刀,我这样告诉自己。 我是个杀手,职业杀手。我杀过很多人,也用过很多刀。或是因为爱情,或是亲情,抑或其它,我...
    左丨天阅读 693评论 0 2
  • 没有记录,就没有发生 我的三个标签: 1、12岁男孩的妈妈 2、萱妍堂护肤代理商 3、运动爱好者,易效能践行者 【...
    薇_f50b阅读 259评论 0 0

友情链接更多精彩内容