题目描述:在Flink批处理中创建自定义Sink
效果:
通过自定义Sink实现将数据保存到MySQL中
任务要求:
1:使用Scala代码开发Flink批处理中的自定义Sink
2:避免频繁创建MySQL数据库连接
任务提示、思路分析:
1:查阅Flink相关资料,实现自定义Sink的开发
2:考虑使用OutputFormat
代码:
package batch.sink
import org.apache.flink.api.scala.ExecutionEnvironment
object CustomSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val mysqlSink = new MysqlSink(
mysqlUrl = "jdbc:mysql://localhost:3306/test",
port =3306,
user="root",
passwd="111111"
)
val text = env.readTextFile("hdfs://hadoop102:8020/zzbzzb.txt")
import org.apache.flink.api.scala._
//处理数据
text.flatMap(_.split(" "))
.map((_,1))
.groupBy(0)
.sum(1)
.setParallelism(1)
.output(mysqlSink)
env.execute(this.getClass.getName)
}
}
向output() 传入一个DataSink对象
DataSink class 继承 OutputFormat 实现三个方法
1.open()
2.writeRecord()
3.close()
image.png
package batch.sink
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.Configuration
import java.sql.{Connection, DriverManager, PreparedStatement}
class MysqlSink(mysqlUrl: String, port: Int, user: String, passwd: String)extends OutputFormat[(String,Int)]{
private var conn: Connection = _
private var stmt: PreparedStatement = _
override def configure(parameters: Configuration): Unit = {}
//初始化,连接数据库
override def open(taskNumber: Int, numTasks: Int): Unit = {
classOf[com.mysql.cj.jdbc.Driver]
conn = DriverManager.getConnection(mysqlUrl, user, passwd)
stmt = conn.prepareStatement("insert into wordcount(word,count) values(?,?)")
}
//record补全预编译,执行
override def writeRecord(record: (String, Int)): Unit = {
stmt.setString(1,record._1)
stmt.setInt(2,record._2)
stmt.execute()
}
//任务完成后执行
override def close(): Unit = {
println("close")
if (stmt != null) {
stmt.close()
}
if (conn != null) {
conn.close()
}
}
}