- sparkstreaming写入kafka优化
项目中我们实时程序写入kafka可能是这样写
//
resDS.foreachRDD(rdd => {
val kafkaSink = KafkaSink[String, Array[Byte]]()
rdd.foreach(r => {
r.foreach(s => kafkaSink.send(retryTopic, s.getBytes()))
})
})
...
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions.
*/
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
def close() = {
producer.close()
}
}
object KafkaSink extends Application {
import scala.collection.JavaConversions._
val WRITE_BROKERS = SubsConfig("xxx").toString
val defaultKafkaProducerConfig = {
val p = new Properties()
p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, WRITE_BROKERS)
p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
p.setProperty(ProducerConfig.ACKS_CONFIG, "1")
p
}
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink[K, V](createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply[K, V](config.toMap)
def apply[K, V](): KafkaSink[K, V] = apply[K, V](defaultKafkaProducerConfig)
}
这样的方式是是每一个分区都拿一个KafkaProducer,这样可能会带来性能问题,造成写入速度慢。那么优化的一个方式就是通过广播变量的方式,每个executor直接使用即可,代码如下
val streamingContext = new StreamingContext(sparkContext, Seconds(seconds))
val kafkaSink = streamingContext.sparkContext.broadcast(KafkaSink[String, Array[Byte]]())
...
resDS.foreachRDD(rdd => {
rdd.foreach(r => {
r.foreach(s => kafkaSink.value.send(retryTopic, s.getBytes()))
})
})
欢迎对技术感兴趣的小伙伴一起交流学习^^