pyspark api 解读一

pyspark 是spark的python api

公有类信息:

SparkContext:

spark 函数式编程的主入口.

RDD:

弹性分布式数据集,spark的基本抽象.

Broadcast:

广播变量可以在任务之间重复使用.

Accumulator:

任务之间共享的只增不减的变量.

SparkConf:

配置spark变量.

SparkFiles:

Access files shipped with jobs.

StorageLevel:

细粒度的持久化等级.

TaskContext:

当前正在运行的任务信息,在worker节点上,目前是实验性的

class pyspark.SparkConf(loadDefaults=True_jvm=None_jconf=None)[source]

Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

spark应用程序的配置,用来设置spark的各种各样的键值对参数。

大多数情况下,你需要创建一个SparkConf对象,同时它也会加载java体系的参数。因此,你设置的任何参数的优先级是高于系统设置的参数的。

对于单元测试,你总是可以设置SparkConf(false)来跳过外部参数的加载,并且获得同样的配置,不管系统的参数配置是啥。

SparkConf下的所有setter方法支持链式操作。比如,你可以这样写:

conf.write.setMaster("local").setAppName("My app")

注意:

一旦SparkConf对象传递给了Spark,它就会被克隆并且不能够再被用户修改了。

contains(key)[source]

配置中是否含有某个制定的key

get(keydefaultValue=None)[source]

获取某个key的值或者获取默认值

getAll()[source]

获取所有参数值,返回键值对列表

set(keyvalue)[source]

设置一个配置属性.

setAll(pairs)[source]

设置多个参数,通过传入键值对列表。

Parameters:pairs – list of key-value pairs to set

setAppName(value)[source]

Set application name.

setExecutorEnv(key=Nonevalue=Nonepairs=None)[source]

设置一个传递个executor的环境变量

setIfMissing(keyvalue)[source]

设置一个配置属性如果这个配置属性缺失.

setMaster(value)[source]

设置master的url.

setSparkHome(value)[source]

设置worker节点的spark安装目录.

toDebugString()[source]

返回一个可打印版本的配置信息,以一个list key=value 对的形式,一个配置一行


class pyspark.SparkContext(master=NoneappName=NonesparkHome=NonepyFiles=Noneenvironment=NonebatchSize=0serializer=PickleSerializer()conf=Nonegateway=Nonejsc=Noneprofiler_cls=)[source]

Spark 函数式编程的主要入口,一个SparkContext对象代表了一个Spark集群的链接,在集群中,它能够被用来创建RDD和广播变量

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

accumulator(valueaccum_param=None)[source]

创建一个指定初始值的累加器,使用一个指定的累加器参数帮助对象来定义指定类型怎样累加,如果你没有指定的话,默认的累加器参数是用来指定整型和浮点型数据的累加方式的。对于其他类型,你可以自定义一个累加器参数。

addFile(pathrecursive=False)[source]

添加一个Spark每个节点都需要加载的文件,path可以是一个本地文件,hdfs文件,hadoop支持的其他文件,或者http,https,或者ftp uri

在Spark jobs中访问这个文件,使用SparkFiles.get(fileName)来获取这个文件的位置

如果recursive设置成true这里的path也可以是一个目录,目前这里的目录仅支持hadoop支持的文件系统目录。

>>> from pyspark  import  SparkFiles 

>>> path=os.path.join(tempdir,"test.txt")

>>> with open(path,"w") as testFile:

            _=testFile.write("100")

>>> sc.addFile(path)

>>> def func(iterator):

        with open(SparkFiles.get("test.txt")) as testFile:

           fileVal=int(testFile.readline())

            return  [x*fileValforxiniterator]

>>> sc.parallelize([1,2,3,4]).mapPartitions(func).collect()

[100, 200, 300, 400]

addPyFile(path)[source]

为将来在SparkContext上执行的所有任务添加一个.py或者.zip依赖。传递的路径可以是一个本地文件,也可以是一个hdfs上的文件或者其他hadoop支持的文件系统,或者是http,https,ftp uri。

applicationId

一个spark应用程序的独一无二的标识符。它的格式取决于调度器的实现方式。

如果是本地spark程序可能是‘local-1433865536131’

如果是yarn程序可能是‘application_1433865536131_34483’

binaryFiles(pathminPartitions=None)[source]

Note


Experimental

Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Note


Small files are preferred, large file is also allowable, but may cause bad performance.

binaryRecords(pathrecordLength)[source]

Note


Experimental

Load data from a flat binary file, assuming each record is a set of numbers with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant.

Parameters:path – Directory to the input data files

recordLength – The length at which to split the records

broadcast(value)[source]

Broadcast 是一个只读的变量,返回一个Broadcast对象用于分布式的方法。这个变量发送给每个节点仅一次。

cancelAllJobs()[source]

取消所有已经调度的或者正在运行的job。

cancelJobGroup(groupId)[source]

Cancel active jobs for the specified group. See SparkContext.setJobGroup for more information.

defaultMinPartitions

Default min number of partitions for Hadoop RDDs when not given by user

defaultParallelism

Default level of parallelism to use when not given by user (e.g. for reduce tasks)

dump_profiles(path)[source]

Dump the profile stats into directory path

emptyRDD()[source]

Create an RDD that has no partitions or elements.

getConf()[source]

getLocalProperty(key)[source]

Get a local property set in this thread, or null if it is missing. See setLocalProperty

classmethod getOrCreate(conf=None)[source]

Get or instantiate a SparkContext and register it as a singleton object.

Parameters:conf – SparkConf (optional)

hadoopFile(pathinputFormatClasskeyClassvalueClasskeyConverter=NonevalueConverter=Noneconf=NonebatchSize=0)[source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.

A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.

Parameters:path – path to Hadoop file

inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)

keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter – (None by default)

valueConverter – (None by default)

conf – Hadoop configuration, passed in as a dict (None by default)

batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

hadoopRDD(inputFormatClasskeyClassvalueClasskeyConverter=NonevalueConverter=Noneconf=NonebatchSize=0)[source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.

Parameters:inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)

keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter – (None by default)

valueConverter – (None by default)

conf – Hadoop configuration, passed in as a dict (None by default)

batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

newAPIHadoopFile(pathinputFormatClasskeyClassvalueClasskeyConverter=NonevalueConverter=Noneconf=NonebatchSize=0)[source]

Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.

A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java

Parameters:path – path to Hadoop file

inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter – (None by default)

valueConverter – (None by default)

conf – Hadoop configuration, passed in as a dict (None by default)

batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

newAPIHadoopRDD(inputFormatClasskeyClassvalueClasskeyConverter=NonevalueConverter=Noneconf=NonebatchSize=0)[source]

Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.

Parameters:inputFormatClass – fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter – (None by default)

valueConverter – (None by default)

conf – Hadoop configuration, passed in as a dict (None by default)

batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

parallelize(cnumSlices=None)[source]

Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.

>>> sc.parallelize([0,2,3,4,6],5).glom().collect()[[0], [2], [3], [4], [6]]>>> sc.parallelize(xrange(0,6,2),5).glom().collect()[[], [0], [], [2], [4]]

pickleFile(nameminPartitions=None)[source]

Load an RDD previously saved using RDD.saveAsPickleFile method.

>>> tmpFile=NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name,5)>>> sorted(sc.pickleFile(tmpFile.name,3).collect())[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

range(startend=Nonestep=1numSlices=None)[source]

Create a new RDD of int containing elements from start to end (exclusive), increased by step every element. Can be called the same way as python’s built-in range() function. If called with a single argument, the argument is interpreted as end, and start is set to 0.

Parameters:start – the start value

end – the end value (exclusive)

step – the incremental step (default: 1)

numSlices – the number of partitions of the new RDD

Returns:An RDD of int

>>> sc.range(5).collect()[0, 1, 2, 3, 4]>>> sc.range(2,4).collect()[2, 3]>>> sc.range(1,7,2).collect()[1, 3, 5]

runJob(rddpartitionFuncpartitions=NoneallowLocal=False)[source]

Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements.

If ‘partitions’ is not specified, this will run over all partitions.

>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart])[0, 1, 4, 9, 16, 25]

>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart],[0,2],True)[0, 1, 16, 25]

sequenceFile(pathkeyClass=NonevalueClass=NonekeyConverter=NonevalueConverter=NoneminSplits=NonebatchSize=0)[source]

Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows:

A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes

Serialization is attempted via Pyrolite pickling

If this fails, the fallback is to call ‘toString’ on each key and value

PickleSerializer is used to deserialize pickled objects on the Python side

Parameters:path – path to sequncefile

keyClass – fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass – fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter –

valueConverter –

minSplits – minimum splits in dataset (default min(2, sc.defaultParallelism))

batchSize – The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

setCheckpointDir(dirName)[source]

Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.

setJobDescription(value)[source]

Set a human readable description of the current job.

setJobGroup(groupIddescriptioninterruptOnCancel=False)[source]

Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared.

Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group.

The application can use SparkContext.cancelJobGroup to cancel all running jobs in this group.

>>> importthreading>>> fromtimeimportsleep>>> result="Not Set">>> lock=threading.Lock()>>> defmap_func(x):... sleep(100)... raiseException("Task should have been cancelled")>>> defstart_job(x):... globalresult... try:... sc.setJobGroup("job_to_cancel","some description")... result=sc.parallelize(range(x)).map(map_func).collect()... exceptExceptionase:... result="Cancelled"... lock.release()>>> defstop_job():... sleep(5)... sc.cancelJobGroup("job_to_cancel")>>> supress=lock.acquire()>>> supress=threading.Thread(target=start_job,args=(10,)).start()>>> supress=threading.Thread(target=stop_job).start()>>> supress=lock.acquire()>>> print(result)Cancelled

If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job’s executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

setLocalProperty(keyvalue)[source]

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.

setLogLevel(logLevel)[source]

Control our logLevel. This overrides any user-defined log settings. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

classmethod setSystemProperty(keyvalue)[source]

Set a Java system property, such as spark.executor.memory. This must must be invoked before instantiating SparkContext.

show_profiles()[source]

Print the profile stats to stdout

sparkUser()[source]

Get SPARK_USER for user who is running SparkContext.

startTime

Return the epoch time when the Spark Context was started.

statusTracker()[source]

Return StatusTracker object

stop()[source]

Shut down the SparkContext.

textFile(nameminPartitions=Noneuse_unicode=True)[source]

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

>>> path=os.path.join(tempdir,"sample-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello world!")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello world!']

uiWebUrl

Return the URL of the SparkUI instance started by this SparkContext

union(rdds)[source]

Build the union of a list of RDDs.

This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer:

>>> path=os.path.join(tempdir,"union-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello']>>> parallelized=sc.parallelize(["World!"])>>> sorted(sc.union([textFile,parallelized]).collect())['Hello', 'World!']

version

The version of Spark on which this application is running.

wholeTextFiles(pathminPartitions=Noneuse_unicode=True)[source]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

For example, if you have the following files:

hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn

Do rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), then rdd contains:

(a-hdfs-path/part-00000,itscontent)(a-hdfs-path/part-00001,itscontent)...(a-hdfs-path/part-nnnnn,itscontent)

Note


Small files are preferred, as each file will be loaded fully in memory.

>>> dirPath=os.path.join(tempdir,"files")>>> os.mkdir(dirPath)>>> withopen(os.path.join(dirPath,"1.txt"),"w")asfile1:... _=file1.write("1")>>> withopen(os.path.join(dirPath,"2.txt"),"w")asfile2:... _=file2.write("2")>>> textFiles=sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[('.../1.txt', '1'), ('.../2.txt', '2')]

class pyspark.SparkFiles[source]

Resolves paths to files added through L{SparkContext.addFile()}.

SparkFiles contains only classmethods; users should not create SparkFiles instances.

classmethod get(filename)[source]

Get the absolute path of a file added through SparkContext.addFile().

classmethod getRootDirectory()[source]

Get the root directory that contains files added through SparkContext.addFile().

SparkFiles 主要解决了向spark添加文件的问题,这个文件用于spark的每个节点,推测spark有自己的临时目录存放文件

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容