sample
官方文档描述:
Return a sampled subset of this RDD.
返回抽样的样本的子集。
函数原型:
- withReplacement can elements be sampled multiple times (replaced when sampled out)
- fraction expected size of the sample as a fraction of this RDD's size
- without replacement: probability that each element is chosen; fraction must be [0, 1]
- with replacement: expected number of times each element is chosen; fraction must be >= 0
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T]
- withReplacement can elements be sampled multiple times (replaced when sampled out)
- fraction expected size of the sample as a fraction of this RDD's size
- without replacement: probability that each element is chosen; fraction must be [0, 1]
- with replacement: expected number of times each element is chosen; fraction must be >= 0
- seed seed for the random number generator
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T]
**
第一函数是基于第二个实现的,在第一个函数中seed为Utils.random.nextLong;其中,withReplacement是建立不同的采样器;fraction为采样比例;seed为随机生成器的种子
**
源码分析:
def sample(withReplacement: Boolean, fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
**
sample函数中,首先对fraction进行验证;再次建立PartitionwiseSampledRDD,依据withReplacement的值分别建立柏松采样器或伯努利采样器。
**
实例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//false 是伯努利分布 (元素可以多次采样);0.2 采样比例;100 随机数生成器的种子
JavaRDD<Integer> sampleRDD = javaRDD.sample(false,0.2,100);
System.out.println("sampleRDD~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD.collect());
//true 是柏松分布;0.2 采样比例;100 随机数生成器的种子
JavaRDD<Integer> sampleRDD1 = javaRDD.sample(false,0.2,100);
System.out.println("sampleRDD1~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD1.collect());
randomSplit
官方文档描述:
Randomly splits this RDD with the provided weights.
依据所提供的权重对该RDD进行随机划分
函数原型:
- weights for splits, will be normalized if they don't sum to 1
- random seed
- return split RDDs in an array
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]]
- weights for splits, will be normalized if they don't sum to 1
- return split RDDs in an array
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]]
源码分析:
def randomSplit(weights: Array[Double],
seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
randomSampleWithRange(x(0), x(1), seed)
}.toArray
}
def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {
this.mapPartitionsWithIndex( { (index, partition) =>
val sampler = new BernoulliCellSampler[T](lb, ub)
sampler.setSeed(seed + index)
sampler.sample(partition)
}, preservesPartitioning = true)
}
**
从源码中可以看到randomSPlit先是对权重数组进行0-1正则化;再利用randomSampleWithRange函数,对RDD进行划分;而在该函数中调用mapPartitionsWithIndex(上一节有具体说明),建立伯努利采样器对RDD进行划分。
**
实例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
double [] weights = {0.1,0.2,0.7};
//依据所提供的权重对该RDD进行随机划分
JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights);
System.out.println("randomSplitRDDs of size~~~~~~~~~~~~~~" + randomSplitRDDs.length);
int i = 0;
for(JavaRDD<Integer> item:randomSplitRDDs)
System.out.println(i++ + " randomSplitRDDs of item~~~~~~~~~~~~~~~~" + item.collect());