1.在第一次搭建的基础上改良,环境已有:hadoop2.8.3、spark3.0.2、scala2.12、jdk8、delta lake0.8.0
2.mysql配置binlog,日志模式设置为mixed,可以直接读取sql,监听binlog日志,实时传送修改消息,然后进行增量的批量更新delta表
delta表对字段的大小写以及顺序有要求,并且spark sql不支持insert语句中的列列表
3.配置kafka,(当然也可以用其他消息中间件取代),用于传递binlog产生的消息
4.消费kafka的消息,更新delta数据
5.配置canal,用于贯通mysql和kafka,并且实时传送消息,并且带有端点消息的消费功能
canal内部配置不同类型的数据库以及消息中间件,十分便捷
6.spark可以实时监听数据源的更新执行sql
./spark-submit --class main.Main /root/graduate/gold/target/gold-1.0-SNAPSHOT.jar
提交任务
随后kafka从canal接收消息,spark从kafka消费,这里本该有消费的截图,但是spark修改delta表的执行过程会直接把消费记录刷掉
7.tableau连接spark,可以自动更新数据,制作仪表盘以及工作表,bi形式呈现数据,并且可以自助分析
mysql执行会在一定延迟内更新到tableau
=======================================后续预期:
1.非结构化数据的迁移
2.delta表格的输出,尝试用csv格式输出
3.用tableau做一些数据分析结论