简介
- avro是一种固定格式(schema),以文件为单位的 数据序列化系统(类似加密解密)
- 支持二进制序列化方式,所以可以快速处理大量数据
- 支持对数据流,javabean等 序列化 反序列化操作,传输等效率高
- 就是由一个定义好的schema来读取的二进制文本文件。
个人业务分析
- 大量数据需要落成文件,存放到HDFS并支持 hive外表关联查询(指定目录)
- 数据入口:kafka
- 数据出口:hive业务库
分析
- kafka 实时接收数据落每日本地文件
- 定时上传每日本地文件到 hive外表目录下
- hive 外部表 关联(指定schema)
注意事项
- avro 文件格式 [https://www.jianshu.com/p/a5c0cbfbf608]
- 一个avro文件 文件由 header 和多个data block 组成,header由 消息包和指定 schema 以及压缩方式等组成
- 所有一个文件只有一个schema,当写入数据前,首先根据当前记录匹配 header 的schema,如果不符合schema时,则会抛异常
- 所以,对于单个文件来言,所有数据必须具有相同的schema
- 写的schema中fields的数量 > 大于读的schema中fields数量时,那么将被忽略
- 读的schema中fields数量 > 写的schema中fields的数量时,如果无默认值,则报错
Schema格式
{
"type": "record",
"name": "LongList",
"aliases": ["LinkedLongs"], // old name for this
"fields" : [
{"name": "value", "type": "long"}, // each element has a long
{"name": "next", "type": ["null", "LongList"]} // optional next element
]
}
代码
//根据传入的json串生成 schema串
public Map buildSchema(String arg) throws Exception {
String prefix = "{\"type\":\"record\",\"name\":\"" + table_name + "\",\"fields\":[";
String suffix = "]}";
if (table_name == null) {
throw new Exception("配置文件读取失败");
}
Map<String, String> hm = new HashMap<String, String>();
JSONObject js_ob;
try {
js_ob = JSONObject.fromObject(arg);
} catch (Exception e) {
throw new Exception("参数有误,检查是否为Json格式");
}
Iterator it = js_ob.keys();
String schemaPj = "";
StringBuffer sbf = new StringBuffer();
String finalSbf = "";
while (it.hasNext()) {
String key = (String) it.next();
String value = js_ob.get(key).toString();
schemaPj = "{\"name\":\"" + key + "\",\"type\":\"string\",\"default\":\"\"},";
sbf.append(schemaPj);
hm.put(key, value);
}
finalSbf = prefix + sbf.toString().substring(0, sbf.length() - 1) + suffix;
schema = Schema.parse(finalSbf);
return hm;
}
/**生成本地 avro 文件
* @param data 数据流入(json 串)
**/
public void buildFile(String data) throws IOException {
if (data != null) {
// 根據入參獲取 schema
HashMap<String, String> hm = null;
try {
hm = (HashMap) buildSchema(data);
} catch (Exception e1) {
e1.printStackTrace();
}
FileSystem fs = null;
FilterOutputStream ps = null;
// 允許 append
conf.setBoolean("dfs.support.append", true);
DataFileWriter<Record> writer = new DataFileWriter<Record>(new GenericDatumWriter<Record>(schema));
InputStream in = null;
try {
Record tab = new GenericData.Record(schema);
String dt = "";
// 业务数据处理 区域-(根据传入数据时间日期生成日期目录,对应hive外部表partition)
for (String key : hm.keySet()) {
if ("time".equals(key.trim())) {
String val = hm.get(key);
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
dt = sdf.format(new Date(Long.valueOf(val)));
tab.put(key, val);
continue;
}
tab.put(key, hm.get(key));
}
// -----------------------------------------------------------
// 本地路径落 avro 定时任务把文件传到 hive外部表路径
String uri = localPath + folderName + dt;
File file = new File(uri);
if (!file.exists()) {
file.mkdir();
}
String file_url = uri + "/" + file_name;
File newFile = new File(file_url);
// File newFile = new File(file_url_new);
// 如果存在路径 append 不存在 则创建
if (!newFile.exists()) {
newFile.createNewFile();
newFile.setWritable(true, false);
writer.create(schema, newFile);
} else {
//文件append 操作
writer.appendTo(newFile);
}
writer.append(tab);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
IOUtils.closeStream(in);
writer.close();
}
}
}
hive
//建立外部 schema
CREATE EXTERNAL TABLE avro_test1
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/tmp' (hdfs路径)
TBLPROPERTIES (
'avro.schema.url'='hdfs:///user/tmp/avsc/student.avsc' (外部 schema文件)
);
hadoop fs -cat /user/tmp/avsc/student.avsc
{
"type": "record",
"name": "student",
"namespace": "com.tiejia.avro",
"fields": [
{ "name":"SID", "type":"string","default":""},
{ "name":"Name", "type":"string","default":""},
{"name":"Dept", "type": "string","default":""},
{ "name":"Phone", "type":"string","default":""},
{"name":"Age", "type": "string","default":""},
{"name":"Date", "type": "string","default":""}
]}
ps:设置默认值,否则如果传入数据为 null 则报错
后记:由于传入数据schema 有七八种之多,原来使用的列式存储 Hbase,一种schema对应一个文件太过于麻烦,最后没有选型avro来实现,欢迎有想法的各位朋友、大神们指导下