Hadoop和Spark实现矩阵相乘

Paste_Image.png

一、方法一

需要两步完成, Mi,kNk,j
1.Map阶段,对于矩阵 M,把列作为 key,对于矩阵 N 把行作为 key
2.Reduce阶段,对于相同 key 的值,M 矩阵和 N 矩阵的值做笛卡尔积,输出 【(M 的行)+ (N 的列值)+ (MN相乘 value 值)】

public static class ReadMapper extends Mapper<Object, Text, Text, Text>{
    public void map(Object key ,Text value,Context context) throws IOException, InterruptedException{
        String str=value.toString(); String [] strs=str.split(" ");
    if(strs[0].equals("M")&&strs.length==4){//如果是 M矩 阵,则以 j 作为 key
         context.write(new Text(strs[2]),new Text(strs[0]+" "+strs[1]+" "+strs[3]));
        } else if(strs[0].equals("N")&&strs.length==4){
        context.write(new Text(strs[1]),new Text(strs[0]+" "+strs[2]+" "+strs[3]));
    }

    }
}
    public static class WriteReducer extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            //定义连个 ArrayList 第一个存放 M,第二个存放 N
            ArrayList<String> mTrix=new ArrayList<String>();
            ArrayList<String> nTrix=new ArrayList<String>();
            for(Text value : values){
                if((value.toString()).contains("M")){
                    mTrix.add(value.toString());
                } else {
                    nTrix.add(value.toString());
                }
            }
            String[] mItems, nItems;
            //进行合并计算
            for(String m : mTrix){
                mItems=m.split(" ");
                for(String n : nTrix){
                    nItems=n.split(" ");
                }
                context.write(new Text(key+"+"+mItems[1]+"+"+nItems[1]+"+"+(Integer.parseInt(nIt ems[2])*Integer.parseInt(mItems[2]))+"+"),new Text(""));
            }
        }
    }
   

4.map 阶段读取上一阶段 reduce 产生的 i,j,value 值
5.把所有相同 key 的 value 值相加,输出即可

public static class ReadMapper1 extends Mapper<Object, Text, Text, Text>{
    public void map(Object key ,Text value,Context context)
        throws IOException, InterruptedException{
        String str=value.toString();
        String [] strs=str.split("\\+");
        if(strs.length>=4){
        //如果是 M 矩阵,则以 j 作为 key
        context.write(new Text(strs[1]+" "+strs[2]),new Text(strs[3]));
        }
    }
}
public static class WriteReducer1 extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
        int num=0;
        for(Text value : values){
            num+=Integer.parseInt(value.toString());
        }
        context.write(key,new Text(num+""));
     }
}
Paste_Image.png

二、方法二

(1)在 Map 阶段,把来自表 M 的元素 Aij,标识成 l 条<key, value>的形式。其中 key= (i,k),k=0,1,..l-1,value=(M,j,aij);把来自表 N 的元素 Bij,标识成 m 条<key,value>形式,其中 key=(i,k),i=0,1,...,m-1,value=(‘N’,j,Bjk)。 于是乎,在Map 阶段,我们实现了这样的战术目的:通过 key,我们把参与计算 Cik 的数据 归为一类。通过 value,我们能区分元素是来自 M 还是 N,以及具体的位置。
(2)在 Reduce阶段,将上一阶段的 map 输出值,找到 MN 矩阵的对应位置相乘计入 result 中,最后输出(i,j,result)

public class SigleStep {
    public static final int WIDTH = 2;
    public static final int LENGTH = 2;
    public static final int MATRIX_K = 3;

    public static class MatrixMapper extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split(" ");
            if (values[0].equals("M")) {
                for (int i = 0; i < LENGTH; i++) { //把矩阵每个元素转换为 i,k,M,j,v 形式 
                      context.write(new Text(values[1]+" "+i), new Text(values[0]+" "+values[2]+" "+values[3])); 
                  }
             } else {
                  for(int i=0;i<WIDTH;i++){ 
                 //把矩阵每个元素转换为 i,k,M,j,v 形式 
                   context.write(new Text(i+" "+values[2]), new Text(values[0]+" "+values[1]+" "+values[3]));
                }
            }
        }
    }

    public static class MatrixReducer extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //对获得的五元组进行处理 
            Integer[] m_matrix = new Integer[MATRIX_K];
            Integer[] n_matrix = new Integer[MATRIX_K];
            int i = 0;
            for (Text value : values) {
                String temp[] = value.toString().split(" ");
                if (temp[0].equals("M")) {//如果是 M 矩阵
                    m_matrix[Integer.parseInt(temp[1])] = Integer.parseInt(temp[2]
                    );
                } else {
                    n_matrix[Integer.parseInt(temp[1])] = Integer.parseInt(temp[2]
                    );
                }
            }
            //对两个矩阵进行相乘相加 int result = 0;
            for (i = 0; i < MATRIX_K; i++) {
                if (m_matrix[i] != null && n_matrix[i] != null) result += m_matrix[i] * n_matrix[i];
            }
            System.out.println(key + "+++++++" + result);
            context.write(key, new
                    Text(result + "")
            );
        }
    }
}

二、scala版本的矩阵相乘,第一次写spark程序,可能有问题

import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by chh on 2016/5/21.
  */
object Matrix {
  def main(args :Array[String]): Unit = {
    //创建一个scala版本的SparkContext
    val conf =new SparkConf().setAppName("Matric").setMaster("local")
    val sc=new SparkContext(conf)
    val input =sc.textFile("F:\\url.txt")
    val M=input.filter(line => line.contains("M"))
    val N =input.filter(line => line.contains("N"))
    //Map阶段,对于矩阵 M,把列作为 key,对于矩阵 N 把行作为 key
    val wordsM=M.map(line => {
      val words=line.split(" ")
      (words(2),words)
    })
    val wordsN=N.map(line => {
      val words=line.split(" ")
      Tuple2(words(1),words)
    })
     // .Reduce阶段,对于相同 key 的值,M 矩阵和 N 矩阵的值做笛卡尔积,
    // 输出 【(M 的行)+ (N 的列值)+ (MN相乘 value 值)】
    val dwords=wordsM.join(wordsN)
   // dwords.values.foreach(println)
   val map=dwords.values.map( x=> {
       (x._1(1)+" "+ x._2(2),x._1(3).toDouble*x._2(3).toDouble)
   })
    val reduce=map.reduceByKey((x,y)=>{
        x+y
    })
    reduce.foreach( x => {
      println(x._1+" "+x._2 )
    })
  }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容