partition的实现

在简书的第1篇文章,写得不好多多包涵哦

partition的作用是把环形缓冲区中的map输出分区存储,以便分配给不同的reducer。
把内部的实现写下来,作为一个学习笔记

  1. 在map函数,调用context.write()时,会去调用分区函数,得到分区号,把分区号一块写进keyvalue的元数据。
  2. 当环形缓冲区达到溢写磁盘时
    • a) 对每个分区内的数据进行排序
    • b) 把每个分区内的数据写到磁盘

下面通过代码来说明

1

context.write(K,V) -> MapTask.NewOutputCollector.write(K, V) -> MapOutputBuffer.collect(K, V, partion)

void MapTask.NewOutputCollector.write(K key, V value) {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));        // 调用分区函数
    }

MapOutputBuffer.collect(K, V, partion) {
    ...
    kvmeta.put(kvindex + PARTITION, partition);        // 把分区号一块写进keyvalue元数据
    ...
}

2-a)

MapTask.MapOutputBuffer.flush()->MapTask.MapOutputBuffer.sortAndSpill()->IndexedSortable.compare(final int mi, final int mj)


void MapTask.MapOutputBuffer.sortAndSpill() {
    ...
    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);        // 对数据进行排序,默认采用快速排序。调用了下面的compare()方法
    ...
}

// 比较 mi和mj所对应的两个key,这个方法先比较分区号,如果分区号相同,才有必要比较key,实现了按各个分区内的key进行排序
public int MapTask.MapOutputBuffer.compare(final int mi, final int mj) {
      final int kvi = offsetFor(mi % maxRec);
      final int kvj = offsetFor(mj % maxRec);
      final int kvip = kvmeta.get(kvi + PARTITION);        // 从keyvalue元数据取出mi的分区号
      final int kvjp = kvmeta.get(kvj + PARTITION);        // 从keyvalue元数据取出mj的分区号
      // sort by partition
      if (kvip != kvjp) {           // 如果分区号不相同,直接比较分区号:分区号的大小决定了写磁盘时的先后顺序
        return kvip - kvjp;
      }
      // sort by key               // 分区号相同,再比较key,这个方法调用RawComparator.compare(buffer, s1, l1, s2, l2);
      return comparator.compare(kvbuffer,                    
          kvmeta.get(kvi + KEYSTART),                                            // key1的开始位置
          kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),               // key1的结束位置
          kvbuffer,
          kvmeta.get(kvj + KEYSTART),                                            //key2的开始位置
          kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));              // key2的开始位置
    }

2-b)

a和b都是在sortAndSpill()中


void MapTask.MapOutputBuffer.sortAndSpill() {
    ...
    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);        // 对数据进行排序,默认采用快速排序。调用了下面的compare()方法
    ...

   // 按分区号从小到大,一个分区一个分区写进磁盘
   for (int i = 0; i < partitions; ++i) {                           
    ...
    while (spindex < mend &&
            kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {  // 从元数据读出kv分区号,如果是当前正在写磁盘的分区号,就把这个kv写到磁盘
        final int kvoff = offsetFor(spindex % maxRec);
            int keystart = kvmeta.get(kvoff + KEYSTART);
            int valstart = kvmeta.get(kvoff + VALSTART);
        key.reset(kvbuffer, keystart, valstart - keystart);
        getVBytesForOffset(kvoff, value);
        writer.append(key, value);                                // 把kv写到磁盘
        ++spindex;
    }
    }
    
    ...
}

经过上面这些步骤,环形缓冲区内的kv,就按分区写到磁盘,并且每个分区内的数据是有序的。
当然,这并不能保证同一个分区内,先后溢写的数据是有序的。后面使用归并排序对磁盘上的分区数据再做一轮排序,这个以后再做分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • MySQL技术内幕:InnoDB存储引擎(第2版) 姜承尧 第1章 MySQL体系结构和存储引擎 >> 在上述例子...
    沉默剑士阅读 12,146评论 0 16
  • Ubuntu的发音 Ubuntu,源于非洲祖鲁人和科萨人的语言,发作 oo-boon-too 的音。了解发音是有意...
    萤火虫de梦阅读 99,914评论 9 468
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,460评论 11 349
  • 定義 使多個對象都有機會處理請求,將這些對象連成一條鏈,並沿著這條鏈傳遞該請求,直到有一個對象處理它為止。 要點 ...
    JohnSmith阅读 1,293评论 1 1
  • 打开页面,突然发现那些要讲的都不重要了。在几天之后,工作上的那些纷纷扰扰,好像突然之间就离我而去了。不过浮云。 昨...
    释放自我阅读 1,196评论 0 1