store模块阅读22:HA(1):HAConnection

简介

HAConnection用于描述master和slave用于同步数据的连接
两个service如下
  ReadSocketService :读来自 Slave节点 的数据。
  WriteSocketService :写到往 Slave节点 的数据。
重要方法
  ReadSocketService#run
  ReadSocketService#processReadEvent
  WriteSocketService#run
  WriteSocketService#transferData

部分涉及HAService的代码后面再讲

master和slave同步数据的协议如下


参照refer

属性

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private final HAService haService;//上层ha服务
    private final SocketChannel socketChannel;
    private final String clientAddr;//slave地址
    private WriteSocketService writeSocketService;//内部类
    private ReadSocketService readSocketService;//内部类

    private volatile long slaveRequestOffset = -1;//slave第一次请求的offset
    private volatile long slaveAckOffset = -1;//确认slave的最大位置

函数

构造函数和状态处理函数比较简单

    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
        this.haService = haService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
        this.socketChannel.socket().setSendBufferSize(1024 * 64);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();//ha服务连接数+1
    }

    public void start() {
        this.readSocketService.start();
        this.writeSocketService.start();
    }

    public void shutdown() {
        this.writeSocketService.shutdown(true);
        this.readSocketService.shutdown(true);
        this.close();
    }

    public void close() {
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            } catch (IOException e) {
                HAConnection.log.error("", e);
            }
        }
    }

    public SocketChannel getSocketChannel() {
        return socketChannel;
    }

内部类

ReadSocketService

读socket服务

属性以及构造函数

        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
        private final Selector selector;
        private final SocketChannel socketChannel;
        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);//读的buffer,分配1M
        private int processPostion = 0;//byteBufferRead处理到的位置
        private volatile long lastReadTimestamp = System.currentTimeMillis();//记录最后读取的时间,20s超时

        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
            this.selector = RemotingUtil.openSelector();
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
            this.thread.setDaemon(true);
        }

run方法

代码,注释如下

        /**
         * while循环中
         *  1.调用processReadEvent,如果出错则break
         *  2.如果读的间隔超过了指定时间(默认20s)则break
         * 退出了break
         *  1.读写service stop
         *  2.ha服务移除当前HAConnection记录
         *  3.selector,channel关闭,清理
         */
        @Override
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    boolean ok = this.processReadEvent();//处理读事件
                    if (!ok) {//处理错误
                        HAConnection.log.error("processReadEvent error");
                        break;
                    }

                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {//心跳间隔超过了指定间隔,默认20s
                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                        break;
                    }
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }

            this.makeStop();//读线程stop

            writeSocketService.makeStop();//写线程stop

            //ha服务移除当前HAConnection记录
            haService.removeConnection(HAConnection.this);

            //ha服务的连接数-1
            HAConnection.this.haService.getConnectionCount().decrementAndGet();

            //取消注册的key
            SelectionKey sk = this.socketChannel.keyFor(this.selector);
            if (sk != null) {
                sk.cancel();
            }
            //关闭socket以及channel
            try {
                this.selector.close();
                this.socketChannel.close();
            } catch (IOException e) {
                HAConnection.log.error("", e);
            }

            HAConnection.log.info(this.getServiceName() + " service end");
        }

processReadEvent

        /**
         * 处理读事件
         * 1.如果byteBufferRead写满了,就flip准备重新写,更新processPostion
         * 2.只要buffer还能写
         *  从socketChannel内容写到byteBufferRead中
         *  更新lastReadTimestamp
         *  找到pos之前最近%8==0的位置(每次传输的是一个long,刚好8个字节)
         *  读取之前8个字节,记录slaveAckOffset,代表slave发送过来的offset,即slave同步到的offset
         *  notifyTransferSome。master通知slave: "已经知道slave同步到slaveAckOffset这个位置了"
         */
        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;

            if (!this.byteBufferRead.hasRemaining()) {//没有什么要读了
                this.byteBufferRead.flip();//从头读
                this.processPostion = 0;
            }

            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int readSize = this.socketChannel.read(this.byteBufferRead);//channel中的内容读到byteBufferRead中
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        //最后读取时间
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {//buffer最新的位置离处理的位置超过了8字节(一个long占的位置)
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);//找到对应的整
                            long readOffset = this.byteBufferRead.getLong(pos - 8);//读取这个long
                            this.processPostion = pos;//记录处理到的位置
                            // 设置已确认的最大位置
                            HAConnection.this.slaveAckOffset = readOffset;
                            if (HAConnection.this.slaveRequestOffset < 0) {//初始请求的位置,代表是slave第一次请求
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            }
                            // 通知目前Slave进度
                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }

WriteSocketService

写socket服务,完成master向slave发送数据

属性以及构造函数

        private final Selector selector;
        private final SocketChannel socketChannel;
        
        private final int headerSize = 8 + 4;//头部信息 包含 physical offset(long) + bodySize(int)
        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);//master给slave同步数据时的头部buffer
        private long nextTransferFromWhere = -1;//传递数据从哪开始
        private SelectMappedBufferResult selectMappedBufferResult;//master给slave同步数据的内容,即body
        private boolean lastWriteOver = true;
        private long lastWriteTimestamp = System.currentTimeMillis();//最后一次写的时间

        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
            this.selector = RemotingUtil.openSelector();
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
            this.thread.setDaemon(true);
        }

run方法

        /**
         * 正常情况下while循环
         *  1.等到slave请求,读取服务中更新slaveRequestOffset
         *  2.计算nextTransferFromWhere
         *      2.1如果为首次连接,那么slave传递的offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
         *      2.2否则为slave同步过来的slaveRequestOffset
         *  3.如果之前发送给slave内容没有完成,那么一直接着发送
         *  4.如果之前发送给slave内容发完了,而且过了心跳时间(默认5s),那么只传递一个header过去(header中记录的bodySize为0)
         *  5.根据nextTransferFromWhere去mappedFile找合适大小的mappedFile内容,记录在selectMappedBufferResult
         *  6.调用transferData把头部byteBufferHeader以及body内容selectMappedBufferResult发送过去
         * 如果遇到异常,break处while循环
         *  1.读写service stop
         *  2.haService移除当前记录
         *  3.socket,channel关闭,清理
         *
         */
        @Override
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);

                    if (-1 == HAConnection.this.slaveRequestOffset) {//slave还未请求过
                        Thread.sleep(10);
                        continue;
                    }

                    if (-1 == this.nextTransferFromWhere) {
                        //如果为首次连接,那么offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            masterOffset =
                                masterOffset
                                    - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMapedFileSizeCommitLog());

                            if (masterOffset < 0) {
                                masterOffset = 0;
                            }
                            this.nextTransferFromWhere = masterOffset;
                        } else {//否则从指定的offset开始同步数据给slave
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;//如果不是首次连接,就从确认的slave的offset开始
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }

                    if (this.lastWriteOver) {//上次写完了

                        long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        //5s一次心跳,如果超时,记录的bodySize为0
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {

                            // Build Header
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(headerSize);
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);//头部信息 phyOffset为nextTransferFromWhere
                            this.byteBufferHeader.putInt(0);//头部信息 bodySize为0
                            this.byteBufferHeader.flip();

                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver)
                                continue;
                        }
                    } else {
                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }

                    //获取指定位置之后的buffer
                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    if (selectResult != null) {
                        int size = selectResult.getSize();
                        //同步数据不得超过默认32k
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }

                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += size;//下次传输的位置

                        selectResult.getByteBuffer().limit(size);
                        this.selectMappedBufferResult = selectResult;

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);//头部12字节
                        this.byteBufferHeader.putLong(thisOffset);//开始位置
                        this.byteBufferHeader.putInt(size);//bodySize大小
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                    } else {
                        // 没新的消息,挂起等待
                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }
                } catch (Exception e) {

                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
            //break出来,代表异常
            if (this.selectMappedBufferResult != null) {
                this.selectMappedBufferResult.release();
            }
            //停止读写service
            this.makeStop();

            readSocketService.makeStop();

            haService.removeConnection(HAConnection.this);//移除该记录

            SelectionKey sk = this.socketChannel.keyFor(this.selector);
            if (sk != null) {
                sk.cancel();
            }

            try {
                this.selector.close();
                this.socketChannel.close();
            } catch (IOException e) {
                HAConnection.log.error("", e);
            }

            HAConnection.log.info(this.getServiceName() + " service end");
        }

transferData

master给slave写数据

        /**
         * 传输数据
         * 1.写头部
         * 2.写body
         */
        private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;//长度为0的写的次数
            // Write Header,写头部
            while (this.byteBufferHeader.hasRemaining()) {
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    //更新写时间
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }

            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // Write Body,写内容
            if (!this.byteBufferHeader.hasRemaining()) {//头部写完了
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());//把master的内容传递给slave
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;//清空次数
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }

            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;//发送完了之后置为null
            }

            return result;
        }

思考

两个线程run方法的流程,思路

都写在注释上了

processReadEvent时如果byteBufferRead一次读入了大量数据怎么办

源码上


image.png

如果一下子读的内容比processPostion 大了几十几百,会怎么办,为什么只读最后8个字节就行
这里应该是HAClient的发送内容保证的,最后发送的offset一定是最大的,因此之前的offset,即使收到了没有处理,也没有关系

WriteSocketService和ReadSocketService的异同

同:都继承ServiceThread,都记录一个最后读(写)时间,出现问题时都需要关闭两个Service,处理socket等
异:

1.同步协议不一样
读数据只要读一个long,代表maxPhyOffset
写要有一个12字节的header,后面是同步的内容
2.检测的时长不一样
读数据是20s没有读算超时(houseKeep)
写是5s没有写算超时(heartBeat)

问题

processReadEvent代码重复

image.png

我并不知道为啥要这样写

备注

思考中第二个对于HAClient的讲解,源码还没看

refer

http://blog.csdn.net/quhongwei_zhanqiu/article/details/39144469
http://technoboy.iteye.com/blog/2368458

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容