Flink DataSink

同DataSource一样,flink流处理和批处理也都内置了很多DataSink,可以满足部分应用场景。但平时使用的应用场景比较多,光是靠内置的DataSink完全不满足日常使用。flink也考虑到了这个问题,允许我们实现自定义的DataSink。

1 批处理

最简单的DataSink就是print(),平时我们在编写flink程序时进行简单测试的时候通常都会使用print()在控制台上打印处理后的结果数据。
真正业务应用比较多的还是writeAsCsv(),writeAsText()。还可以通过继承write()中FileOutputFormat类来实现将结果数据输出到其他格式文件中。

    targetDataSet.print()
    targetDataSet.writeAsCsv("file:///xxx/Documents/batch_csv")
    targetDataSet.writeAsText("file:///xxx/Documents/batch_txt")
    targetDataSet.write(
      outputFormat: FileOutputFormat[T],
      filePath: String,
      writeMode: FileSystem.WriteMode = null)
01_batch_sink.png

2 流处理

2.1 kafka connector

kafka connector是flink提供给我们的自定义连接器,可以直接实例化FlinkKafkaProducer对象来将记录存放到kafka中。

object FlinkStreamDataSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val original: DataStream[String] = ...

    // kafka sink properties参数
    val producerConf = new Properties()
    producerConf.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
    producerConf.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
    producerConf.setProperty("group.id", "leslie")

    // sink结果数据到kafka中
    original.addSink(new FlinkKafkaProducer010[String]("test_1", new SimpleStringSchema(), producerConf))

    env.execute("flink_data_sink")
  }
}
01_stream_sink_kafka.png

2.2 自定义DataSink

自定义DataSink和自定义DataSource一样简单,只需要继承SinkFunction接口并重写其中的invoke()方法。

object FlinkStreamCustomerDataSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // kafka source properties参数
    val props = new Properties()
    props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
    props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
    props.setProperty("group.id", "leslie")
    props.setProperty("auto.offset.reset", "latest")

    val originalStream: DataStream[String] = env
      .addSource(new FlinkKafkaConsumer010[String](
        "test", // 被消费的kafka topic
        new SimpleStringSchema(), // 序列化
        props)) // kafka source properties参数

    val targetStream: DataStream[(String, String, Int)] = originalStream
      .flatMap(_.split(","))
      .map { name =>
        val sex = if (name.contains("e")) "男" else "女"
        val age = if (name.contains("e")) 23 else 18
        (name, sex, age) // 根据名字来构造人物的基本信息
      }

    // 自定义DataSink
    targetStream.addSink(new MysqlDataSink())

    env.execute("fink_consumer_data_sink")
  }
}

下文代码MysqlDataSink类继承RichSinkFunction类,实现将记录存放到mysql的目的。为了避免重复创建和销毁mysql连接,我们和自定义DataSouce一样继承"富函数",在open(),close()方法中实现连接的创建和销毁(前一篇博客Flink DataSouce中有提到open()方法仅在函数类实例化的时候调用一次,close()则是在实例对象销毁前调用一次)。

class MysqlDataSink extends RichSinkFunction[(String, String, Int)] {
  private var pStmt: PreparedStatement = _
  private var conn: Connection = _

  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
    val username = "root"
    val password = "123456"
    conn = DriverManager.getConnection(url, username, password);
    val sql =
      """
        |insert into user (name, sex, age) values (?, ?, ?);
        |""".stripMargin
    pStmt = conn.prepareStatement(sql)
  }

  override def close(): Unit = {
    if (conn != null) conn.close()
    if (pStmt != null) pStmt.close()
  }

  // 主体方法,插入数据到mysql中
  override def invoke(value: (String, String, Int)): Unit = {
    pStmt.setString(1, value._1)
    pStmt.setNString(2, value._2)
    pStmt.setInt(3, value._3)
    pStmt.execute()
  }
}
02_stream_sink_customer.png

最后:
上边就是Flink DataSink的介绍部分。当然DataSink相关的知识并不只有这么一点点,此文只是些基础知识,各位老哥们以后遇到具体的场景具体处理。
下一篇博文将会把这两次提到的富函数给大家说说,也会通过富函数的讲解把flink的有状态编程也说说。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • CocoaPods 开源库的制作过程: 添加私有Pod仓库,用来存储私有Pod库的podspec文件,类似Coco...
    心至靜行至遠阅读 879评论 0 1
  • 大观园里长袖舞,怡红院中尽缠绵。 缘起缘灭缘已尽,花开花落花归尘。 君言天下无我处处皆君, 君言老骥伏枥烈士暮年,...
    昨夜星雨阅读 713评论 3 7
  • 血淋淋的日子总在提醒 ——铭记“九一·八” 文/黄影 这个日子,不敢忘记 又不愿想起 不敢忘记,是...
    黄影诗风阅读 332评论 1 7