消息序列化
网络数据的传输是字节形式的,kafka 支持发送各种类型的消息,例如字符串、数字、数组、对象,这就需要序列化器在发送前将消息转换成字节数组,接收消息后再将字节数组转换成相应的对象。
kafka 提供了多种序列化器,例如StringSerializer、LongSerializer、IntegerSerializer、BytesSerializer,对于我们自己定义的对象,就需要自定义序列化器。
下面看一下如何自定义序列化器。
自定义序列化器
- 创建 topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
- 创建项目
$ mvn archetype:generate -DgroupId=com.kafkademo -DartifactId=KafkaSerializer -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
- 添加依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
<!-- 指定jdk1.8 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
- 自定义对象
package com.kafkademo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
@Data
@AllArgsConstructor
@ToString
public class User {
private String firstName;
private String lastName;
private int age;
private String address;
}
- User 对象序列化器
package com.kafkademo;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class UserSerializer implements Serializer {
private ObjectMapper objectMapper;
public void configure(Map configs, boolean isKey) {
objectMapper = new ObjectMapper();
}
public byte[] serialize(String topic, Object data) {
byte[] ret = null;
try {
ret = objectMapper.writeValueAsString(data).getBytes("utf-8");
} catch (Exception e) {
e.printStackTrace();
}
return ret;
}
public void close() {
}
}
- 消息发送
package com.kafkademo;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定 value 的序列化器为我们自定义的 UserSerializer
props.put("value.serializer", "com.kafkademo.UserSerializer");
String topic = "test";
Producer<String, User> producer = new KafkaProducer<String, User>(props);
User user = new User("Zhao", "Si", 33, "Beijing, China");
ProducerRecord<String, User> record = new ProducerRecord<>(topic, user);
producer.send(record).get();
producer.close();
}
}
- 运行
$ mvn compile
$ mvn exec:java -Dexec.mainClass="com.kafkademo.ProducerTest"
- 验证
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 输出信息
{"firstName":"Zhao","lastName":"Si","age":33,"address":"Beijing, China"}
可以看到序列化后的User对象。