/**
* 该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):
*
* 它至少包含 10 分钟的数据
* 最近 5 分钟没有收到新的记录
* 文件大小达到 1GB (写入最后一条记录后)
* 部分文件(part file)可以处于以下三种状态之一:
*
* In-progress :当前文件正在写入中
* Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
* Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
*/
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024*1024*1024)// 设置每个文件的最大大小 ,默认是128M。这里设置为1G
.withRolloverInterval(TimeUnit.MINUTES.toMillis(5))// 至少包含10分钟的数据
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))// 60s空闲,就滚动写入新的文件 近 5 分钟没有收到新的记录
.build();
/**
* 输出配置
*/
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("kefu_client_")//前缀
.withPartSuffix(".log")//后缀
.build();
StreamingFileSink sink = StreamingFileSink
.forRowFormat(new Path(properties.getProperty("etl_hadoop_url")
+ properties.getProperty("etl_storm_kefu_basepath")),new SimpleStringEncoder("UTF-8"))
.withRollingPolicy(rollingPolicy)
.withBucketAssigner(new BasePathBucketAssigner())
.withBucketCheckInterval(1000L)// 桶检查间隔,这里设置为1s
.withOutputFileConfig(config)
.build();