Flink序列化和反序列化DTO数据

kafka中的数据,平常大多是json、cvs、String,这类数据使用SimpleStringSchema就可以轻松对数据进行序列化和反序列化。
但有时候又需要特殊结构的数据进行传输(DTO),需要根据实际的数据结构进行序列化和反序列化。
下面以Student的自定义结构体为例,使用FlinkSerializationSchema进行序列化和反序列化进行演示。

一、创建spring boot工程

pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.qogo8</groupId>
    <artifactId>flink-kfk-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>flink-kfk-demo</name>
    <description>flink consume/produce demo</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.5</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.5.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-jackson</artifactId>
            <version>2.15.3-19.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 打包可运行fatjar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

二、创建自定义数据结构的类

1、创建Socres类

package com.qogo8.flink_kfk_demo.Student;

public class Scores {
    private String subject;  //科目
    private int scores;  //科目分数

    public Scores() {
    }

    public Scores(int scores, String subject) {
        this.scores = scores;
        this.subject = subject;
    }

    public int getScores() {
        return this.scores;
    }

    public void setScores(int scores) {
        this.scores = scores;
    }

    public String getSubject() {
        return this.subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }
}

2、创建Student类

package com.qogo8.flink_kfk_demo.Student;

public class Student {
    private String name;  //学生姓名
    private int age;  //学生年龄
    private int grade;  // 年级
    private int Sclass;  //班级
    private String hobby; //兴趣爱好
    private Scores scores;  //成绩

    public void setGrade(int grade) {
        this.grade = grade;
    }

    public int getGrade() {
        return this.grade;
    }

    public void setSclass(int Sclass) {
        this.Sclass = Sclass;
    }

    public int getSclass() {
        return this.Sclass;
    }

    public void setHobby(String hobby) {
        this.hobby = hobby;
    }

    public String getHobby() {
        return this.hobby;
    }

    public Scores getScores() {
        return this.scores;
    }

    public void setScores(Scores scores) {
        this.scores = scores;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return this.age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age, int grade, int Sclass, String hobby) {
        this.name = name;
        this.age = age;
        this.grade = grade;
        this.Sclass = Sclass;
        this.hobby = hobby;
        this.scores = new Scores();
        this.scores.setSubject("Chinese");
        this.scores.setScores(22);
    }

    public Student(String name, int age, int grade, int Sclass, String hobby, String subject, int scores) {
        this.name = name;
        this.age = age;
        this.grade = grade;
        this.Sclass = Sclass;
        this.hobby = hobby;
        this.scores = new Scores();
        this.scores.setSubject(subject);
        this.scores.setScores(scores);
    }

    public Student(){}

    public Student(String name, int age, Scores scores) {
        this.name = name;
        this.age = age;
        this.scores = scores;
    }

    public String toString() {
        return "Student [name=" + this.name + ",age=" + this.age + ",grade=" + this.grade + ",Sclass=" + this.Sclass + ",hobby=" + this.hobby + ",subject=" + this.scores.getSubject() + ",scores=" + this.scores.getScores() + "]";
    }
}

三、定义序列化和反序列化类FlinkSerializationSchema

package com.qogo8.flink_kfk_demo.Serialization;

import org.apache.commons.lang3.SerializationException;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import com.qogo8.flink_kfk_demo.Student.Student;

import java.io.IOException;
import java.nio.ByteBuffer;

// 使用Flink 1.13.5版本序列化数据(因为本公司是使用的这个Flink版本^_^)
public class FlinkSerializationSchema implements DeserializationSchema<Student>, SerializationSchema<Student> {

    /**
     * 将Student对象序列化为字节数组
     *
     * @param o 要序列化的Student对象
     * @return 序列化后的字节数组
     * @throws SerializationException 如果序列化过程中出现错误,则抛出此异常
     */
    @Override
    public byte[] serialize(Student o) {
        try {
            // 将传入的对象强制转换为Student类型
            Student data = (Student) o;
            if (data == null) {
                // 如果对象为null,则返回null
                return null;
            } else {
                // 定义用于存储序列化长度的变量
                int serializedLength;
                // 定义用于存储序列化后名称的字节数组
                byte[] serializedName;
                // 定义用于存储序列化后爱好的字节数组
                byte[] serializedHobby;
                // 定义用于存储序列化后科目的字节数组
                byte[] serializedSubject;

                // 如果学生名称不为null
                if (data.getName() != null) {
                    // 将学生名称转换为UTF-8编码的字节数组,并获取长度
                    serializedName = data.getName().getBytes("UTF-8");
                    serializedLength = serializedName.length;
                } else {
                    // 如果学生名称为null,则初始化为长度为0的字节数组,长度也为0
                    serializedName = new byte[0];
                    serializedLength = 0;
                }

                // 将学生爱好转换为UTF-8编码的字节数组
                serializedHobby = data.getHobby().getBytes("UTF-8");
                // 将学生科目转换为UTF-8编码的字节数组
                serializedSubject = data.getScores().getSubject().getBytes("UTF-8");

                // 分配一个ByteBuffer,大小为固定值28加上各个序列化字段的长度之和
                //一个int是4个字节
                ByteBuffer buffer = ByteBuffer.allocate(28 + serializedLength + serializedHobby.length + serializedSubject.length);
                // 依次将学生年龄、年级、班级、名称长度、名称、爱好长度、爱好、成绩、科目长度、科目序列化到ByteBuffer中
                buffer.putInt(serializedLength);
                buffer.put(serializedName);
                buffer.putInt(data.getAge());
                buffer.putInt(data.getGrade());
                buffer.putInt(data.getSclass());
                buffer.putInt(serializedHobby.length);
                buffer.put(serializedHobby);
                buffer.putInt(serializedSubject.length);
                buffer.put(serializedSubject);
                buffer.putInt(data.getScores().getScores());
                
                // 返回ByteBuffer的字节数组
                return buffer.array();
            }
        } catch (Exception e) {
            // 捕获异常并抛出SerializationException异常
            throw new SerializationException("error when serializing..." + e);
        }
    }

     /**
      * 从字节数组反序列化一个学生对象
      *
      * @param data 包含学生对象序列化数据的字节数组
      * @return 反序列化后的学生对象
      * @throws IOException 如果发生IO异常
      */
     @Override
    public Student deserialize(byte[] data) throws IOException {
        try {
             // 如果数据为空,则返回null
             if (data == null) {
                 return null;
             } else if (data.length < 28) {
                 // 如果数据长度小于28,则抛出异常
                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected...");
            } else {
                 // 使用ByteBuffer包装数据
                ByteBuffer buffer = ByteBuffer.wrap(data);
                // 读取名字长度
                int nameLength = buffer.getInt();
                 // 根据名字长度创建字节数组
                byte[] nameBytes = new byte[nameLength];
                 // 从缓冲区读取名字字节数据
                buffer.get(nameBytes);
                 // 将名字字节数据转换为字符串
                String name = new String(nameBytes, "UTF-8");
                 // 读取年龄
                int age = buffer.getInt();
                 // 读取年级
                int grade = buffer.getInt();
                 // 读取班级
                int Sclass = buffer.getInt();
                 // 读取爱好长度
                int hobbyLength = buffer.getInt();
                 // 根据爱好长度创建字节数组
                byte[] hobbyBytes = new byte[hobbyLength];
                 // 从缓冲区读取爱好字节数据
                buffer.get(hobbyBytes);
                 // 将爱好字节数据转换为字符串
                String hobby = new String(hobbyBytes, "UTF-8");
                 // 读取科目长度
                int subjectLength = buffer.getInt();
                 // 根据科目长度创建字节数组
                byte[] subjectBytes = new byte[subjectLength];
                 // 从缓冲区读取科目字节数据
                buffer.get(subjectBytes);
                 // 将科目字节数据转换为字符串
                String subject = new String(subjectBytes, "UTF-8");
                // 读取分数
                int scores = buffer.getInt();
                 // 创建并返回学生对象
                 return new Student(name, age, grade, Sclass, hobby, subject, scores);
            }
        } catch (Exception e) {
             // 如果反序列化过程中发生异常,则抛出异常
            throw new SerializationException("error when deserializing..." + e);
             // return new Student("name", 20, 1, 3, "good");
        }
    }

    @Override
    public boolean isEndOfStream(Student o) {
        return false;
    }

    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(Student.class);
    }

    public Class<?> getDTOClass() {
        return Student.class;
    }
}

四、定义kafka生产者

上面已经定义好序列化和反序列化的类了,下面就来使用序列化类和反序列化类
定义FlinkKafkaProducer_DTO_Serializer类,用于序列化数据,同时往kafka里写数据

package com.qogo8.flink_kfk_demo.client;

import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.CommonClientConfigs;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;

public class FlinkKafkaProducer_DTO_Serializer {
    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(600000); // 每10秒创建一个检查点
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置检查点超时时间为60秒
        env.setRestartStrategy(RestartStrategies.fallBackRestart());
        DataStream<Student> ds = (DataStream<Student>) env.addSource(new SourceFunction<Student>() {

            @Override
            public void run(SourceContext<Student> ctx) throws Exception {
                int i = 1;
                SecureRandom rand = new SecureRandom();

                final List<String> INTERESTS = Arrays.asList(
                    "阅读", "健身", "烹饪", "旅行", "摄影",
                    "编程", "绘画", "音乐", "登山", "骑行",
                    "舞蹈", "桌游", "露营", "钓鱼", "手工DIY"
                );

                final List<String> SubjectList = Arrays.asList(
                    "语文", "数学", "英语", "地理", "历史",
                    "生物", "科学", "物理", "化学", "美术"
                );

                final String[] surnames = {
                    "赵", "钱", "孙", "李", "周", "吴", "郑", "王", "冯", "陈", 
                    "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许"
                };

                final String[] givenNames = {
                    "伟", "芳", "娜", "秀英", "敏静", "丽", "强", "军", "磊", "超",
                    "杰", "婷婷", "鹏", "雪", "慧", "倩", "宇", "晨", "欣", "明"
                };
                String surname = surnames[rand.nextInt(surnames.length)];
                String givenName = givenNames[rand.nextInt(givenNames.length)];

                while (true) {
                    surname = surnames[rand.nextInt(surnames.length)];
                    givenName = givenNames[rand.nextInt(givenNames.length)];
                    Student student = new Student(surname + givenName,i,1,3,INTERESTS.get(rand.nextInt(INTERESTS.size())),SubjectList.get(rand.nextInt(SubjectList.size())),rand.nextInt(155) + 1);
                    ctx.collect(student);
                    System.out.println(student.toString());
                    i++;
                    TimeUnit.SECONDS.sleep(3);
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(1).name("gen-data");

        Properties props = new Properties();
        String broker = parameterTool.get("broker");
        String topic = parameterTool.get("topic");
        // String token = parameterTool.get("token");
        // String clusterName = parameterTool.get("clusterName");
        if (broker != null) {
            props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
        } else {
            throw new RuntimeException("broker为空");
        }

       FlinkKafkaProducer<Student> producer = new FlinkKafkaProducer<Student>(topic,
               new FlinkSerializationSchema(),
               props);
        //    FlinkFixedPartitioner 是 Flink 中 FlinkKafkaProducer 默认使用的分区策略,其核心效果是将 Flink 任务的 并行子任务(subtask) 与 Kafka 的 分区(partition) 建立固定映射关系,确保每个 Flink subtask 始终将数据写入固定的 Kafka 分区
        // 可选:显式设置 FlinkFixedPartitioner(默认无需配置)
        // producer.setCustomPartitioner(new FlinkFixedPartitioner<>());
        //假定kafka有4个分区,并发数4可同时向4个分区写数
        ds.addSink(producer).setParallelism(4).name("kafka-producer");
        env.execute("flink_kfk_demo");
    }
}

定义kafka消费者

定义FlinkKafkaConsumer_DTO_DeSerializer类,用于反序列化数据,同时从kafka里读数据

package com.qogo8.flink_kfk_demo.client;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import com.qogo8.flink_kfk_demo.Serialization.FlinkSerializationSchema;
import com.qogo8.flink_kfk_demo.Student.Student;
import java.util.Properties;
import org.apache.flink.util.Collector;

/**
 * 该类用于测试自定义dto序列化的发送
 */
public class FlinkKafkaConsumer_DTO_DeSerializer {
    public FlinkKafkaConsumer_DTO_DeSerializer() {
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String broker = parameterTool.get("broker");
        String topic = parameterTool.get("topic");
        Properties props = new Properties();
        if (broker != null) {
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, broker);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "student-consumer-group1");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        } else {
            throw new RuntimeException("broker为空");
        }
        
        // 创建 Kafka 消费者
        FlinkKafkaConsumer<Student> consumer = new FlinkKafkaConsumer<Student>(topic, new FlinkSerializationSchema(), props);

        // 添加 Kafka 消费者为数据源
        DataStream<Student> stream = env.addSource(consumer);
        // 简单的数据处理(将输入字符串拆分为单词)
        DataStream<String> words = stream.flatMap(new FlatMapFunction<Student, String>() {
            @Override
            public void flatMap(Student value, Collector<String> out) {
                // 根据实际业务逻辑处理Student对象
                out.collect(value.toString());

                // 这里可以添加你的业务处理逻辑
                processStudent(value);
            }
        });

        // 将处理后的数据打印到控制台
        words.print();

        // 启动作业
        env.execute("Flink Kafka Consumer Job");
    
    }

    private static void processStudent(Student student) {
        // 处理Student对象的业务逻辑
        System.out.println("学生: " + student.getName());
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容