7. kafka序列化&反序列化

序列化

kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<>(props);

属性key.serializervalue.serializer就是key和value指定的序列化方式。无论是key还是value序列化和反序列化实现都是一样的,所以接下来都只以value的序列化和反序列为例。

StringSerializer

StringSerializer是内置的字符串序列化方式,核心源码如下:

/**
 *  String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
 *  value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
 */
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";
    ... ...

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            // 如果数据为空,那么直接返回null即可
            if (data == null)
                return null;
            else
                // 否则将String序列化,即转为byte[]即可
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

自定义序列化

和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口Serializer。假设每个字段按照下图所示的方式自定义序列化:

image.png

下面是一个简单的自定义实现(假设自定义Order类型的Serializer,Order类中有一些String,Integer,Long,Date类型的属性--其他类型暂不支持,读者可以自行扩展):

/**
 * @author wangzhenfei9
 * @version 1.0.0
 * @since 2018年06月22日
 */
public class OrderSerializer implements Serializer<Order> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public byte[] serialize(String topic, Order data){

        if (data == null) {
            return null;
        }

        Class<? extends Order> clazz = data.getClass();
        Field[] fields = clazz.getDeclaredFields();

        // 计算保存Order所示属性值总计需要多少字节
        int total = 0;

        // 遍历所有属性字段, 目前只支持Integer, Long, String, Date类型
        for (Field field:fields){
            String fieldName = field.getName();
            int fieldLen = fieldName.getBytes().length;
            Object value = getPropertyValue(data, fieldName);
            String type = field.getType().getSimpleName();
            System.out.println("propertyName: "+fieldName +", value: " + value
                    +", len: " + fieldLen+", type: " + type);
            // 每个属性序列化方式: 属性长度(都是4个字节)+属性名称(长度需要计算)+值长度+值
            switch (type){
                case "Long":
                    // 第一个4表示属性名长度需要的空间, 即int的长度;(int类型反序列化后需要4个字节长度,即32位)
                    // 第二个fieldLen表示属性名需要的空间
                    // 第三个4表示属性值长度需要的空间
                    // 第四个8表示属性值需要的空间(Long类型反序列化后需要8个字节长度,即64位)
                    total+=(4+fieldLen+4+8);
                    break;
                case "Date":
                    // +8+8
                    // 如果是日期类型先转成Long类型的timestamp
                    total+=(4+fieldLen+4+8);
                    break;
                case "Integer":
                    total+=(4+fieldLen+4+4);
                    break;
                case "String":
                    try {
                        total+=(4+fieldLen + 4 + value.toString().getBytes("utf-8").length);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    break;
                // 本次自定义的Serializer只支持Integer, Long, Date, String类型属性, 如果想要支持其他类型属性, 请自行实现
                default:
                    throw new IllegalArgumentException("Unsupported argument type: "+type);
            }
        }

        // 算出需要个byte[]数组长度后, 分配同样大的byte[]中
        ByteBuffer result = ByteBuffer.allocate(total);

        for (Field field:fields){
            String fieldName = field.getName();
            byte[] fieldByte = fieldName.getBytes();
            int fieldLen = fieldByte.length;

            Object value = getPropertyValue(data, fieldName);
            String type = field.getType().getSimpleName();

            // 无论什么类型都要先put属性长度和属性名称
            result.putInt(fieldLen);
            result.put(fieldByte);

            switch (type){
                case "Long":
                    byte[] longByte = SerializeUtil.longSerialize((Long) value);
                    result.putInt(longByte.length);
                    result.put(longByte);
                    break;
                case "Date":
                    // 如果是日期类型先转成Long类型的timestamp
                    byte[] dateByte = SerializeUtil.longSerialize(((Date) value).getTime());
                    result.putInt(dateByte.length);
                    result.put(dateByte);
                    break;
                case "Integer":
                    byte[] integerByte = SerializeUtil.integerSerialize((Integer) value);
                    result.putInt(integerByte.length);
                    result.put(integerByte);
                    break;
                case "String":
                    byte[] stringByte = ((String)value).getBytes();
                    result.putInt(stringByte.length);
                    result.put(stringByte);
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported argument type: "+type);
            }
        }

        // 返回序列化后的结果
        return result.array();
    }


    private Object getPropertyValue(Order order, String propertyName) {
        Method[] methods = order.getClass().getMethods();
        for (Method method : methods) {
            // 这里方法匹配还不够严谨
            if (method.getName().equalsIgnoreCase("get" + propertyName)
                    || method.getName().equalsIgnoreCase("is" + propertyName)) {
                try {
                    return method.invoke(order);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        return null;
    }

    @Override
    public void close() {
        // nothing to do
    }

}

自定义Serializer后,修改属性value.serializer的值为com.afei.kafka.serialization.OrderSerializer,且ProducerRecord申明为ProducerRecord<String, Order>

反序列化

kafka反序列化消息是在消费端。由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。而构造KafkaConsumer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

属性key.deserializervalue.deserializer就是key和value指定的反序列化方式。

StringDeserializer

StringDeserializer是内置的字符串反序列化方式,核心源码如下:

/**
 *  String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
 *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
 */
public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";
    ... ...

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            // 如果数据为空,那么直接返回null即可
            if (data == null)
                return null;
            else
                // 否则将byte[]反序列化,即转为String即可
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

    ... ...
}

自定义反序列化

和内置的StringDeserializer字符串反序列化一样,如果要自定义反序列化方式,需要实现接口Deserializer。下面是一个简单的自定义实现(假设自定义Order类型的Deserializer,Order类中有一些String,Integer,Long,Date类型的属性--其他类型暂不支持),反序列化就是根据序列化的方式得到序列化前的内容:

/**
 * @author wangzhenfei9
 * @version 1.0.0
 * @since 2018年06月22日
 */
public class OrderDeserializer implements Deserializer<Order> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public Order deserialize(String topic, byte[] bytes) {

        ByteBuffer data = ByteBuffer.wrap(bytes);

        Field[] declaredFields = new Field[0];
        try {
            declaredFields = Class.forName(Order.class.getCanonicalName()).getDeclaredFields();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

        int propertyCount = declaredFields.length;

        Order order = new Order();

        while ((propertyCount--)>0){
            int propertyLen = data.getInt();
            byte[] nameByte = new byte[propertyLen];
            data.get(nameByte);
            String propertyName = new String(nameByte);
            //根据属性名称得到属性的类型
            String propertyType = getPropertyType(Order.class, propertyName);

            int valueLen = data.getInt();
            byte[] valueByte = new byte[valueLen];
            data.get(valueByte);

            Object value;

            switch (propertyType ){
                case "Long":
                    value = DeserializeUtil.longDeserialize(valueByte);
                    break;
                case "Date":
                    // 如果是日期类型先反序列化成Long类型, 然后得到Date类型的值
                    // result.putLong(((Date) value).getTime());
                    value = new Date(DeserializeUtil.longDeserialize(valueByte));
                    break;
                case "Integer":
                    value = DeserializeUtil.integerDeserialize(valueByte);
                    break;
                case "String":
                    value = new String(valueByte);
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported argument type: "+propertyType);
            }

            setPropertyValue(order, propertyName, value);
            System.out.println("property = "+propertyLen+", name = "+propertyName
                    +", value = "+valueLen+", name = "+value);
        }
        return order;
    }

    private String getPropertyType(Class<? extends Order> clazz, String propertyName) {
         /*
        * 得到类中的方法
        */
        try {
            Field field = clazz.getDeclaredField(propertyName);
            return field.getType().getSimpleName();
        } catch (NoSuchFieldException e) {

        }
        return null;
    }

    /**
     * 调用Order中属性${propertyName}的setter方法并赋值${propertyValue}
     */
    private void setPropertyValue(Order order, String propertyName, Object propertyValue) {
         /*
        * 得到类中的方法
        */
        Method[] methods = order.getClass().getMethods();
        for (Method method : methods) {
            // 这里方法匹配还不够严谨
            if (method.getName().equalsIgnoreCase("set" + propertyName)) {
                try {
                    method.invoke(order, propertyValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void close() {
        // nothing to do
    }

}

自定义Serializer后,修改属性value.serializer的值为com.afei.kafka.serialization.OrderDeserializer,且KafkaConsumer申明为KafkaConsumer<String, Order>

总结

可以看到,自定义Serializer和Deserializer非常痛苦,而且上面还有很多异常情况没有处理,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化推荐直接使用StringSerializerStringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 前言 在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取...
    生活的探路者阅读 2,067评论 0 6
  • 发行说明 - Kafka - 版本1.0.0 以下是Kafka 1.0.0发行版中解决的JIRA问题的摘要。有关该...
    全能程序猿阅读 2,859评论 2 7
  • 主要内容 理论部分 常见应用 理论部分 iOS中事件(UIEvent)主要是以下几种,本文主要是分析触控事件(UI...
    mtry阅读 483评论 0 0
  • 你的心情自己做主 1 她是我的老师。学习、工作、生活中都是。 2010年9月2号,她在去学校的路上被一场突如其来的...
    我在他城阅读 354评论 0 2