官网翻译 :基于batch的用于消费kafka消息的接口
class KafkaRDD[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
R: ClassTag] private[spark] (
sc: SparkContext,
kafkaParams: Map[String, String],
val offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, (String, Int)],
messageHandler: MessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
入参 SparkContext,kafkaParams,offsetRanges,leaders,messageHandler
主要看一下如何产生partition的,即compute方法
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {//如果起始的offset相同的话,则跳过
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}
再看下KafkaRDDIterator ,首先看下getNext方法
override def getNext(): R = {
if (iter == null || !iter.hasNext) {//如果当前分区的iterator为空则获取下一个batch
iter = fetchBatch
}
if (!iter.hasNext) {
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
} else {
val item = iter.next()
if (item.offset >= part.untilOffset) {//如果获取的offset已经大于要消费的offset则返回异常
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else {
requestOffset = item.nextOffset
messageHandler(new MessageAndMetadata(
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
}
}
}
继续看fetchBatch
private def fetchBatch: Iterator[MessageAndOffset] = {
val req = new FetchRequestBuilder()
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
.build()
val resp = consumer.fetch(req)
handleFetchErr(resp)
// kafka may return a batch that starts before the requested offset
resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
}
发送FetchRequestBuilder
consumer是创建parttion的时候创建的
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] {
context.addTaskCompletionListener{ context => closeIfNeeded() }
log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val kc = new KafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null
// The idea is to use the provided preferred host, except on task retry attempts,
// to minimize number of kafka metadata requests //
private def connectLeader: SimpleConsumer = {
if (context.attemptNumber > 0) {
kc.connectLeader(part.topic, part.partition).fold(
errs => throw new SparkException(
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
errs.mkString("\n")),
consumer => consumer
)
} else {//不用获取leader直接访问对应host,能够减少对kafka metadata的请求
kc.connect(part.host, part.port)
}
}