kafka中的数据,平常大多是json、cvs、String,这类数据使用SimpleStringSchema就可以轻松对数据进行序列化和反序列化。
但有时候又需要特殊结构的数据进行传输(DTO),需要根据实际的数据结构进行序列化和反序列化。
下面以Student的自定义结构体为例,使用FlinkSerializationSchema进行序列化和反序列化进行演示。
一、创建spring boot工程
pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qogo8</groupId>
<artifactId>flink-kfk-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>flink-kfk-demo</name>
<description>flink consume/produce demo</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.5</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.15.3-19.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包可运行fatjar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
二、创建自定义数据结构的类
1、创建Socres类
package com.qogo8.flink_kfk_demo.Student;
public class Scores {
private String subject; //科目
private int scores; //科目分数
public Scores() {
}
public Scores(int scores, String subject) {
this.scores = scores;
this.subject = subject;
}
public int getScores() {
return this.scores;
}
public void setScores(int scores) {
this.scores = scores;
}
public String getSubject() {
return this.subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
}
2、创建Student类
package com.qogo8.flink_kfk_demo.Student;
public class Student {
private String name; //学生姓名
private int age; //学生年龄
private int grade; // 年级
private int Sclass; //班级
private String hobby; //兴趣爱好
private Scores scores; //成绩
public void setGrade(int grade) {
this.grade = grade;
}
public int getGrade() {
return this.grade;
}
public void setSclass(int Sclass) {
this.Sclass = Sclass;
}
public int getSclass() {
return this.Sclass;
}
public void setHobby(String hobby) {
this.hobby = hobby;
}
public String getHobby() {
return this.hobby;
}
public Scores getScores() {
return this.scores;
}
public void setScores(Scores scores) {
this.scores = scores;
}
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return this.age;
}
public void setAge(int age) {
this.age = age;
}
public Student(String name, int age, int grade, int Sclass, String hobby) {
this.name = name;
this.age = age;
this.grade = grade;
this.Sclass = Sclass;
this.hobby = hobby;
this.scores = new Scores();
this.scores.setSubject("Chinese");
this.scores.setScores(22);
}
public Student(String name, int age, int grade, int Sclass, String hobby, String subject, int scores) {
this.name = name;
this.age = age;
this.grade = grade;
this.Sclass = Sclass;
this.hobby = hobby;
this.scores = new Scores();
this.scores.setSubject(subject);
this.scores.setScores(scores);
}
public Student(){}
public Student(String name, int age, Scores scores) {
this.name = name;
this.age = age;
this.scores = scores;
}
public String toString() {
return "Student [name=" + this.name + ",age=" + this.age + ",grade=" + this.grade + ",Sclass=" + this.Sclass + ",hobby=" + this.hobby + ",subject=" + this.scores.getSubject() + ",scores=" + this.scores.getScores() + "]";
}
}
三、定义序列化和反序列化类FlinkSerializationSchema
package com.qogo8.flink_kfk_demo.Serialization;
import org.apache.commons.lang3.SerializationException;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.qogo8.flink_kfk_demo.Student.Student;
import java.io.IOException;
import java.nio.ByteBuffer;
// 使用Flink 1.13.5版本序列化数据(因为本公司是使用的这个Flink版本^_^)
public class FlinkSerializationSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {
/**
* 将Student对象序列化为字节数组
*
* @param o 要序列化的Student对象
* @return 序列化后的字节数组
* @throws SerializationException 如果序列化过程中出现错误,则抛出此异常
*/
@Override
public byte[] serialize(Student o) {
try {
// 将传入的对象强制转换为Student类型
Student data = (Student) o;
if (data == null) {
// 如果对象为null,则返回null
return null;
} else {
// 定义用于存储序列化长度的变量
int serializedLength;
// 定义用于存储序列化后名称的字节数组
byte[] serializedName;
// 定义用于存储序列化后爱好的字节数组
byte[] serializedHobby;
// 定义用于存储序列化后科目的字节数组
byte[] serializedSubject;
// 如果学生名称不为null
if (data.getName() != null) {
// 将学生名称转换为UTF-8编码的字节数组,并获取长度
serializedName = data.getName().getBytes("UTF-8");
serializedLength = serializedName.length;
} else {
// 如果学生名称为null,则初始化为长度为0的字节数组,长度也为0
serializedName = new byte[0];
serializedLength = 0;
}
// 将学生爱好转换为UTF-8编码的字节数组
serializedHobby = data.getHobby().getBytes("UTF-8");
// 将学生科目转换为UTF-8编码的字节数组
serializedSubject = data.getScores().getSubject().getBytes("UTF-8");
// 分配一个ByteBuffer,大小为固定值28加上各个序列化字段的长度之和
//一个int是4个字节
ByteBuffer buffer = ByteBuffer.allocate(28 + serializedLength + serializedHobby.length + serializedSubject.length);
// 依次将学生年龄、年级、班级、名称长度、名称、爱好长度、爱好、成绩、科目长度、科目序列化到ByteBuffer中
buffer.putInt(serializedLength);
buffer.put(serializedName);
buffer.putInt(data.getAge());
buffer.putInt(data.getGrade());
buffer.putInt(data.getSclass());
buffer.putInt(serializedHobby.length);
buffer.put(serializedHobby);
buffer.putInt(serializedSubject.length);
buffer.put(serializedSubject);
buffer.putInt(data.getScores().getScores());
// 返回ByteBuffer的字节数组
return buffer.array();
}
} catch (Exception e) {
// 捕获异常并抛出SerializationException异常
throw new SerializationException("error when serializing..." + e);
}
}
/**
* 从字节数组反序列化一个学生对象
*
* @param data 包含学生对象序列化数据的字节数组
* @return 反序列化后的学生对象
* @throws IOException 如果发生IO异常
*/
@Override
public Student deserialize(byte[] data) throws IOException {
try {
// 如果数据为空,则返回null
if (data == null) {
return null;
} else if (data.length < 28) {
// 如果数据长度小于28,则抛出异常
throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected...");
} else {
// 使用ByteBuffer包装数据
ByteBuffer buffer = ByteBuffer.wrap(data);
// 读取名字长度
int nameLength = buffer.getInt();
// 根据名字长度创建字节数组
byte[] nameBytes = new byte[nameLength];
// 从缓冲区读取名字字节数据
buffer.get(nameBytes);
// 将名字字节数据转换为字符串
String name = new String(nameBytes, "UTF-8");
// 读取年龄
int age = buffer.getInt();
// 读取年级
int grade = buffer.getInt();
// 读取班级
int Sclass = buffer.getInt();
// 读取爱好长度
int hobbyLength = buffer.getInt();
// 根据爱好长度创建字节数组
byte[] hobbyBytes = new byte[hobbyLength];
// 从缓冲区读取爱好字节数据
buffer.get(hobbyBytes);
// 将爱好字节数据转换为字符串
String hobby = new String(hobbyBytes, "UTF-8");
// 读取科目长度
int subjectLength = buffer.getInt();
// 根据科目长度创建字节数组
byte[] subjectBytes = new byte[subjectLength];
// 从缓冲区读取科目字节数据
buffer.get(subjectBytes);
// 将科目字节数据转换为字符串
String subject = new String(subjectBytes, "UTF-8");
// 读取分数
int scores = buffer.getInt();
// 创建并返回学生对象
return new Student(name, age, grade, Sclass, hobby, subject, scores);
}
} catch (Exception e) {
// 如果反序列化过程中发生异常,则抛出异常
throw new SerializationException("error when deserializing..." + e);
// return new Student("name", 20, 1, 3, "good");
}
}
@Override
public boolean isEndOfStream(Student o) {
return false;
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(Student.class);
}
public Class<?> getDTOClass() {
return Student.class;
}
}
四、定义kafka生产者
上面已经定义好序列化和反序列化的类了,下面就来使用序列化类和反序列化类
定义FlinkKafkaProducer_DTO_Serializer类,用于序列化数据,同时往kafka里写数据
package com.qogo8.flink_kfk_demo.client;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.CommonClientConfigs;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;
public class FlinkKafkaProducer_DTO_Serializer {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(600000); // 每10秒创建一个检查点
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置检查点超时时间为60秒
env.setRestartStrategy(RestartStrategies.fallBackRestart());
DataStream<Student> ds = (DataStream<Student>) env.addSource(new SourceFunction<Student>() {
@Override
public void run(SourceContext<Student> ctx) throws Exception {
int i = 1;
SecureRandom rand = new SecureRandom();
final List<String> INTERESTS = Arrays.asList(
"阅读", "健身", "烹饪", "旅行", "摄影",
"编程", "绘画", "音乐", "登山", "骑行",
"舞蹈", "桌游", "露营", "钓鱼", "手工DIY"
);
final List<String> SubjectList = Arrays.asList(
"语文", "数学", "英语", "地理", "历史",
"生物", "科学", "物理", "化学", "美术"
);
final String[] surnames = {
"赵", "钱", "孙", "李", "周", "吴", "郑", "王", "冯", "陈",
"褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许"
};
final String[] givenNames = {
"伟", "芳", "娜", "秀英", "敏静", "丽", "强", "军", "磊", "超",
"杰", "婷婷", "鹏", "雪", "慧", "倩", "宇", "晨", "欣", "明"
};
String surname = surnames[rand.nextInt(surnames.length)];
String givenName = givenNames[rand.nextInt(givenNames.length)];
while (true) {
surname = surnames[rand.nextInt(surnames.length)];
givenName = givenNames[rand.nextInt(givenNames.length)];
Student student = new Student(surname + givenName,i,1,3,INTERESTS.get(rand.nextInt(INTERESTS.size())),SubjectList.get(rand.nextInt(SubjectList.size())),rand.nextInt(155) + 1);
ctx.collect(student);
System.out.println(student.toString());
i++;
TimeUnit.SECONDS.sleep(3);
}
}
@Override
public void cancel() {
}
}).setParallelism(1).name("gen-data");
Properties props = new Properties();
String broker = parameterTool.get("broker");
String topic = parameterTool.get("topic");
// String token = parameterTool.get("token");
// String clusterName = parameterTool.get("clusterName");
if (broker != null) {
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
} else {
throw new RuntimeException("broker为空");
}
FlinkKafkaProducer<Student> producer = new FlinkKafkaProducer<Student>(topic,
new FlinkSerializationSchema(),
props);
// FlinkFixedPartitioner 是 Flink 中 FlinkKafkaProducer 默认使用的分区策略,其核心效果是将 Flink 任务的 并行子任务(subtask) 与 Kafka 的 分区(partition) 建立固定映射关系,确保每个 Flink subtask 始终将数据写入固定的 Kafka 分区
// 可选:显式设置 FlinkFixedPartitioner(默认无需配置)
// producer.setCustomPartitioner(new FlinkFixedPartitioner<>());
//假定kafka有4个分区,并发数4可同时向4个分区写数
ds.addSink(producer).setParallelism(4).name("kafka-producer");
env.execute("flink_kfk_demo");
}
}
定义kafka消费者
定义FlinkKafkaConsumer_DTO_DeSerializer类,用于反序列化数据,同时从kafka里读数据
package com.qogo8.flink_kfk_demo.client;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;
import java.util.Properties;
import org.apache.flink.util.Collector;
/**
* 该类用于测试自定义dto序列化的发送
*/
public class FlinkKafkaConsumer_DTO_DeSerializer {
public FlinkKafkaConsumer_DTO_DeSerializer() {
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String broker = parameterTool.get("broker");
String topic = parameterTool.get("topic");
Properties props = new Properties();
if (broker != null) {
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "student-consumer-group1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
} else {
throw new RuntimeException("broker为空");
}
// 创建 Kafka 消费者
FlinkKafkaConsumer<Student> consumer = new FlinkKafkaConsumer<Student>(topic, new FlinkSerializationSchema(), props);
// 添加 Kafka 消费者为数据源
DataStream<Student> stream = env.addSource(consumer);
// 简单的数据处理(将输入字符串拆分为单词)
DataStream<String> words = stream.flatMap(new FlatMapFunction<Student, String>() {
@Override
public void flatMap(Student value, Collector<String> out) {
// 根据实际业务逻辑处理Student对象
out.collect(value.toString());
// 这里可以添加你的业务处理逻辑
processStudent(value);
}
});
// 将处理后的数据打印到控制台
words.print();
// 启动作业
env.execute("Flink Kafka Consumer Job");
}
private static void processStudent(Student student) {
// 处理Student对象的业务逻辑
System.out.println("学生: " + student.getName());
}
}