Effective PySpark(PySpark 常见问题)

构建PySpark环境

首先确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark

pip install pyspark

文件比较大,大约180多M,有点耐心。

下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。

其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。

PySpark worker启动机制

PySpark的工作原理是通过Spark里的PythonRDD启动一个(或者多个,以pythonExec, 和envVars为key)Python deamon进程,然后一旦有task过来了,就通过python deamon进程fork一个新的python worker。 python worker是可以复用的,并不会用完就立马销毁。一个task过来的流程为, 看看worker里有清闲的么,如果有,就直接返回。没有就fork一个新的worker.

PySpark 如何实现某个worker 里的变量单例

从前面PySpark worker启动机制里,我们可以看到,一个Python worker是可以反复执行任务的。在NLP任务中,我们经常要加载非常多的字典,我们希望字典只会加载一次。这个时候就需要做些额外处理了。做法如下:

class DictLoader(object):
    clf = None

   def __init__(self, baseDir, archive_auto_extract, zipResources):
        if not DictLoader.is_loaded():            
            DictLoader.load_dic(baseDir)

    @staticmethod
    def load_dic(baseDir):
        globPath = baseDir + "/dic/*.dic"
        dicts = glob.glob(globPath)
        for dictFile in dicts:
            temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
            jieba.load_userdict(temp)
        jieba.cut("nice to meet you")
        DictLoader.clf = "SUCCESS"

    @staticmethod
    def is_loaded():
        return DictLoader.clf is not None

定义一个cls对象,并且使用staicmethod annotation,这样就可以模拟类似Java的静态方法了。之后你可以随心所欲的loader = DictLoader ()

如何加载资源文件

在NLP处理了,字典是少不了,前面我们避免了一个worker多次加载字典,现在还有一个问题,就是程序如何加载字典。通常我们希望能够把字典打成一个zip包,代码也打成一个zip包,然后通过下面的命令进行提交:

./bin/spark-submit \
--py-files dist/jobs.zip \
--files dist/dics.zip \
--master "local[*]"  python/src/batch.py

自己开发的模块可以打包成jobs.zip,对应的spark任务单独成一个batch.py文件,然后字典打包成dics.zip.

那么程序中如何读取dics.zip里的文件呢? 在Spark standalone 和 local模式下,dics.zip在各个worker的工作目录里并不会被解压,所以需要额外处理下:

   def __init__(self, baseDir, archive_auto_extract, zipResources):
        if not DictLoader.is_loaded():  
            for zr in zipResources:
                if not archive_auto_extract:
                    with zipfile.ZipFile(SparkFiles.getRootDirectory() + '/' + zr, 'r') as f:
                        f.extractall(".")          
            DictLoader(baseDir)

archive_auto_extract 判定是不是会自动解压(yarn模式下回自动解压),判断的方法为:

archive_auto_extract = spark.conf.get("spark.master").lower().startswith("yarn")

zipResources 则是所有需要解压的zip包的名字,对应获取的方法为:

zipfiles = [f.split("/")[-1] for f in spark.conf.get("spark.files").split(",") if f.endswith(".zip")]

对应的zipfiles所在的目录你可以这样拼接:

SparkFiles.getRootDirectory() + '/' + zfilename

所以如果你不是运行在yarn模式的情况下,你需要先解压,然后进行加载。获取路径的方式建议如下:

temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)

这样可以兼容IDE里运行,local/standalone/yarn 模式运行。

前面的jobs.zip文件里面全部是python文件,并不需要压缩就可以直接读到。

主动定义schema,避免spark auto inference schema

我之前写过这么一段代码:

oldr = df.rdd.map(
    lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))

然后我需要把oldr 变回为rdd,这个时候我这么用:

resultDf = spark.createDataFrame(oldr)
resultDf.mode("overwrite").format(...).save(...

这会导致oldr被执行两次,一次是为了做schema推测,一次是为了做实际的计算。
我们可以这么写:

from pyspark.sql.types import StructType, IntegerType, ArrayType, StructField, StringType, MapType

fields = [StructField("ids", ArrayType(IntegerType())), StructField("mainId", IntegerType()),
          StructField("tags", MapType(StringType(), IntegerType()))]
resultDf = spark.createDataFrame(resultRdd, StructType(fields=fields)

这样显示的为rdd定义schema,就可以避免额外的推测了。

lambda 和 函数的选择

lambda可以定义匿名函数,但是表现力有限:

.map(
    lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))

我们也可以定义函数:

def create_new_row(row):
    return Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"])

然后直接使用:

.map(create_new_row).....

如何定义udf函数/如何避免使用Python UDF函数

先定义一个常规的python函数:

# 自定义split函数
def split_sentence(s):
    return s.split(" ")

转化为udf函数并且使用。

from pyspark.sql.functions import udf
from pyspark.sql.types import *

ss = udf(split_sentence, ArrayType(StringType()))
documentDF.select(ss("text").alias("text_array")).show()

唯一麻烦的是,定义好udf函数时,你需要指定返回值的类型。

使用Python 的udf函数,显然效率是会受到损伤的,我们建议使用标准库的函数,具体这么用:

from pyspark.sql import functions as f
documentDF.select(f.split("text", "\\s+").alias("text_array")).show()

pyspark.sql. functions 引用的都是spark的实现,所以效率会更高。

另外,在使用UDF函数的时候,发现列是NoneType 或者null,那么有两种可能:

在PySpark里,有时候会发现udf函数返回的值总为null,可能的原因有:

  1. 忘了写return
def abc(c):
    "yes"
  1. 返回的类型不匹配。

比如你明明是一个FloatType,但是你定义的时候说是一个ArrayType,这个时候似乎不会报错,而是udf函数执行会是null.
这个问题之前在处理二进制字段时遇到了。我们理所当然的认为二进制应该是类型 ArrayType(Byte(),True) ,但实际上是BinaryType.

dataframe.show 问题

详细问题可参看: https://stackoverflow.com/questions/39662384/pyspark-unicodeencodeerror-ascii-codec-cant-encode-character

主要是python方面的问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352