kafka需要将要发送的消息序列化为字节数组才能发送给Boker,kafka Client 自带了几种序列化方式:String、ByteArray、ByteBuffer、Bytes、Double、Long 。但是如果想使用自定义对象序列化的话,我们就需要构建一个自定义的序列化器。自定义的序列化器需要实org.apache.kafka.common.serialization.Serializer
的接口。
1.首先创建一个自定义对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Company {
private String name;
private String address;
}
2.实现Serializer 接口
public class CompanySerializer implements Serializer<Company> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
//进行字节数组序列化
@Override
public byte[] serialize(String topic, Company data) {
if(data == null){
return null;
}
byte[] name, address;
try{
if(data.getName() != null){
name = data.getName().getBytes("UTF-8");
}else {
name = new byte[0];
}
if(data.getAddress() != null){
address = data.getAddress().getBytes("UTF-8");
}else{
address = new byte[0];
}
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);
byteBuffer.putInt(name.length);
byteBuffer.put(name);
byteBuffer.putInt(address.length);
byteBuffer.put(address);
return byteBuffer.array();
}catch (UnsupportedEncodingException e){
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
}
}
此时,自定义序列化器已经做好了,我们就可以使用了。
@Test
public void testSerializer() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//设置key的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
Company company = Company.builder().name("hiddenkafka").address("China").build();
ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
producer.send(record).get();
}