1 hadoop中矩阵相乘原理
2 Spark中的矩阵相乘原理
def multiply(other: BlockMatrix): BlockMatrix = {
if (colsPerBlock == other.rowsPerBlock) {
val resultPartitioner = GridPartitioner(numRowBlocks, other.numColBlocks,
math.max(blocks.partitions.length, other.blocks.partitions.length))
// 这里是计算每个leftDestinations和rightDestinations的类型都是Map[(Int,Int),Set[Int]],也就是先计算左右矩阵的
// 每一块会shuffle到哪个partition
val (leftDestinations, rightDestinations) = simulateMultiply(other, resultPartitioner)
// Each block of A must be multiplied with the corresponding blocks in the columns of B.
val flatA = blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
val destinations = leftDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
// Each block of B must be multiplied with the corresponding blocks in each row of A.
val flatB = other.blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
val destinations = rightDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
// GridPartitioner一共有numRowBlocks*other.numColBlocks 个分区,所以在cogroup的时候,在计算A*B=C的时候,C矩阵所用到的所有A和B中的
// 没有shuffle的过程。
val newBlocks = flatA.cogroup(flatB, resultPartitioner).flatMap { case (pId, (a, b)) =>
a.flatMap { case(leftRowIndex, leftColIndex, leftBlock) =>
b.filter(_._1 == leftColIndex).map { case (rightRowIndex, rightColIndex, rightBlock) =>
val C = rightBlock match{
case dense: DenseMatrix => leftBlock.multiply(dense)
case sparse: SparseMatrix => leftBlock.multiply(sparse.toDense)
case _ =>
throw new SparkException(s"Unrecognized matrix type ${rightBlock.getClass}.")
((leftRowIndex, rightColIndex), C.toBreeze)
}.reduceByKey(resultPartitioner, (a, b) => a + b).mapValues(Matrices.fromBreeze)
// TODO: Try to use aggregateByKey instead of reduceByKey to get rid of intermediate matrices
new BlockMatrix(newBlocks, rowsPerBlock, other.colsPerBlock, numRows(), other.numCols())
} else {.......
private[distributed] def simulateMultiply(
other: BlockMatrix,
partitioner: GridPartitioner): (BlockDestinations, BlockDestinations) = {
val leftMatrix = blockInfo.keys.collect() // blockInfo should already be cached
val rightMatrix = other.blocks.keys.collect()
val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
// 由于有这个判断,右矩阵中没有值的快左矩阵就不会重复复制了,避免了零值计算。
val rightCounterparts = rightMatrix.filter(_._1 == colIndex)
// 因为矩阵乘完之后还有相加的操作(reduceByKey),相加的操作如果在同一部机器上可以用combineBy进行优化,
// 这里直接得到每一个分块在进行完乘法之后会在哪些partition中用到。
val partitions = rightCounterparts.map(b => partitioner.getPartition((rowIndex, b._2))) ((rowIndex, colIndex), partitions.toSet)
val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) =>
val leftCounterparts = leftMatrix.filter(_._2 == rowIndex)
val partitions = leftCounterparts.map(b => partitioner.getPartition((b._1, colIndex)))
((rowIndex, colIndex), partitions.toSet)}
(leftDestinations, rightDestinations)}