Flink结合Kafka简单实战应用

一、环境准备

  1. 在VMware上开启三台CentOS7虚拟机;



  2. 通过Xshell连接这三台虚拟机,开启zookeeper集群以及kafka;



  3. 使用IDEA创建maven项目。

二、概念说明

  (一)Flink流处理任务的逻辑视图

  同Spark Streaming、Storm等流计算引擎一样,Flink的数据处理组件也被分为三类:数据输入(source)、数据处理(transformation)和数据输出(sink)。Flink程序实际执行时,会映射到流数据流(streaming dataflow),每个数据流起始于一个或多个source,并终止于一个或多个sink。


  (二)kafka的简单介绍
  1.生产者(Producer)

  顾名思义,生产者就是生产消息的组件,它的主要工作就是源源不断地生产出消息,然后发送给消息队列。生产者是消息队列的数据源,只有通过生产者持续不断地向消息队列发送消息,消息队列才能不断处理消息。

  2.消费者(Consumer)

  所谓消费者,指的是不断消费(获取)消息的组件,它获取消息的来源就是消息队列(即Kafka本身)。换句话说,生产者不断向消息队列发送消息,而消费者则不断从消息队列中获取消息。

  3.主题(Topic)

  主题是一个逻辑上的概念,它用于从逻辑上来归类与存储消息本身。多个生产者可以向一个Topic发送消息,同时也可以有多个消费者消费一个Topic中的消息。Topic与消息这两个概念之间密切相关,Kafka中的每一条消息都归属于某一个Topic,而一个Topic下面可以有任意数量的消息。

  (三)Flink-Connector-Kafka作用

  1. Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复;
  2. Kafka可以作为Flink的source和sink;
  3. 任务失败,通过设置kafka的offset来恢复应用。


三、简单实战应用

  (一)应用说明

  某手机销售网站,希望看到某一时间段内实时的手机销量情况。翻译一下,可以是每隔5秒钟输出最近10分钟的手机销量的wordcount。

  (二)逻辑视图
  (三)代码实现
  0. pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zj</groupId>
    <artifactId>flkafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.7.0</version>
    </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
            <dependency>
                <groupId>com.typesafe</groupId>
                <artifactId>config</artifactId>
                <version>1.3.2</version>
            </dependency>
        <dependency>
            <groupId>jfree</groupId>
            <artifactId>jfreechart</artifactId>
            <version>1.0.13</version>
        </dependency>

        <dependency>
            <groupId>jfree</groupId>
            <artifactId>jcommon</artifactId>
            <version>1.0.16</version>
        </dependency>
    </dependencies>
    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>flink.KafkaDemo1</mainClass>
                            </transformer>-->
                            <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
    </build>
</project>
  1. 模拟产生购买手机的流数据:

  将kafka作为Flink的sink,编写代码实现一个发送器向kafka中指定的topic发送模拟购买手机的数据。
  KafkaProducer.java(向kafka写入数据):

/**
    向Kafka发消息模拟购买事件
 */
package zj666;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class KafkaProducer {


    public static void main(String[] args) throws Exception{
        //首先要创建StreamExecutionEnvironment,这是流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //source是程序的数据源输入,通过addSource(sourceFunction)来为程序添加一个source
        //flink提供了大量的已经实现好的source方法,可以自定义source
        //这里是通过实现sourceFunction接口来自定义一个并行度为1的source
        DataStreamSource<String> text = env.addSource(new FlinkSource()).setParallelism(1);

        //Properties类用于设置配置文件中的参数
        Properties properties = new Properties();

        //调用setProperty方法将设置的参数保存到配置文件中
        //这里是设置了kafka的bootstrap.servers参数,为启动kafka的主机ip和端口
        properties.setProperty("bootstrap.servers", "10.6.6.2:9092");

        //创建生产者producer,指定消息要发送到的topic以及添加上面的properties
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test",new SimpleStringSchema(),properties);

        //自定义的接收器,接收producer产生的消息
        text.addSink(producer);

        //execute()方法触发程序执行
        env.execute();
    }
}

  FlinkSource.java(自定义实现的发送器):

/**
    自定义实现的一个发送器,用来向kafka发送数据:
 */
package zj666;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class FlinkSource implements SourceFunction<String> {

    private boolean isRunning = true;

    /**
     * 在run方法中实现一个循环,模拟产生购物数据
*/
    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        while(isRunning){

            //模拟产生购买手机的数据
            //创建一个数组队列phone,在里面添加5类手机品牌
            List<String> phone = new ArrayList<>();
            phone.add("Samsung");
            phone.add("华为");
            phone.add("iPhone");
            phone.add("小米");
            phone.add("OPPO");

            //产生0-4随机索引值,通过该索引值来访问phone中的元素
            int i = new Random().nextInt(5);
            ctx.collect(phone.get(i));

            //每2毫秒产生一条数据
            Thread.sleep(200);
        }
    }

    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}
  2.实时count

  将kafka作为作为Flink的source,从kafka中接收指定的topic的消息作为输入。再使用Flink的滑动窗口机制(Sliding Windows),按10分钟的窗口大小,每5秒钟统计一次,做滑动窗口聚合,最后输出聚合结果。
  滑动窗口原理:


  滚动窗口原理:

  TopN.java:

/**
    每隔5秒钟,计算过去十分钟手机销量情况
 */
package zj666;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.Comparator;
import java.util.Properties;
import java.util.TreeMap;

public class TopN {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        env.setParallelism(1);
        //ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.6.6.2:9092");
        //从kafka中接收指定的topic的消息作为输入input
        FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);

        //设置消费模式为setStartFromEarliest,说明是从最早的记录开始消费
        input.setStartFromEarliest();

        //将input作为数据流stream的source
        DataStream<String> stream = env
                .addSource(input);

        //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型
        DataStream<Tuple2<String, Integer>> ds = stream
                .flatMap(new LineSplitter());


        DataStream<Tuple2<String, Integer>> wcount = ds
                .keyBy(0)//将同一Key的数据放到同一个分区
                .window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(5)))
                //keyBy之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
                .sum(1);// 将相同key的元素的count值相加

        wcount
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(手机1, xx) (手机2,xx)....
                //windowAll定义一个全局窗口,所有key元素进入一个5s长的滚动窗口(选5秒是因为上游的滑动窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
                .process(new TopNAllFunction(5))
                .print();

        env.execute();
    }//

    private static final class LineSplitter implements
            FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            //(手机1,1) (手机2,1) (手机3,1)
             out.collect(new Tuple2<String, Integer>(value, 1));
        }

    }

    private static class TopNAllFunction
            extends
            ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> {

        private int topSize;

        public TopNAllFunction(int topSize) {

            this.topSize = topSize;
        }

        public void process(

                ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input,
                Collector<String> out) throws Exception {

            TreeMap<Integer, String> treemap = new TreeMap<Integer, String>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            return (x < y) ? -1 : 1;
                        }

                    }); //treemap按照key降序排列,相同count值不覆盖
            int[] pdata=new int[5];
            for (Tuple2<String, Integer> element : input) {
                System.out.println(element);
                if(element.f0.equals("华为")){
                    pdata[0] = element.f1;
                }
                if(element.f0.equals("小米")){
                    pdata[1] = element.f1;
                }
                if(element.f0.equals("OPPO")){
                    pdata[2] = element.f1;
                }
                if(element.f0.equals("Samsung")){
                    pdata[3] = element.f1;
                }
                if(element.f0.equals("iPhone")){
                    pdata[4] = element.f1;
                }

                treemap.put(element.f1, element.f0);
                if (treemap.size() > topSize) { //只保留前面TopN个元素
                    treemap.pollLastEntry();
                }
            }
            //通过柱形图来实现可视化
            BarChart.main((new Timestamp(System.currentTimeMillis())).toString(),pdata[0],pdata[1],pdata[2],pdata[3],pdata[4]);
            //每隔5s输出实时手机销量
            out.collect("=================\n" + new Timestamp(System.currentTimeMillis()) + "\n" + "实时手机销量:\n" + treemap.toString());

        }
    }

}
  3.可视化

  将每一次统计的结果以柱形图的形式可视化,每更新一次数据,统计图的数据也相应更新。
  BarChart.java(实现柱形图):

/**
 实现柱形图
 */
package zj666;

import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.CategoryAxis;
import org.jfree.chart.axis.ValueAxis;
import org.jfree.chart.labels.StandardCategoryItemLabelGenerator;
import org.jfree.chart.plot.CategoryPlot;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.chart.renderer.category.BarRenderer3D;
import org.jfree.data.category.CategoryDataset;
import org.jfree.data.category.DefaultCategoryDataset;

import javax.swing.*;
import java.awt.*;

public class BarChart {
    //1-写一个ChartPanel变量
    ChartPanel jframe;

    //2-BarChart的无参数的构造方法
    public BarChart(String time,int a,int b,int c,int d,int e) {
        DefaultCategoryDataset data=(DefaultCategoryDataset) getDataSet(a,b,c,d,e);
        JFreeChart chart= ChartFactory.createBarChart3D(
                time + "手机实时销售数据统计",  //图表标题
                "手机品牌",//目录轴的显示标签
                "数量",//数值轴的显示标签
                data,
                PlotOrientation.VERTICAL,  //图表方向 水平 垂直
                true,  //是否显示图例(对于简单的图表建议显示图例)
                false,//是否生成工具
                false);  //是否生成网址链接
        //字体设置
        //获得图表区域对象
        CategoryPlot plot=chart.getCategoryPlot();
        //水平底部列表
        CategoryAxis domain =plot.getDomainAxis();
        //垂直标题字体设置
        domain.setTickLabelFont(new Font("黑体", Font.BOLD, 16));
        //水平底部标题设置
        domain.setLabelFont(new Font("黑体", Font.BOLD, 20));
        //获取柱状体
        ValueAxis rangeAxis=plot.getRangeAxis();
        rangeAxis.setLabelFont(new Font("黑体", Font.BOLD, 16));
        //设置lengend字体
        chart.getLegend().setItemFont(new Font("黑体", Font.BOLD, 16));
        chart.getTitle().setFont(new Font("黑体", Font.BOLD, 16));
        //显示数值
        BarRenderer3D renderer = new BarRenderer3D();//3D属性修改
        renderer.setBaseItemLabelGenerator(new StandardCategoryItemLabelGenerator());
        renderer.setBaseItemLabelsVisible(true);
        renderer.setBaseItemLabelPaint(Color.BLUE);//设置数值颜色,默认黑色
        plot.setRenderer(renderer);
        //初始化Jframe
        jframe=new ChartPanel(chart);


    }

    //3-图表数据设置
    public static CategoryDataset getDataSet(int a,int b,int c,int d,int e) {
        DefaultCategoryDataset data=new DefaultCategoryDataset();
        //设置数据
        data.setValue(a, "国产", "华为");
        data.setValue(b, "国产", "小米");
        data.setValue(c, "国产", "OPPO");
        data.setValue(d, "国外", "Samsung");
        data.setValue(e, "国外", "iPhone");

        return data;
    }

    //4-返回一个ChartPanel
    public ChartPanel getPanel() {
        return jframe;
    }
    public static void main(String time,int a,int b,int c, int d,int e) throws InterruptedException {
        JFrame j=new JFrame();
        JDialog jd=new JDialog();
        jd.setBounds(250, 250, 600, 600);
        jd.add(new BarChart(time,a,b,c,d,e).getPanel());
        jd.setVisible(true);
        jd.setDefaultCloseOperation(javax.swing.WindowConstants.DISPOSE_ON_CLOSE);
    }
}

四、操作步骤

  1. 在kafka创建一个topic test


  2. 运行KafkaProducer.java,向kafka发送消息:
  IDEA控制台输出:

  kafka输出:

  3. 运行TopN.java,实时显示销售数量:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352