巧用广播,Spark向Kafka写入数据

Kafka生产者类不能实例化,需要包装成一个可实例化的类

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class KafkaSink[K,V](createProducer:()=> KafkaProducer[K,V]) extends Serializable{

  lazy val producer= createProducer()
  def send(topic:String, key:K, value:V):Future[RecordMetadata] =
    producer.send(new ProducerRecord[K,V](topic,key,value))

  def send(topic:String,value:V):Future[RecordMetadata] =
    producer.send(new ProducerRecord[K,V](topic,value))
}

object KafkaSink {
  import scala.collection.JavaConversions._

  def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook{
        producer.close()
      }
      producer
    }

    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K,V] = apply(config.toMap)
}

注册为广播对象

"spark write kafka" should "be fine" in {

    val kafkaProducer:Broadcast[KafkaSink[String,String]]={
      val kafkaProducerConfig = {
        val p = new Properties();
        p.setProperty("bootstrap.servers","kafka1-c1:9092,kafka2-c1:9092,kafka3-c1:9092")
        p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        p
      }
      sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
    }

    val seqList = for(i<- 0 until 100) yield (i.toString,(i*2).toString)

    val rdd = sc.makeRDD(seqList)
    rdd.foreachPartition(f=>{
      f.foreach(record => {
        kafkaProducer.value.send("topic",record._1,record._2)
      })
    })
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,099评论 19 139
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,779评论 13 425
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,375评论 1 15
  • ​ 小青柑作为最炙手可热的新宠,喜爱小青柑的茶友们自然是囤了不少。可是没多久就发现小青柑上面长了一层“白霜”。 1...
    O2TEA阅读 457评论 0 0
  • 使用STM8L来驱动和读取光照传感器TCS3200时,RGB数据总是不稳定,跳动的幅度很大。 通过单独新建程序,不...
    几度木阅读 365评论 0 0