本章欲解决的问题
购物篮问题是一个流行的数据挖掘计算,常用这个技术来揭示不用商品或商品组之间的相似度
本章共有三种实现方式
- 基于传统mapreduce实现
- 基于spark实现--能求得相关数据的依懒性
- 基于传统Scala实现
基于传统mapreduce实现
//在MBADriver中都是一般的定义,主要亮点在Combiner
job.setMapperClass(MBAMapper.class);
//Combiner用于在map端将相用的key进行分组,之后再传到reduce,减少网络传输的数量
job.setCombinerClass(MBAReducer.class);
job.setReducerClass(MBAReducer.class);
//map函数
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// input line
String line = value.toString();
//convertItemsToList将每一个行转换成list<String>
List<String> items = convertItemsToList(line);
if ((items == null) || (items.isEmpty())) {
// no mapper output will be generated
return;
}
//获得按照指定数目的已排序的输出
generateMapperOutput(numberOfPairs, items, context);
}
private static List<String> convertItemsToList(String line) {
if ((line == null) || (line.length() == 0)) {
// no mapper output will be generated
return null;
}
String[] tokens = StringUtils.split(line, ",");
if ( (tokens == null) || (tokens.length == 0) ) {
return null;
}
List<String> items = new ArrayList<String>();
for (String token : tokens) {
if (token != null) {
items.add(token.trim());
}
}
return items;
}
private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
throws IOException, InterruptedException {
//items为源数据,numberOfPairs为取多少个
List<List<String>> sortedCombinations = Combination.findSortedCombinations(items, numberOfPairs);
for (List<String> itemList: sortedCombinations) {
System.out.println("itemlist="+itemList.toString());
reducerKey.set(itemList.toString());
context.write(reducerKey, NUMBER_ONE);
}
}
//reduce为普通的wordCount
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0; // total items paired
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
//findSortedCombinations函数用于在指定element获取N个
public static <T extends Comparable<? super T>> List<List<T>> findSortedCombinations(Collection<T> elements, int n) {
List<List<T>> result = new ArrayList<List<T>>();
if (n == 0) {
result.add(new ArrayList<T>());
return result;
}
List<List<T>> combinations = findSortedCombinations(elements, n - 1);
for (List<T> combination: combinations) {
for (T element: elements) {
if (combination.contains(element)) {
continue;
}
List<T> list = new ArrayList<T>();
list.addAll(combination);
if (list.contains(element)) {
continue;
}
list.add(element);
//sort items not to duplicate the items
// example: (a, b, c) and (a, c, b) might become
// different items to be counted if not sorted
Collections.sort(list);
if (result.contains(list)) {
continue;
}
result.add(list);
}
}
return result;
}
基于spark实现--能求得相关数据的依懒性
public static void main(String[] args) throws Exception {
// STEP-1: handle input parameters
if (args.length < 1) {
System.err.println("Usage: FindAssociationRules <transactions>");
System.exit(1);
}
String transactionsFileName = args[0];
// STEP-2: create a Spark context object
JavaSparkContext ctx = new JavaSparkContext();
// STEP-3: read all transactions from HDFS and create the first RDD
JavaRDD<String> transactions = ctx.textFile(transactionsFileName, 1);
transactions.saveAsTextFile("/rules/output/1");
// STEP-4: generate frequent patterns
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
/**
* 输出格式为([a],1),([b],1)([a,b],1)([a,b,c],1)([b,d],1)等
*/
JavaPairRDD<List<String>,Integer> patterns =
transactions.flatMapToPair(new PairFlatMapFunction<
String, // T
List<String>, // K
Integer // V
>() {
@Override
public Iterator<Tuple2<List<String>,Integer>> call(String transaction) {
List<String> list = Util.toList(transaction);
//传入findSortedCombinations时不指定获取的参数N,则取得全部
List<List<String>> combinations = Combination.findSortedCombinations(list);
List<Tuple2<List<String>,Integer>> result = new ArrayList<Tuple2<List<String>,Integer>>();
for (List<String> combList : combinations) {
if (combList.size() > 0) {
//把全部的组合赋上一次
result.add(new Tuple2<List<String>,Integer>(combList, 1));
}
}
return result.iterator();
}
});
patterns.saveAsTextFile("/rules/output/2");
// 对key相同的进行聚合
JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
combined.saveAsTextFile("/rules/output/3");
// now, we have: patterns(K,V)
// K = pattern as List<String>
// V = frequency of pattern
// now given (K,V) as (List<a,b,c>, 2) we will
// generate the following (K2,V2) pairs:
//
// (List<a,b,c>, T2(null, 2))
// (List<a,b>, T2(List<a,b,c>, 2))
// (List<a,c>, T2(List<a,b,c>, 2))
// (List<b,c>, T2(List<a,b,c>, 2))
// STEP-6: generate all sub-patterns
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
/**
* 输出的类型大概:([a,b],(null,2))([b],([a,b],2)) ([a],([a,b],2))
* ([a,b,d],(null,1)) ([b,d],([a,b,d],1)) ([a,d],([a,b,d],1)) ([a,b],([a,b,d],1))
*/
JavaPairRDD<List<String>,Tuple2<List<String>,Integer>> subpatterns =
combined.flatMapToPair(new PairFlatMapFunction<
Tuple2<List<String>, Integer>, // T
List<String>, // K
Tuple2<List<String>,Integer> // V
>() {
@Override
public Iterator<Tuple2<List<String>,Tuple2<List<String>,Integer>>>
call(Tuple2<List<String>, Integer> pattern) {
List<Tuple2<List<String>,Tuple2<List<String>,Integer>>> result =
new ArrayList<Tuple2<List<String>,Tuple2<List<String>,Integer>>>();
List<String> list = pattern._1;
Integer frequency = pattern._2;
result.add(new Tuple2(list, new Tuple2(null,frequency)));
if (list.size() == 1) {
return result.iterator();
}
// pattern has more than one items
// result.add(new Tuple2(list, new Tuple2(null,size)));
for (int i=0; i < list.size(); i++) {
//removeOneItem用于删除掉list中的一个值并返回list
List<String> sublist = Util.removeOneItem(list, i);
result.add(new Tuple2(sublist, new Tuple2(list, frequency)));
}
return result.iterator();
}
});
subpatterns.saveAsTextFile("/rules/output/4");
// 将key进行分组
/**
* 输出的格式为:([a,c],[([a,b,c],1),(null,1)]) --- key:[a,c]
* ([a,b,c],[(null,1)]) --- key:[a,b,c]
* ([b,c],[(null,3),([a,b,c],1)]) --- key:[b,c]
*/
JavaPairRDD<List<String>,Iterable<Tuple2<List<String>,Integer>>> rules = subpatterns.groupByKey();
rules.saveAsTextFile("/rules/output/5");
// STEP-7: generate association rules
// Now, use (K=List<String>, V=Iterable<Tuple2<List<String>,Integer>>)
// to generate association rules
// JavaRDD<R> map(Function<T,R> f)
// Return a new RDD by applying a function to all elements of this RDD.
/**
* 输出的格式为:[([a,b],[d],0.5),([a,b],[c],0.5)]
* []
* [([c],[b],1.0),([c],[a],0.33333)]
*/
JavaRDD<List<Tuple3<List<String>,List<String>,Double>>> assocRules = rules.map(new Function<
Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>>, // T: input
List<Tuple3<List<String>,List<String>,Double>> // R: ( ac => b, 1/3): T3(List(a,c), List(b), 0.33)
// ( ad => c, 1/3): T3(List(a,d), List(c), 0.33)
>() {
@Override
public List<Tuple3<List<String>,List<String>,Double>> call(Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>> in) {
List<Tuple3<List<String>,List<String>,Double>> result =
new ArrayList<Tuple3<List<String>,List<String>,Double>>();
//([a,c],[([a,b,c],1),(null,1)])中的[a,c]
List<String> fromList = in._1;
// //([a,c],[([a,b,c],1),(null,1)])中的[([a,b,c],1),(null,1)
Iterable<Tuple2<List<String>,Integer>> to = in._2;
List<Tuple2<List<String>,Integer>> toList = new ArrayList<Tuple2<List<String>,Integer>>();
Tuple2<List<String>,Integer> fromCount = null;
for (Tuple2<List<String>,Integer> t2 : to) {
// find the "count" object
if (t2._1 == null) {
//fromCount用于记录总的值count
fromCount = t2;
}
else {
toList.add(t2);
}
}
// Now, we have the required objects for generating association rules:
// "fromList", "fromCount", and "toList"
if (toList.isEmpty()) {
// no output generated, but since Spark does not like null objects, we will fake a null object
return result; // an empty list
}
// now using 3 objects: "from", "fromCount", and "toList",
// create association rules:
for (Tuple2<List<String>,Integer> t2 : toList) {
double confidence = (double) t2._2 / (double) fromCount._2;
List<String> t2List = new ArrayList<String>(t2._1);
//把 t2List中关于fromList的元素全部的删除
t2List.removeAll(fromList);
//定义输出格式
result.add(new Tuple3(fromList, t2List, confidence));
}
return result;
}
});
assocRules.saveAsTextFile("/rules/output/6");
// done
ctx.close();
System.exit(0);
}
基于Scala实现
//基于Scala的实现方式
def main(args: Array[String]): Unit = {
if (args.size < 2) {
println("Usage: FindAssociationRules <input-path> <output-path>")
sys.exit(1)
}
val sparkConf = new SparkConf().setAppName("market-basket-analysis")
val sc = new SparkContext(sparkConf)
val input = args(0)
val output = args(1)
val transactions = sc.textFile(input)
val patterns = transactions.flatMap(line => {
val items = line.split(",").toList
//对每一条读取的transactions根据逗号分隔之后赋值上1
(0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)
}).map((_, 1))
//对key进行组合
val combined = patterns.reduceByKey(_ + _)
//生成所有的子模式
val subpatterns = combined.flatMap(pattern => {
val result = ListBuffer.empty[Tuple2[List[String], Tuple2[List[String], Int]]]
//第一个赋值为(null,count)
result += ((pattern._1, (Nil, pattern._2)))
val sublist = for {
i <- 0 until pattern._1.size
//获取比pattern._1.size少一个的list
xs = pattern._1.take(i) ++ pattern._1.drop(i + 1)
if xs.size > 0
} yield (xs, (pattern._1, pattern._2))
result ++= sublist
result.toList
})
//对组合之后的可以进行分组
val rules = subpatterns.groupByKey()
val assocRules = rules.map(in => {
//获得总数count
val fromCount = in._2.find(p => p._1 == Nil).get
val toList = in._2.filter(p => p._1 != Nil).toList
if (toList.isEmpty) Nil
else {
val result =
for {
t2 <- toList
//获得比率
confidence = t2._2.toDouble / fromCount._2.toDouble
//t2._1去点与in._1相同的element
difference = t2._1 diff in._1
} yield (((in._1, difference, confidence)))
result
}
})
// Formatting the result just for easy reading.
val formatResult = assocRules.flatMap(f => {
f.map(s => (s._1.mkString("[", ",", "]"), s._2.mkString("[", ",", "]"), s._3))
})
formatResult.saveAsTextFile(output)
// done!
sc.stop()
}