一、背景
批量处理数据,指代IO密集型任务,因为网络IO有QPS限制,并且又有网络IO消耗,因此如何对数据进行分组尤为重要。
二、具体实现伪代码
valdf= hiveContext.sql(querySQL)
.repartition(5) //控制QPS--控制发送节点数
.mapPartitions( f => f.map(x => x.getAs[Long]("id")).grouped(45) ) //获取要请求的参数,对集合进行分组,每45条数据为一组。进一步控制QPS为每个请求45条数据
.mapPartitions( record => { //此时record就是每一个请求需要的45个id集合
valthriftGet = new ThriftGet() //网络请求伪类
record.flatMap( x => { //因为45个id发送请求,可能返回45条数据,因此是flatMap
validList = x.toList.map( id => id.toString ).toArray
valresult = thriftGet.getRequest(idList)
if(result.isEmpty()) {
None
}
try{
result.asScala
.filter( ) //过滤无效数据
.flatMap( record => {
val bufferResult = scala.collection.mutable.ArrayBuffer.empty[Option[Person]]
//TODO 处理返回的record数据
bufferResult += Some(Person())
})
bufferResult.toList
})
} catch {
case e:Exception=>{
e.printStackTrace()
None
}
}
})
}).filter( !_.isEmpty).map(_.get).cache()