一、环境准备
1. 在VMware上开启三台CentOS7虚拟机;
2. 通过Xshell连接这三台虚拟机,开启zookeeper集群以及kafka;
3. 使用IDEA创建maven项目。
二、概念说明
(一)Flink流处理任务的逻辑视图
同Spark Streaming、Storm等流计算引擎一样,Flink的数据处理组件也被分为三类:数据输入(source)、数据处理(transformation)和数据输出(sink)。Flink程序实际执行时,会映射到流数据流(streaming dataflow),每个数据流起始于一个或多个source,并终止于一个或多个sink。
(二)kafka的简单介绍
1.生产者(Producer)
顾名思义,生产者就是生产消息的组件,它的主要工作就是源源不断地生产出消息,然后发送给消息队列。生产者是消息队列的数据源,只有通过生产者持续不断地向消息队列发送消息,消息队列才能不断处理消息。
2.消费者(Consumer)
所谓消费者,指的是不断消费(获取)消息的组件,它获取消息的来源就是消息队列(即Kafka本身)。换句话说,生产者不断向消息队列发送消息,而消费者则不断从消息队列中获取消息。
3.主题(Topic)
主题是一个逻辑上的概念,它用于从逻辑上来归类与存储消息本身。多个生产者可以向一个Topic发送消息,同时也可以有多个消费者消费一个Topic中的消息。Topic与消息这两个概念之间密切相关,Kafka中的每一条消息都归属于某一个Topic,而一个Topic下面可以有任意数量的消息。
(三)Flink-Connector-Kafka作用
1. Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复;
2. Kafka可以作为Flink的source和sink;
3. 任务失败,通过设置kafka的offset来恢复应用。
三、简单实战应用
(一)应用说明
某手机销售网站,希望看到某一时间段内实时的手机销量情况。翻译一下,可以是每隔5秒钟输出最近10分钟的手机销量的wordcount。
(二)逻辑视图
(三)代码实现
0. pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zj</groupId>
<artifactId>flkafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>jfree</groupId>
<artifactId>jcommon</artifactId>
<version>1.0.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>flink.KafkaDemo1</mainClass>
</transformer>-->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1. 模拟产生购买手机的流数据:
将kafka作为Flink的sink,编写代码实现一个发送器向kafka中指定的topic发送模拟购买手机的数据。
KafkaProducer.java(向kafka写入数据):
/**
向Kafka发消息模拟购买事件
*/
package zj666;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaProducer {
public static void main(String[] args) throws Exception{
//首先要创建StreamExecutionEnvironment,这是流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//source是程序的数据源输入,通过addSource(sourceFunction)来为程序添加一个source
//flink提供了大量的已经实现好的source方法,可以自定义source
//这里是通过实现sourceFunction接口来自定义一个并行度为1的source
DataStreamSource<String> text = env.addSource(new FlinkSource()).setParallelism(1);
//Properties类用于设置配置文件中的参数
Properties properties = new Properties();
//调用setProperty方法将设置的参数保存到配置文件中
//这里是设置了kafka的bootstrap.servers参数,为启动kafka的主机ip和端口
properties.setProperty("bootstrap.servers", "10.6.6.2:9092");
//创建生产者producer,指定消息要发送到的topic以及添加上面的properties
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test",new SimpleStringSchema(),properties);
//自定义的接收器,接收producer产生的消息
text.addSink(producer);
//execute()方法触发程序执行
env.execute();
}
}
FlinkSource.java(自定义实现的发送器):
/**
自定义实现的一个发送器,用来向kafka发送数据:
*/
package zj666;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class FlinkSource implements SourceFunction<String> {
private boolean isRunning = true;
/**
* 在run方法中实现一个循环,模拟产生购物数据
*/
@Override
public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
while(isRunning){
//模拟产生购买手机的数据
//创建一个数组队列phone,在里面添加5类手机品牌
List<String> phone = new ArrayList<>();
phone.add("Samsung");
phone.add("华为");
phone.add("iPhone");
phone.add("小米");
phone.add("OPPO");
//产生0-4随机索引值,通过该索引值来访问phone中的元素
int i = new Random().nextInt(5);
ctx.collect(phone.get(i));
//每2毫秒产生一条数据
Thread.sleep(200);
}
}
//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
2.实时count
将kafka作为作为Flink的source,从kafka中接收指定的topic的消息作为输入。再使用Flink的滑动窗口机制(Sliding Windows),按10分钟的窗口大小,每5秒钟统计一次,做滑动窗口聚合,最后输出聚合结果。
滑动窗口原理:
滚动窗口原理:
TopN.java:
/**
每隔5秒钟,计算过去十分钟手机销量情况
*/
package zj666;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.Comparator;
import java.util.Properties;
import java.util.TreeMap;
public class TopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为1
env.setParallelism(1);
//ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.6.6.2:9092");
//从kafka中接收指定的topic的消息作为输入input
FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
//设置消费模式为setStartFromEarliest,说明是从最早的记录开始消费
input.setStartFromEarliest();
//将input作为数据流stream的source
DataStream<String> stream = env
.addSource(input);
//将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型
DataStream<Tuple2<String, Integer>> ds = stream
.flatMap(new LineSplitter());
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0)//将同一Key的数据放到同一个分区
.window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(5)))
//keyBy之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
.sum(1);// 将相同key的元素的count值相加
wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(手机1, xx) (手机2,xx)....
//windowAll定义一个全局窗口,所有key元素进入一个5s长的滚动窗口(选5秒是因为上游的滑动窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
.process(new TopNAllFunction(5))
.print();
env.execute();
}//
private static final class LineSplitter implements
FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
//(手机1,1) (手机2,1) (手机3,1)
out.collect(new Tuple2<String, Integer>(value, 1));
}
}
private static class TopNAllFunction
extends
ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> {
private int topSize;
public TopNAllFunction(int topSize) {
this.topSize = topSize;
}
public void process(
ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0,
Iterable<Tuple2<String, Integer>> input,
Collector<String> out) throws Exception {
TreeMap<Integer, String> treemap = new TreeMap<Integer, String>(
new Comparator<Integer>() {
@Override
public int compare(Integer y, Integer x) {
return (x < y) ? -1 : 1;
}
}); //treemap按照key降序排列,相同count值不覆盖
int[] pdata=new int[5];
for (Tuple2<String, Integer> element : input) {
System.out.println(element);
if(element.f0.equals("华为")){
pdata[0] = element.f1;
}
if(element.f0.equals("小米")){
pdata[1] = element.f1;
}
if(element.f0.equals("OPPO")){
pdata[2] = element.f1;
}
if(element.f0.equals("Samsung")){
pdata[3] = element.f1;
}
if(element.f0.equals("iPhone")){
pdata[4] = element.f1;
}
treemap.put(element.f1, element.f0);
if (treemap.size() > topSize) { //只保留前面TopN个元素
treemap.pollLastEntry();
}
}
//通过柱形图来实现可视化
BarChart.main((new Timestamp(System.currentTimeMillis())).toString(),pdata[0],pdata[1],pdata[2],pdata[3],pdata[4]);
//每隔5s输出实时手机销量
out.collect("=================\n" + new Timestamp(System.currentTimeMillis()) + "\n" + "实时手机销量:\n" + treemap.toString());
}
}
}
3.可视化
将每一次统计的结果以柱形图的形式可视化,每更新一次数据,统计图的数据也相应更新。
BarChart.java(实现柱形图):
/**
实现柱形图
*/
package zj666;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.CategoryAxis;
import org.jfree.chart.axis.ValueAxis;
import org.jfree.chart.labels.StandardCategoryItemLabelGenerator;
import org.jfree.chart.plot.CategoryPlot;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.chart.renderer.category.BarRenderer3D;
import org.jfree.data.category.CategoryDataset;
import org.jfree.data.category.DefaultCategoryDataset;
import javax.swing.*;
import java.awt.*;
public class BarChart {
//1-写一个ChartPanel变量
ChartPanel jframe;
//2-BarChart的无参数的构造方法
public BarChart(String time,int a,int b,int c,int d,int e) {
DefaultCategoryDataset data=(DefaultCategoryDataset) getDataSet(a,b,c,d,e);
JFreeChart chart= ChartFactory.createBarChart3D(
time + "手机实时销售数据统计", //图表标题
"手机品牌",//目录轴的显示标签
"数量",//数值轴的显示标签
data,
PlotOrientation.VERTICAL, //图表方向 水平 垂直
true, //是否显示图例(对于简单的图表建议显示图例)
false,//是否生成工具
false); //是否生成网址链接
//字体设置
//获得图表区域对象
CategoryPlot plot=chart.getCategoryPlot();
//水平底部列表
CategoryAxis domain =plot.getDomainAxis();
//垂直标题字体设置
domain.setTickLabelFont(new Font("黑体", Font.BOLD, 16));
//水平底部标题设置
domain.setLabelFont(new Font("黑体", Font.BOLD, 20));
//获取柱状体
ValueAxis rangeAxis=plot.getRangeAxis();
rangeAxis.setLabelFont(new Font("黑体", Font.BOLD, 16));
//设置lengend字体
chart.getLegend().setItemFont(new Font("黑体", Font.BOLD, 16));
chart.getTitle().setFont(new Font("黑体", Font.BOLD, 16));
//显示数值
BarRenderer3D renderer = new BarRenderer3D();//3D属性修改
renderer.setBaseItemLabelGenerator(new StandardCategoryItemLabelGenerator());
renderer.setBaseItemLabelsVisible(true);
renderer.setBaseItemLabelPaint(Color.BLUE);//设置数值颜色,默认黑色
plot.setRenderer(renderer);
//初始化Jframe
jframe=new ChartPanel(chart);
}
//3-图表数据设置
public static CategoryDataset getDataSet(int a,int b,int c,int d,int e) {
DefaultCategoryDataset data=new DefaultCategoryDataset();
//设置数据
data.setValue(a, "国产", "华为");
data.setValue(b, "国产", "小米");
data.setValue(c, "国产", "OPPO");
data.setValue(d, "国外", "Samsung");
data.setValue(e, "国外", "iPhone");
return data;
}
//4-返回一个ChartPanel
public ChartPanel getPanel() {
return jframe;
}
public static void main(String time,int a,int b,int c, int d,int e) throws InterruptedException {
JFrame j=new JFrame();
JDialog jd=new JDialog();
jd.setBounds(250, 250, 600, 600);
jd.add(new BarChart(time,a,b,c,d,e).getPanel());
jd.setVisible(true);
jd.setDefaultCloseOperation(javax.swing.WindowConstants.DISPOSE_ON_CLOSE);
}
}
四、操作步骤
1. 在kafka创建一个topic test
:
2. 运行
KafkaProducer.java
,向kafka发送消息:IDEA控制台输出:
kafka输出:
3. 运行
TopN.java
,实时显示销售数量: