累加器
提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。 假设我们在从文件中读取呼号列表对应的日志, 同时也想知道输入文件中有多少空行,下面的python程序使用累加器完成了这一点
file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # 访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
上面的例子中创建了一个叫作 blankLines 的 Accumulator[Int] 对象,然后在输入
中看到一个空行时就对其加 1。执行完转化操作之后, 就打印出累加器中的值。注意,只有在运行 saveAsTextFile() 行动操作后才能看到正确的计数,因为行动操作前的转化操作flatMap() 是惰性的,所以作为计算副产品的累加器只有在惰性的转化操作 flatMap() 被saveAsTextFile() 行动操作强制触发时才会开始求值。
- 首先通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。 返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。
- Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add)增加累加器的值。
- 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue())来访问累加器的值
注意工作节点上的任务不能访问累加器的值(即不能读取累加器的值)。从这些任务的角度来看,累加器是一个只写变量。
下面的示例来验证呼号器:
内容:
{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}
代码如下:
# 创建用来验证呼号的累加器
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)
def validateSign(sign):
global validSignCount, invalidSignCount
if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
validSignCount += 1
return True
else:
invalidSignCount += 1
return False
# 对与每个呼号的联系次数进行计数
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x
+ y)
# 强制求值计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)
累加器的容错性:
- 对于要在行动操作中使用的累加器, Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。
- 转化操作中使用的累加器, 就不能保证有这种情况了。转化操作中累加器可能会发生不止一次更新。 举个例子,当一个被缓存下来但是没有经常使用的 RDD 在第一次从 LRU 缓存中被移除并又被重新用到时,这种非预期的多次更新就会发生。这会强制RDD 根据其谱系进行重算, 而副作用就是这也会使得谱系中的转化操作里的累加器进行更新,并再次发送到驱动器中。在转化操作中,累加器通常只用于调试目的。
广播变量
- 可以让程序高效地向所有工作节点发送一个较大的只读值, 以供一个或多个 Spark 操作使用。下面是一个广播变量的使用例子:
# 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
# 读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x+ y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
主要的使用步骤为:
- 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。
任何可序列化的类型都可以这么实现。 - 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
- 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)广播变量其实就是类型为 spark.broadcast.Broadcast[T] 的一个对象,其中存放着类型为 T 的值。可以在任务中通过对Broadcast 对象调用 value 来获取该对象的值。这个值只会被发送到各节点一次,使用的是一种高效的类似 BitTorrent 的通信机制
广播的优化主要通过选择既快又好的序列化格式来实现。你可以使用 spark.serializer 属性选择另一个序列化库来优化序列化过程。
基于分区的操作
基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作, 都是我们应当尽量避免为每个元素都配置一次的工作。 Spark 提供基于分区的 map 和 foreach,让你的部分代码只对 RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。
下面的例子有一个在线的业余电台呼号数据库,可以用这个数据库查询日志中记录过的联系人呼号列表。 通过使用基于分区的操作,可以在每个分区内共享一个数据库连接池, 来避免建立太多连接,同时还可以重用 JSON 解析器。
def processCallSigns(signs):
"""使用连接池查询呼号"""
# 创建一个连接池
http = urllib3.PoolManager()
# 与每条呼号记录相关联的URL
urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
# 创建请求(非阻塞)
requests = map(lambda x: (x, http.request('GET', x)), urls)
# 获取结果
result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
# 删除空的结果并返回
return filter(lambda x: x[1] is not None, result)
def fetchCallSigns(input):
"""获取呼号"""
return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
contactsContactList = fetchCallSigns(validSigns)
主要有三种按分区执行的操作符
mapPartitions() 避免创建对象的开销。有时需要创建一个对象来将不同类型的数据聚合起来。当计算平均值时,一种方法是将数值 RDD 转为二元组 RDD,以在归约过程中追踪所处理的元素个数。现在,可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作。下面举一个例子:
首先是 不使用mapPartitions()来求平均值。
def combineCtrs(c1, c2):
return (c1[0] + c2[0], c1[1] + c2[1])
def basicAvg(nums):
"""计算平均值"""
nums.map(lambda num: (num, 1)).reduce(combineCtrs)
第二个是使用其来求平均值:
def partitionCtr(nums):
"""计算分区的sumCounter"""
sumCount = [0, 0]
for num in nums:
sumCount[0] += num
sumCount[1] += 1
return [sumCount]
def fastAvg(nums):
"""计算平均值"""
sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
return sumCount[0] / float(sumCount[1])
管道
Spark 的 pipe() 方法可以让我们使用任意一种语言实现 Spark 作业中的部分逻辑,只要它能读写 Unix 标准流就行。通过 pipe(),你可以将 RDD 中的各元素从标准输入流中以字符串形式读出,并对这些元素执行任何你需要的操作,然后把结果以字符串的形式写入标准输出——这个过程就是 RDD 的转化操作过程。这种接口和编程模型有较大的局限性, 但是有时候这恰恰是你想要的,比如在 map 或filter 操作中使用某些语言原生的函数。
数值RDD操作
Spark对包含数据的RDD提供了描述性的统计操作。Spark 的数值操作是通过流式算法实现的, 允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。下面对这些方法进行了介绍。
下面使用汇总统计来从数据中移除一些异常值。由于我们会两次使用同一个 RDD(一次用来计算汇总统计数据,另一次用来移除异常值),因此应该把这个 RDD 缓存下来。这里移除掉了较远的联系点。
# 要把String类型RDD转为数字数据,这样才能
# 使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = std.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
lambda x: math.fabs(x - mean) < 3 * stddev)
print reasonableDistances.collect()