kudu增删改查(orm方式)

由于presto对kudu进行数据插入过慢,采用kudu-client之后写入可达每秒10w条,所以对kudu-client进行封装

 <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.10.0</version>
        </dependency>

kudu util工具类

public class KuduUtil {

    public static KuduClient client = SpringUtil.getBean("kuduClient", KuduClient.class);

    public static HashMap<String, KuduTable> tables = new HashMap<>();

    public static KuduSession getBatchSession(int batchSize) {
        KuduSession session = client.newSession();
        session.setTimeoutMillis(60000);
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
        session.setFlushMode(mode);
        session.setMutationBufferSpace(batchSize + 10);
        return session;
    }

    public static KuduSession getCommSession() {
        KuduSession session = client.newSession();
        session.setTimeoutMillis(60000);
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
        session.setFlushMode(mode);
        return session;
    }

    private static <T> void dealRow(PartialRow row, Field[] fields, T instance) {

        for (Field field : fields) {
            TableField coulmn = field.getAnnotation(TableField.class);
            TableId idColumn = field.getAnnotation(TableId.class);
            Class type = field.getType();
            String key = "";
            if (coulmn != null) {
                key = coulmn.value();
            } else if (idColumn != null) {
                key = idColumn.value();
            } else if (coulmn == null && idColumn == null) {
                if ("serialVersionUID".equals(field.getName())) {
                    continue;
                } else {
                    throw new RuntimeException(field + " filed should contain com.baomidou.mybatisplus." +
                            "annotation.TableField/TableId Annotation");
                }
            }
            key = key.toLowerCase();
            Object fieldVal = getFieldValueByName(instance, field.getName());
            if (fieldVal == null) {
                row.setNull(key);
            } else if (type.equals(String.class)) {
                row.addString(key, (String) fieldVal);

            } else if (type.equals(Long.class)) {
                row.addLong(key, (Long) fieldVal);
            } else if (type.equals(Integer.class)) {
                row.addInt(key, (Integer) fieldVal);
            } else if (type.equals(BigDecimal.class)) {
                row.addDecimal(key, (BigDecimal) fieldVal);
            } else if (type.equals(Date.class)) {
                //java Date.getTime()返回的是毫秒
                //kudu中时间精确到纳秒,所以要*1000
                row.addLong(key, ((Date) fieldVal).getTime() * 1000);
            } else if (type.equals(LocalDateTime.class)) {
                row.addLong(key, ((LocalDateTime) fieldVal).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() * 1000);
            } else if (type.equals(Double.class) || (type.equals(Float.class))) {
                row.addDecimal(key, new BigDecimal(String.valueOf(fieldVal)));
            } else {
                throw new RuntimeException("error Type");
            }

        }

    }

    private static KuduTable getKuduTable(String kuduTable) {
        KuduTable table = null;
        try {
            if (tables.containsKey(kuduTable)) {
                table = tables.get(kuduTable);
            } else {
                table = client.openTable(kuduTable);
                tables.put(kuduTable, table);
            }

        } catch (KuduException e) {
            e.printStackTrace();
            throw new RuntimeException("error open table " + kuduTable, e);
        }
        if (table == null) {
            throw new RuntimeException("error open table " + kuduTable);
        }
        return table;
    }


    public static <T> void save(T object) {
        if (CommonUtil.isNull(object)) {
            return;
        }
        Field[] fields = object.getClass().getDeclaredFields();
        LyKuduTable ano = object.getClass().getAnnotation(LyKuduTable.class);
        KuduTable table = getKuduTable(ano.value());
        try {
            Upsert upsert = table.newUpsert();
            KuduSession kuduSession = getCommSession();
            PartialRow row = upsert.getRow();
            dealRow(row, fields, object);
            kuduSession.apply(upsert);
            kuduSession.close();
        } catch (KuduException e) {
            throw new RuntimeException("kudu error", e);
        }
    }

    public static <T> void save(List<T> objects) {
        if (CommonUtil.isNull(objects)) {
            return;
        }
        Field[] fields = objects.get(0).getClass().getDeclaredFields();
        LyKuduTable ano = objects.get(0).getClass().getAnnotation(LyKuduTable.class);
        KuduTable table = getKuduTable(ano.value());
        try {
            KuduSession kuduSession = getBatchSession(objects.size());
            for (T instance : objects) {
                Upsert upsert = table.newUpsert();
                PartialRow row = upsert.getRow();
                dealRow(row, fields, instance);
                kuduSession.apply(upsert);
            }

            kuduSession.flush();
            kuduSession.close();
        } catch (KuduException e) {
            throw new RuntimeException("kudu error", e);
        }
    }

    private static <T> Object getFieldValueByName(T instance, String fieldName) {
        Object val = null;
        try {
            String firstLetter = fieldName.substring(0, 1).toUpperCase();
            String getter = "get" + firstLetter + fieldName.substring(1);
            Method method = instance.getClass().getMethod(getter);
            val = method.invoke(instance);
        } catch (Exception e) {
            throw new RuntimeException("获取对象值失败" + e.toString());
        }
        return val;
    }

    public static <T> void deteteById(Class<T> classs, Object id) {
        if (CommonUtil.isNull(id)) {
            return;
        }
        Field[] fields = classs.getDeclaredFields();
        LyKuduTable ano = classs.getAnnotation(LyKuduTable.class);
        KuduTable table = getKuduTable(ano.value());
        try {
            KuduSession kuduSession = getCommSession();
            Delete delete = table.newDelete();
            PartialRow row = delete.getRow();
            dealDeleteRow(row, fields, id);
            kuduSession.apply(delete);
            kuduSession.close();
        } catch (KuduException e) {
            throw new RuntimeException("kudu error", e);
        }
    }

    private static <T> void dealDeleteRow(PartialRow row, Field[] fields, Object fieldVal) {

        for (Field field : fields) {
            TableId idColumn = field.getAnnotation(TableId.class);
            Class type = field.getType();
            String key = "";
            if (idColumn != null) {
                key = idColumn.value();
                key = key.toLowerCase();
                if (type.equals(String.class)) {
                    row.addString(key, (String) fieldVal);
                } else if (type.equals(Long.class)) {
                    row.addLong(key, (Long) fieldVal);
                } else if (type.equals(Integer.class)) {
                    row.addInt(key, (Integer) fieldVal);
                } else if (type.equals(BigDecimal.class)) {
                    row.addDecimal(key, (BigDecimal) fieldVal);
                } else if (type.equals(Date.class)) {
                    //java Date.getTime()返回的是毫秒
                    //kudu中时间精确到纳秒,所以要*1000
                    row.addLong(key, ((Date) fieldVal).getTime() * 1000);
                } else if (type.equals(LocalDateTime.class)) {
                    row.addLong(key, ((LocalDateTime) fieldVal).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() * 1000);
                } else if (type.equals(Double.class) || (type.equals(Float.class))) {
                    row.addDecimal(key, new BigDecimal(String.valueOf(fieldVal)));
                } else {
                    throw new RuntimeException("error Type");
                }
                break;
            }

        }

    }

    public static <T> void saveBatch(List<T> objects) {
        if (CommonUtil.isNull(objects)) {
            return;
        }
        Field[] fields = objects.get(0).getClass().getDeclaredFields();
        LyKuduTable ano = objects.get(0).getClass().getAnnotation(LyKuduTable.class);
        KuduTable table = getKuduTable(ano.value());
        try {
            KuduSession kuduSession = getBatchSession(objects.size());
            int unCommit = 0;
            long startTime = System.currentTimeMillis();
            for (T instance : objects) {
                Upsert upsert = table.newUpsert();
                PartialRow row = upsert.getRow();
                dealRow(row, fields, instance);
                kuduSession.apply(upsert);

                unCommit = unCommit + 1;
                if (unCommit > 60000) {
                    kuduSession.flush();
                    unCommit = 0;
                }
            }
            kuduSession.flush();
            kuduSession.close();
            SkyLoggerClient.info("ERP导入KUDU", "ERP导入KUDU", "导入kudu完成,耗时" + (System.currentTimeMillis() - startTime) + "ms");
        } catch (KuduException e) {
            SkyLoggerClient.error("ERP导入KUDU", "ERP导入KUDU", e.getMessage());
            throw new RuntimeException("kudu error", e);
        }
    }

}

自定义kudu注解

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface LyKuduTable {
    String value() ;
}

实体类

import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import xxxx.java.datacenter.common.annotation.LyKuduTable;

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("[Order]")
@ApiModel(value="ErpOrder对象", description="订单表")
@LyKuduTable("presto::erprf.order")
public class ErpOrder extends Model<ErpOrder> {

    private static final long serialVersionUID = 1L;

    @ApiModelProperty(value = "订单Id")
    @TableId("OrderId")
    private String OrderId;

    @ApiModelProperty(value = "多级分销订单ID")
    @TableField("OrderFullPathId")
    private String OrderFullPathId;

    @ApiModelProperty(value = "订单号")
    @TableField("OrderCode")
    private String OrderCode;

     .....
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容