分布式矩阵(Distributed Matrix)

矩阵RowMatrix是最基础的分布式矩阵类型。每行是一个本地向量,行索引无实际意义(即无法直接使用)。数据存储在一个由行组成的RDD中,其中每一行都使用一个本地向量来进行存储。由于行是通过本地向量来实现的,故列数(即行的维度)被限制在普通整型(integer)的范围内。在实际使用时,由于单机处理本地向量的存储和通信代价,行维度更是需要被控制在一个更小的范围之内。RowMatrix可通过一个RDD[Vector]的实例来创建,如下代码所示:

scala> import org.apache.spark.rdd.RDD

import org.apache.spark.rdd.RDD

scala> import org.apache.spark.mllib.linalg.{Vector,Vectors}

import org.apache.spark.mllib.linalg.{Vector,Vectors}

scala> import org.apache.spark.mllib.linalg.distributed.RowMatrix

import org.apache.spark.mllib.linalg.distributed.RowMatrix

// 创建两个本地向量dv1 dv2

scala> val dv1 : Vector = Vectors.dense(1.0,2.0,3.0)

dv1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

scala> val dv2 : Vector = Vectors.dense(2.0,3.0,4.0)

dv2: org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0]

// 使用两个本地向量创建一个RDD[Vector]

scala> val rows : RDD[Vector] = sc.parallelize(Array(dv1,dv2))

rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[13] at parallelize at :38

// 通过RDD[Vector]创建一个行矩阵

scala> val mat : RowMatrix = new RowMatrix(rows)

mat: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@76fc0fa

//可以使用numRows()和numCols()方法得到行数和列数

scala> mat.numRows()

res0: Long = 2

scala> mat.numCols()

res1: Long = 3

scala> mat.rows.foreach(println)

[1.0,2.0,3.0]

[2.0,3.0,4.0]

在获得RowMatrix的实例后,我们可以通过其自带的computeColumnSummaryStatistics()方法获取该矩阵的一些统计摘要信息,并可以对其进行QR分解SVD分解PCA分解,这一部分内容将在特征降维的章节详细解说,这里不再叙述。

统计摘要信息的获取如下代码段所示(接上代码段):

// 通过computeColumnSummaryStatistics()方法获取统计摘要

scala> val summary = mat.computeColumnSummaryStatistics()

// 可以通过summary实例来获取矩阵的相关统计信息,例如行数

scala> summary.count

res2: Long = 2

// 最大向量

scala> summary.max

res3: org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0]

// 方差向量

scala> summary.variance

res4: org.apache.spark.mllib.linalg.Vector = [0.5,0.5,0.5]

// 平均向量

scala> summary.mean

res5: org.apache.spark.mllib.linalg.Vector = [1.5,2.5,3.5]

// L1范数向量

scala> summary.normL1

res6: org.apache.spark.mllib.linalg.Vector = [3.0,5.0,7.0]

(二)索引行矩阵(IndexedRowMatrix)

索引行矩阵IndexedRowMatrix与RowMatrix相似,但它的每一行都带有一个有意义的行索引值,这个索引值可以被用来识别不同行,或是进行诸如join之类的操作。其数据存储在一个由IndexedRow组成的RDD里,即每一行都是一个带长整型索引的本地向量。

与RowMatrix类似,IndexedRowMatrix的实例可以通过RDD[IndexedRow]实例来创建。如下代码段所示(接上例):

scala>import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

// 通过本地向量dv1 dv2来创建对应的IndexedRow

// 在创建时可以给定行的索引值,如这里给dv1的向量赋索引值1,dv2赋索引值2

scala> val idxr1 = IndexedRow(1,dv1)

idxr1: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(1,[1.0,2.0,3.0])

scala> val idxr2 = IndexedRow(2,dv2)

idxr2: org.apache.spark.mllib.linalg.distributed.IndexedRow = IndexedRow(2,[2.0,3.0,4.0])

// 通过IndexedRow创建RDD[IndexedRow]

scala> val idxrows = sc.parallelize(Array(idxr1,idxr2))

idxrows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[14] at parallelize at :45

// 通过RDD[IndexedRow]创建一个索引行矩阵

scala> val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)

idxmat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@532887bc

//打印

scala> idxmat.rows.foreach(println)

IndexedRow(1,[1.0,2.0,3.0])

IndexedRow(2,[2.0,3.0,4.0])

(三)坐标矩阵(Coordinate Matrix)

坐标矩阵CoordinateMatrix是一个基于矩阵项构成的RDD的分布式矩阵。每一个矩阵项MatrixEntry都是一个三元组:(i: Long, j: Long, value: Double),其中i是行索引,j是列索引,value是该位置的值。坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。

CoordinateMatrix实例可通过RDD[MatrixEntry]实例来创建,其中每一个矩阵项都是一个(rowIndex, colIndex, elem)的三元组:

scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

// 创建两个矩阵项ent1和ent2,每一个矩阵项都是由索引和值构成的三元组

scala> val ent1 = new MatrixEntry(0,1,0.5)

ent1: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(0,1,0.5)

scala> val ent2 = new MatrixEntry(2,2,1.8)

ent2: org.apache.spark.mllib.linalg.distributed.MatrixEntry = MatrixEntry(2,2,1.8)

// 创建RDD[MatrixEntry]

scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2))

entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[15] at parallelize at :42

// 通过RDD[MatrixEntry]创建一个坐标矩阵

scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)

coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@25b2d465

//打印

scala> coordMat.entries.foreach(println)

MatrixEntry(0,1,0.5)

MatrixEntry(2,2,1.8)

坐标矩阵可以通过transpose()方法对矩阵进行转置操作,并可以通过自带的toIndexedRowMatrix()方法转换成索引行矩阵IndexedRowMatrix。但目前暂不支持CoordinateMatrix的其他计算操作。

// 将coordMat进行转置

scala> val transMat: CoordinateMatrix = coordMat.transpose()

transMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@c1ee50

scala> transMat.entries.foreach(println)

MatrixEntry(1,0,0.5)

MatrixEntry(2,2,1.8)

// 将坐标矩阵转换成一个索引行矩阵

scala> val indexedRowMatrix = transMat.toIndexedRowMatrix()

indexedRowMatrix: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@7ee7e1bb

scala> indexedRowMatrix.rows.foreach(println)

IndexedRow(1,(3,[0],[0.5]))

IndexedRow(2,(3,[2],[1.8]))

####(四)分块矩阵(Block Matrix)

分块矩阵是基于矩阵块MatrixBlock构成的RDD的分布式矩阵,其中每一个矩阵块MatrixBlock都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,而Matrix则是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。分块矩阵支持和另一个分块矩阵进行加法操作和乘法操作,并提供了一个支持方法validate()来确认分块矩阵是否创建成功。

分块矩阵可由索引行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换,该方法将矩阵划分成尺寸默认为1024x1024的分块,可以在调用toBlockMatrix(rowsPerBlock, colsPerBlock)方法时传入参数来调整分块的尺寸。

下面以矩阵A(如图)为例,先利用矩阵项MatrixEntry将其构造成坐标矩阵,再转化成如图所示的4个分块矩阵,最后对矩阵A与其转置进行乘法运算:

scala> import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

scala> import org.apache.spark.mllib.linalg.distributed.BlockMatrix

import org.apache.spark.mllib.linalg.distributed.BlockMatrix

// 创建8个矩阵项,每一个矩阵项都是由索引和值构成的三元组

scala> val ent1 = new MatrixEntry(0,0,1)

...

scala> val ent2 = new MatrixEntry(1,1,1)

...

scala> val ent3 = new MatrixEntry(2,0,-1)

...

scala> val ent4 = new MatrixEntry(2,1,2)

...

scala> val ent5 = new MatrixEntry(2,2,1)

...

scala> val ent6 = new MatrixEntry(3,0,1)

...

scala> val ent7 = new MatrixEntry(3,1,1)

...

scala> val ent8 = new MatrixEntry(3,3,1)

...

// 创建RDD[MatrixEntry]

scala> val entries : RDD[MatrixEntry] = sc.parallelize(Array(ent1,ent2,ent3,ent4,ent5,ent6,ent7,ent8))

entries: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = ParallelCollectionRDD[21] at parallelize at :57

// 通过RDD[MatrixEntry]创建一个坐标矩阵

scala> val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)

coordMat: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@31c5fb43

// 将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入

val matA: BlockMatrix = coordMat.toBlockMatrix(2,2).cache()

matA: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@26b1df2c

// 可以用validate()方法判断是否分块成功

matA.validate()

构建成功后,可通过toLocalMatrix转换成本地矩阵,并查看其分块情况:

scala> matA.toLocalMatrix

res31: org.apache.spark.mllib.linalg.Matrix =

1.0  0.0  0.0  0.0

0.0  1.0  0.0  0.0

-1.0  2.0  1.0  0.0

1.0  1.0  0.0  1.0

// 查看其分块情况

scala> matA.numColBlocks

res12: Int = 2

scala> matA.numRowBlocks

res13: Int = 2

// 计算矩阵A和其转置矩阵的积矩阵

scala> val ata = matA.transpose.multiply(matA)

ata: org.apache.spark.mllib.linalg.distributed.BlockMatrix = org.apache.spark.mllib.linalg.distributed.BlockMatrix@3644e451

scala> ata.toLocalMatrix

res1: org.apache.spark.mllib.linalg.Matrix =

3.0  -1.0  -1.0  1.0

-1.0  6.0  2.0  1.0

-1.0  2.0  1.0  0.0

1.0  1.0  0.0  1.0

分块矩阵BlockMatrix将矩阵分成一系列矩阵块,底层由矩阵块构成的RDD来进行数据存储。值得指出的是,用于生成分布式矩阵的底层RDD必须是已经确定(Deterministic)的,因为矩阵的尺寸将被存储下来,所以使用未确定的RDD将会导致错误。而且,不同类型的分布式矩阵之间的转换需要进行一个全局的shuffle操作,非常耗费资源。所以,根据数据本身的性质和应用需求来选取恰当的分布式矩阵存储类型是非常重要的。

参考路径:http://dblab.xmu.edu.cn/blog/1175/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352