HDFS架构师3.1-元数据管理流程2(日志同步及fsimage互传)

19} 20} 21} 22}

元数据源码剖析.png

1、内存里面的元数据刷盘

image.png

19}
接上期

——1.1 .1.1》 FSEditLog.logSync()
——1.1 .1.1.1》 EditLogOutputStream.flush()

//第一次:FileJouranlManager -> EditLogFileOutputStream
//第二次:QuorumJounalManager -> QuorumOutputStream

        flushAndSync(durable);
                         ↓ 先看这个

——1.1 .1.1.1》EditLogFileOutputStream# flushAndSync(durable)
//TODO 涮写磁盘
doubleBuf.flushTo(fp);
↓ 再看这个
——1.1 .1.1.2》QuorumOutputStream#flushAndSync
——1.1 .1.1.2.1》AsyncLoggerSet#sendEdits()
//往journalnode去发送日志。
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);

2、内存里面的元数据刷盘和journalnode 19},20} 10分钟

3、standby的 namenode 同步 元数据 by journalnode

21} 40分钟 22} 18 分钟

元数据管理.png

/* EditLogTailer是一个后台线程,启动了以后会周期性的去journalnode集群上面去

  • 读取元数据日志,然后再把这些元数据日志应用到自己的元数据里面(内存+磁盘)
    */
    EditLogTailer类
    EditLogTailer.EditLogTailerThread.run()方法
    ——1》EditLogTailer.EditLogTailerThread.doWork()
    //TODO 重要的代码
    doTailEdits();
    //TODO 每隔60秒 StandByNameNode 去Journalnode获取一下日志
    Thread.sleep(sleepTimeMs);

    ——1》EditLogTailer.doTailEdits()

    //TODO 加载当前自己的元数据日志
    FSImage image = namesystem.getFSImage();
    //TODO StandByNamenoe 获取当前的元数据日志的最后一条日志的事务ID是多少
    long lastTxnId = image.getLastAppliedTxId();
    //这个地方是重要的代码
    //需要去journlanode上面去读取元数据
    //现在的事务id 1000,所以我去journlanode上面去读取
    //日志的时候,只需要去读取 1001后面的日志就可以。
    //TODO 设置获取Journalnode获取日志的流
    streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);

//TODO 去Journalnode加载日志
editsLoaded = image.loadEdits(streams, namesystem);

——1.1》FSImage.loadEdits()
                                         ▼

//TODO 加载日志
//1000
//1001
//2000
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);

//TODO 记录最后的一个事务ID
//1000 -> 2000
lastAppliedTxId = loader.getLastAppliedTxId();

   ——1.1.1》FSEditLogLoader.loadFSEdits()

//TODO 重要代码
long numEdits = loadEditRecords(edits, false,

       ——1.1.1》FSEditLogLoader.loadEditRecords()
                                                         ▼

//TODO 把获取到的元数据作用到自己的内存元数据里
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(true), lastInodeId);

         ——1.1.1.1》FSEditLogLoader.applyEditLogOp()
                                                         ▼

//TODO 创建目录的日志
case OP_MKDIR: {
//根据匹配规则我们这次的日志
//应该是一个创建目录的日志。
MkdirOp mkdirOp = (MkdirOp)op;
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
//TODO 把数据作用于自己的元数据里面。
FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
break;

              ——1.1.1.1》FSDirMkdirOp.mkdirForEditLog()
                                  //TODO 重要代码
unprotectedMkdir(fsd, inodeId, existing, localName, permissions, 

                   ——1.1.1.1》 FSDirMkdirOp.unprotectedMkdir()
                                                                ▼

//TODO 封装成一个目录
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
//TODO 往文件目录树 该添加目录的地方添加节点
INodesInPath iip = fsd.addLastINode(parent, dir, true);
▲回到 ——1.1.1》FSEditLogLoader.loadEditRecords

——1.1.1》FSEditLogLoader.loadEditRecords()

try {
/**
* 读取元数据日志(到了journalnode)
* 至于是如何读取的,我们等一下。
* 2.7.0
*/
op = in.readOp();

——1.1.1.1》EditLogInputStream.readOp()
——1.1.1.1》EditLogInputStream. nextOp()

——1.1.1.1.1》EditLogFileInputStream. nextOp()
★——1.1.1.1.1》EditLogFileInputStream.nextOpImpl()

//TODO 核心方法
init(true);
——1.1.1.1.1》EditLogFileInputStream.init()
/**
* TODO 这儿使用了装饰模式
*/
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
▲回到 ——1.1.1.1.1》FSEditLogLoader.nextOpImpl
//TODO 通过reader读取日志
op = reader.readOp(skipBrokenEdits); 21} 0:28:

注释: reader 在 init里面初始化 fStream = log.getInputStream();//log是URLLog
回到 ——1.1.1.1.1》EditLogFileInputStream.init()
//所以找URLlog的getInpustream()的方法
fStream = log.getInputStream();

——1.1.1.1.1.1》EditLogFileInputStream.URLLog.getInputStream();

//创建了HttpURLConnetcion
//如果我们这儿发送的是HTTP的请求,读取的Journalndoe那儿的日志
//说明journalndoe启动起来的时候肯定会有一个JournalnodeHttpServer
//NameNode: NameNodeRpcServer NameNodeHttperServer
//DataNode: RpcServer Httpserver
//JournalNode: JournalnodeRpcServer JournalnodeHttpserver
//TODO 真相大白,我们创建了一个HttpURLConnection对象
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
//通过这个对象获取到了输入流
return connection.getInputStream(); 21} 0:30

                                       ↓    JournalNode服务器接受读取editlog请求 

       ——1.1.1.1.1.2》 JournalNodeHttpServer.start()

//TODO 绑定了一个servlet /getJournal
httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true);
//TODO 启动服务
httpServer.start();

——1.1.1.1.1.2.1》 GetJournalEditServlet.doGet()

//TODO journalndoe读取数据流
//就是我们平常普通的操作
editFileIn = new FileInputStream(editFile);

              //TODO 流对烤
              //editFileIn 这个输入流读取的是journalnode这儿的日志
              //response.getOutputStream() 把数据写到这个输出流里面             
        TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,  editFileIn, throttler);

                                ↓  //StandbyCheckpointer类做checkpoint

——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.run()

  • 命名空间 = 元数据信息 = 目录树 = fsimage *
  • StandbyCheckpointer 是一个运行在standBynamenode上的一个线程。
  • 他会周期性的对命名空间做checkpoint的操作(说白了就是把 内存里面目录树的信息持久化到磁盘上面)
  • 并且会把这个份数据上传到active namenode(用来替换 active namednoe上面的fsimage)

——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.dowork()

//TODO 每隔60检查以下是否需要做checkpoint
Thread.sleep(checkPeriod);
//TODO checkpoint条件一 数量 10000
//这儿是计算以下,我们上次checkpoint 现在最新的数据差了多少?
//或者说大概的意思就是说我们现在有多少条日志没有checkpoint了。
final long uncheckpointed = countUncheckpointedTxns();
//TODO checkpoint条件二
//当前时间 - 上一次checkpoint的时间。
//说白了这个变量代表的意思就是 已经有多久没有做checkpoint了。
final long secsSinceLast = (now - lastCheckpointTime) / 1000;

//TODO 执行checkpoint
doCheckpoint();

——1.1.1.1.1.2.1.1》StandbyCheckpointer.doCheckpoint()

//TODO 把元数据持久化到磁盘上面
img.saveNamespace(namesystem, imageType, canceler);
//开启了一个异步的线程
ExecutorService executor =
Executors.newSingleThreadExecutor(uploadThreadFactory);

//这个操作就要把刚刚从内存里面的元数据持持久化到磁盘上面的 那个份数据 上传到 active的namenode上面去。
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);

4、standby的 namenode 发送 fsimage 到 主namenode 22} 0:13

流程图

image.png

——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImageFromStorage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.writeFileToPutRequest()

//通过http方式获取的流
OutputStream output = connection.getOutputStream();
//输入流肯定是自己这儿的,不断读自己的数据
FileInputStream input = new FileInputStream(imageFile);
try {
//这儿没有什么特别的,就是一个流对烤
//然后把数据网output 输出流里面去写。
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf), canceler);

                                ↓  //NameNodeHttpServer类做上传   22}

——1.1.1.1.1.2.1.1.2》NameNodeHttpServer.start()
——1.1.1.1.1.2.1.1.2》NameNodeHttpServer. setupServlets()

//TODO 上传元数据的请求
//SecondaryNameNode/StandByNamenode合并出来的FSImage需要替换Active NameNode的fsimage
//发送的就是http的请求,请求就会转发给这个servlet
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
Image'Servlet.class, true);

          ——1.1.1.1.1.2.1.1.3》ImageServlet.doPut()
                                                              ▼   
           //TODO 步骤一:
            // 针对请求获取到一个输入流,不断的把数据读取过来
            InputStream stream = request.getInputStream();
            try {
              long start = monotonicNow();
              //TODO 步骤二:
              MD5Hash downloadImageDigest = TransferFsImage
                  .handleUploadImageRequest(request, txid,
                      nnImage.getStorage(), stream,
                      parsedParams.getFileSize(), getThrottler(conf));
              //TODO 步骤三:
              // 会把接收过来的元数据 替换 现在已有的fsimage文件。
              //对文件进行重命名
              nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
                  downloadImageDigest);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容