Hive-ObjectInspector

Serde

Serde是什么:Serde实现数据序列化和反序列化以及提供一个辅助类ObjectInspector帮助使用者访问需要序列化或者反序列化的对象。

Serde层构建在数据存储和执行引擎之间,实现数据存储+中间数据存储和执行引擎的解耦。

public abstract class AbstractSerDe implements SerDe {
    public abstract void initialize(Configuration paramConfiguration, Properties paramProperties) throws SerDeException;

    public abstract Class<? extends Writable> getSerializedClass();

    public abstract Writable serialize(Object paramObject, ObjectInspector paramObjectInspector) throws SerDeException;

    public abstract SerDeStats getSerDeStats();

    public abstract Object deserialize(Writable paramWritable) throws SerDeException;

    public abstract ObjectInspector getObjectInspector() throws SerDeException;
}

这里为什么提到数据存储和中间数据存储两个概念,因为数据序列化和反序列化不仅仅用在对目标文件的读取和结果数据写入,还需要实现中间结果保存和传输,Hive最终会将SQL转化为mapreduce程序,而mapreduce程序需要读取原始数据,并将最终的结果数据写入存储介质,Serde一方面用在针对inputformat中RecordReader读取数据的解析和最终结果的保存,另一方面,在map和reduce之间有一层shuffle,中间结果由Hadoop完成shuffle后也需要读取并反序列化成内部的object,这个object实际上通常是一个Array或者list,但hive会提供一个StandardStructObjectInspector给用户进行该Object的访问。

上一个Operator的输出是下一个Operator的输入,每一个Operator处理后,数据已经发生变化,比如SelectOperator会将用户没有选择的列去掉(如果数据读取端没有按列存储,一般将读取所有列数据上来),再比如经过groupby算子(map阶段被下推的group by)的数据,key已经变为聚类的列,value已经变成局部聚类的结果,所以每一个operator必须知道上一个operator结果数据的格式,我们先看下operator对象的一个重要方法:

void forward(Object row, ObjectInspector rowInspector);其实rowInspector这个参数已经没用了。

每一个operator处理完数据后就通过该方法将结果推给下一个operator,但是针对ReduceSinkOperator是个例外,它需要把中间结果序列化交给hadoop框架进行数据shuffle,在reduce时,再反序列化回来(通过上文提到的Serde),继续遍历Operator树进行处理。

为什么说这里的rowInspector参数已经没用了呢,我们先看ExecMapper的configure方法,ExecMapper是唯一一个map实现类,针对reducer还有一个ExecReducer:这里只列出了相关内容的代码。

public void configure(JobConf job){
    //实例化一个MapOperator作为map执行的起点,保存一些上下文对象
    mo = new MapOperator();
    //这里的初始化方法其实调用的是Operator的初始化方法,最终调用到了Operator的initializeOp方法。
    mo.initialize(jc, null);
}

每个Operator在initializeOp方法中实例化自己的outputObjInspector对象,因为只有自己知道自己需要产生的数据是什么结构,并将该outputObjInspector最为参数调用自己childOperator的initialize(Configuration hconf, ObjectInspector inputOI, int parentId)方法,这样自己的childOperator就拿到了上一个Operator生成结果对象的ObjectOperator,也就是说,上一个Operator把数据对象推下来后,自己已经知道这个对象是什么结构了,至此完成了整个operator树的初始化。

ObjectInspector

public abstract interface ObjectInspector extends Cloneable {
    public abstract String getTypeName();

    public abstract Category getCategory();

    public static enum Category {
        PRIMITIVE, LIST, MAP, STRUCT, UNION;
    }
}

通过getCategory方法可以知道该ObjectInspector是什么类型,我们通过一个实例了解该对象的使用,通过ObjectInspector帮助我们序列化对象。

    static void serialize(OutputByteBuffer buffer, Object o, 
            ObjectInspector oi, boolean invert){
        switch (oi.getCategory()) {
        case PRIMITIVE:{
            PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
            switch (poi.getPrimitiveCategory()) {
            case BOOLEAN:
                boolean v=((BooleanObjectInspector)poi).get(o);
                buffer.write((byte) (v ? 2 : 1), invert);
                break;          
            default:
                break;
            }
        }
        case STRUCT:{
            StructObjectInspector soi=(StructObjectInspector)oi;
            List<? extends StructField> fields = soi.getAllStructFieldRefs();
            for (int i = 0; i < fields.size(); i++) {
                serialize(buffer, soi.getStructFieldData(o, fields.get(i)), 
                        fields.get(i).getFieldObjectInspector(), invert);
            }
            return;
        }
        default:
            break;
        }
    }

primitiveObjectInspector

继承关系如下:


首先查看primitiveObjectInspector接口如下:

public abstract interface PrimitiveObjectInspector extends ObjectInspector {
    public abstract PrimitiveTypeInfo getTypeInfo();

    public abstract PrimitiveCategory getPrimitiveCategory();

    public abstract Class<?> getPrimitiveWritableClass();

    public abstract Object getPrimitiveWritableObject(Object paramObject);

    public abstract Class<?> getJavaPrimitiveClass();

    public abstract Object getPrimitiveJavaObject(Object paramObject);

    public abstract Object copyObject(Object paramObject);

    public abstract boolean preferWritable();

    public abstract int precision();

    public abstract int scale();

    public static enum PrimitiveCategory {
        VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING, DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, UNKNOWN;
    }
}

可以根据PrimitiveCategory得知具体是哪个基本类型。DoubleObjectInspector在原接口基础上提供了get方法。public abstract double get(Object paramObject);SettableDoubleObjectInspector接口提供了
set和create方法。

WritableDoubleObjectInspector实现如下:

public class WritableDoubleObjectInspector extends AbstractPrimitiveWritableObjectInspector
        implements SettableDoubleObjectInspector {
    WritableDoubleObjectInspector() {
        super(TypeInfoFactory.doubleTypeInfo);
    }

    public double get(Object o) {
        return ((DoubleWritable) o).get();
    }

    public Object copyObject(Object o) {
        return new DoubleWritable(((DoubleWritable) o).get());
    }

    public Object getPrimitiveJavaObject(Object o) {
        return ((o == null) ? null : Double.valueOf(((DoubleWritable) o).get()));
    }

    public Object create(double value) {
        return new DoubleWritable(value);
    }

    public Object set(Object o, double value) {
        ((DoubleWritable) o).set(value);
        return o;
    }
}

注意这里面返回的都是writable形式的,如DoubleWritable。不是java中基本类型。

ListObjectInspector

list结构使用ListObjectInspector类型来表示。
层次关系如下:

接口如下:

public abstract interface ListObjectInspector extends ObjectInspector {
    public abstract ObjectInspector getListElementObjectInspector();//获取list中element的类型

    public abstract Object getListElement(Object paramObject, int paramInt);

    public abstract int getListLength(Object paramObject);

    public abstract List<?> getList(Object paramObject);
}

SettableListObjectInspector,继承ListObjectInspector接口,提供接口如下:

public abstract interface SettableListObjectInspector extends ListObjectInspector {
    public abstract Object create(int paramInt);

    public abstract Object set(Object paramObject1, int paramInt, Object paramObject2);

    public abstract Object resize(Object paramObject, int paramInt);
}

StandardListObjectInspector是一个标准实现。实现见代码。比较简单。

/*** Eclipse Class Decompiler plugin, copyright (c) 2016 Chen Chao (cnfree2000@hotmail.com) ***/
package org.apache.hadoop.hive.serde2.objectinspector;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class StandardListObjectInspector implements SettableListObjectInspector {
    private ObjectInspector listElementObjectInspector;//元素类型

    protected StandardListObjectInspector() {
    }

    protected StandardListObjectInspector(ObjectInspector listElementObjectInspector) {
        this.listElementObjectInspector = listElementObjectInspector;
    }

    public final ObjectInspector.Category getCategory() {
        return ObjectInspector.Category.LIST;
    }

    public ObjectInspector getListElementObjectInspector() {
        return this.listElementObjectInspector;
    }

    public Object getListElement(Object data, int index) {
        if (data == null) {
            return null;
        }

        boolean isArray = !(data instanceof List);
        if (isArray) {
            Object[] list = (Object[]) (Object[]) data;
            if ((index < 0) || (index >= list.length)) {
                return null;
            }
            return list[index];
        }
        List list = (List) data;
        if ((index < 0) || (index >= list.size())) {
            return null;
        }
        return list.get(index);
    }

    public int getListLength(Object data) {
        if (data == null) {
            return -1;
        }

        boolean isArray = !(data instanceof List);
        if (isArray) {
            Object[] list = (Object[]) (Object[]) data;
            return list.length;
        }
        List list = (List) data;
        return list.size();
    }

    public List<?> getList(Object data) {
        if (data == null) {
            return null;
        }

        if (!(data instanceof List)) {
            data = Arrays.asList((Object[]) (Object[]) data);
        }
        List list = (List) data;
        return list;
    }

    public String getTypeName() {
        return "array<" + this.listElementObjectInspector.getTypeName() + ">";
    }

    public Object create(int size) {
        List a = new ArrayList(size);
        for (int i = 0; i < size; ++i) {
            a.add(null);
        }
        return a;
    }

    public Object resize(Object list, int newSize) {
        List a = (List) list;
        while (a.size() < newSize) {
            a.add(null);
        }
        while (a.size() > newSize) {
            a.remove(a.size() - 1);
        }
        return a;
    }

    public Object set(Object list, int index, Object element) {
        List a = (List) list;
        a.set(index, element);
        return a;
    }
}

MapObjectInspector

继承关系类似list。

MapObjectInspector接口主要定义了key和value类型的获取方法,获取map元素,获取一个map对象,及map大小等通用接口。

public abstract interface MapObjectInspector extends ObjectInspector {
    public abstract ObjectInspector getMapKeyObjectInspector();

    public abstract ObjectInspector getMapValueObjectInspector();

    public abstract Object getMapValueElement(Object paramObject1, Object paramObject2);

    public abstract Map<?, ?> getMap(Object paramObject);

    public abstract int getMapSize(Object paramObject);
}

SettableMapObjectInspector接口主要提供了添加 移除 创建等接口、

public abstract interface SettableMapObjectInspector extends MapObjectInspector {
    public abstract Object create();

    public abstract Object put(Object paramObject1, Object paramObject2, Object paramObject3);

    public abstract Object remove(Object paramObject1, Object paramObject2);

    public abstract Object clear(Object paramObject);
}

StandardMapObjectInspector一个标准实现,实现如下:

public class StandardMapObjectInspector implements SettableMapObjectInspector {
    private ObjectInspector mapKeyObjectInspector;
    private ObjectInspector mapValueObjectInspector;

    protected StandardMapObjectInspector() {
    }

    protected StandardMapObjectInspector(ObjectInspector mapKeyObjectInspector,
            ObjectInspector mapValueObjectInspector) {
        this.mapKeyObjectInspector = mapKeyObjectInspector;
        this.mapValueObjectInspector = mapValueObjectInspector;
    }

    public ObjectInspector getMapKeyObjectInspector() {
        return this.mapKeyObjectInspector;
    }

    public ObjectInspector getMapValueObjectInspector() {
        return this.mapValueObjectInspector;
    }

    public Object getMapValueElement(Object data, Object key) {
        if ((data == null) || (key == null)) {
            return null;
        }
        Map map = (Map) data;
        return map.get(key);
    }

    public int getMapSize(Object data) {
        if (data == null) {
            return -1;
        }
        Map map = (Map) data;
        return map.size();
    }

    public Map<?, ?> getMap(Object data) {
        if (data == null) {
            return null;
        }
        Map map = (Map) data;
        return map;
    }

    public final ObjectInspector.Category getCategory() {
        return ObjectInspector.Category.MAP;
    }

    public String getTypeName() {
        return "map<" + this.mapKeyObjectInspector.getTypeName() + "," + this.mapValueObjectInspector.getTypeName()
                + ">";
    }

    public Object create() {
        Map m = new HashMap();
        return m;
    }

    public Object clear(Object map) {
        Map m = (HashMap) map;
        m.clear();
        return m;
    }

    public Object put(Object map, Object key, Object value) {
        Map m = (HashMap) map;
        m.put(key, value);
        return m;
    }

    public Object remove(Object map, Object key) {
        Map m = (HashMap) map;
        m.remove(key);
        return m;
    }
}

StructObjectInspector

StructObjectInspector是一个抽象类。

public abstract class StructObjectInspector implements ObjectInspector {
    public abstract List<? extends StructField> getAllStructFieldRefs();

    public abstract StructField getStructFieldRef(String paramString);

    public abstract Object getStructFieldData(Object paramObject, StructField paramStructField);

    public abstract List<Object> getStructFieldsDataAsList(Object paramObject);

    public boolean isSettable() {
        return false;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        List fields = getAllStructFieldRefs();
        sb.append(super.getClass().getName());
        sb.append("<");
        for (int i = 0; i < fields.size(); ++i) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(((StructField) fields.get(i)).getFieldObjectInspector().toString());
        }
        sb.append(">");
        return sb.toString();
    }
}

我们来看一个标准实现。StandardStructObjectInspector。
其中 包含一个静态类MyField,继承StructField。定义了fieldname 和fieldcoment 以及fieldobjectInspector等属性。
主要实现如下:

protected static class MyField implements StructField {
        protected int fieldID;
        protected String fieldName;
        protected ObjectInspector fieldObjectInspector;
        protected String fieldComment;

        protected MyField() {
        }

        public MyField(int fieldID, String fieldName, ObjectInspector fieldObjectInspector) {
            this.fieldID = fieldID;
            this.fieldName = fieldName.toLowerCase();
            this.fieldObjectInspector = fieldObjectInspector;
        }

        public MyField(int fieldID, String fieldName, ObjectInspector fieldObjectInspector, String fieldComment) {
            this(fieldID, fieldName, fieldObjectInspector);
            this.fieldComment = fieldComment;
        }
    }

StandardStructObjectInspector中定义了一个list对象,来存储field。protected List<MyField> fields;

通过init方法来初始化。

protected StandardStructObjectInspector(List<String> structFieldNames,
            List<ObjectInspector> structFieldObjectInspectors) {
        init(structFieldNames, structFieldObjectInspectors, null);
    }

    protected StandardStructObjectInspector(List<String> structFieldNames,
            List<ObjectInspector> structFieldObjectInspectors, List<String> structFieldComments) {
        init(structFieldNames, structFieldObjectInspectors, structFieldComments);
    }

    protected void init(List<String> structFieldNames, List<ObjectInspector> structFieldObjectInspectors,
            List<String> structFieldComments) {
        assert (structFieldNames.size() == structFieldObjectInspectors.size());
        assert ((structFieldComments == null) || (structFieldNames.size() == structFieldComments.size()));

        this.fields = new ArrayList(structFieldNames.size());
        for (int i = 0; i < structFieldNames.size(); ++i)
            this.fields.add(new MyField(i, (String) structFieldNames.get(i),
                    (ObjectInspector) structFieldObjectInspectors.get(i),
                    (structFieldComments == null) ? null : (String) structFieldComments.get(i)));
    }

    protected StandardStructObjectInspector(List<StructField> fields) {
        init(fields);
    }

    protected void init(List<StructField> fields) {
        this.fields = new ArrayList(fields.size());
        for (int i = 0; i < fields.size(); ++i)
            this.fields.add(new MyField(i, ((StructField) fields.get(i)).getFieldName(),
                    ((StructField) fields.get(i)).getFieldObjectInspector()));
    }

获取某一个fieldname对应的fieldcomment方法实现如下:

public Object getStructFieldData(Object data, StructField fieldRef) {
        if (data == null) {
            return null;
        }

        boolean isArray = !(data instanceof List);
        if ((!(isArray)) && (!(data instanceof List))) {
            return data;
        }
        int listSize = (isArray) ? ((Object[]) (Object[]) data).length : ((List) data).size();

        MyField f = (MyField) fieldRef;
        if ((this.fields.size() != listSize) && (!(this.warned))) {
            this.warned = true;
            LOG.warn("Trying to access " + this.fields.size() + " fields inside a list of " + listSize + " elements: "
                    + ((isArray) ? Arrays.asList((Object[]) (Object[]) data) : (List) data));

            LOG.warn("ignoring similar errors.");
        }
        int fieldID = f.getFieldID();
        assert ((fieldID >= 0) && (fieldID < this.fields.size()));

        if (fieldID >= listSize)
            return null;
        if (isArray) {
            return ((Object[]) (Object[]) data)[fieldID];
        }
        return ((List) data).get(fieldID);
    }

其他方法:

public String getTypeName() {
        return ObjectInspectorUtils.getStandardStructTypeName(this);
    }

    public final ObjectInspector.Category getCategory() {
        return ObjectInspector.Category.STRUCT;
    }

    public StructField getStructFieldRef(String fieldName) {
        return ObjectInspectorUtils.getStandardStructFieldRef(fieldName, this.fields);
    }

这里面使用了一个工具类ObjectInspectorUtils。

ObjectInspectorUtils

主要提供一些工具方法、

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,605评论 18 399
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,221评论 11 349
  • 1. 曾经一度,情商在“不识愁滋味”的少年那里,是贬义词。 你我都暗自发过誓:我要做那个出淤泥而不染的人,我不允许...
    钱饭饭阅读 3,566评论 9 41
  • 作为一个好学生,从小的应试教育让我成为一个勤奋刻苦(死记硬背)的人。 举个例子,高中时数学总是错题,搞不明白,老师...
    张严心阅读 673评论 0 0