时空数据处理与组织-基于SparkML的决策树分类

数据集:下载Adult数据集(http://archive.ics.uci.edu/ml/datasets/Adult)。数据从美国1994年人口普查数据库抽取而来,可用来预测居民收入是否超过50K/year。该数据集类变量为年收入是否超过50k,属性变量包含年龄、工种、学历、职业等重要信息。
内容:

  1. 从文件中导入数据,并转化为DataFrame。
  2. 训练决策树模型,用于预测居民收入是否超过50K;
  3. 对Test数据集进行验证,输出模型的准确率。
    代码:
    !!!注意,import findspark和findspark.init()要写在from pyspark import...的前面!
    头文件
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import numpy
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

conf=SparkConf().setAppName("myApp1").setMaster("local[*]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

读入文件:

    dfData=sc.textFile("file:///C:/Users/98275/Desktop/1.txt")
    trainDf=dataProduce(dfData)
    dfTest=sc.textFile("file:///C:/Users/98275/Desktop/2.txt")
    testDf=dataProduce(dfTest)

其中dataProduce函数

def getFeature(df,i):
    F=df.map(lambda line: (line[i],1)).reduceByKey(lambda a,b:a+b)
    F=F.map(lambda line:line[0]).zipWithIndex()
    return F

def s(x,f):
    return f[x]

def t(x):
    if int(x)>0:
        return 1
    else:
        return 0

def fs(x,f):
     rel = {}
     #print("000")
     rel['features']=Vectors. \
     dense(int(x[0]),s(x[1],f[1]),s(x[3],f[3]),int(int(x[4])/3),s(x[5],f[5]),s(x[6],f[6]),s(x[7],f[7]),s(x[8],f[8]),s(x[9],f[9]),t(x[10]),t(x[11]),int(int(x[12])/5),s(x[13],f[13]))
     rel['label'] = s(x[14],f[14])
     return rel

def dataProduce(dfData):
    df = dfData.map(lambda line: line.split(','))
    print(df.take(5))
    f= {}
    f[1] = getFeature(df,1).collectAsMap()
    f[3] = getFeature(df, 3).collectAsMap()
    f[5] = getFeature(df, 5).collectAsMap()
    f[6] = getFeature(df, 6).collectAsMap()
    f[7] = getFeature(df, 7).collectAsMap()
    f[8] = getFeature(df, 8).collectAsMap()
    f[9]= getFeature(df, 9).collectAsMap()
    f[13] = getFeature(df, 13).collectAsMap()
    f[14] = getFeature(df, 14).collectAsMap()


    #tst=fff(df.first(),f)
    #print(tst)
    df=df.map(lambda p: Row(**fs(p,f))).toDF()
    df.show()
    return df

因为所给的特征多数是文字的,所以需要将这些特征统计出来,转化为数字类型的特征向量(用了简单粗暴的办法,直接挨个编号,比如工作有私立和公立单位,编为1和2)。显然这样一来在合并某些特征时会出错,所以建议先手动合并特征再编号。
训练模型

dfAll=trainDf.union(testDf)
    dfAll.show()
    dtPipeline=getPipe(dfAll)
    dtPipelineModel = dtPipeline.fit(trainDf)

其中getPipe函数

def getPipe(df):
    labelIndexer = StringIndexer(). \
        setInputCol("label"). \
        setOutputCol("indexedLabel"). \
        fit(df)
    featureIndexer = VectorIndexer(). \
        setInputCol("features"). \
        setOutputCol("indexedFeatures"). \
        setMaxCategories(20). \
        fit(df)
    labelConverter = IndexToString(). \
        setInputCol("prediction"). \
        setOutputCol("predictedLabel"). \
        setLabels(labelIndexer.labels)
    dtClassifier = DecisionTreeClassifier(). \
        setLabelCol("indexedLabel"). \
        setFeaturesCol("indexedFeatures")
    dtPipeline = Pipeline(). \
        setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
    return dtPipeline

预测:

 #根据训练出的模型预测label
    dtPredictions = dtPipelineModel.transform(testDf)
    dtPredictions.select("predictedLabel", "label", "features").show()
    #统计准确率
    evaluator = MulticlassClassificationEvaluator(). \
        setLabelCol("indexedLabel"). \
        setPredictionCol("prediction")
    dtAccuracy = evaluator.evaluate(dtPredictions)
    print(dtAccuracy)
    sc.stop()

最后准确率只有44%,和特征合并有关。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容