DFSOutputStream 实现(pipeline)

create接口.png

DFSOutputStream 扩展自抽象类FSOutputSummer,FSOutputSummer 在 Outputstream 的基础上提供了写数据并计算校验和的功能,DFSOutputStream.write()方法的实现就继承自FSOutputStream类。

packet

再讲DFSOutputStream之前,我们先要说说packet这个东西,DFSOutputStream中使用Packet类来封装一个数据包。每个数据包中都包含若干个校验块,以及校验块对应的校验和。一个完整的数据包结构如图所示。首先是数据包包头, 记录了数据包的概要属性信息,然后是校验和数据,最后是校验块数据。Packet类提供了writeData()以及writeChecksum()方法向数据块中写入校验块数据以及校验和。

DFSOutputStream构造方法

DFSOutputStream构造方法比较简单,它首先调用私有的构造方法初始化一些属性,并且 对shouldSyncBIock (是否在关闭时将数据块持久化到磁盘)属性赋值。然后调用 computePacketChunkSize()方法确定数据包(packet)大小,同时确定一个数据包当中包含多少个校验块(chunk)。接下来会构造streamer线程,这个streamer线程负责建立数据流管道(pipeline),并将数据包发送到数据流管道中的一个Datanode,最后设置了 favoredNodes字段,确认客户端想要在哪些 Datanode上写入数据块。



write 方法

DFSOutputStream.write()方法可以将指定大小的数据写入数据流内部的一个缓冲区中,写入的数据会被切分成多个数据包,每个数据包又由一组校验块和这组校验块对应的校验和组成,默认数据包大小为65536字节,校验块大小为512字节,每个校验和都是校验块的512 字节数据对应的校验值。这里的数据包大小、校验块大小是在computePacketChunkSize()方法中定义的。

当Client写入的字节流数据达到一个数据包的长度时,DFSOutputStream会构造一个 Packet对象保存这个要发送的数据包。如果当前数据块中的所有数据包都发送完毕了, DFSOutputStream会发送一个空的数据包标识数据块发送完毕。新构造的Packet对象会被放到 DFSOutputStream.dataQueue 队列中,由 DFSOutputStream 的内部线程类 DataStreamer 处理。

DataStreamer 线程会从 dataQueue 中取出Packet对象,发送到数据流管道中的第一个Datanode,发送完毕后,将Packet从dataQueue中移除,放入ackQueue中等待下游节点的确认消息。确认消息是由DataStreamer的内部线程类ResponseProcessor 处理的。ResponseProcessor线程等待下游节点的响应ack,判断ack状态码,如果是失败状态,则记录出错Datanode的索引(errorlndex & restartindex),并设置错误状态位(hasError)。如果 ack状态是成功,则将数据包从ack队列中移除,整个数据包发送过程完成。

image.png

如果在数据块发送过程中出现错误,那所有ackQueue队列中等待确认的Packet都会被重新放回dataQueue队列中重新发送。客户端会执行错误处理流程,将出现错误的Datanode从 数据流管道中删除,然后向Namenode申请新的Datanode重建数据流管道。接着DataStreamer 线程会从dataQueue队列中取出Packet重新发送。


DataStreamer

DataStreamer 首先向Namenode申请一个新的数据块,然后建立写这个数据块的数据流管道(pipeline),最后DataStreamer从dataQueue队列中取岀待发送数据包并通过数据流管道发送给Datanode。每个数据包(packet)都有一个与之相关的序列号, 当一个数据块中所有的数据包都发送完毕,并且获得了 ACK消息后,DataStreamer线程就会将当前数据块的数据流管道关闭。

如果DFSOutputStream中还有数据需要发送,则DataStreamer 会再次向Namenode申请分配新的数据块,并且提交上一个数据块。获取了新分配数据块的位置信息后,DataStreamer会再次建立到新分配数据块的数据流管道,然后发送数据。

数据流管道的状态:

DataStreamer类定义了 nodes、storageTypes、storagcIDs以及stage等字段,用于保存当前数据流管道的状态。同时, DataStreamer还定义了 setPipeline()方法用于更新上述字段。

DataStreamer使用stage字段记录了当前数据流管道的状态,数据流管道的状态定义在BlockConstructionStage类中,有如下几种:

  • PIPELINE_SETUP_CREATE:写新文件时,数据流管道的初始状态。
  • PIPELINE_SETUP_APPEND:追加写已有文件时,数据流管道的初始状态。
  • DATA_STREAMING:数据流管道已经建立好,可以传输数据了。
  • PIPELINE_CLOSE:数据块已经写满,数据流管道关闭。

状态转换如图:


pipeline.png

当DFSCIient执行写新文件操作时,数据流管道的初始状态为PIPELINE_SETUP_CREATE;当DFSCIient执行追加写文件操作时,数据流管道的初始状态为 PIPELINE_SETUP_APPEND。

对于写新文件操作,DataStreamer会调用nextBlockOutputStream()方法向Namenode申请 分配新的数据块,然后构造这个新数据块的数据流管道。

对于追加写文件操作,DataStreamer 会调用setupPipelineForAppendOrRecovery()方法打开已有的HDFS文件并返回这个文件最后 一个数据块的位置信息,然后根据最后一个数据块的位置信息初始化数据流管道。

成功构造数据流管道后,DataStreamer会调用initDataStreaming()方法将数据流管道状态改为DATA_STREAMING,并调用setPipeline()记录数据流管道状态,然后就可以通过数据流 管道发送数驱包了。
当数据流管道将当前数据块写满后,会将数据流管道状态设置为PIPELINE_CLOSE,然 后向数据流管道发送一个空的数据包标识数据块已经写完。当DataStreamer确认收到这个空数据包的响应消息后,也就是数据流管道中的所有Datanode都成功地写入了数据块时,会调
endBk)ck()方法关闭数据流管道,并将数据流管道状态设置为PIPELINE_SETUP_CREATE 初始状态。然后DataStreamer会申请新的数据块,建立数据流管道,并写入数据,直到 DataStreamer线程被关闭。

数据流管道的建立

DataStreamer在将数据包发送到(Namespace)中分配数据块,建立写数据块的数据流管道。这些操作都是在DataStreamer.run() 方法中触发的。

首先调用nextBlockOutputStream。方法向Namenode申请分配新的数据块,然后建立到新分配数据块的输出流。接下来调用setPipeline()方法记录数据流管道信息(包括存 储数据的 Datanode,以及它们的 storagelDs)最后调用initDataStreaming。启动 ResponseProcessor线程处理来自Datanode的响应信息,并将数据块构建状态(BlockConstructionStage) 设置为DATA_STREAMING

数据流管道的建立流程如图所示。

  • nextBlockOutputStream

  • setPipeline

  • initDataStreaming

错误处理

DataStreamer错误处理部分,主要是由processDatanodeError()方法实现的。在DataStreamer发送数据包的过程中,由hasErrorerrorindexrestartingNodelndex 这三个变量 记录错误信息,它们分别表明数据流管道是否出现错误、数据流管道中错误的Datanode索引、 数据流管道中需要重启的Datanode索引。

在发送数据包的过程中,可能出现如下错误:

  • 在建立数据流管道的过程中:在createBlockOutputStream()方法中,调用DataTransfer Protocol.writeBlock()请求时,下游Datanode可能出现异常,并随着writeBlock()的响 应带回。在异常处理代码中,对hasError、errorindex和restartingNodelndex这三个 变量赋值。

  • 数据包发送完成后:下游节点会对每个数据包进行ack确认。ack确认消息中就会携 带岀现故障的Datanode信息,也就是在ResponseProcessor.run()方法处理ack消息时, 异常处理代码会对hasError、errorindex和restartingNodelndex这三个变量赋值。

  • 在DataStreamer.run()中,通过底层IO流发送数据包时会出现异常(这个时候,由于 没有下游节点返回的消息,所以直接将数据管道流中的第一个节点标识为错误节点)。

出现错误之后的错误处理是由processDatanodeErrorf)方法实现的,它的流程可以分为以 下三个部分。

  1. 关闭当前10流。
  2. 将ackQueue队列中的元素移动到dataQueue中重新发送。
  3. 重新初始化数据流管道。
  • setupPipelineForAppendOrRecovery()
    主要应用在两种情况下:

    1. append操作时用于创建数据流管道;
    2. 数据流管道写数据出现异常时,进行恢复操作。
      恢复操作有三个目的:
      1. 等待重启的Datanode启动;
      2. 将错误的Datanode从数据流管道中删除,将新的Datanode添加到数据流管道中;
      3. 最后更新Namenode命名空间中数据块 的时间戳,这样异常Datanode ±的过期数据块就可以被删除了;

    setupPipelineForAppendOrRecovery()方法的执行流程分为如下几步:

    1. 处理重启的Datanode,这里的处理方式是线程睡眠一段时间(默认为4秒),等待 Datanode重启。如果睡眠时间过长,超过restartDeadline,那么将这个重启的Datanode 标志为错误节点。

    2.处理异常的Datanode,处理方式是从数据流管道中移除异常的Datanodeo

    1. 如果满足了替换异常Datanode的条件,则调用addDatanode2ExistingPipeline()方法在 数据流管道中添加一个新的Datanodeo

    2. 调用 ClientProtocoLupdatcBlockForPipeline()更新数据块的时间戳,这样异常 Datanode 上的时间戳错误的过期数据块就可以被删除了。

    3. 如果数据流管道恢复成功,则更新Namenode命名空间中数据块的时间戳,同时更 新当前Client侧缓存的数据块信息的时间戳。

在满足什么样的条件下,会用新的Datanode替换数据流管道中异常的Datanode呢?
根据配置,当当前数据节点的数目小于所需要的副本数目除2,以及在appencVhflushed操作下, 当前数据节点的数目小于所需要的副本数目时,调用addDatanode2ExistingPipeline()方法将新 的数据节点添加到当前的数据流管道中。

ResponseProcessor线程

ResponseProcessor线程的处理逻辑比较简单,它从数据流管道下游节点的输入流中读入响应消息。然后判断响应状态,如果下游数据节点执行写入数据包失败,则通过ack消息中的应答码记录错误节点(errorlndex ),并设置错误标志位(hashError )。最后会在
DataStreamer.run()方法中调用processDatanodeEroor()理这个错误信息。如果下游节点写入数据包成功,则把当前数据包信息从ackQueue中移除。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容