使用flink 自带的datagen作为数据源,测试StarRocks 的 主键模型(primary key)的更新性能。
环境准备
StarRocks 版本:2.0.1
StarRocks 服务器
机器 | 3台 阿里云主机 |
---|---|
CPU | 16core |
内存 | 64GB |
网络带宽 | 5 Gbits/s |
磁盘 | 系统盘ESSD 40G 数据盘ESSD高效云盘 100GB |
Flink 版本:1.13.5
Flink 服务器
机器 | 1台 阿里云主机 |
---|---|
CPU | 4core |
内存 | 16GB |
网络带宽 | 5 Gbits/s |
磁盘 | 系统盘ESSD 40G 数据盘ESSD高效云盘 100GB |
测试步骤
- 创建StarRocks相关表结构
create database db1;
CREATE TABLE `t1` (
`id` int(11) NOT NULL COMMENT "",
`c1` varchar(65533) NULL COMMENT "",
`c2` varchar(65533) NULL COMMENT "",
`c3` varchar(65533) NULL COMMENT "",
`c4` varchar(65533) NULL COMMENT "",
`c5` varchar(65533) NULL COMMENT "",
`c6` varchar(65533) NULL COMMENT "",
`c7` varchar(65533) NULL COMMENT "",
`c8` varchar(65533) NULL COMMENT "",
`c9` varchar(65533) NULL COMMENT "",
`version` int(11) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`) --使用主键模型,相同id的数据会被新的数据覆盖
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 5
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
- 下载StarRocks flink connector(注意下载对应版本的connector,并确保flink lib 目录只有一个StarRocks flink connector jar包)
wget https://github.com/StarRocks/flink-connector-starrocks/releases/download/v1.1.15/flink-connector-starrocks-1.1.15_flink-1.13_2.12.jar
- 将flink-connector-starrocks-1.1.15_flink-1.13_2.12.jar放到flink lib目录下
- 启动flink-sql
sh sql-client.sh
- 创建source表
CREATE TABLE src (
id int,
c1 string ,
c2 string ,
c3 string ,
c4 string ,
c5 string ,
c6 string ,
c7 string ,
c8 string ,
c9 string ,
version int
) WITH (
'connector' = 'datagen',
'rows-per-second'='10000', -- 每秒产生1万条记录
'number-of-rows'='1000000', -- 共产生100万条记录
'fields.id.min'='1', -- id只有20个,记录20个id在每秒1000条的update操作
'fields.id.max'='20',
'fields.version.kind'='sequence',
'fields.version.start'='1',
'fields.version.end'='1000000'
);
- 创建目标表
CREATE TABLE t2(
id int,
c1 string ,
c2 string ,
c3 string ,
c4 string ,
c5 string ,
c6 string ,
c7 string ,
c8 string ,
c9 string ,
version int,
primary key (id) not enforced
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://xxx.xxx.xxx.xxx:9030',
'load-url'='xxx.xxx.xxx.xxx:8030',
'database-name' = 'db1',
'table-name' = 't1',
'username' = 'root',
'password' = ''
);
- 设置checkpoint强制写入(生成环境不推荐,checkpoint会造成阻塞。可以根据实际情况调整sink.buffer-flush.max-bytes、sink.buffer-flush.max-rows、sink.buffer-flush.interval-ms参数,控制刷新频率)
set 'execution.checkpointing.interval'='1s';
- 写入数据
insert into t1 select * from src;
测试结论
StarRocks 的主键模型(primary key)使用官方提供的flink connector,在单个flink写入的任务下,能够每秒对1万条数据进行更新操作。