在nsqd中,消息存放在消息队列中。每个topic和topic下的每个channel都维护一个消息队列。消息分为两种,瞬时消息和普通消息。以"#ephemeral"开头的topic和channel,都是瞬时的,采用的队列为newDummyBackendQueue,而普通消息使用的是diskqueue。这两种消息队列都实现了BackendQueue这个接口,利用接口实现了多态。
// nsqd/backend_queue.go
type BackendQueue interface {
Put([]byte) error
ReadChan() chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Empty() error
}
newDummyBackendQueue不做消息的持久化,实现的BackendQueue中的方法基本没做什么事。如果channel阻塞了,消息就会被丢弃。
diskqueue则是实现了消息的持久化。diskqueue来自另一个github项目,go-diskqueue。
ioLoop是diskqueue实现持久化队列的主体方法。diskqueue通过writeChan,readchan和外界交换信息。外界通过writeChan写入消息,通过readChan读消息。而diskqueue会将消息写入文件,并从文件中读取。readOne和writeOne就是从消息数据文件中读和写一个消息的方法。通过记录读和写的位置,判断和控制读和写的进度。
// go-diskqueue/disk_queue.go
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte
syncTicker := time.NewTicker(d.syncTimeout)
for {
// dont sync all the time :)
if count == d.syncEvery {
d.needSync = true
}
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
count = 0
}
// 检查是否有消息没有被读
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
// 检查上一次读的消息是否消费
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
}
}
r = d.readChan
} else {
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}
exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
diskqueue也做了数据的持久化,方便重启。也支持文件按照大小自动分割的功能。这里不做赘述。