Spark累加器及广播变量

累加器

提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。 假设我们在从文件中读取呼号列表对应的日志, 同时也想知道输入文件中有多少空行,下面的python程序使用累加器完成了这一点

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
  global blankLines # 访问全局变量
  if (line == ""):
    blankLines += 1
  return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

上面的例子中创建了一个叫作 blankLines 的 Accumulator[Int] 对象,然后在输入
中看到一个空行时就对其加 1。执行完转化操作之后, 就打印出累加器中的值。注意,只有在运行 saveAsTextFile() 行动操作后才能看到正确的计数,因为行动操作前的转化操作flatMap() 是惰性的,所以作为计算副产品的累加器只有在惰性的转化操作 flatMap() 被saveAsTextFile() 行动操作强制触发时才会开始求值。

  • 首先通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。 返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。
  • Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add)增加累加器的值。
  • 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue())来访问累加器的值

注意工作节点上的任务不能访问累加器的值(即不能读取累加器的值)。从这些任务的角度来看,累加器是一个只写变量。
下面的示例来验证呼号器:
内容:

{"address":"address here", "band":"40m","callsign":"KK6JLK","city":"SUNNYVALE",
"contactlat":"37.384733","contactlong":"-122.032164",
"county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
"id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

代码如下:

# 创建用来验证呼号的累加器
validSignCount = sc.accumulator(0)
invalidSignCount = sc.accumulator(0)
def validateSign(sign):
  global validSignCount, invalidSignCount
  if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z", sign):
    validSignCount += 1
    return True
  else:
    invalidSignCount += 1
    return False
# 对与每个呼号的联系次数进行计数
validSigns = callSigns.filter(validateSign)
contactCount = validSigns.map(lambda sign: (sign, 1)).reduceByKey(lambda (x, y): x
+ y)
# 强制求值计算计数
contactCount.count()
if invalidSignCount.value < 0.1 * validSignCount.value:
    contactCount.saveAsTextFile(outputDir + "/contactCount")
else:
    print "Too many errors: %d in %d" % (invalidSignCount.value, validSignCount.
value)

累加器的容错性

  • 对于要在行动操作中使用的累加器, Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。
  • 转化操作中使用的累加器, 就不能保证有这种情况了。转化操作中累加器可能会发生不止一次更新。 举个例子,当一个被缓存下来但是没有经常使用的 RDD 在第一次从 LRU 缓存中被移除并又被重新用到时,这种非预期的多次更新就会发生。这会强制RDD 根据其谱系进行重算, 而副作用就是这也会使得谱系中的转化操作里的累加器进行更新,并再次发送到驱动器中。在转化操作中,累加器通常只用于调试目的。

广播变量

  • 可以让程序高效地向所有工作节点发送一个较大的只读值, 以供一个或多个 Spark 操作使用。下面是一个广播变量的使用例子:
# 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
# 读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)
countryContactCounts = (contactCounts
    .map(processSignCount)
    .reduceByKey((lambda x, y: x+ y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

主要的使用步骤为:

  • 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。
    任何可序列化的类型都可以这么实现。
  • 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
  • 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)广播变量其实就是类型为 spark.broadcast.Broadcast[T] 的一个对象,其中存放着类型为 T 的值。可以在任务中通过对Broadcast 对象调用 value 来获取该对象的值。这个值只会被发送到各节点一次,使用的是一种高效的类似 BitTorrent 的通信机制

广播的优化主要通过选择既快又好的序列化格式来实现。你可以使用 spark.serializer 属性选择另一个序列化库来优化序列化过程。

基于分区的操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作, 都是我们应当尽量避免为每个元素都配置一次的工作。 Spark 提供基于分区的 map 和 foreach,让你的部分代码只对 RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。
下面的例子有一个在线的业余电台呼号数据库,可以用这个数据库查询日志中记录过的联系人呼号列表。 通过使用基于分区的操作,可以在每个分区内共享一个数据库连接池, 来避免建立太多连接,同时还可以重用 JSON 解析器。

def processCallSigns(signs):
"""使用连接池查询呼号"""
# 创建一个连接池
  http = urllib3.PoolManager()
# 与每条呼号记录相关联的URL
  urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
# 创建请求(非阻塞)
  requests = map(lambda x: (x, http.request('GET', x)), urls)
# 获取结果
  result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
# 删除空的结果并返回
  return filter(lambda x: x[1] is not None, result)
def fetchCallSigns(input):
"""获取呼号"""
  return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
contactsContactList = fetchCallSigns(validSigns)

主要有三种按分区执行的操作符

mapPartitions() 避免创建对象的开销。有时需要创建一个对象来将不同类型的数据聚合起来。当计算平均值时,一种方法是将数值 RDD 转为二元组 RDD,以在归约过程中追踪所处理的元素个数。现在,可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作。下面举一个例子:
首先是 不使用mapPartitions()来求平均值。

def combineCtrs(c1, c2):
  return (c1[0] + c2[0], c1[1] + c2[1])
def basicAvg(nums):
  """计算平均值"""
  nums.map(lambda num: (num, 1)).reduce(combineCtrs)

第二个是使用其来求平均值:

def partitionCtr(nums):
  """计算分区的sumCounter"""
  sumCount = [0, 0]
  for num in nums:
    sumCount[0] += num
    sumCount[1] += 1
  return [sumCount]
def fastAvg(nums):
  """计算平均值"""
  sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
  return sumCount[0] / float(sumCount[1])

管道

Spark 的 pipe() 方法可以让我们使用任意一种语言实现 Spark 作业中的部分逻辑,只要它能读写 Unix 标准流就行。通过 pipe(),你可以将 RDD 中的各元素从标准输入流中以字符串形式读出,并对这些元素执行任何你需要的操作,然后把结果以字符串的形式写入标准输出——这个过程就是 RDD 的转化操作过程。这种接口和编程模型有较大的局限性, 但是有时候这恰恰是你想要的,比如在 map 或filter 操作中使用某些语言原生的函数。

数值RDD操作

Spark对包含数据的RDD提供了描述性的统计操作。Spark 的数值操作是通过流式算法实现的, 允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。下面对这些方法进行了介绍。



下面使用汇总统计来从数据中移除一些异常值。由于我们会两次使用同一个 RDD(一次用来计算汇总统计数据,另一次用来移除异常值),因此应该把这个 RDD 缓存下来。这里移除掉了较远的联系点。

# 要把String类型RDD转为数字数据,这样才能
# 使用统计函数并移除异常值
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = std.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
  lambda x: math.fabs(x - mean) < 3 * stddev)
print reasonableDistances.collect()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容