hdfs写之写数据<二>

一、写数据流程图

该流程主要是客户端开始写数据,然后把数据切分多个chunk,多个chunk组成一个packet,发送到queue中,等待datanode写数据。

image.png

二、客户端FSDataOutputStream写数据方法调用

1、FSDataOutputStream.writer写方法

调用DFSOutputStream父类的FSOutputSummer.write方法

    @Override
    public void write(byte b[], int off, int len) throws IOException {
      //2。 DFSOutputStream extends FSOutputSummer.write
      out.write(b, off, len);
      position += len;                            // update position
      if (statistics != null) {
        statistics.incrementBytesWritten(len);
      }
    }

三、客户端DFSOutputStream写数据流程

1、FSOutputSummer.writer写方法

该类FSOutputSummer是DFSOutputStream的父类,客户端调用都是DFSOutputStream实例

该方法检查客户端端状态是否正常

 /**
   * Writes <code>len</code> bytes from the specified byte array 
   * starting at offset <code>off</code> and generate a checksum for
   * each data chunk.
   *
   * <p> This method stores bytes from the given array into this
   * stream's buffer before it gets checksumed. The buffer gets checksumed 
   * and flushed to the underlying output stream when all data 
   * in a checksum chunk are in the buffer.  If the buffer is empty and
   * requested length is at least as large as the size of next checksum chunk
   * size, this method will checksum and write the chunk directly 
   * to the underlying output stream.  Thus it avoids uneccessary data copy.
   *
   * @param      b     the data.
   * @param      off   the start offset in the data.
   * @param      len   the number of bytes to write.
   * @exception  IOException  if an I/O error occurs.
   */
  @Override
  public synchronized void write(byte b[], int off, int len)
      throws IOException {
    //3
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }
    //len is block size
    for (int n=0; n<len; n += write1(b, off+n, len-n)) {
    }
  }

2、FSOutputSummer.writer1

该方法最终调用writeChecksumChunks,包括flushBuffer也是调用writeChecksumChunks。写chunk数据到packet中。

/**
   * Write a portion of an array, flushing to the underlying
   * stream at most once if necessary.
   */
  private int write1(byte b[], int off, int len) throws IOException {
    //buf: internal buffer for storing data before it is checksumed
    //如果buffer为空并且写入数据大于buffer长度(一个校验块chunk大小),直接将数据与校验写入IO中
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user buffer size >= local buffer size, so
      // simply checksum the user buffer and send it directly to the underlying
      // stream
      final int length = buf.length;//4608=512*9=sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS
      //每次正好写一个校验块chunk大小
      writeChecksumChunks(b, off, length);
      return length;
    }
    // 当数据小于本地数据库chunk时候,先写入buf,当buf写满之后,flushBuffer也执行writeChecksumChunks
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      flushBuffer();
    } 
    return bytesToCopy;
  }

####3、FSOutputSummer.writeChecksumChunks


根据写的数据大小,切分多个chunk分别由writeChunk写。

  /** Generate checksums for the given data chunks and output chunks & checksums
   * to the underlying output stream.
   */
  private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    //len=4068
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    //每次正好写一个校验块chunk大小,len=4608,getBytesPerChecksum=512
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      //chunkLen=512,和blocksize有什么数学计算关系?
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
    }
  }

4、DFSOutputStream.writeChunk

调用writeChunkImpl处理

  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    TraceScope scope =
        dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
    try {
      //len=512
      writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
    } finally {
      scope.close();
    }
  }

5、DFSOutputStream.writeChunkImpl

该方法主要将数据和校验和写入packet中,如果packet写满了chunk或者达到blocksize就会将整个packet发送给dequeue队列中,等待线程DataStreamer 发送,最后发生一个空的packet告诉DataStreamer已经发送完成一个整的packet。

  private synchronized void writeChunkImpl(byte[] b, int offset, int len,
          byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    //      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
    // chunkLen = len
    //写的数据不能大于校验块chunk大小,len=512,bytesPerChecksum=512
    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    //实际写的校验和大小和给的值不一致
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }
    //当前写入的packet包为空则重新创建
    if (currentPacket == null) {
      //DFSPacket maxChunks=chunksPerPacket,第一次chunksPerPacket=126,后续就=1
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++, false);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.getSeqno() +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }
    //写入校验和
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    //写实际数据
    currentPacket.writeData(b, offset, len);
    //chunk个数加一
    currentPacket.incNumChunks();
    //当前block size累加len(512)
    bytesCurBlock += len;
   // If packet is full, enqueue it for transmission
    //如果校验块或者写满了block size 则将packet放到queue中。
    // 会不会有不等于的情况发生?由于incNumChunks是加一操作,所以肯定会有currentPacket.getNumChunks() == currentPacket.getMaxChunks()
    //blockSize=65536
    //如果当前bytesCurBlock大小大于默认的blockSize怎么办?这种情况好像出现不了
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.getSeqno() +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      //将数据currentPacket放到队列dataqueue中,等待线程DataStreamer 发送
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      //默认appendChunk false,如果chunk没有写满则appendChunk=true,见DataStreamer构造方法
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumBufSize();
      }

      if (!appendChunk) {
        //writePacketSize=dfs.client-write-packet-size默认65536
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        System.out.println("psize="+psize);
        //将chunksPerPacket重新计算packetsize
       /*
    final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
    final int chunkSize = csize + getChecksumSize();
    chunksPerPacket = Math.max(bodySize/chunkSize, 1);
    packetSize = chunkSize*chunksPerPacket;
    */

        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //当getNumChunks() == currentPacket.getMaxChunks时候bytesCurBlock=64512
      //dataQueue:
      //0 = {DFSPacket@7262} "packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512"
      //1 = {DFSPacket@7263} "packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65024"
      //2 = {DFSPacket@7188} "packet seqno: 2 offsetInBlock: 65024 lastPacketInBlock: false lastByteOffsetInBlock: 65536"
      //当block正好写满了,发送一个空packet
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
        currentPacket.setSyncBlock(shouldSyncBlock);
        waitAndQueueCurrentPacket();
        //重新赋值
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

6、DFSOutputStream.waitAndQueueCurrentPacket

判断队列是否已经满,如果满了就等待,没有满就调用queueCurrentPacket方法将packet加入到队列中。

private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
      try {
      // If queue is full, then wait till we have enough space
        boolean firstWait = true;
        try {
          //如果dataQueue和ackQueue的长度大于dfs.client.write.max-packets-in-flight=80 就认为queue满了,就等待
          while (!isClosed() && dataQueue.size() + ackQueue.size() >
              dfsClient.getConf().writeMaxPackets) {
            if (firstWait) {
              Span span = Trace.currentSpan();
              if (span != null) {
                span.addTimelineAnnotation("dataQueue.wait");
              }
              firstWait = false;
            }
            try {
              //在DataStream中等待notifyAll
              dataQueue.wait();
            } catch (InterruptedException e) {
              // If we get interrupted while waiting to queue data, we still need to get rid
              // of the current packet. This is because we have an invariant that if
              // currentPacket gets full, it will get queued before the next writeChunk.
              //
              // Rather than wait around for space in the queue, we should instead try to
              // return to the caller as soon as possible, even though we slightly overrun
              // the MAX_PACKETS length.
              Thread.currentThread().interrupt();
              break;
            }
          }
        } finally {
          Span span = Trace.currentSpan();
          if ((span != null) && (!firstWait)) {
            span.addTimelineAnnotation("end.wait");
          }
        }
        checkClosed();
        //向队列加入packet
        queueCurrentPacket();
      } catch (ClosedChannelException e) {
      }
    }
  }

7、DFSOutputStream.queueCurrentPacket

将数据packet加入到LinkedList队列中

  private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      currentPacket.addTraceParent(Trace.currentSpan());
      //加入队列
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.getSeqno();
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;
      dataQueue.notifyAll();
    }
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容

  • 概要 64学时 3.5学分 章节安排 电子商务网站概况 HTML5+CSS3 JavaScript Node 电子...
    阿啊阿吖丁阅读 9,182评论 0 3
  • Hadoop读书笔记2-HDFS-read write file HDFS是一个分布式文件系统,在HDFS上写文...
    raincoffee阅读 973评论 0 1
  • RTMP协议是Real Time Message Protocol(实时信息传输协议)的缩写,它是由Adobe公司...
    iOS小肖阅读 3,491评论 0 4
  • 我的妈妈今年37岁了,妈妈每天都辅导我写作业。他非常辛苦,我爱我的妈妈。
    刘淼涵妈妈阅读 285评论 0 0
  • 上周四晚上,我因为下楼踩空,左腿腓骨骨折,跟领导电话请假。 今天是第一天正式休班。 学...
    小胖儿_5b80阅读 286评论 0 1