上一篇: Go消息中间件Nsq系列(六)------Message结构
通过此次源码阅读, 可以学习到
- 结合go select io多路复用 实现文件队列的思路
1. go-diskqueue
- 它是一个提供文件系统支持的FIFO(先进先出)队列库
- 在nsq中,当channel缓冲区超过
mem_queue_size = 10000
,其他消息往文件队列里面写入
2. 注意go中channel select的特性
- 当channel 为nil的时候, 将跳过select操作
- 当close channel的时候, select也会响应,避免错误,可以判断是否close
3. diskqueue源码分析
3.1 diskQueue的定义
diskQueue 实现了 BackendQueue
接口,
// BackendQueue 接口
type Interface interface {
// 加锁检测 标志位是否退出, 如果否 则继续往文件写入数据并等待结果
Put([]byte) error
// 读取文件数据 返回chan 可用于多消费者并发读取
ReadChan() chan []byte
// 等待ioloop结束, 正常关闭 并保存元数据
Close() error
// 等待ioloop结束, 直接关闭io流
Delete() error
// 未读消息积压量
Depth() int64
// 清空消息, 删除文件
Empty() error
}
type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// run-time state (also persisted to disk)
// 运行时的数据保存, 也会保存到文件
readPos int64 // 文件读取的位置
writePos int64 // 文件写入的位置
readFileNum int64 // 读取文件编号
writeFileNum int64 // 写入文件编号
depth int64 // 未读消息积压量
sync.RWMutex // 读写锁
// instantiation time metadata
name string // 队列实例名称
dataPath string // 数据文件存储路径
maxBytesPerFile int64 // 文件最大长度为 100 * 1024 * 1024
minMsgSize int32 // 最小消息长度 MsgIDLength + 8 + 2 =26//Id + Timestamp + Attempts
maxMsgSize int32 // 最大消息长度 1024 * 1024
syncEvery int64 // 文件同步 count 累计间隔 2500 出发
syncTimeout time.Duration // 同步定时2s触发
exitFlag int32 // 退出标志位
needSync bool // 需要同步
// keeps track of the position where we have read
// (but not yet sent over readChan)
nextReadPos int64 // 下一次读取的位置
nextReadFileNum int64 // 下一次读取对应的文件编号
readFile *os.File // 读取的文件
writeFile *os.File // 写入的文件
reader *bufio.Reader // 缓冲读取
writeBuf bytes.Buffer // 缓冲写入
// exposed via ReadChan()
readChan chan []byte // 读取的数据,可以多消费者进行通信消费
// internal channels
writeChan chan []byte // 写入通道
writeResponseChan chan error // 写入结果反馈通道
emptyChan chan int // 清空队列通道
emptyResponseChan chan error // 清空反馈通道
exitChan chan int // 结束信号通道
exitSyncChan chan int // 退出同步通道
logf AppLogFunc // 日志记录封装
}
3.2 diskqueue 初始化流程
diskQueue在New初始化的时候会取回之前持久化的元数据
// ... 前面一堆根据参数初始化diskqueue
// 取回之前持久化的元数据
err := d.retrieveMetaData()
// .... 省略
go d.ioLoop()
其保存的元数据格式为实例名称.diskqueue.meta.dat
, 保存的数据为
65 // depth 未读消息积压量
52,0 // 52为当前读取文件编号 , 0 为当前读取文件位置
53,0 // 53位当前写入文件编号, 0 为当前写入文件位置
以下是对应的方法d.retrieveMetaData()
和 persistMetaData()
具体实现,前者在初始化取回之前保存的state, 后者在sync同步的保存当前的state
// 从本地文件取回元数据
func (d *diskQueue) retrieveMetaData() error {
var f *os.File
var err error
// 存储路径.diskqueue.meta.dat
fileName := d.metaDataFileName()
f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()
var depth int64
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth, // 待读取消息数量
&d.readFileNum, // 待读取文件编号
&d.readPos, // 待读取文件位置
&d.writeFileNum, // 待写入文件编号
&d.writePos ,// 待写入文件位置
)
if err != nil {
return err
}
// 原子更新未读消息
atomic.StoreInt64(&d.depth, depth)
// 更新读取位置和文件编号
d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos
return nil
}
// 同步元数据到本地文件
func (d *diskQueue) persistMetaData() error {
var f *os.File
var err error
// metdat 临时文件
fileName := d.metaDataFileName()
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
// write to tmp file
f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
atomic.LoadInt64(&d.depth),
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()
// 成功往临时文件写入数据, 在进行替换源文件
// atomically rename
return os.Rename(tmpFileName, fileName)
}
接下来就是diskqueue的核心所在,也就是ioLoop() 函数
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte
// 定时器
syncTicker := time.NewTicker(d.syncTimeout)
for {
// dont sync all the time :)
// 累计读取次数超过阈值 进行同步
if count == d.syncEvery {
d.needSync = true
}
// 需要同步的时候同步, 并重置阈值
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
count = 0
}
// 如果可以读, 下次要读的消息位置等于已经读取的消息地址,才进行读取
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
// 坏的数据文件
d.handleReadError()
continue
}
}
// 赋值channel
r = d.readChan
} else {
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
// 如果channel 为nil 进行读写, select会跳过
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case <-d.emptyChan:
// 删除所有文件响应
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
count++
// 写入消息到磁盘
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
// 没有活动避免同步
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
// 退出程序
goto exit
}
}
exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
ioLoop() 的实现是通过for 轮询, 加 select 多路复用监听多个通道, 具体如下
- count的阈值是通过syncEvery 设定的, 默认2500, count的值为每次读取或写入时递增, 达到阈值触发
如果触发同步的,needSync
为true的情况下,进行同步,并重置count
ticker定时时间为2s, 触发同步 -
(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos)
也就是说 当前rn < wn 或者 rp<wp 的时候, 并且下次读取的位置要等于当前读取的位置(代表上条数据已经读取了,设置的nextReadPos就是新的读取位置), 接着调用readOne()
去读取一条数据, 如果发生错误,在handleReadError()
将当前读取文件标记为bad文件,重新修改读写位置
, 有可读取的数据时 会将readChan赋值, 否则置为nil, 这是本文开头所说的channel select特性,讲跳过select case - select多路复用监听了以下通道
3.1r <- dataRead
读取到一条数据时,moveForward()
消费数据处理读取位置,并检测已经读取完的文件会做删除处理
3.2<-d.emptyChan
删除从读取-写入的文件编号, 删除元数据文件, 下次写入将会创建新的读写文件, 不会停止程序
3.3dataWrite := <-d.writeChan
调用writeOne()
写入数据到文件
3.4<-syncTicker.C
定时同步
3.5<-d.exitChan:
程序结束, 关闭定时器,给 d.exitSyncChan <- 1发送通知i
readOne
读取一条数据的具体实现:
// readOne performs a low level filesystem read for a single []byte
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32
// 如果没有初始化 则先初始化
if d.readFile == nil {
curFileName := d.fileName(d.readFileNum)
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
// 恢复到上次读取的位置
if d.readPos > 0 {
_, err = d.readFile.Seek(d.readPos, 0)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
}
// 使用缓冲区来读取
d.reader = bufio.NewReader(d.readFile)
}
// 使用大字节序方式读取4个字节 的消息包大小
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
// 无效的 消息包大小
if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
// this file is corrupt and we have no reasonable guarantee on
// where a new message should begin
d.readFile.Close()
d.readFile = nil
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
}
// 根据消息包大小 读取消息内容
readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
// 总长度 消息包大小+消息长度
totalBytes := int64(4 + msgSize)
// we only advance next* because we have not yet sent this to consumers
// (where readFileNum, readPos will actually be advanced)
// 移动位置
d.nextReadPos = d.readPos + totalBytes
d.nextReadFileNum = d.readFileNum
// TODO: each data file should embed the maxBytesPerFile
// as the first 8 bytes (at creation time) ensuring that
// the value can change without affecting runtime
// 文件大小超过设定阈值 则进行自增
if d.nextReadPos > d.maxBytesPerFile {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
d.nextReadFileNum++
d.nextReadPos = 0
}
return readBuf, nil
}
writeOne()
写入一条数据的具体实现:
// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
var err error
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
// 有没有上次写入文件位置, 有着跳转到之前的位置
if d.writePos > 0 {
_, err = d.writeFile.Seek(d.writePos, 0)
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
}
// 消息包大小
dataLen := int32(len(data))
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
}
// 重置buf 然后大字节序写入大小到buf, 然后在写入数据包
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
return err
}
_, err = d.writeBuf.Write(data)
if err != nil {
return err
}
// only write to the file once
// 写入到文章
_, err = d.writeFile.Write(d.writeBuf.Bytes())
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
// 移动读取位置, 并消息量加1
totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
atomic.AddInt64(&d.depth, 1)
// 写入的文件大于切片大小, 则新建文件
if d.writePos > d.maxBytesPerFile {
d.writeFileNum++
d.writePos = 0
// sync every time we start writing to a new file
// 将之前的文件同步到磁盘
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}
return err
}
其他的一些函数如下:
ReadChan() // 下面的例子 监听readChan多并发消费
exit() // channel 特性 close 也会走select的选项
deleteAllFiles()
skipToNextRWFile() // 删除当前所有的文件,继续新的开始
sync() // 同步数据(含元数据)
metaDataFileName() // 元数据保存文件名
fileName() // 数据文件名
checkTailCorruption() // 检查数据丢失之类的不规范操作
moveForward() // 消费操作,判断是否判断旧文件
handleReadError() // 处理读取数据异常
下面是使用的例子
func Logf( f string, args ...interface{}) {
log.Output(3, fmt.Sprintf("diskqueue: "+f, args...))
}
func main(){
dqLogf := func(level LogLevel, f string, args ...interface{}) {
Logf( f, args...)
}
dq := New(
"test_dq",
"D:/GoWorkspace/src/Examples/nsq_dq_test",
1*1024,
10,
100,
100,
2 * time.Second,
dqLogf,
)
go func() {
var err error
for {
// 110毫秒写入一条数据
err = dq.Put([]byte("测试数据"))
if err == nil{
log.Println("写入完成后 消息积压量",dq.Depth())
}
time.Sleep(time.Millisecond*110)
}
}()
go func() {
a := time.Tick(time.Millisecond*50)
for {
select {
case <-a:
msg := <- dq.ReadChan()
log.Println("读取数据:",string(msg),dq.Depth())
}
}
}()
// 多消费者进行消费
go func() {
a := time.Tick(2*time.Second)
for {
select {
case <-a:
msg := <- dq.ReadChan()
log.Println("读取数据2:",string(msg),dq.Depth())
}
}
}()
for {
}
}
// 执行结果
2019/08/20 18:25:39 diskqueue: DISKQUEUE(test_dq): writeOne() opened D:/GoWorkspace/src/Examples/nsq_dq_test/test_dq.diskqueue.000000.dat
2019/08/20 18:25:39 写入完成后 消息积压量 1
2019/08/20 18:25:39 diskqueue: DISKQUEUE(test_dq): readOne() opened D:/GoWorkspace/src/Examples/nsq_dq_test/test_dq.diskqueue.000000.dat
2019/08/20 18:25:39 读取数据: 测试数据 1
2019/08/20 18:25:39 读取数据: 测试数据 0
2019/08/20 18:25:39 写入完成后 消息积压量 0
2019/08/20 18:25:39 读取数据: 测试数据 0
2019/08/20 18:25:39 写入完成后 消息积压量 0
2019/08/20 18:25:39 读取数据: 测试数据 0
2019/08/20 18:25:39 写入完成后 消息积压量 0