Parquet是一种新型列存储格式,它可以兼容Hadoop生态圈中大多数计算框架(Hadoop、Spark等),被多种查询引擎支持(Hive、Impala、Drill等),并且它是语言和平台无关的。Parquet最初是由Twitter和Cloudera(由于Impala的缘故)合作开发完成并开源,2015年5月从Apache的孵化器里毕业成为Apache顶级项目[1][2]。
用Java读写Parquet格式文件需要以下maven依赖:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.2</version>
</dependency>
下面是写入parquet文件说明和关键代码:
Date、Timestamp类型需要保存为int96,否则impala读取出错。
int96为12字节,前8字节表示时间戳对应当天已过去的纳秒数,后4字节表示时间戳当天距离儒略历起始日已过去的天数。
注意前8字节和后4字节都是小端字节序,如果写入时使用大端序将导致读取失败[3][4]。
// schema定义
...
required int96 timestamp_field;
...
public static byte[] getBytes(int i) {
byte[] bytes=new byte[4];
bytes[0]=(byte)((i >> 24) & 0xFF);
bytes[1]=(byte)((i >> 16) & 0xFF);
bytes[2]=(byte)((i >> 8) & 0xFF);
bytes[3]=(byte)(i & 0xFF);
return bytes;
}
public static byte[] getBytes(long i) {
byte[] bytes=new byte[8];
bytes[0]=(byte)((i >> 56) & 0xFF);
bytes[1]=(byte)((i >> 48) & 0xFF);
bytes[2]=(byte)((i >> 40) & 0xFF);
bytes[3]=(byte)((i >> 32) & 0xFF);
bytes[4]=(byte)((i >> 24) & 0xFF);
bytes[5]=(byte)((i >> 16) & 0xFF);
bytes[6]=(byte)((i >> 8) & 0xFF);
bytes[7]=(byte)(i & 0xFF);
return bytes;
}
// 调转字节数组
public static void flip(byte[] bytes) {
for(int i=0,j=bytes.length-1;i<j;i++,j--) {
byte t=bytes[i];
bytes[i]=bytes[j];
bytes[j]=t;
}
}
// 每天的纳秒数
private static final long NANO_SECONDS_PER_DAY = 86400_000_000_000L;
// 儒略历起始日(儒略历的公元前4713年1月1日中午12点,在格里历是公元前4714年11月24日)距离1970-01-01的天数
private static final long JULIAN_EPOCH_OFFSET_DAYS = 2440588;
// 写入数据,此处预先存在一个Date对象(可由时间戳转换得到)
Date date = ...
// 转换成距1970-01-01 00:00:00的纳秒数
long nano = date.getTime() * 1000_000;
// 转换成距儒略历起始日的天数
int julianDays = (int) ((nano / NANO_SECONDS_PER_DAY) + JULIAN_EPOCH_OFFSET_DAYS);
byte[] julianDaysBytes = getBytes(julianDays);
flip(julianDaysBytes);
// 当前时间戳距离当天已过去的纳秒数
long lastDayNanos = nano % NANO_SECONDS_PER_DAY;
byte[] lastDayNanosBytes = getBytes(lastDayNanos);
flip(lastDayNanosBytes);
byte[] dst = new byte[12];
// 前8字节表示时间戳对应当天已过去的纳秒数
System.arraycopy(lastDayNanosBytes, 0, dst, 0, 8);
// 后4字节表示时间戳当天距离儒略历起始日已过去的天数
System.arraycopy(julianDaysBytes, 0, dst, 8, 4);
// Group group = factory.newGroup();
group.append("timestamp_field", Binary.fromConstantByteArray(dst));
这样写入parquet文件后,Impala将可以正确读取对应字段的内容。
参考:
[1] https://parquet.apache.org/
[2] Parquet格式详解:https://blog.csdn.net/yu616568/article/details/50993491
[3] NanosecondsToImpalaTimestamp函数:https://github.com/apache/parquet-cpp/blob/master/src/parquet/arrow/writer.h
[4] Github关于INT96的讨论:https://github.com/apache/parquet-format/pull/49