前言
之前写过一篇文章 Delta的真正用处和价值,你可知道,该项目开源的那天我就集到MLSQL了。不过当时只是尝鲜性质,主要原因是因为我一直觉得delta缺了Compaction功能。很多公司其实都有小文件的困扰,而Delta这个问题会更严重。不过近期Delta团队应该就会发布新版本了,届时有可能相关的功能都会补上。不过MLSQL现在也自己实现了一个Compaction的功能,并且对delta做了一定的集成和增强。
写入delta数据
下面的例子都会使用这个数据:
set data='''
{"key":"a","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"a","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';
流写入样例:
-- the stream name, should be uniq.
set streamName="streamExample";
-- load data as table
load jsonStr.`data` as datasource;
-- convert table as stream source
load mockStream.`datasource` options
stepSizeRange="0-3"
as newkafkatable1;
select * from newkafkatable1
as table21;
-- output the the result to console.
save append table21
as rate.`/tmp/delta/rate-1-table`
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/rate-1" partitionBy key;
这里注意一下是流里面delta叫rate。
批写入示例:
load jsonStr.`data` as datasource;
save append datasource
as delta.`/tmp/delta/rate-2-table`
partitionBy key;
Delta 工具集
!delta history /tmp/delta/rate-2-table;
Compaction前置条件
Compaction 实现参看这里Github。只有一个文件,用户兼容0.1.0版本,用户可以直接拷贝的自己项目里。
使用Compaction的前提有如下几个:
- delta表至少发生了一次checkpoint.默认是有十次提交就会产生一个新的checkpoint.
- 批/流都需要为append 模式往表里写数据
Compaction使用范例
!delta compact /tmp/delta/rate-2-table 8 1;
也可以使用
!delta help;
查看这些命令怎么使用。
前面表示对第八个版本之前的所有数据都进行合并,每个目录(分区)都合并成一个文件。
我们看下合并前每个分区下面文件情况:
合并后文件情况:
一个Compaction也是一次提交:
我们删除了16个文件,生成了两个新文件。另外在compaction的时候,并不影响读和写。所以是非常有用的。
总结
Delta解决了几个痛点问题:
- 数据版本问题
- 并发读写问题(多个进程或者单个进程多线程)
- 流批共享表
这几个问题对于数仓是经常会遇到的。
而对于机器学习而言,譬如在MLSQL里,用户是可以将自己的训练参数保存成版本的,但是他们使用的数据目前只能存多份(实际使用时用户需要手动修改存储目录,每次训练的时候)。而使用了delta之后,意味着机器学习的完整实验版本都可以被追踪,什么样的数据通过什么算法,以及配合什么参数得到了什么样的结果,都可以得到保留。