spark源码分析StatCounter以及用法

StatCounter
这是用于统计的一个类,在org.apache.spark.util包中
如果是RDD[Double]可以通过隐式转化DoubleRDDFunctions来获得一些额外的功能,就比如能产生这个对象的.stats

  def stats(): StatCounter = self.withScope {
    self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
  }

其他的就不拓展讲了,主要介绍一下StatCounter这个类

功能
这个类的描述是

 * A class for tracking the statistics of a set of numbers (count, mean and variance) in a
 * numerically robust way. Includes support for merging two StatCounters. Based on Welford
 * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
 * for running variance.

通过描述我们可以知道这个类主要用于对一组数字进行追踪,主要用来统计级数、平均数和方差,其中包括对合并两个StatCounters的支持

构造器:

class StatCounter(values: TraversableOnce[Double]) extends Serializable {

}

其中参数类型为TraversableOnce,这是Traversable和Iterator的公共父类,这个类的描述是

 *  This trait exists primarily to eliminate code duplication between
 *  `Iterator` and `Traversable

所以这里我们可以传迭代器进来

属性

  private var n: Long = 0     // Running count of our values  count计数
  private var mu: Double = 0  // Running mean of our values  平均值
  private var m2: Double = 0  // Running variance numerator (sum of (x - mean)^2) 离差平方之和,除以计数则是方差
  private var maxValue: Double = Double.NegativeInfinity // Running max of our values 最大值
  private var minValue: Double = Double.PositiveInfinity // Running min of our values 最小值

  def count: Long = n //返回n 计数

  def mean: Double = mu //返回mu 平均值

  def sum: Double = n * mu //sum,求和,平均值与个数乘积

  def max: Double = maxValue //最大值

  def min: Double = minValue //最小值

初始化


 /** Initialize the StatCounter with no values.
    没有value情况下的构造器
  */
  def this() = this(Nil)

  /** Add a value into this StatCounter, updating the internal statistics.
    更新状态主要靠merge函数,这个函数接收了一个Double参数,
   */
  def merge(value: Double): StatCounter = {
    val delta = value - mu //离差或者差量: 该数与平均值的差值
    n += 1 //总数加1
    mu += delta / n //  原mu我们叫做mu1,新的叫mu2,那么mu1 = sum / count ,mu2 = (sum + value) / count + 1 ,所以两者之间的差为: (count * value - count * mu1) / (count * (count + 1)),进一步化简: (value - mu1) / (count + 1),而value - mu1 = delta,所以可以得到上述公式
    m2 += delta * (value - mu) // 此时注意的是mu在上一步中完成更新,其推导过程类似上一步,这里就不再展开推导了
    maxValue = math.max(maxValue, value) 
    minValue = math.min(minValue, value)
    //maxValue =  if(maxValue > value) maxValue else value
    //minValue = if(minValue < value) minVa  /** Clone this StatCounter */
  def copy(): StatCounter = {
    val other = new StatCounter
    other.n = n
    other.mu = mu
    other.m2 = m2
    other.maxValue = maxValue
    other.minValue = minValue
    other
  }lue else value
    this //返回本身
  }

针对TraversableOnce对象,则有这样的merge方法:

/** Add multiple values into this StatCounter, updating the internal statistics.
多个元素会先调用foreach,然后分别取更新状态
*/
def merge(values: TraversableOnce[Double]): StatCounter = {
values.foreach(v => merge(v))
this
}

而这个merge方法会在类的初始化的时候被调用:

merge(values)
1

与其他StatCounter的merge:
  /** Merge another StatCounter into this one, adding up the internal statistics.
  合并多个StatCounter
   */
  def merge(other: StatCounter): StatCounter = {
    if (other == this) {//如果两个完全一致
      merge(other.copy())  // Avoid overwriting fields in a weird order
    } else {
      if (n == 0) {//如果该StatCounter没有元素,则直接将另一个的拷贝过来来覆盖这些参数
        mu = other.mu
        m2 = other.m2
        n = other.n
        maxValue = other.maxValue
        minValue = other.minValue
      } else if (other.n != 0) {//如果另一个的计数不等于0,此时两个Counter都有数据
        val delta = other.mu - mu //两个Counter平均值之差
        if (other.n * 10 < n) { // 此时比较两个Counter计数结果,本质就是求两组数据的平方差之和,但需要看以谁为基准求
          mu = mu + (delta * other.n) / (n + other.n) //这个推导就是将第一个mu看成 (mu * n) / n ,然后合并同类项,最终可以得到最后那个else分支的版本
        } else if (n * 10 < other.n) {
          mu = other.mu - (delta * n) / (n + other.n) 
        } else {
          mu = (mu * n + other.mu * other.n) / (n + other.n) // 两个sum相加并将两个count相加,求平均值
        }
        m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
        n += other.n
        maxValue = math.max(maxValue, other.maxValue)
        minValue = math.min(minValue, other.minValue)
      }
      this
    }
  }


 /** Clone this StatCounter 
 完成了对该StatCounter的拷贝*/
  def copy(): StatCounter = {
    val other = new StatCounter
    other.n = n
    other.mu = mu
    other.m2 = m2
    other.maxValue = maxValue
    other.minValue = minValue
    other
  }

其他
  /** Return the variance of the values. 方差,就是离差平方和除以计数,前提是n不为0*/
  def variance: Double = {
    if (n == 0) {
      Double.NaN
    } else {
      m2 / n
    }
  }

  /**
   * Return the sample variance, which corrects for bias in estimating the variance by dividing
   * by N-1 instead of N. 样本方差
   */
  def sampleVariance: Double = {
    if (n <= 1) {
      Double.NaN
    } else {
      m2 / (n - 1)
    }
  }

  /** Return the standard deviation of the values. 标准差*/
  def stdev: Double = math.sqrt(variance)

  /**
   * Return the sample standard deviation of the values, which corrects for bias in estimating the
   * variance by dividing by N-1 instead of N. 样本标准差
   */
  def sampleStdev: Double = math.sqrt(sampleVariance)

  /**
  重写toString方法
  **/
  override def toString: String = {
    "(count: %d, mean: %f, stdev: %f, max: %f, min: %f)".format(count, mean, stdev, max, min)
  }

伴生对象
object StatCounter {
  /** Build a StatCounter from a list of values. 这边是以TraversableOnce为参数类型*/
  def apply(values: TraversableOnce[Double]): StatCounter = new StatCounter(values)

  /** Build a StatCounter from a list of values passed as variable-length arguments. 这边是Double的List */
  def apply(values: Double*): StatCounter = new StatCounter(values)
}

代码中使用:

println(sc.parallelize(Seq(1.0,2.2,3.1)).stats())
1
输出:
(count: 3, mean: 2.100000, stdev: 0.860233, max: 3.100000, min: 1.000000)


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,295评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,928评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,682评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,209评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,237评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,965评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,586评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,487评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,016评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,136评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,271评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,948评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,619评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,139评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,252评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,598评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,267评论 2 358

推荐阅读更多精彩内容