Cat消息存储

  1. 消息格式为 应用名-IP-小时正点数-消息递增号 MessageId
  2. 每个 应用 + IP + 整点小时 对应: 一个索引文件 和 一个数据文件
  3. 消息经过编码后,首4字节为该消息的大小,从文件中读消息的时候会用到这个特性

写消息过程

  1. 获取MessageBlock中的MessageTree个数,进行遍历
  2. 获得每个MessageTree的index(索引递增号) 和 每个MessageTree的size(数据大小)
  3. 设置索引文件的起始位置 索引递增号*6
  4. 将该该消息所对应block在数据文件中的起始地址写到索引文件(4字节)
  5. 将该该消息在block中的偏移量写入索引文件(2字节)
  6. 将block的内容长度写入数据文件
  7. 将block的内容写入dataFile
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
    // block中消息条数
    int len = block.getBlockSize();
    // block大小
    byte[] data = block.getData();

    // 用于在遍历过程中记录每条消息的偏移量,遍历完成之后,blockSize等于block的大小
    int blockSize = 0;

    ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);

    for (int i = 0; i < len; i++) {
        // 消息的递增号
        int seq = block.getIndex(i);
        // 消息的大小
        int size = block.getSize(i);

        // m_indexFile.seek(seq * 6L);
        // 该消息在索引文件的起始位置 递增号*6 ,表示每条消息在索引文件中占6个字节大小
        m_indexChannel.position(seq * 6L);

        // m_indexFile.writeInt(m_blockAddress);
        // m_indexFile.writeShort(blockSize);
        // 用于记录该消息所对应block在数据文件中的起始地址
        buffer.putInt(m_blockAddress);
        // 用于记录该消息在block中的偏移量
        buffer.putShort((short) blockSize);
        buffer.flip();
        // 写入索引文件
        m_indexChannel.write(buffer);

        // 计算下一条消息在该block中的偏移量
        blockSize += size;

        buffer.clear();
    }

    // m_dataFile.writeInt(data.length);
    // m_dataFile.write(data);
    buffer = ByteBuffer.allocate(4 + data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);
    // 先在数据文件中用4个字节记录 block 的大小
    buffer.putInt(data.length);
    // 再将block的内容写入数据文件
    buffer.put(data);
    buffer.flip();
    m_dataChannel.write(buffer);

    // 更新 m_blockAddress 的值,即数据文件下一次写入时的起始位置
    m_blockAddress += data.length + 4;
}

即数据文件中的存储结构为: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】

索引文件的存储结构为: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】

读消息过程

对于真正的文件存储,block在这里其实是一个抽象的概念; 如果是直接以Message为单位进行写文件,那这个 block 和 索引文件中的block偏移量 就没有什么意义了。但实际上消息是以block为单位进行写文件,一个block最大为64K,而一个block中又存在多条消息,所以每条消息在它所属的block中有一个偏移量

  1. 根据 索引递增号从索引文件读前4个字节 找到block的地址
  2. 该地址为起始地址,从数据文件中读取一个int类型数据(4个字节)作为该block的长度
  3. 根据该长度读取整个block的内容到byte数组
  4. 根据 索引递增号从索引文件读后2个字节 找到该消息在该block中的偏移地址
  5. 以偏移地址为起始地址,读取一个int类型数据(4个字节)作为该消息的大小(为什么读4字节?这是在对消息编码时决定的,首4字节表示该消息的大小)
  6. 根据偏移地址 和 上一步获取的int类型数据大小 读取Message
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
    DataInputStream in = null;

    try {
        in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
    } catch (IOException e) {
        try {
            in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
        } catch (IOException ioe) {
            Cat.logError(ioe);
        }
    }
    return in;
}

public byte[] readMessage(int index) throws IOException {
    int blockAddress = 0;
    int blockOffset = 0;

    // 索引 在索引文件的起始位置
    m_indexFile.seek(index * 6L);

    // 读出4字节,该值代表block在数据文件的起始位置
    blockAddress = m_indexFile.readInt();
    // 读出2字节 该值代表Message在block中的偏移量
    blockOffset = m_indexFile.readShort() & 0xFFFF;

    // 从数据文件的 blockAddress 地址开始访问数据
    m_dataFile.seek(blockAddress);
    // 4字节里面存的是block块的长度
    byte[] buf = new byte[m_dataFile.readInt()];
    // 从数据文件中读取整个block到buf数组
    m_dataFile.readFully(buf);

    DataInputStream in = createDataInputStream(buf);

    if (in != null) {
        try {
            // 跳到block中的偏移量
            in.skip(blockOffset);
            
            // 该值代表消息长度
            int len = in.readInt();

            byte[] data = new byte[len];
            
            // 从block中读取Message
            in.readFully(data);
            return data;
        } finally {
            try {
                in.close();
            } catch (Exception e) {
                // ignore it
            }
        }
    } else {
        return null;
    }
}

听说还有V2版本,分 以一级索引和二级索引,可我拉代码没看到呀

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342