store模块阅读14:Index子模块3:IndexService

说明

这个类就是索引服务,提供查询key,根据key构建索引,加载IndexFile到内存,destroy过期的IndexFile等功能,UML图如下

IndexService UML图

属性

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    /**
     * Maximum times to attempt index file creation.
     */
    private static final int MAX_TRY_IDX_CREATE = 3;//尝试创建IndexFile,最多3次
    private final DefaultMessageStore defaultMessageStore;
    private final int hashSlotNum;//每个索引文件的槽数,默认500w
    private final int indexNum;//索引文件最多记录的索引个数,默认2千万
    private final String storePath;//索引文件存储路径,默认 user.home/store/index
    private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();//索引文件列表
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

函数

destroy函数不介绍,start,shutdown是空函数

构造函数

    public IndexService(final DefaultMessageStore store) {
        this.defaultMessageStore = store;
        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
        this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
    }

默认值参考 属性中的讲解

load

加载函数,将持久化文件的相关信息,读到内存中
加载所有IndexFile,根据lastExitOK判断是否要destroy不合理的记录,更新indexFileList

    public boolean load(final boolean lastExitOK) {
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {
                try {
                    IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
                    f.load();//读取IndexFile头信息,加载到IndexHeader的内存中

                    if (!lastExitOK) {
                        if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                            .getIndexMsgTimestamp()) {
                            //上次不ok,且时间比store记录的检查点要新的话,就无效
                            f.destroy(0);
                            continue;
                        }
                    }

                    log.info("load index file OK, " + f.getFileName());
                    this.indexFileList.add(f);
                } catch (IOException e) {
                    log.error("load file {} error", file, e);
                    return false;
                } catch (NumberFormatException e) {
                    log.error("load file {} error", file, e);
                }
            }
        }

        return true;
    }

deleteExpiredFile函数

分为deleteExpiredFile(long offset) 以及 deleteExpiredFile(List<IndexFile> files)

deleteExpiredFile(long offset)就是找到IndexFile中,EndPhyOffset<offset的,调用deleteExpiredFile(fileList)删掉
deleteExpiredFile(List<IndexFile> files),就是destroy所有"过期"的IndexFile

    /**
     * 找到IndexFile中,EndPhyOffset<offset的
     * 调用deleteExpiredFile(fileList)删掉
     */
    public void deleteExpiredFile(long offset) {
        Object[] files = null;
        try {
            this.readWriteLock.readLock().lock();
            if (this.indexFileList.isEmpty()) {
                return;
            }

            long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
            if (endPhyOffset < offset) {
                files = this.indexFileList.toArray();
            }
        } catch (Exception e) {
            log.error("destroy exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        if (files != null) {
            List<IndexFile> fileList = new ArrayList<IndexFile>();
            for (int i = 0; i < (files.length - 1); i++) {
                IndexFile f = (IndexFile) files[i];
                if (f.getEndPhyOffset() < offset) {
                    fileList.add(f);
                } else {
                    break;
                }
            }

            this.deleteExpiredFile(fileList);
        }
    }

    //destroy所有"过期"的IndexFile
    private void deleteExpiredFile(List<IndexFile> files) {
        if (!files.isEmpty()) {
            try {
                this.readWriteLock.writeLock().lock();
                for (IndexFile file : files) {
                    boolean destroyed = file.destroy(3000);
                    destroyed = destroyed && this.indexFileList.remove(file);
                    if (!destroyed) {
                        log.error("deleteExpiredFile remove failed.");
                        break;
                    }
                }
            } catch (Exception e) {
                log.error("deleteExpiredFile has exception.", e);
            } finally {
                this.readWriteLock.writeLock().unlock();
            }
        }
    }

queryOffset

根据topic和key作为关键字,在所有IndexFile中找到匹配的记录,且存储时间在[begin,end]时间内,最多记录maxNum条

    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<Long>(maxNum);

        long indexLastUpdateTimestamp = 0;
        long indexLastUpdatePhyoffset = 0;
        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());//默认64
        try {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                for (int i = this.indexFileList.size(); i > 0; i--) {//从新往旧搜索
                    IndexFile f = this.indexFileList.get(i - 1);
                    boolean lastFile = i == this.indexFileList.size();
                    if (lastFile) {
                        indexLastUpdateTimestamp = f.getEndTimestamp();//最新IndexFile的最后时间以及物理偏移
                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
                    }

                    if (f.isTimeMatched(begin, end)) {//IndexFile的开始结束时间和[begin,end]是否有重叠
                        //根据topic key在IndexFile找到匹配的记录,记录在phyOffsets中
                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                    }

                    if (f.getBeginTimestamp() < begin) {
                        break;
                    }

                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            log.error("queryMsg exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }
        //返回QueryOffsetResult
        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
    }

buildKey

这个是个辅助的,就是把topic和key拼起来

    private String buildKey(final String topic, final String key) {
        return topic + "#" + key;
    }

构建索引相关

buildIndex

根据DispatchRequest构建索引
如果getSysFlag是 ROLLBACK的,就返回,不建立索引(也不会删除旧索引)
根据 UniqKey 以及 keys分别建立索引

    public void buildIndex(DispatchRequest req) {
        IndexFile indexFile = retryGetAndCreateIndexFile();//尝试三次,获取或者创建一个最新的可写的,IndexFile
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            if (msg.getCommitLogOffset() < endPhyOffset) {//如果是老记录
                return;
            }

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE://rollback就不建立索引
                    return;
            }
            //根据req.getUniqKey 构建索引
            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
            //根据msg.getKeys()按照" "来split,再构建索引
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }

里面调用了retryGetAndCreateIndexFile,以及putKey如下

retryGetAndCreateIndexFile

尝试3次获取或者创建一个最新的,可写的IndexFile
如果原来最新的没写满就返回原来最新的
否则创建最新的

    public IndexFile retryGetAndCreateIndexFile() {
        IndexFile indexFile = null;

        for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
            indexFile = this.getAndCreateLastIndexFile();
            if (null != indexFile)
                break;

            try {
                log.info("Tried to create index file " + times + " times");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }

        if (null == indexFile) {
            this.defaultMessageStore.getAccessRights().makeIndexFileError();
            log.error("Mark index file cannot build flag");
        }

        return indexFile;
    }

里面调用了getAndCreateLastIndexFile,如下

getAndCreateLastIndexFile

获取,创建最新的,可写的IndexFile
IndexFileList里面最新的IndexFile没写满,就用最新的
如果写满了,就以时间戳作为filename创建一个新的IndexFile,并且flush上一个IndexFile,将最后的时间戳记录在storeCheckPoint中

public IndexFile getAndCreateLastIndexFile() {
        IndexFile indexFile = null;
        IndexFile prevIndexFile = null;
        long lastUpdateEndPhyOffset = 0;
        long lastUpdateIndexTimestamp = 0;

        {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {//索引文件列表非空
                IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
                if (!tmp.isWriteFull()) {
                    indexFile = tmp;//如果最新的索引文件没写满,就用
                } else {
                    lastUpdateEndPhyOffset = tmp.getEndPhyOffset();//获取上一个写满了的文件的物理偏移 以及 最后更新时间
                    lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                    prevIndexFile = tmp;
                }
            }

            this.readWriteLock.readLock().unlock();
        }

        if (indexFile == null) {//最新一个文件写满了,indexFile为空
            try {
                String fileName =
                    this.storePath + File.separator
                        + UtilAll.timeMillisToHumanString(System.currentTimeMillis());//文件名是时间戳
                indexFile =
                    new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                        lastUpdateIndexTimestamp);//标记新文件开始的物理偏移,时间为上一个文件的结束偏移,时间
                this.readWriteLock.writeLock().lock();
                this.indexFileList.add(indexFile);//加入列表
            } catch (Exception e) {
                log.error("getLastIndexFile exception ", e);
            } finally {
                this.readWriteLock.writeLock().unlock();
            }

            if (indexFile != null) {
                final IndexFile flushThisFile = prevIndexFile;
                Thread flushThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        IndexService.this.flush(flushThisFile);//上一个文件flush
                    }
                }, "FlushIndexFileThread");

                flushThread.setDaemon(true);
                flushThread.start();
            }
        }

        return indexFile;
    }

里面调用了flush,如下

flush

flush一个文件,如果写满了,获取文件最后的存储时间,记录在StoreCheckpoint中

    public void flush(final IndexFile f) {
        if (null == f)
            return;

        long indexMsgTimestamp = 0;

        if (f.isWriteFull()) {//写满了就获取最后时间
            indexMsgTimestamp = f.getEndTimestamp();
        }

        f.flush();

        if (indexMsgTimestamp > 0) {
            this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);//更新storeCheckPoint
            this.defaultMessageStore.getStoreCheckpoint().flush();
        }
    }

putKey

建立索引 将key,物理偏移,存储时间放入IndexFile
如果一次放失败了,就重新获取最新的可写的IndexFile再放

    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }

思考

IndexFile的文件名是什么

时间戳,getAndCreateLastIndexFile函数中注明了

哪些消息可以进入IndexFile进行记录

设置了keys或者UniqKey的消息,都会进入IndexFile
buildIndex函数说明了

重要函数

函数名 作用
load(final boolean lastExitOK) 加载所有IndexFile,根据lastExitOK判断是否要destroy不合理的记录,更新indexFileList
deleteExpiredFile(long offset) 找到IndexFile中,EndPhyOffset<offset的进行destroy
queryOffset 根据topic和key作为关键字,在所有IndexFile中找到匹配的记录,且存储时间在[begin,end]时间内,最多记录maxNum条
buildIndex(DispatchRequest req) 根据DispatchRequest 的 UniqKey 以及 keys分别建立索引

吐槽

构建索引的代码嵌套复杂了点,要反复保证,IndexFile是最新的,可写的

refer

http://blog.csdn.net/meilong_whpu/article/details/76921583

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容

  • 概述 IndexFile由几个部分构成: IndexHeader 文件头(40字节)SlotTable 槽表(4字...
    赤子心_d709阅读 341评论 0 0
  • HBase存储架构图 HBase Master 为Region server分配region 负责Region s...
    kimibob阅读 5,580评论 0 52
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,315评论 1 15
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,664评论 24 1,002
  • 1 这是竹听欢第六次越狱了。 第一次竹听欢趁着夜色开了锁,偷偷溜了出去;第二次竹听欢策动两个狱友,直...
    小渔歌儿阅读 291评论 12 8