repartition和coalesce源码解析
点击进入repartition的源码:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
可以看到直接调用coalesce方法,传入numPartitions和设置shuffle=true。
接着进入coalesce方法:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
可以看到,分为两种情况,一种是shuffle为true的情况,一种是shuffle为false的情况。
repartition默认设置shuffle为true,故比较适合增加分区数;coalesce设置shuffle为false时,适合减少分区数。
Kafka的副本机制
基于领导者(Leader-based)的副本机制,副本分为领导者副本(Leader Replica)和追随者副本(Follewer Replica)。所有的请求都必须由领导者副本来处理,追随者副本不对外提供服务,它的唯一任务就是从领导者副本异步拉取消息。
这样设计的好处:1.方便实现实现“Read-your-writes”;2.方便实现单调读(Monotonic Reads)
In-sync Replicas(ISR)
即所谓的ISR副本集合,ISR中的副本都是与Leader同步的副本。通过设定broker端参数replica.lag.time.max.ms参数值来判断follower是否与Leader同步。
Unclean领导者选举
ISR为空,那说明Leader副本也挂掉了,可以从不在ISR中的存活副本中选取,不过有可能出现数据丢失。选举这种副本的过程称之为Unclean领导者选举,Broker端参数unclean.leader.election.enable控制是否允许领导者选举。
kafka的副本机制充分体现了CAP理论中的选择C或A的权利。一致性(Consistency)、可用性(Available)、分区容错性(Partition tolerance)
高水位(High Watermark)
Kafka的HW是用消息位移来表征的,定义了消息的可见性(标识分区下哪些消息是可以被消费者消费的)和帮助kafka完成消息同步。在分区HW以下的消息被认为是已提交的消息,反之就是未提交的消息,HW以上的消息是不能被消费者消费的。分区的高水位就是Leader副本的高水位。
日志末端位移(Log End Offset):标识副本写入下一条消息的位移值。
通过HW和LEO来完成消息同步,具体过程参考如下:
高水位更新机制
Broker 0上保存了某个分区的Leader副本和所有Follower副本的LEO值,而Broker 1上仅仅保存了该分区的某个follower副本。Kafka把Broker 0上保存的这些Follower副本又称为远程副本,来帮助Leader副本确定其高水位。
记一次线上kafka一直rebalance故障
https://www.jianshu.com/p/271f88f06eb3
解决方案:
1.增加max.poll.interval.ms处理时长
2.设置分区拉取阈值(max.poll.records = 50)
3.poll到的消息,处理完一条就提交一条,当出现提交失败时,马上跳出循环,这时候kafka就会进行rebalance,下一次会继续从当前offset进行消费。
{"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送没有可用的营销活动--老pos机"},"fullAmountSendRes":{"status":400,"info":"满额送没有可用的营销活动--老pos机"}},"info":"发券流程执
行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event内
容:{"id":36306061,"score":3,"orderPrice":3.0,"payTime":1533775482000,"thirdTransId":"420000016320180809