1、流式聚合
这一节介绍一些实用的优化选项以及流式聚合的内部原理,它们在某些情况下能带来很大的提升。
MiniBatch 聚合
MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。
- 开启mini-batch的优化设置
table.exec.mini-batch.enabled
table.exec.mini-batch.allow-latency
table.exec.mini-batch.size
Local-Global 聚合
Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。
- 开启Local-Global的优化设置
// instantiate table environment
TableEnvironment tEnv = ...;
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
拆分 distinct 聚合
提升count(distinct )的性能
// instantiate table environment
TableEnvironment tEnv = ...;
tEnv.getConfig()
.set("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
在 distinct 聚合上使用 FILTER 修饰符
某些情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为 FILTER 更符合 SQL 标准,并且能获得更多的性能提升。FILTER 是用于聚合函数的修饰符,用于限制聚合中使用的值。将上面的示例替换为 FILTER 修饰符,如下所示:
-- 不好的例子
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
-- 好的例子
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 user_id 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。
2、数据类型
简单的数据类型就跳过了,记一下构造数据类型。
ARRAY
array 是所有元素都是同一种subtype的数组
声明方式
ARRAY<t>
t ARRAY
t代表的是元素的子类型,如 ARRAY<INT>,
<INT>ARRAY 和 ARRAY<INT>的含义相同
MAP
将键(包括NULL)映射到值(包括NULL值)的关联数组的数据类型。映射不能包含重复的键;每个键最多可以映射到一个值。
声明方式
MAP<kt, vt>
kt是键的数据类型,vt是值得数据类型
MULTISET
允许实例中的不同元素有不同的数据类型
MULTISET<t>
t MULTISET
ROW
(不是很好翻译)Data type of a sequence of fields.A field consists of a field name, field type, and an optional description.
声明方式
ROW<n0 t0, n1 t1, ...>
ROW<n0 t0 'd0', n1 t1 'd1', ...>
ROW(n0 t0, n1 t1, ...>
ROW(n0 t0 'd0', n1 t1 'd1', ...)
n是字段名字,t是字段的类型,d是字段的描述
ROW(myField INT, myOtherField BOOLEAN) 等价于 ROW<myField INT, myOtherField BOOLEAN>