数据算法 Hadoop/Spark大数据处理---第四章

本章主要介绍左外连接(LEFT JOIN)

共介绍了六种的实现方式

  1. 用传统的mapreduce()方法
  2. 用传统的spark方法
  3. 用spark的leftOutJoin()方法
  4. 用传统的Scala实现
  5. 用Scala的leftOutJoin()方法
  6. 用Scala高效的DataFrame实现

本章数据的输入输出及目标结果

  • 输入user表(user_id,location_id)
  • 输入transaction表(transaction_id,product_id,user_id,quantity,amount)
  • 输出为(product_id,{distinct<location_id> as L, L.size})

运行结果的SQL表示

  • SELECT product_id,location_id FROM transactions LEFT OUTER JOIN users ON transactions.user_ID = users.user_ID
  • SELECT product_id,count(distinct location_id) FROM transactions LEFT OUTER JOIN users ON transactions.user_ID = users.user_ID group by product_id

++使用传统MapReduce实现++

header 1 header 2
LeftJoinDriver 提交阶段1作业的驱动器
LeftJoinReducer 左连接归约器
LeftJoinTransactionMapper 左连接交易映射器
LeftJoinUserMapper 左连接用户映射器
SecondarySortPartitioner 对自然键分区
SecondarySortGroupComparator 对自然键分组
LocationCountDriver 提交阶段2作业的驱动器
LocationCountMapper 定义map()完成地址统计
LocationCountReducer 定义reduce()完成地址统计
首选为两个映射器
//可看成映射的规则为加上识别字段
//组成的格式为{(user,1),("L",product)}这样的key-value格式
//UserMapper 
 public void map(LongWritable key, Text value, Context context) 
      throws java.io.IOException, InterruptedException {
      String[] tokens = StringUtils.split(value.toString(), "\t");
      if (tokens.length == 2) {
         // tokens[0] = user_id
         // tokens[1] = location_id
         // to make sure location arrives before products
         outputKey.set(tokens[0], "1");    // set user_id
         outputValue.set("L", tokens[1]);  // set location_id
         context.write(outputKey, outputValue);
      }
   }
 
 //TransactionMapper 
 public void map(LongWritable key, Text value, Context context) 
      throws java.io.IOException, InterruptedException {
      String[] tokens = StringUtils.split(value.toString(), "\t");
      String productID = tokens[1];
      String userID = tokens[2];
      // make sure products arrive at a reducer after location
      outputKey.set(userID, "2");
      outputValue.set("P", productID);
      context.write(outputKey, outputValue);
   }

先进行分区,之后对userID进行分组
 @Override
    public int compare(PairOfStrings first, PairOfStrings second) {
       return first.getLeftElement().compareTo(second.getLeftElement());
    }
    
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2 ) {
        DataInputBuffer buffer = new DataInputBuffer();
        PairOfStrings a = new PairOfStrings();
        PairOfStrings b = new PairOfStrings();
        try {
            buffer.reset(b1, s1, l1);
            a.readFields(buffer);
            buffer.reset(b2, s2, l2);
            b.readFields(buffer);
            return compare(a,b);  
        } 
        catch(Exception ex) {
            return -1;
        }  
    }

reduce函数进行归约,第一阶段完成
//据书上说USERID因为1<2,所以每次想用ID的location_id会先到达,
//也即{location_id,produce_id……}这样,发射出去的为{location_id,produce_id}
//也即相用location_id的商品的组

 @Override
   public void reduce(PairOfStrings key, Iterable<PairOfStrings> values, Context context) 
      throws java.io.IOException, InterruptedException {
      
      Iterator<PairOfStrings> iterator = values.iterator();
      if (iterator.hasNext()) {
         // firstPair must be location pair
         PairOfStrings firstPair = iterator.next(); 
         System.out.println("firstPair="+firstPair.toString());
         if (firstPair.getLeftElement().equals("L")) {
            locationID.set(firstPair.getRightElement());
         }
      }      
                 
      while (iterator.hasNext()) {
         // the remaining elements must be product pair
         PairOfStrings productPair = iterator.next(); 
         System.out.println("productPair="+productPair.toString());
         productID.set(productPair.getRightElement());
         context.write(productID, locationID);
      }
   }
第二阶段map为普通的map,reduce类似wordcount进行计数,求唯一值,用HashSet
 @Override
    public void reduce(Text productID, Iterable<Text> locations, Context context)
        throws  IOException, InterruptedException {
        //用set求出唯一值
        Set<String> set = new HashSet<String>();
        //把location设置进去,去掉重复
        for (Text location: locations) {
           set.add(location.toString());
        }
        //输入商品ID和location_ID
        context.write(productID, new LongWritable(set.size()));
    }

在驱动器类中同时驱动两个mapper

      MultipleInputs.addInputPath(job, transactions, TextInputFormat.class, LeftJoinTransactionMapper.class);
      MultipleInputs.addInputPath(job, users, TextInputFormat.class, LeftJoinUserMapper.class);


++使用传统spark实现++

使用javaRDD.union()函数返回两个javaRDD的并集(合并用户RDD和交易RDD)

public static void main(String[] args) throws Exception {
    //读取输入参数
    if (args.length < 2) {
       System.err.println("Usage: SparkLeftOuterJoin <users> <transactions>");
       System.exit(1);
    }
    String usersInputFile = args[0];
    String transactionsInputFile = args[1];
    System.out.println("users="+ usersInputFile);
    System.out.println("transactions="+ transactionsInputFile);
       
    JavaSparkContext ctx = new JavaSparkContext();
    //用用户创建一个RDD                
    JavaRDD<String> users = ctx.textFile(usersInputFile, 1);

    // 从一个RDD转换成一个RDD类似(user_id,("L",location))
    // PairFunction<T, K, V>    
    // T => Tuple2<K, V>
    JavaPairRDD<String,Tuple2<String,String>> usersRDD = 
          users.mapToPair(new PairFunction<
                                           String,                // T 
                                           String,                // K
                                           Tuple2<String,String>  // V
                                          >() {
      @Override
      public Tuple2<String,Tuple2<String,String>> call(String s) {
        String[] userRecord = s.split("\t");
        //构建Tuple2类
        Tuple2<String,String> location = new Tuple2<String,String>("L", userRecord[1]);
        return new Tuple2<String,Tuple2<String,String>>(userRecord[0], location);
      }
    });
    
    //为订单创建一个RDD    
    JavaRDD<String> transactions = ctx.textFile(transactionsInputFile, 1);

    // PairFunction<T, K, V>    
    // T => Tuple2<K, V>
    //生成一个类似(user_id,("P",product))
    JavaPairRDD<String,Tuple2<String,String>> transactionsRDD = 
          transactions.mapToPair(new PairFunction<String, String, Tuple2<String,String>>() {
      @Override
      public Tuple2<String,Tuple2<String,String>> call(String s) {
        String[] transactionRecord = s.split("\t");
        Tuple2<String,String> product = new Tuple2<String,String>("P", transactionRecord[1]);
        return new Tuple2<String,Tuple2<String,String>>(transactionRecord[2], product);
      }
    });
    
    // union() 函数合并两个RDD
    JavaPairRDD<String,Tuple2<String,String>> allRDD = transactionsRDD.union(usersRDD);
    
    // 对userID进行排序
    // 变成专业<userID, List[T2("L", location), T2("P", p1), T2("P", p2), T2("P", p3), 
    JavaPairRDD<String, Iterable<Tuple2<String,String>>> groupedRDD = allRDD.groupByKey(); 
   
    // PairFlatMapFunction<T, K, V> 
    // T => Iterable<Tuple2<K, V>>
    JavaPairRDD<String,String> productLocationsRDD = 
         //                                               T                                                K       V 
         groupedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String,String>>>, String, String>() {
      @Override
      public Iterator<Tuple2<String,String>> call(Tuple2<String, Iterable<Tuple2<String,String>>> s) {
        // String userID = s._1;  // NOT Needed
        Iterable<Tuple2<String,String>> pairs = s._2;
        String location = "UNKNOWN";
        //保存product列表,开头为location
        List<String> products = new ArrayList<String>();
        for (Tuple2<String,String> t2 : pairs) {
            if (t2._1.equals("L")) {
                location = t2._2;
            }
            else {
                // t2._1.equals("P")
                products.add(t2._2);
            }
        }
        
        // now emit (K, V) pairs
        List<Tuple2<String,String>> kvList = new ArrayList<Tuple2<String,String>>();
        for (String product : products) {
            kvList.add(new Tuple2<String, String>(product, location));
        }
        return kvList.iterator();
      }
    });
    
    // 发射过来的是一个个的{product, location}需根据product分组
    JavaPairRDD<String, Iterable<String>> productByLocations = productLocationsRDD.groupByKey();    
    
    // debug3
    List<Tuple2<String, Iterable<String>>> debug3 = productByLocations.collect();
    System.out.println("--- debug3 begin ---");
    for (Tuple2<String, Iterable<String>> t2 : debug3) {
      System.out.println("debug3 t2._1="+t2._1);
      System.out.println("debug3 t2._2="+t2._2);
    }
    System.out.println("--- debug3 end ---");
    
    
    //对分组过后的value进行操作,这一步是引用传递
    JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByUniqueLocations = 
          productByLocations.mapValues(new Function< Iterable<String>,                   // input
                                                     Tuple2<Set<String>, Integer>        // output
                                                   >() {
      @Override
      public Tuple2<Set<String>, Integer> call(Iterable<String> s) {
        Set<String> uniqueLocations = new HashSet<String>();
        for (String location : s) {
            uniqueLocations.add(location);
        }
        return new Tuple2<Set<String>, Integer>(uniqueLocations, uniqueLocations.size());
      }
    });    
    
     // 打印最终的结果
    System.out.println("=== Unique Locations and Counts ===");
    List<Tuple2<String, Tuple2<Set<String>, Integer>>>  debug4 = productByUniqueLocations.collect();
    System.out.println("--- debug4 begin ---");
    for (Tuple2<String, Tuple2<Set<String>, Integer>> t2 : debug4) {
      System.out.println("debug4 t2._1="+t2._1);
      System.out.println("debug4 t2._2="+t2._2);
    }
    System.out.println("--- debug4 end ---");
    //productByUniqueLocations.saveAsTextFile("/left/output");
    System.exit(0);
  }

++使用传统spark的leftOuterJoin()实现++

  • 对users和transaction使用javapairRDD.union操作的话,开销太大
  • 引入定制标识“L”和"P"

而使用leftOuterJoin()的话,则高效生成结果,transaction是RDD左边,userRDD是右表
生成(u4,(p4,Optional.of(CA)))
(u5,(p4,Optional.of(GT)))……等等

//步骤8:根据leftOuterJoin(),把两个表相用的USERID连起来,生成(k,(v,some(w))
JavaPairRDD<String,Tuple2<String,Optional<String>>> joined = transactionsRDD.leftOuterJoin(usersRDD);

//步骤9:创建(product,location)
JavaPairRDD<String,String> products = joined.mapToPair(new PairFunctions<Tuple2<String,Tuple2<String,Optional<String>>>>,String,String)(){
    public Tuple2<String,String> call(Tuple2<String,Tuple2<String,Optional<String>>> t){
        Tuple2<String,Optional<String>> list = t._2;
        return new Tuple2<String,String>(list._1,list._2.get())
    }
//步骤10:完成与上述类似的groupByKey操作即可.之后<k=product,V=set<location>>
}

上述步骤10用combineBeKey的方法实现

用combineBeKey相对reduceByKey的区别reduceByKey可以将类型V的值归约为V,
而combineBeKey可以归约为C,例如本实例中:可以将整数(V)值转换成一个整数集(Set<Integer>。


//函数的一般签名,提供了三个参数
//createCombiner 将一个C转换成一个C(如:创建一个单元素列表)
//mergeValue,将一个V合并为一个C(如:将它增加到列表的末尾)
//mergeCombiners:合并两个C来创建一个新的C
public <C> JavaPairRDD<K,C> combineByKey(Funciton<V,C> createCombiner,Funciton2<C,V,C> mergeValue,Funciton2<C,C,C> mergeCombiners)

那么在本实例中,我们的目标为各个键创建一个Set<String>,即从String -> Set<String>

//三个基本函数的实现
Function<String,Set<String>> createCombiner = new Function<String,Set<String>>{
    public Set<String> call(String x){
        Set<String> set = new HashSet<String>();
        set.add(x);
        return set;
    }
}

Function2<Set<String>,String,Set<String>> mergerValue = new Function2<Set<String>,String,Set<String>>{
    public Set<String> call(Set<String> set,String x){
        set.add(x);
        return set;
    }
}

Function2<Set<String>,Set<String>,Set<String>> mergerCombiners = new Function2<Set<String>,Set<String>,Set<String>>{
    public Set<String> call(Set<String> a,Set<String> b){
        a.addAll(b);
        return a;
    }
}

//之后使用combineByKey()完成这一步
JavaPairRDD<String,Set<String>> productUniqueLocations = products.combineByKey(createCombiner,mergerValue,mergerCombiners);
//之后打印成map()
Map<String,Set<String>> productMap = productUniqueLocations.CollectAsMap();


++用传统的Scala实现++

def main(args: Array[String]): Unit = {
    if (args.size < 3) {
      println("Usage: LeftOuterJoin <users-data-path> <transactions-data-path> <output-path>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("LeftOuterJoin")
    val sc = new SparkContext(sparkConf)

    val usersInputFile = args(0)
    val transactionsInputFile = args(1)
    val output = args(2)

    val usersRaw = sc.textFile(usersInputFile)
    val transactionsRaw = sc.textFile(transactionsInputFile)
    //对location进行赋值,格式为(user_id,{“L”,location_id})
    val users = usersRaw.map(line => {
      val tokens = line.split("\t")
      (tokens(0), ("L", tokens(1))) // Tagging Locations with L
    })

    //对transaction进行赋值,格式为(user_id,{“P”,transaction_id})
    val transactions = transactionsRaw.map(line => {
      val tokens = line.split("\t")
      (tokens(2), ("P", tokens(1))) // Tagging Products with P
    })

    //用union函数合并两个RDD
    val all = users union transactions
    //对key相用的进行排序
    val grouped = all.groupByKey()

    val productLocations = grouped.flatMap {
      case (userId, iterable) =>
        // span 返回两个iterable
        val (location, products) = iterable span (_._1 == "L")
        //获得location
        val loc = location.headOption.getOrElse(("L", "UNKNOWN"))
        //toSet去掉重复值
        products.filter(_._1 == "P").map(p => (p._2, loc._2)).toSet
    }
    //对key进行排序
    val productByLocations = productLocations.groupByKey()
    //size获取数量
    val result = productByLocations.map(t => (t._1, t._2.size)) // Return (product, location count) tuple

    result.saveAsTextFile(output) // Saves output to the file.

    // done
    sc.stop()
  }

++用Scala的leftOutJoin()方法++

def main(args: Array[String]): Unit = {
    if (args.size < 3) {
      println("Usage: SparkLeftOuterJoin <users> <transactions> <output>")
      sys.exit(1)
    }
    
    val sparkConf = new SparkConf().setAppName("SparkLeftOuterJoin")
    val sc = new SparkContext(sparkConf)

    val usersInputFile = args(0)
    val transactionsInputFile = args(1)
    val output = args(2)

    val usersRaw = sc.textFile(usersInputFile)
    val transactionsRaw = sc.textFile(transactionsInputFile)

    val users = usersRaw.map(line => {
      val tokens = line.split("\t")
      (tokens(0), tokens(1))
    })

    val transactions = transactionsRaw.map(line => {
      val tokens = line.split("\t")
      (tokens(2), tokens(1))
    })

    val joined =  transactions leftOuterJoin users

    //返回的格式为k, (v, Some(w))),在这里只对value进行操作
    val productLocations = joined.values.map(f => (f._1, f._2.getOrElse("unknown"))) 
  
    val productByLocations = productLocations.groupByKey()

    //对value进行set去除重复值
    val productWithUniqueLocations = productByLocations.mapValues(_.toSet) // Converting toSet removes duplicates.
    
    val result = productWithUniqueLocations.map(t => (t._1, t._2.size)) // Return (product, location count) tuple.
    
    result.saveAsTextFile(output) // Saves output to the file.

    // done
    sc.stop()
  }

++用Scala高效的DataFrame实现++

def main(args: Array[String]): Unit = {
    if (args.size < 3) {
      println("Usage: DataFrameLeftOuterJoin <users-data-path> <transactions-data-path> <output-path>")
      sys.exit(1)
    }

    val usersInputFile = args(0)
    val transactionsInputFile = args(1)
    val output = args(2)

    val sparkConf = new SparkConf()

    // Use for Spark 1.6.2 or below
    // val sc = new SparkContext(sparkConf)
    // val spark = new SQLContext(sc) 

    // Use below for Spark 2.0.0
    val spark = SparkSession
      .builder()
      .appName("DataFram LeftOuterJoin")
      .config(sparkConf)
      .getOrCreate()

    // Use below for Spark 2.0.0
    val sc = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.types._

    // 定义用户模型
    val userSchema = StructType(Seq(
      StructField("userId", StringType, false),
      StructField("location", StringType, false)))

    //定义交易模型
    val transactionSchema = StructType(Seq(
      StructField("transactionId", StringType, false),
      StructField("productId", StringType, false),
      StructField("userId", StringType, false),
      StructField("quantity", IntegerType, false),
      StructField("price", DoubleType, false)))

    def userRows(line: String): Row = {
      val tokens = line.split("\t")
      Row(tokens(0), tokens(1))
    }

    def transactionRows(line: String): Row = {
      val tokens = line.split("\t")
      Row(tokens(0), tokens(1), tokens(2), tokens(3).toInt, tokens(4).toDouble)
    }

    val usersRaw = sc.textFile(usersInputFile) // Loading user data
    val userRDDRows = usersRaw.map(userRows(_)) // Converting to RDD[org.apache.spark.sql.Row]
    //获得用户模型RDD
    val users = spark.createDataFrame(userRDDRows, userSchema) // obtaining DataFrame from RDD

    val transactionsRaw = sc.textFile(transactionsInputFile) // Loading transactions data
    val transactionsRDDRows = transactionsRaw.map(transactionRows(_)) // Converting to  RDD[org.apache.spark.sql.Row]
    //获得交易模型RDD
    val transactions = spark.createDataFrame(transactionsRDDRows, transactionSchema) // obtaining DataFrame from RDD

    //
    // Approach 1 using DataFrame API
    // 连接两个表,条件是userId相同
    val joined = transactions.join(users, transactions("userId") === users("userId")) // performing join on on userId
    joined.printSchema() //Prints schema on the console
    //在很多的条目中选择productId和location
    val product_location = joined.select(joined.col("productId"), joined.col("location")) // Selecting only productId and location
    val product_location_distinct = product_location.distinct // Getting only disting values、
    //按照productId进行分组
    val products = product_location_distinct.groupBy("productId").count()
    products.show() // Print first 20 records on the console
    products.write.save(output + "/approach1") // Saves output in compressed Parquet format, recommended for large projects.
    products.rdd.saveAsTextFile(output + "/approach1_textFormat") // Converts DataFram to RDD[Row] and saves it to in text file. To see output use cat command, e.g. cat output/approach1_textFormat/part-00*
    // Approach 1 ends

    //
    // Approach 2 using plain old SQL query
    // 
    // Use below for Spark 1.6.2 or below
    // users.registerTempTable("users") // Register as table (temporary) so that query can be performed on the table
    // transactions.registerTempTable("transactions") // Register as table (temporary) so that query can be performed on the table
    
    // 方法二:注册两个临时表格
    users.createOrReplaceTempView("users") // Register as table (temporary) so that query can be performed on the table
    transactions.createOrReplaceTempView("transactions") // Register as table (temporary) so that query can be performed on the table

    import spark.sql

    // 用SQL语句查询
    val sqlResult = sql("SELECT productId, count(distinct location) locCount FROM transactions LEFT OUTER JOIN users ON transactions.userId = users.userId group by productId")
    sqlResult.show() // Print first 20 records on the console
    sqlResult.write.save(output + "/approach2") // Saves output in compressed Parquet format, recommended for large projects.
    sqlResult.rdd.saveAsTextFile(output + "/approach2_textFormat") // Converts DataFram to RDD[Row] and saves it to in text file. To see output use cat command, e.g. cat output/approach2_textFormat/part-00*
    // Approach 2 ends

    // done
    spark.stop()
  }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容