数据集:下载Adult数据集(http://archive.ics.uci.edu/ml/datasets/Adult)。数据从美国1994年人口普查数据库抽取而来,可用来预测居民收入是否超过50K,属性变量包含年龄、工种、学历、职业等重要信息。
内容:
- 从文件中导入数据,并转化为DataFrame。
- 训练决策树模型,用于预测居民收入是否超过50K;
- 对Test数据集进行验证,输出模型的准确率。
代码:
!!!注意,import findspark和findspark.init()要写在from pyspark import...的前面!
头文件
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import numpy
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer
conf=SparkConf().setAppName("myApp1").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
读入文件:
dfData=sc.textFile("file:///C:/Users/98275/Desktop/1.txt")
trainDf=dataProduce(dfData)
dfTest=sc.textFile("file:///C:/Users/98275/Desktop/2.txt")
testDf=dataProduce(dfTest)
其中dataProduce函数
def getFeature(df,i):
F=df.map(lambda line: (line[i],1)).reduceByKey(lambda a,b:a+b)
F=F.map(lambda line:line[0]).zipWithIndex()
return F
def s(x,f):
return f[x]
def t(x):
if int(x)>0:
return 1
else:
return 0
def fs(x,f):
rel = {}
#print("000")
rel['features']=Vectors. \
dense(int(x[0]),s(x[1],f[1]),s(x[3],f[3]),int(int(x[4])/3),s(x[5],f[5]),s(x[6],f[6]),s(x[7],f[7]),s(x[8],f[8]),s(x[9],f[9]),t(x[10]),t(x[11]),int(int(x[12])/5),s(x[13],f[13]))
rel['label'] = s(x[14],f[14])
return rel
def dataProduce(dfData):
df = dfData.map(lambda line: line.split(','))
print(df.take(5))
f= {}
f[1] = getFeature(df,1).collectAsMap()
f[3] = getFeature(df, 3).collectAsMap()
f[5] = getFeature(df, 5).collectAsMap()
f[6] = getFeature(df, 6).collectAsMap()
f[7] = getFeature(df, 7).collectAsMap()
f[8] = getFeature(df, 8).collectAsMap()
f[9]= getFeature(df, 9).collectAsMap()
f[13] = getFeature(df, 13).collectAsMap()
f[14] = getFeature(df, 14).collectAsMap()
#tst=fff(df.first(),f)
#print(tst)
df=df.map(lambda p: Row(**fs(p,f))).toDF()
df.show()
return df
因为所给的特征多数是文字的,所以需要将这些特征统计出来,转化为数字类型的特征向量(用了简单粗暴的办法,直接挨个编号,比如工作有私立和公立单位,编为1和2)。显然这样一来在合并某些特征时会出错,所以建议先手动合并特征再编号。
训练模型
dfAll=trainDf.union(testDf)
dfAll.show()
dtPipeline=getPipe(dfAll)
dtPipelineModel = dtPipeline.fit(trainDf)
其中getPipe函数
def getPipe(df):
labelIndexer = StringIndexer(). \
setInputCol("label"). \
setOutputCol("indexedLabel"). \
fit(df)
featureIndexer = VectorIndexer(). \
setInputCol("features"). \
setOutputCol("indexedFeatures"). \
setMaxCategories(20). \
fit(df)
labelConverter = IndexToString(). \
setInputCol("prediction"). \
setOutputCol("predictedLabel"). \
setLabels(labelIndexer.labels)
dtClassifier = DecisionTreeClassifier(). \
setLabelCol("indexedLabel"). \
setFeaturesCol("indexedFeatures")
dtPipeline = Pipeline(). \
setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
return dtPipeline
预测:
#根据训练出的模型预测label
dtPredictions = dtPipelineModel.transform(testDf)
dtPredictions.select("predictedLabel", "label", "features").show()
#统计准确率
evaluator = MulticlassClassificationEvaluator(). \
setLabelCol("indexedLabel"). \
setPredictionCol("prediction")
dtAccuracy = evaluator.evaluate(dtPredictions)
print(dtAccuracy)
sc.stop()
最后准确率只有44%,和特征合并有关。