参考:https://blog.csdn.net/weixin_43857576/article/details/121843701
https://cloud.tencent.com/developer/article/1812592
http://it.ckcest.cn/article-4007002-1.html
https://hudi.apache.org/docs/0.9.0/flink-quick-start-guide
- 环境信息
在集成之前首先你的服务器必须具有jdk,hadoop,scala,flink,maven环境。
其中jdk1.8以上,
hadoop最好用3.0以上
至于scala和flink的版本受限于hudi的版本,我这里使用的是0.9版本,对应flink-1.12.2,scala-2.11.12
-
需要将hudi-flink-bundle_2.12-0.9.0.jar 放到flink的lib目录下
3.修改配置文件flink的conf下的配置文件,taskmanager.numberOfTaskSlots: 1 默认为1,修改
启动集群
export HADOOP_CLASSPATH='$HADOOP_HOME/bin/hadoop classpath'
./bin/start-cluster.sh
启动客户端
export HADOOP_CLASSPATH='$HADOOP_HOME/bin/hadoop classpath'
./bin/sql-client.sh embedded
创建表
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
partition
VARCHAR(20)
)
PARTITIONED BY (partition
)
WITH (
'connector'= 'hudi',
'path'= 'hdfs://hadoop01:9000/tmp/t1',
'table.type'= 'MERGE_ON_READ'
);
注意path的部分,这是我自己服务器hdfs,在配置hadoop集群时已经配置过,改成自己的就行,如果忘记了可以在hadoop安装路径etc/hadoop下的core-site.xml下查看你自己的配置,我就只因为这个配置错了,造成一直报错,卡了好几天,真坑。
插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
查询数据
select * from t1;
在flink的ui上查看任务:http://ip:8081/#/job/completed
在hdfs的目录上查看生产的文件
hdfs dfs -ls /tmp/t1