在Flink批处理中创建自定义Sink

题目描述:在Flink批处理中创建自定义Sink

效果:

通过自定义Sink实现将数据保存到MySQL中

任务要求:

1:使用Scala代码开发Flink批处理中的自定义Sink
2:避免频繁创建MySQL数据库连接

任务提示、思路分析:

1:查阅Flink相关资料,实现自定义Sink的开发
2:考虑使用OutputFormat

代码:
package batch.sink

import org.apache.flink.api.scala.ExecutionEnvironment

object CustomSink {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val mysqlSink = new MysqlSink(
      mysqlUrl = "jdbc:mysql://localhost:3306/test",
      port =3306,
      user="root",
      passwd="111111"
    )

    val text = env.readTextFile("hdfs://hadoop102:8020/zzbzzb.txt")

    import org.apache.flink.api.scala._
    //处理数据
    text.flatMap(_.split(" "))
      .map((_,1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)
      .output(mysqlSink)

    env.execute(this.getClass.getName)
  }

}

向output() 传入一个DataSink对象
DataSink class 继承 OutputFormat 实现三个方法
1.open()
2.writeRecord()
3.close()


image.png
package batch.sink

import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.Configuration

import java.sql.{Connection, DriverManager, PreparedStatement}

class MysqlSink(mysqlUrl: String, port: Int, user: String, passwd: String)extends OutputFormat[(String,Int)]{


  private var conn: Connection = _
  private var stmt: PreparedStatement = _


  override def configure(parameters: Configuration): Unit = {}

  //初始化,连接数据库
  override def open(taskNumber: Int, numTasks: Int): Unit = {
    classOf[com.mysql.cj.jdbc.Driver]
    conn = DriverManager.getConnection(mysqlUrl, user, passwd)
    stmt = conn.prepareStatement("insert into wordcount(word,count) values(?,?)")

  }

  //record补全预编译,执行
  override def writeRecord(record: (String, Int)): Unit = {
    stmt.setString(1,record._1)
    stmt.setInt(2,record._2)
    stmt.execute()
  }

  //任务完成后执行
  override def close(): Unit = {
    println("close")
    if (stmt != null) {
      stmt.close()
    }
    if (conn != null) {
      conn.close()
    }
  }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容