频繁项集挖掘的基本概念请直接看我参考的文章,我这里只给出频繁项集单机版和分布式版本的实现。
通过单机版本熟悉算法的基本思想,分布式版本则是用于工业界实践
参考文章:https://www.jianshu.com/p/fba9e41334a8
单机版本(apriori算法)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 22 10:00:39 2019
定义:
支持度(support):support(A=>B) = P(A∪B),表示A和B同时出现的概率。
置信度(confidence):confidence(A=>B)=support(A∪B) / support(A),表示A和B同时出现的概率占A出现概率的比值。
频繁项集:在项集中频繁出现并满足最小支持度阈值的集合,例如{牛奶,面包}、{手机,手机壳}等。
强关联规则:满足最小支持度和最小至此年度的关联规则
算法步骤:
从记录中计算所有的候选1项集,并计算频繁1项集及支持度。
由频繁1项集生成k项候选集,并由k项候选集计算k项频繁集。
用k项频繁集生成所有关联规则,计算生成规则置信度,筛选符合最小置信度的关联规则。
@author: xxxx
"""
data_set = [[1, 3, 4], [2, 3, 5], [1, 2, 3, 5], [2, 5]]
#item_set = c1 = {frozenset({2}),frozenset({3}),frozenset({1}),frozenset({5}),frozenset({4})}
#generate_freq_supports(data_set, c1, min_support)
def generate_freq_supports(data_set, item_set, min_support):
""" Input:原始的data_set, n-1频繁项集,以及最小支持度
Return: n频繁项集以及对应的支持度
"""
freq_set = set() # 保存频繁项集元素
item_count = {} # 保存元素频次,用于计算支持度
supports = {} # 保存支持度
# 如果项集中元素在数据集中则计数
for record in data_set:
for item in item_set:
if item.issubset(record):
if item not in item_count:
item_count[item] = 1
else:
item_count[item] += 1
data_len = float(len(data_set))
# 计算项集支持度
for item in item_count:
if (item_count[item] / data_len) >= min_support:
freq_set.add(item)
supports[item] = item_count[item] / data_len
return freq_set, supports
def generate_new_combinations(freq_set, k):
new_combinations = set() # 保存新组合
sets_len = len(freq_set) # 集合含有元素个数,用于遍历求得组合
freq_set_list = list(freq_set) # 集合转为列表用于索引
for i in range(sets_len):
for j in range(i + 1, sets_len):
l1 = list(freq_set_list[i])
l2 = list(freq_set_list[j])
l1.sort()
l2.sort()
# 项集若有相同的父集则合并项集
if l1[0:k-2] == l2[0:k-2]:
freq_item = freq_set_list[i] | freq_set_list[j]
new_combinations.add(freq_item)
return new_combinations
data_set = [[1, 3, 4], [2, 3, 5], [1, 2, 3, 5], [2, 5]]
L, support_data = apriori(data, min_support=0.5)
def apriori(data_set, min_support, max_len=None):
max_items = 2 # 初始项集元素个数,开始应该由 1—>2 的频繁项集
freq_sets = [] # 保存所有频繁项集
supports = {} # 保存所有支持度
# 候选项1项集
c1 = set()
for items in data_set:
for item in items:
item_set = frozenset([item])
c1.add(item_set)
# 频繁项1项集及其支持度
l1, support1 = generate_freq_supports(data_set, c1, min_support)
freq_sets.append(l1)
supports.update(support1)
if max_len is None:
max_len = float('inf')
while max_items and max_items <= max_len:
ci = generate_new_combinations(freq_sets[-1], max_items) # 生成候选集
li, support = generate_freq_supports(data_set, ci, min_support) # 生成频繁项集和支持度
# 如果有频繁项集则进入下个循环
if li:
freq_sets.append(li)
supports.update(support)
max_items += 1
else:
max_items = 0
return freq_sets, supports
def association_rules(freq_sets, supports, min_conf):
rules = []
max_len = len(freq_sets)
# 生成关联规则,筛选符合规则的频繁集计算置信度,满足最小置信度的关联规则添加到列表
# k代表K项集
for k in range(max_len - 1):
for freq_set in freq_sets[k]:
for sub_set in freq_sets[k + 1]:
if freq_set.issubset(sub_set):
conf = supports[sub_set] / supports[freq_set]
rule = (freq_set, sub_set - freq_set, conf)
if conf >= min_conf:
rules.append(rule)
return rules
if __name__ == '__main__':
data_set = [[1, 3, 4], [2, 3, 5], [1, 2, 3, 5], [2, 5]]
L, support_data = apriori(data_set, min_support=0.5)
association_rules = association_rules(L, support_data, min_conf=0.7)
分布式版本(基于spark)
import org.apache.spark.ml.fpm.FPGrowth
import org.apache.spark.ml.fpm.FPGrowthModel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
// 模型参数配置
val fpgrowth = new FPGrowth().setItemsCol("photo_set").
setMinSupport(0.1).
setMinConfidence(0.9).
setNumPartitions(2000)
val fpmodel = fpgrowth.fit(photo_set)
fpmodel.write.overwrite().save(outPath + "/fpModel")
val fpModel = FPGrowthModel.load(outPath + "/fpModel")
// 获取频繁项集
val freItemSeqDF_ = fpModel.freqItemsets
值得注意的是,当数据量非常大的时候会出现堆溢出的问题,这个时候需要在启动spark的时候分配更大的堆内存
executor_jvm_argu='-XX:+UseG1GC -Xss81280k -Djava.net.preferIPv4Stack=true -XX:ParallelGCThreads=3 -verbose:gc -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintHeapAtGC'
--conf spark.executor.extraJavaOptions="${executor_jvm_argu}"