摘要:Sqoop
,MySQL
,Hive
,Impala
在Spark跑批到Hive的任务后面加入Sqoop任务,将数据从Hive导入MySQL提供在线查询服务,记录一下Shell脚本,主要是Shell常用语法,Impala命令参数,Sqoop命令参数
导数需求
有一张Parquet格式的Hive分区表test.sqoop_test,用Impala查看表结构,其中dt是分区字段
[cloudera01:21000] > desc sqoop_test;
Query: describe sqoop_test
+---------------+--------+---------+
| name | type | comment |
+---------------+--------+---------+
| industry_code | string | |
| rank | string | |
| inc | string | |
| dt | string | |
+---------------+--------+---------+
需要导入MySQL库中,每天导入HIve表中最新dt分区的数据,根据Industry_code覆盖更新导入,其他两个字段是MySQL JSON类型,MySQL使用的是8.0.25版本,默认区分大小写,所以字段大小写要一致
mysql> desc sqoop_test;
+---------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+-------------+------+-----+---------+-------+
| industry_code | varchar(20) | NO | PRI | NULL | |
| rank | json | YES | | NULL | |
| inc | json | YES | | NULL | |
+---------------+-------------+------+-----+---------+-------+
3 rows in set (0.64 sec)
导出方式
先将数据从Hive表导出到HDFS,或者导入到一张新的Hive表,再从HDFS将数据导入到MySQL。虽然比直接导出多了一步操作,但是可以实现对数据的更精准的操作,主要体现在
- 在从Hive表导出到HDFS时,可以进一步对数据进行字段筛选、字段加工、数据过滤操作,从而使得HDFS上的数据更“接近”或等于将来实际要导入MySQL表的数据
- 在从HDFS导入MySQL时,也是将一个“小数据集”与目标表中的数据做对比,会提高导出速度
导数流程如下
导出到Hive中间表的方式
将原始Hive表sqoop_test进列筛选,行和分区过滤之后导入中间表sqoop_test_push,再导入MySQL,先建立一张Hive中间表,作为存储一个指定dt版本的中间数据
hive> create table sqoop_test_push (
> `industry_code` string,
> `rank` string,
> `inc` string)
> stored as PARQUET;
下一步使用Impala同步元数据,并且使用Impala shell传入本地脚本文件获得Hive表的最大分区dt,shell脚本如下
sql="invalidate metadata test.sqoop_test; select max(dt) as cnt from test.sqoop_test"
max_partition=$(impala-shell -l --auth_creds_ok_in_clear -u cdh_dev --ldap_password_cmd="sh /home/etl/impala.sh" -i cloudera03 -d test -B -q "$sql")
max_partition变量被impala-shell执行的返回值赋值,impala-shell的参数包括
-
-l
:使用LDAP向Impala进行身份验证。必须将Impala配置为允许LDAP身份验证 auth_creds_ok_in_clear
-
--ldap_password_cmd
:启动命令带有检索到的密码,传入一个获得密码的shell命令,比如--ldap_password_cmd="echo -n 123456" -
-i
:要连接的impala host和port,默认port是21000 -
-d
:设置数据库 -
-B
:去除格式化,查询大数据量时可以提高性能 -
-q
:执行一个query语句
执行完毕得到最大分区
[root@ubuntu ~]# echo $max_partition
20210317
下一步操作impala-shell将Hive表过滤分区和字段,写入中间表,先指定一个INSERT OVERWRITE导入Hive表SQL语句文件,位置在/home/push_test.sql
use ${var:impala_database};
invalidate metadata ${var:impala_table_push};
insert overwrite table ${var:impala_table_push}
select industry_code,
rank,
inc
from ${var:impala_table} where dt='${var:max_partition}';
以上SQL语句先对创建的中间表做impala元数据同步,然后使用insert overwrite
以过滤后的原表数据直接覆盖中间表,接下来使用impala-shell执行以上SQL语句文件
impala-shell -l --auth_creds_ok_in_clear \
-u cdh_dev \
--ldap_password_cmd="sh /home/etl/impala.sh" \
-i cloudera03 \
-d test \
-f "/home/push_test.sql" \
--var=max_partition="$max_partition" \
--var=impala_table="sqoop_test" \
--var=impala_table_push="sqoop_test_push" \
--var=impala_database="test"
其中
-
-f
:表示执行一个query文件,其中以分号;作为语句分隔条件 -
--var
:自定义impala上下文变量,可以多次使用,必须指定key=value的格式,在定义的时候${var:KEY}
,赋值的时候--var=KEY=VALUE
执行成功后中间表sqoop_test_push就有了需要导入MySQL的数据,并且和要求的最终数据是一致的
下一步使用sqoop进行导数,由于目标MySQL版本是8.0.025的,sqoop需要高版本的mysql-connect驱动,否则报错无法建立链接,先在maven仓库上下载高版本的驱动mysql-connector-java-8.0.25.jar
,放在sqoop的lib目录下,然后启动sqoop开始导数
导入方式为allowinsert
覆盖插入,即无则插入,有则根据主键更新
sudo -u hdfs sqoop export \
--connect "jdbc:mysql://192.168.67.72:3306/test" \
--username "root" \
--password "123456" \
--table "sqoop_test" \
--update-mode allowinsert \
--update-key "industry_code" \
--hcatalog-database "test" \
--hcatalog-table "sqoop_test_push" \
--null-string '\\N' \
--null-non-string '\\N' \
-m 1
相关参数如下
-
export
:从hdfs导出数据到关系型数据库 -
--connect
:指定jdbc连接字符串 -
--username
:数据库用户 -
--password
:数据库密码 -
--table
:导出的数据库表名称 -
--update-mode
:指定更新策略,包括updateonly
,allowinsert
,updateonly是默认模式,仅仅更新已存在的数据记录,不会插入新纪录,allowinsert有则更新,无则插入 -
--update-key
:更新参考的列名称,多个列用逗号,隔开 -
--hcatalog-database
:hive数据库,parquet格式的hive表使用hcatalog -
--hcatalog-table
:hive数据库表名 -
--null-string
:针对string类型的字段,当Value是NULL,替换成指定的字符 -
--null-non-string
:针对非string类型的字段,当Value是NULL,替换成指定字符 -
-m
:并行化,使用n个map任务并行导出
再看MySQL已经成功导入20条数据
mysql> select count(*) from sqoop_test;
+----------+
| count(*) |
+----------+
| 20 |
+----------+
1 row in set (0.19 sec)
完整Shell脚本
完成的shell脚本如下,挂在Spark入库Hive的作业后面执行
[root@ubuntu ~]# vim push_rank.sh
#!/bin/bash
mysql_url="jdbc:mysql://192.168.67.72/test"
mysql_username="root"
mysql_password="123456"
mysql_table="sqoop_test"
impala_username="cdh_dev"
impalad_host="cloudera03"
impala_database='test'
impala_table='sqoop_test'
impala_table_push='sqoop_test_push'
push_sql_path='/home/push_rank.sql'
sql="invalidate metadata $impala_database.$impala_table; select max(dt) as cnt from $impala_database.$impala_table"
max_partition=$(impala-shell -l --auth_creds_ok_in_clear -u $impala_username --ldap_password_cmd="sh /home/etl/impala.sh" -i $impalad_host -d test -B -q "$sql")
if [ $? -eq 0 ]; then
echo "Impala SQL执行成功!"
echo "最大分区为${max_partition}"
else
echo "Impala SQL执行失败!"
exit 1
fi
impala-shell -l --auth_creds_ok_in_clear -u $impala_username --ldap_password_cmd="sh /home/etl/impala.sh" -i $impalad_host -d $impala_database -f $push_sql_path --var=max_partition="$max_partition" --var=impala_table=$impala_table --var=impala_table_push=$impala_table_push --var=impala_database=$impala_database
if [ $? -eq 0 ]; then
echo "impala执行成功,开始向mysql推数"
sudo -u hdfs sqoop export --connect ${mysql_url} --username ${mysql_username} --password ${mysql_password} --table ${mysql_table} --update-mode allowinsert --update-key "industry_code" --hcatalog-database ${impala_database} --hcatalog-table ${impala_table_push} --null-string '\\N' --null-non-string '\\N' -m 1;
if [ $? -eq 0 ]
then
echo "sqoop export success !"
else
echo "sqoop export failed !"
exit 1
fi
else
echo "impala执行失败"
exit 1
fi
其中/home/push_rank.sql如下
use ${var:impala_database};
invalidate metadata ${var:impala_table_push};
insert overwrite table ${var:impala_table_push}
select industry_code,
rank,
inc
from ${var:impala_table} where dt='${var:max_partition}';