Paste_Image.png
一、方法一
需要两步完成, Mi,kNk,j
1.Map阶段,对于矩阵 M,把列作为 key,对于矩阵 N 把行作为 key
2.Reduce阶段,对于相同 key 的值,M 矩阵和 N 矩阵的值做笛卡尔积,输出 【(M 的行)+ (N 的列值)+ (MN相乘 value 值)】
public static class ReadMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key ,Text value,Context context) throws IOException, InterruptedException{
String str=value.toString(); String [] strs=str.split(" ");
if(strs[0].equals("M")&&strs.length==4){//如果是 M矩 阵,则以 j 作为 key
context.write(new Text(strs[2]),new Text(strs[0]+" "+strs[1]+" "+strs[3]));
} else if(strs[0].equals("N")&&strs.length==4){
context.write(new Text(strs[1]),new Text(strs[0]+" "+strs[2]+" "+strs[3]));
}
}
}
public static class WriteReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
//定义连个 ArrayList 第一个存放 M,第二个存放 N
ArrayList<String> mTrix=new ArrayList<String>();
ArrayList<String> nTrix=new ArrayList<String>();
for(Text value : values){
if((value.toString()).contains("M")){
mTrix.add(value.toString());
} else {
nTrix.add(value.toString());
}
}
String[] mItems, nItems;
//进行合并计算
for(String m : mTrix){
mItems=m.split(" ");
for(String n : nTrix){
nItems=n.split(" ");
}
context.write(new Text(key+"+"+mItems[1]+"+"+nItems[1]+"+"+(Integer.parseInt(nIt ems[2])*Integer.parseInt(mItems[2]))+"+"),new Text(""));
}
}
}
4.map 阶段读取上一阶段 reduce 产生的 i,j,value 值
5.把所有相同 key 的 value 值相加,输出即可
public static class ReadMapper1 extends Mapper<Object, Text, Text, Text>{
public void map(Object key ,Text value,Context context)
throws IOException, InterruptedException{
String str=value.toString();
String [] strs=str.split("\\+");
if(strs.length>=4){
//如果是 M 矩阵,则以 j 作为 key
context.write(new Text(strs[1]+" "+strs[2]),new Text(strs[3]));
}
}
}
public static class WriteReducer1 extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int num=0;
for(Text value : values){
num+=Integer.parseInt(value.toString());
}
context.write(key,new Text(num+""));
}
}
Paste_Image.png
二、方法二
(1)在 Map 阶段,把来自表 M 的元素 Aij,标识成 l 条<key, value>的形式。其中 key= (i,k),k=0,1,..l-1,value=(M,j,aij);把来自表 N 的元素 Bij,标识成 m 条<key,value>形式,其中 key=(i,k),i=0,1,...,m-1,value=(‘N’,j,Bjk)。 于是乎,在Map 阶段,我们实现了这样的战术目的:通过 key,我们把参与计算 Cik 的数据 归为一类。通过 value,我们能区分元素是来自 M 还是 N,以及具体的位置。
(2)在 Reduce阶段,将上一阶段的 map 输出值,找到 MN 矩阵的对应位置相乘计入 result 中,最后输出(i,j,result)
public class SigleStep {
public static final int WIDTH = 2;
public static final int LENGTH = 2;
public static final int MATRIX_K = 3;
public static class MatrixMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(" ");
if (values[0].equals("M")) {
for (int i = 0; i < LENGTH; i++) { //把矩阵每个元素转换为 i,k,M,j,v 形式
context.write(new Text(values[1]+" "+i), new Text(values[0]+" "+values[2]+" "+values[3]));
}
} else {
for(int i=0;i<WIDTH;i++){
//把矩阵每个元素转换为 i,k,M,j,v 形式
context.write(new Text(i+" "+values[2]), new Text(values[0]+" "+values[1]+" "+values[3]));
}
}
}
}
public static class MatrixReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//对获得的五元组进行处理
Integer[] m_matrix = new Integer[MATRIX_K];
Integer[] n_matrix = new Integer[MATRIX_K];
int i = 0;
for (Text value : values) {
String temp[] = value.toString().split(" ");
if (temp[0].equals("M")) {//如果是 M 矩阵
m_matrix[Integer.parseInt(temp[1])] = Integer.parseInt(temp[2]
);
} else {
n_matrix[Integer.parseInt(temp[1])] = Integer.parseInt(temp[2]
);
}
}
//对两个矩阵进行相乘相加 int result = 0;
for (i = 0; i < MATRIX_K; i++) {
if (m_matrix[i] != null && n_matrix[i] != null) result += m_matrix[i] * n_matrix[i];
}
System.out.println(key + "+++++++" + result);
context.write(key, new
Text(result + "")
);
}
}
}
二、scala版本的矩阵相乘,第一次写spark程序,可能有问题
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by chh on 2016/5/21.
*/
object Matrix {
def main(args :Array[String]): Unit = {
//创建一个scala版本的SparkContext
val conf =new SparkConf().setAppName("Matric").setMaster("local")
val sc=new SparkContext(conf)
val input =sc.textFile("F:\\url.txt")
val M=input.filter(line => line.contains("M"))
val N =input.filter(line => line.contains("N"))
//Map阶段,对于矩阵 M,把列作为 key,对于矩阵 N 把行作为 key
val wordsM=M.map(line => {
val words=line.split(" ")
(words(2),words)
})
val wordsN=N.map(line => {
val words=line.split(" ")
Tuple2(words(1),words)
})
// .Reduce阶段,对于相同 key 的值,M 矩阵和 N 矩阵的值做笛卡尔积,
// 输出 【(M 的行)+ (N 的列值)+ (MN相乘 value 值)】
val dwords=wordsM.join(wordsN)
// dwords.values.foreach(println)
val map=dwords.values.map( x=> {
(x._1(1)+" "+ x._2(2),x._1(3).toDouble*x._2(3).toDouble)
})
val reduce=map.reduceByKey((x,y)=>{
x+y
})
reduce.foreach( x => {
println(x._1+" "+x._2 )
})
}
}