Spark 学习笔记

dataframe

create

创建dataframe

   val training = ss.createDataFrame(Seq(

     (1.0, Vectors.dense(0.0, 1.1, 0.1)),

     (0.0, Vectors.dense(2.0, 1.0, -1.0)),

     (0.0, Vectors.dense(2.0, 1.3, 1.0)),

     (1.0, Vectors.dense(0.0, 1.2, -0.5))

   )).toDF("label", "features")

read

读csv,json格式存储的文件

sc = spark.sparkContext

df = spark.read.json("examples/src/main/resources/people.json")

df = spark.read.option("delimiter", "\t").option("header", "true").csv(path) [读取以\t分隔的文件到dateframe]

df.write.option("header",true)

  .csv("/tmp/spark_output/datacsv")

people = spark.read.parquet("...")

column get

列相关操作

features get: ageCol = people.age

添加现有列:

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))

df_with_x7 = df_with_x6.withColumn("x7", Rand())

func.col("id")

row: row = Row(name="Alice", age=11)

'wrong_key' in Person

define function [自定义函数]

create function:

from pyspark.sql.functions import udf

@udf

def to_upper(s):

   if s is not None:

       return s.upper()  

df_new2 = df_new.withColumn("topnum", get_front3_value("Country"))

传入固定值:

使用lit

from pyspark.sql.functions import lit

show the content of Data Frame [展示Data Frame的内容]

df.show()

自定义函数并取列

df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age"))

去重

dataFrame.dropDuplicates("id","label")

dataFrame.distinct()

vector

向量操作

dense vector: a = Vectors.dense([1.0, 2.0])

sparse vector: SparseVector(2, [0, 1], [2., 1.])

vector: toArray()

mLlib

import

from pyspark.ml.clustering import Kmeans

initial parameter 初始化模型超参数

kmeans = KMeans(k=100, maxIter=200)

train model 模型训练

model0 = kmeans.fit(input0.limit(200000))

kmeans.clusterCenters()

data split

训练数据和测试数据切分

(trainingData, testData) = data.randomSplit([0.7, 0.3])

Feature scaled

特征级别化(可分桶,标准化)

max min scaler:

featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data) [max min scale]

scalerModel = scaler.fit(dealfeature)

bucket scaler:

from pyspark.ml.feature import Bucketizer

splits = [float("-inf"), 0,100, 200, 300, 400, 500, 600, 700,

           800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, float("inf")]

dealinput = dealinput.withColumn("ctr", dealinput["ctr"].cast("float")).withColumn("ctr", dealinput["ctrv3"].cast("float"))

bucketizer = Bucketizer(splits=splits, inputCol="ctr", outputCol="bucketedctr")

dealout = bucketizer.transform(dealinput)

Data Type view and Data Type transformer

数据类型查看: df.dtypes

数据类型转换: dealoutput = dealinput.select(dealinput["Clicks"].cast("float")

train test split

(deal_train, deal_test) = dealfeature.randomSplit([0.9,0.1])

data filter

数据过滤[条件筛选]

outanaly = predres.filter(predres.prediction ==1).select("InClickBinary", "prediction")

metrics

指标评估

from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabels = predres.rdd.map(lambda x:(float(x.prediction), float(x.InClickBinary)))

metrics = MulticlassMetrics(predictionAndLabels)

print("Area under accauracy = %s" % metrics.precision(1.0))

print("Area under recall = %s" % metrics.recall(1.0))

column assemble to vector

每列特征集成到一个dense vector

#get the vector feature

assembler = VectorAssembler(

     inputCols = ["Clicks", "BinaryFeature","isMarket", "ctrv2", "ctrv3", "Feature1", "Feature2", "Feature3", "Feature4", "Feature5", "Feature6", "Feature7", "Feature8"],

     outputCol = "features")

dealfeature = assembler.transform(dealoutput)

(deal_train, deal_test) = dealfeature.randomSplit([0.9,0.1])

column to sparse vector

N列特征到sparse vector

featureids = [i for i in range(len(featurekeys))]

featuredict = dict(zip(featurekeys, featureids))

speckeys = ["ctrv2", "ctrv3", "isMarket", "BinaryFeature"]

from pyspark.sql.functions import struct

def get_sparse_vector(x):

   valuedict = {}

   for index, item in enumerate(speckeys):

       valuedict[featuredict[item]] = x[index]

   for index in range(8):

       nindex = 4 + index

       valuedict[featuredict["prediction%s_%s"%(index, x[nindex])]] = 1.0

   return Vectors.sparse(len(featuredict), valuedict)

sparsecreate = udf(get_sparse_vector, VectorUDT())

dealfeature = dealout.withColumn("features", sparsecreate(struct("A", "B", "C", "D", "E", "F", "G", "H","I", "J", "K", "L", "M")))

model train

模型训练

以LogisticRegression为例:

lr = LogisticRegression(featuresCol="features", labelCol="BinaryFeature", maxIter = 200)

lrmodel = lr.fit(deal_train)

predreslr = lrmodel.transform(deal_test)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容