def generateSamples1(data: RDD[(Long, Vector)], k: Int, N: Int): RDD[Vector] = {
// 1.k-neighbours: 先分组,再笛卡尔积,排除自己减少计算
val groupedRDD = data.groupBy(_._1)
val vecAndNeis: RDD[(Vector, Array[Vector])] = groupedRDD.flatMap { case (id, iter) =>
val vecArr = iter.toArray.map(_._2)
//2.对每个vector产生笛卡尔积
val cartesianArr: Array[(Vector, Vector)] = vecArr.flatMap(vec1 => {
vecArr.map(vec2 => (vec1, vec2))
}).filter(tuple => tuple._1 != tuple._2)
cartesianArr.groupBy(_._1).map { case (vec, vecArr) => {
(vec, vecArr.sortBy(x => Vectors.sqdist(x._1, x._2)).take(k).map(_._2))
}
}
}
// 3.从这k个近邻中随机挑选一个样本,以该随机样本为基准生成N个新样本
vecAndNeis.flatMap { case (vec, neighbours) =>
(1 to N).map { i =>
val rn = neighbours(Random.nextInt(k))
val diff = rn.copy
BLASUDF.axpy(-1.0, vec, diff)
val newVec = vec.copy
BLASUDF.axpy(Random.nextDouble(), diff, newVec)
newVec
}.iterator
}
}
2 欠采样
2.1 算法思想
即从多数类S中随机选择⼀些样样本组成样本集E
然后将样本集从S中移除。新的数据集!
2.2 spark实现
// 1 找出样本多的类别
val faudDF = creaditDf.where("Class=1")
val faudNumber = BigDecimal(faudDF.count())
val normalDF = creaditDf.where("Class=0")
val normalNumber = BigDecimal(normalDF.count())
// 2 按比例随机采样
val ratio = faudNumber./(normalNumber)
val sampleNormalDF = normalDF.sample(false, ratio.toDouble, 10)
val underDF = faudDF.union(sampleNormalDF)
//数据标准化
val vecUnder = new VectorAssembler()
.setInputCols(Array("Amount"))
.setOutputCol("vectorAmount")
.transform(underDF)
val scaler = new StandardScaler()
.setInputCol("vectorAmount")
.setOutputCol("scaledAmount")
.setWithStd(true)
.setWithMean(false)
val scalerModel = scaler.fit(vecUnder)
val scaledUnder = scalerModel.transform(vecUnder).drop("vectorAmount")