数据算法 Hadoop/Spark大数据处理---第七章

本章欲解决的问题

购物篮问题是一个流行的数据挖掘计算,常用这个技术来揭示不用商品或商品组之间的相似度

本章共有三种实现方式

  1. 基于传统mapreduce实现
  2. 基于spark实现--能求得相关数据的依懒性
  3. 基于传统Scala实现

基于传统mapreduce实现

//在MBADriver中都是一般的定义,主要亮点在Combiner
job.setMapperClass(MBAMapper.class);
//Combiner用于在map端将相用的key进行分组,之后再传到reduce,减少网络传输的数量
job.setCombinerClass(MBAReducer.class);
job.setReducerClass(MBAReducer.class);


//map函数
 @Override
   public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {

      // input line
      String line = value.toString();
      //convertItemsToList将每一个行转换成list<String>
      List<String> items = convertItemsToList(line);
      if ((items == null) || (items.isEmpty())) {
         // no mapper output will be generated
         return;
      }
      //获得按照指定数目的已排序的输出
      generateMapperOutput(numberOfPairs, items, context);
   }
   
   private static List<String> convertItemsToList(String line) {
      if ((line == null) || (line.length() == 0)) {
         // no mapper output will be generated
         return null;
      }      
      String[] tokens = StringUtils.split(line, ",");   
      if ( (tokens == null) || (tokens.length == 0) ) {
         return null;
      }
      List<String> items = new ArrayList<String>();         
      for (String token : tokens) {
         if (token != null) {
             items.add(token.trim());
         }         
      }         
      return items;
   }
   
   
   private void generateMapperOutput(int numberOfPairs, List<String> items, Context context) 
      throws IOException, InterruptedException {
      //items为源数据,numberOfPairs为取多少个
      List<List<String>> sortedCombinations = Combination.findSortedCombinations(items, numberOfPairs);
      for (List<String> itemList: sortedCombinations) {
         System.out.println("itemlist="+itemList.toString());
         reducerKey.set(itemList.toString());
         context.write(reducerKey, NUMBER_ONE);
      }   
   }

//reduce为普通的wordCount
@Override
   public void reduce(Text key, Iterable<IntWritable> values, Context context) 
      throws IOException, InterruptedException {
      int sum = 0; // total items paired
      for (IntWritable value : values) {
         sum += value.get();
      }
      context.write(key, new IntWritable(sum));
   }


//findSortedCombinations函数用于在指定element获取N个
public static <T extends Comparable<? super T>> List<List<T>> findSortedCombinations(Collection<T> elements, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        
        if (n == 0) {
            result.add(new ArrayList<T>());
            return result;
        }
        
        List<List<T>> combinations = findSortedCombinations(elements, n - 1);
        for (List<T> combination: combinations) {
            for (T element: elements) {
                if (combination.contains(element)) {
                    continue;
                }
                
                List<T> list = new ArrayList<T>();
                list.addAll(combination);
                
                if (list.contains(element)) {
                    continue;
                }
                
                list.add(element);
                //sort items not to duplicate the items
                //   example: (a, b, c) and (a, c, b) might become  
                //   different items to be counted if not sorted   
                Collections.sort(list);
                
                if (result.contains(list)) {
                    continue;
                }
                
                result.add(list);
            }
        }
        
        return result;
    }

基于spark实现--能求得相关数据的依懒性

public static void main(String[] args) throws Exception {
      // STEP-1: handle input parameters
      if (args.length < 1) {
         System.err.println("Usage: FindAssociationRules <transactions>");
         System.exit(1);
      }
      String transactionsFileName =  args[0];

      // STEP-2: create a Spark context object
      JavaSparkContext ctx = new JavaSparkContext();
       
      // STEP-3: read all transactions from HDFS and create the first RDD 
      JavaRDD<String> transactions = ctx.textFile(transactionsFileName, 1);
      transactions.saveAsTextFile("/rules/output/1");

      // STEP-4: generate frequent patterns
      // PairFlatMapFunction<T, K, V>     
      // T => Iterable<Tuple2<K, V>>

       /**
        * 输出格式为([a],1),([b],1)([a,b],1)([a,b,c],1)([b,d],1)等
        */
       JavaPairRDD<List<String>,Integer> patterns =
         transactions.flatMapToPair(new PairFlatMapFunction<
                                                             String,        // T
                                                             List<String>,  // K
                                                             Integer        // V
                                                           >() {
         @Override
         public Iterator<Tuple2<List<String>,Integer>> call(String transaction) {
            List<String> list = Util.toList(transaction);
            //传入findSortedCombinations时不指定获取的参数N,则取得全部
            List<List<String>> combinations = Combination.findSortedCombinations(list);
            List<Tuple2<List<String>,Integer>> result = new ArrayList<Tuple2<List<String>,Integer>>();
            for (List<String> combList : combinations) {
                 if (combList.size() > 0) {
                     //把全部的组合赋上一次
                   result.add(new Tuple2<List<String>,Integer>(combList, 1));
                 }
            }
            return result.iterator();
         }
      });    
      patterns.saveAsTextFile("/rules/output/2");
    
      // 对key相同的进行聚合
      JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() {
         @Override
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });    
      combined.saveAsTextFile("/rules/output/3");
    
      // now, we have: patterns(K,V)
      //      K = pattern as List<String>
      //      V = frequency of pattern
      // now given (K,V) as (List<a,b,c>, 2) we will 
      // generate the following (K2,V2) pairs:
      //
      //   (List<a,b,c>, T2(null, 2))
      //   (List<a,b>,   T2(List<a,b,c>, 2))
      //   (List<a,c>,   T2(List<a,b,c>, 2))
      //   (List<b,c>,   T2(List<a,b,c>, 2))


      // STEP-6: generate all sub-patterns
      // PairFlatMapFunction<T, K, V>     
      // T => Iterable<Tuple2<K, V>>
       /**
        * 输出的类型大概:([a,b],(null,2))([b],([a,b],2)) ([a],([a,b],2))
        * ([a,b,d],(null,1)) ([b,d],([a,b,d],1)) ([a,d],([a,b,d],1)) ([a,b],([a,b,d],1))
        */
      JavaPairRDD<List<String>,Tuple2<List<String>,Integer>> subpatterns = 
         combined.flatMapToPair(new PairFlatMapFunction<
          Tuple2<List<String>, Integer>,   // T
          List<String>,                    // K
          Tuple2<List<String>,Integer>     // V
        >() {
       @Override
       public Iterator<Tuple2<List<String>,Tuple2<List<String>,Integer>>> 
          call(Tuple2<List<String>, Integer> pattern) {
            List<Tuple2<List<String>,Tuple2<List<String>,Integer>>> result = 
               new ArrayList<Tuple2<List<String>,Tuple2<List<String>,Integer>>>();
            List<String> list = pattern._1;
            Integer frequency = pattern._2;
            result.add(new Tuple2(list, new Tuple2(null,frequency)));
            if (list.size() == 1) {
               return result.iterator();
            }
            
            // pattern has more than one items
            // result.add(new Tuple2(list, new Tuple2(null,size)));
            for (int i=0; i < list.size(); i++) {
                //removeOneItem用于删除掉list中的一个值并返回list
               List<String> sublist = Util.removeOneItem(list, i);
               result.add(new Tuple2(sublist, new Tuple2(list, frequency)));
            }
            return result.iterator();
        }
      });
      subpatterns.saveAsTextFile("/rules/output/4");
        
      // 将key进行分组
       /**
        * 输出的格式为:([a,c],[([a,b,c],1),(null,1)]) --- key:[a,c]
        *               ([a,b,c],[(null,1)])            --- key:[a,b,c]
        *               ([b,c],[(null,3),([a,b,c],1)])  --- key:[b,c]
        */
      JavaPairRDD<List<String>,Iterable<Tuple2<List<String>,Integer>>> rules = subpatterns.groupByKey();       
      rules.saveAsTextFile("/rules/output/5");

      // STEP-7: generate association rules      
      // Now, use (K=List<String>, V=Iterable<Tuple2<List<String>,Integer>>) 
      // to generate association rules
      // JavaRDD<R> map(Function<T,R> f)
      // Return a new RDD by applying a function to all elements of this RDD.
       /**
        * 输出的格式为:[([a,b],[d],0.5),([a,b],[c],0.5)]
        *              []
        *              [([c],[b],1.0),([c],[a],0.33333)]
        */
      JavaRDD<List<Tuple3<List<String>,List<String>,Double>>> assocRules = rules.map(new Function<
          Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>>,     // T: input 
          List<Tuple3<List<String>,List<String>,Double>>                   // R: ( ac => b, 1/3): T3(List(a,c), List(b),  0.33)
                                                                           //    ( ad => c, 1/3): T3(List(a,d), List(c),  0.33)
         >() {
        @Override
        public List<Tuple3<List<String>,List<String>,Double>> call(Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>> in) {
            List<Tuple3<List<String>,List<String>,Double>> result = 
               new ArrayList<Tuple3<List<String>,List<String>,Double>>();
            //([a,c],[([a,b,c],1),(null,1)])中的[a,c]
            List<String> fromList = in._1;
            // //([a,c],[([a,b,c],1),(null,1)])中的[([a,b,c],1),(null,1)
            Iterable<Tuple2<List<String>,Integer>> to = in._2;
            List<Tuple2<List<String>,Integer>> toList = new ArrayList<Tuple2<List<String>,Integer>>();
            Tuple2<List<String>,Integer> fromCount = null;
            for (Tuple2<List<String>,Integer> t2 : to) {
               // find the "count" object
               if (t2._1 == null) {
                   //fromCount用于记录总的值count
                    fromCount = t2;
               }
               else {
                  toList.add(t2);
               }
            }
            
            // Now, we have the required objects for generating association rules:
            //  "fromList", "fromCount", and "toList"
            if (toList.isEmpty()) {
               // no output generated, but since Spark does not like null objects, we will fake a null object
               return result; // an empty list
            } 
            
            // now using 3 objects: "from", "fromCount", and "toList",
            // create association rules:
            for (Tuple2<List<String>,Integer>  t2 : toList) {
               double confidence = (double) t2._2 / (double) fromCount._2;
               List<String> t2List = new ArrayList<String>(t2._1);
               //把 t2List中关于fromList的元素全部的删除
               t2List.removeAll(fromList);
               //定义输出格式
               result.add(new Tuple3(fromList, t2List, confidence));
            }
          return result;
        }
      });   
      assocRules.saveAsTextFile("/rules/output/6");

      // done
      ctx.close(); 
      
      System.exit(0);
   }

基于Scala实现

//基于Scala的实现方式
  def main(args: Array[String]): Unit = {

    if (args.size < 2) {
      println("Usage: FindAssociationRules <input-path> <output-path>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("market-basket-analysis")
    val sc = new SparkContext(sparkConf)

    val input = args(0)
    val output = args(1)

    val transactions = sc.textFile(input)

    val patterns = transactions.flatMap(line => {
      val items = line.split(",").toList
      //对每一条读取的transactions根据逗号分隔之后赋值上1
      (0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)
    }).map((_, 1))
    //对key进行组合
    val combined = patterns.reduceByKey(_ + _)
    //生成所有的子模式
    val subpatterns = combined.flatMap(pattern => {
      val result = ListBuffer.empty[Tuple2[List[String], Tuple2[List[String], Int]]]
      //第一个赋值为(null,count)
      result += ((pattern._1, (Nil, pattern._2)))

      val sublist = for {
        i <- 0 until pattern._1.size
        //获取比pattern._1.size少一个的list
        xs = pattern._1.take(i) ++ pattern._1.drop(i + 1)
        if xs.size > 0
      } yield (xs, (pattern._1, pattern._2))
      result ++= sublist
      result.toList
    })
    //对组合之后的可以进行分组
    val rules = subpatterns.groupByKey()

    val assocRules = rules.map(in => {
      //获得总数count
      val fromCount = in._2.find(p => p._1 == Nil).get
      val toList = in._2.filter(p => p._1 != Nil).toList
      if (toList.isEmpty) Nil
      else {
        val result =
          for {
            t2 <- toList
          //获得比率
            confidence = t2._2.toDouble / fromCount._2.toDouble
          //t2._1去点与in._1相同的element
            difference = t2._1 diff in._1
          } yield (((in._1, difference, confidence)))
        result
      }
    })

    // Formatting the result just for easy reading.
    val formatResult = assocRules.flatMap(f => {
      f.map(s => (s._1.mkString("[", ",", "]"), s._2.mkString("[", ",", "]"), s._3))
    })
    formatResult.saveAsTextFile(output)

    // done!
    sc.stop()
  }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容