由于presto对kudu进行数据插入过慢,采用kudu-client之后写入可达每秒10w条,所以对kudu-client进行封装
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
</dependency>
kudu util工具类
public class KuduUtil {
public static KuduClient client = SpringUtil.getBean("kuduClient", KuduClient.class);
public static HashMap<String, KuduTable> tables = new HashMap<>();
public static KuduSession getBatchSession(int batchSize) {
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
session.setFlushMode(mode);
session.setMutationBufferSpace(batchSize + 10);
return session;
}
public static KuduSession getCommSession() {
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
session.setFlushMode(mode);
return session;
}
private static <T> void dealRow(PartialRow row, Field[] fields, T instance) {
for (Field field : fields) {
TableField coulmn = field.getAnnotation(TableField.class);
TableId idColumn = field.getAnnotation(TableId.class);
Class type = field.getType();
String key = "";
if (coulmn != null) {
key = coulmn.value();
} else if (idColumn != null) {
key = idColumn.value();
} else if (coulmn == null && idColumn == null) {
if ("serialVersionUID".equals(field.getName())) {
continue;
} else {
throw new RuntimeException(field + " filed should contain com.baomidou.mybatisplus." +
"annotation.TableField/TableId Annotation");
}
}
key = key.toLowerCase();
Object fieldVal = getFieldValueByName(instance, field.getName());
if (fieldVal == null) {
row.setNull(key);
} else if (type.equals(String.class)) {
row.addString(key, (String) fieldVal);
} else if (type.equals(Long.class)) {
row.addLong(key, (Long) fieldVal);
} else if (type.equals(Integer.class)) {
row.addInt(key, (Integer) fieldVal);
} else if (type.equals(BigDecimal.class)) {
row.addDecimal(key, (BigDecimal) fieldVal);
} else if (type.equals(Date.class)) {
//java Date.getTime()返回的是毫秒
//kudu中时间精确到纳秒,所以要*1000
row.addLong(key, ((Date) fieldVal).getTime() * 1000);
} else if (type.equals(LocalDateTime.class)) {
row.addLong(key, ((LocalDateTime) fieldVal).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() * 1000);
} else if (type.equals(Double.class) || (type.equals(Float.class))) {
row.addDecimal(key, new BigDecimal(String.valueOf(fieldVal)));
} else {
throw new RuntimeException("error Type");
}
}
}
private static KuduTable getKuduTable(String kuduTable) {
KuduTable table = null;
try {
if (tables.containsKey(kuduTable)) {
table = tables.get(kuduTable);
} else {
table = client.openTable(kuduTable);
tables.put(kuduTable, table);
}
} catch (KuduException e) {
e.printStackTrace();
throw new RuntimeException("error open table " + kuduTable, e);
}
if (table == null) {
throw new RuntimeException("error open table " + kuduTable);
}
return table;
}
public static <T> void save(T object) {
if (CommonUtil.isNull(object)) {
return;
}
Field[] fields = object.getClass().getDeclaredFields();
LyKuduTable ano = object.getClass().getAnnotation(LyKuduTable.class);
KuduTable table = getKuduTable(ano.value());
try {
Upsert upsert = table.newUpsert();
KuduSession kuduSession = getCommSession();
PartialRow row = upsert.getRow();
dealRow(row, fields, object);
kuduSession.apply(upsert);
kuduSession.close();
} catch (KuduException e) {
throw new RuntimeException("kudu error", e);
}
}
public static <T> void save(List<T> objects) {
if (CommonUtil.isNull(objects)) {
return;
}
Field[] fields = objects.get(0).getClass().getDeclaredFields();
LyKuduTable ano = objects.get(0).getClass().getAnnotation(LyKuduTable.class);
KuduTable table = getKuduTable(ano.value());
try {
KuduSession kuduSession = getBatchSession(objects.size());
for (T instance : objects) {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
dealRow(row, fields, instance);
kuduSession.apply(upsert);
}
kuduSession.flush();
kuduSession.close();
} catch (KuduException e) {
throw new RuntimeException("kudu error", e);
}
}
private static <T> Object getFieldValueByName(T instance, String fieldName) {
Object val = null;
try {
String firstLetter = fieldName.substring(0, 1).toUpperCase();
String getter = "get" + firstLetter + fieldName.substring(1);
Method method = instance.getClass().getMethod(getter);
val = method.invoke(instance);
} catch (Exception e) {
throw new RuntimeException("获取对象值失败" + e.toString());
}
return val;
}
public static <T> void deteteById(Class<T> classs, Object id) {
if (CommonUtil.isNull(id)) {
return;
}
Field[] fields = classs.getDeclaredFields();
LyKuduTable ano = classs.getAnnotation(LyKuduTable.class);
KuduTable table = getKuduTable(ano.value());
try {
KuduSession kuduSession = getCommSession();
Delete delete = table.newDelete();
PartialRow row = delete.getRow();
dealDeleteRow(row, fields, id);
kuduSession.apply(delete);
kuduSession.close();
} catch (KuduException e) {
throw new RuntimeException("kudu error", e);
}
}
private static <T> void dealDeleteRow(PartialRow row, Field[] fields, Object fieldVal) {
for (Field field : fields) {
TableId idColumn = field.getAnnotation(TableId.class);
Class type = field.getType();
String key = "";
if (idColumn != null) {
key = idColumn.value();
key = key.toLowerCase();
if (type.equals(String.class)) {
row.addString(key, (String) fieldVal);
} else if (type.equals(Long.class)) {
row.addLong(key, (Long) fieldVal);
} else if (type.equals(Integer.class)) {
row.addInt(key, (Integer) fieldVal);
} else if (type.equals(BigDecimal.class)) {
row.addDecimal(key, (BigDecimal) fieldVal);
} else if (type.equals(Date.class)) {
//java Date.getTime()返回的是毫秒
//kudu中时间精确到纳秒,所以要*1000
row.addLong(key, ((Date) fieldVal).getTime() * 1000);
} else if (type.equals(LocalDateTime.class)) {
row.addLong(key, ((LocalDateTime) fieldVal).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() * 1000);
} else if (type.equals(Double.class) || (type.equals(Float.class))) {
row.addDecimal(key, new BigDecimal(String.valueOf(fieldVal)));
} else {
throw new RuntimeException("error Type");
}
break;
}
}
}
public static <T> void saveBatch(List<T> objects) {
if (CommonUtil.isNull(objects)) {
return;
}
Field[] fields = objects.get(0).getClass().getDeclaredFields();
LyKuduTable ano = objects.get(0).getClass().getAnnotation(LyKuduTable.class);
KuduTable table = getKuduTable(ano.value());
try {
KuduSession kuduSession = getBatchSession(objects.size());
int unCommit = 0;
long startTime = System.currentTimeMillis();
for (T instance : objects) {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
dealRow(row, fields, instance);
kuduSession.apply(upsert);
unCommit = unCommit + 1;
if (unCommit > 60000) {
kuduSession.flush();
unCommit = 0;
}
}
kuduSession.flush();
kuduSession.close();
SkyLoggerClient.info("ERP导入KUDU", "ERP导入KUDU", "导入kudu完成,耗时" + (System.currentTimeMillis() - startTime) + "ms");
} catch (KuduException e) {
SkyLoggerClient.error("ERP导入KUDU", "ERP导入KUDU", e.getMessage());
throw new RuntimeException("kudu error", e);
}
}
}
自定义kudu注解
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface LyKuduTable {
String value() ;
}
实体类
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import xxxx.java.datacenter.common.annotation.LyKuduTable;
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("[Order]")
@ApiModel(value="ErpOrder对象", description="订单表")
@LyKuduTable("presto::erprf.order")
public class ErpOrder extends Model<ErpOrder> {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "订单Id")
@TableId("OrderId")
private String OrderId;
@ApiModelProperty(value = "多级分销订单ID")
@TableField("OrderFullPathId")
private String OrderFullPathId;
@ApiModelProperty(value = "订单号")
@TableField("OrderCode")
private String OrderCode;
.....
}