数据提取
package cn.spark.uitls;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* 使用fastjson提取评论
*/
public class ExtractTags {
public static String extract(String info){
JSONObject jsonObj = JSONObject.parseObject(info);
if (jsonObj == null || !jsonObj.containsKey("extInfoList")){
return "";
}
JSONArray jsonArray = jsonObj.getJSONArray("extInfoList");
if(jsonArray == null){
return "";
}
StringBuffer sb = new StringBuffer();
for(int i = 0; i < jsonArray.size(); i++){
JSONObject obj = jsonArray.getJSONObject(i);
if(obj != null && obj.containsKey("values")
&& obj.containsKey("title")
&& obj.getString("title").equals("contentTags")){
JSONArray tagsArray = obj.getJSONArray("values");
if(tagsArray != null){
for(int j = 0; j < tagsArray.size(); j++){
sb.append(tagsArray.get(j));
if(j != tagsArray.size() - 1){
sb.append(",");
}
}
}
}
}
return sb.toString();
}
}
排序比较器
package cn.spark.uitls;
import java.util.Comparator;
import scala.Tuple2;
public class TupleComparator implements Comparator<Tuple2<String, Integer>>{
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o2._2 - o1._2;
}
}
生成标签
package cn.spark.core.tags;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import cn.spark.uitls.ExtractTags;
import cn.spark.uitls.TupleComparator;
import scala.Tuple2;
/**
* 生成团购标签
*
*/
public class TagsGenerator {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("TagsGenerator")
.setMaster("local[4]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> tempTags = sc
.textFile("file:///tags.txt");
// 提取出评论,并转换成PairRDD
JavaPairRDD<String, String> pairTags = tempTags.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 2704806555244127991L;
@Override
public Tuple2<String, String> call(String line) throws Exception {
String[] tagArray = line.split("\\t");
return new Tuple2<String, String>(tagArray[0], ExtractTags.extract(tagArray[1]));
}
});
// System.out.println(pairTags.take(10));
// 过滤出无效数据
JavaPairRDD<String, String> vaildPairTags = pairTags.filter(
new Function<Tuple2<String,String>, Boolean>() {
private static final long serialVersionUID = 2341022888467424541L;
@Override
public Boolean call(Tuple2<String, String> tuple) throws Exception {
if(tuple._2 == ""){
return false;
} else {
return true;
}
}
});
// System.out.println(vaildPairTags.take(10));
// 把有效数据的vlaue转成数组
JavaPairRDD<String, List<String>> vaildPairArrayTags = vaildPairTags.mapToPair(
new PairFunction<Tuple2<String,String>, String, List<String>>() {
private static final long serialVersionUID = 4427355754150194235L;
@Override
public Tuple2<String, List<String>> call(Tuple2<String, String> tuple) throws Exception {
String[] tags = tuple._2.split(",");
return new Tuple2<String, List<String>>(tuple._1, Arrays.asList(tags));
}
});
// System.out.println(vaildPairArrayTags.take(10));
// 压扁
JavaPairRDD<String, String> flatPairTags = vaildPairArrayTags.flatMapValues(
new Function<List<String>, Iterable<String>>() {
private static final long serialVersionUID = -83392454411970840L;
@Override
public Iterable<String> call(List<String> list) throws Exception {
return list;
}
});
// System.out.println(flatPairTags.take(10));
// 标1
JavaPairRDD<Tuple2<String, String>, Integer> flatPairTagsOne = flatPairTags.mapToPair(
new PairFunction<Tuple2<String,String>, Tuple2<String, String>, Integer>() {
private static final long serialVersionUID = -4286078108295302852L;
@Override
public Tuple2<Tuple2<String, String>, Integer> call(Tuple2<String, String> tuple) throws Exception {
return new Tuple2<Tuple2<String, String>, Integer>(tuple, 1);
}
});
// System.out.println(flatPairTagsOne.take(10));
// 聚合reduceBykey
JavaPairRDD<Tuple2<String, String>, Integer> aggFlatPairTags = flatPairTagsOne.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 5247246198728620599L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// System.out.println(aggFlatPairTags.take(10));
// 让评价与次数组合为元组
JavaPairRDD<String, Tuple2<String, Integer>> aggFlatPairTags_1 = aggFlatPairTags.mapToPair(
new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 3487125940408716180L;
@Override
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> tuple)
throws Exception {
return new Tuple2<String, Tuple2<String, Integer>>(tuple._1._1, new Tuple2<String, Integer>(tuple._1._2, tuple._2));
}
});
// System.out.println(aggFlatPairTags_1.take(10));
// 把value放进List集合
JavaPairRDD<String, List<Tuple2<String, Integer>>> aggFlatPairTags_2 =
aggFlatPairTags_1.mapToPair(
new PairFunction<Tuple2<String,Tuple2<String,Integer>>,
String, List<Tuple2<String, Integer>>>() {
private static final long serialVersionUID = 4033983426111053118L;
@Override
public Tuple2<String, List<Tuple2<String, Integer>>> call(
Tuple2<String, Tuple2<String, Integer>> tuple) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(tuple._2());
return new Tuple2<String, List<Tuple2<String,Integer>>>(tuple._1, list);
}
});
// System.out.println(aggFlatPairTags_2.take(10));
// 聚合,把相同id的value方到同一个List集合
JavaPairRDD<String, List<Tuple2<String, Integer>>> aggFlatPairTags_3 =
aggFlatPairTags_2.reduceByKey(
new Function2<List<Tuple2<String,Integer>>,
List<Tuple2<String,Integer>>,
List<Tuple2<String,Integer>>>() {
private static final long serialVersionUID = -4811643037731110953L;
@Override
public List<Tuple2<String, Integer>> call(List<Tuple2<String, Integer>> v1,
List<Tuple2<String, Integer>> v2)
throws Exception {
v1.addAll(v2);
return v1;
}
});
// System.out.println(aggFlatPairTags_3.take(10));
JavaPairRDD<String, String> result = aggFlatPairTags_3.mapToPair(
new PairFunction<Tuple2<String,List<Tuple2<String,Integer>>>, String, String>() {
private static final long serialVersionUID = -2848536030175892531L;
@Override
public Tuple2<String, String> call(
Tuple2<String, List<Tuple2<String, Integer>>> tuple) throws Exception {
TreeSet<Tuple2<String, Integer>> set = new TreeSet<>(new TupleComparator());
set.addAll(tuple._2);
Iterator<Tuple2<String, Integer>> iter = set.iterator();
StringBuffer sb = new StringBuffer();
int size = set.size();
int count = 0;
while(iter.hasNext()){
count ++;
Tuple2<String, Integer> tags = iter.next();
if(tags._1 != "" && tags._1 != null){
sb.append(tags._1 + ":" + tags._2.toString());
if(count != size){
sb.append("|");
}
}
}
return new Tuple2<String, String>(tuple._1, sb.toString());
}
});
System.out.println(result.take(10));
sc.close();
}
}