nsq源码解读之nsqd

nsqd是一个守护进程,用来接收和转发消息。和前文提到的nsqdlookup类似,它同样使用go-svc来管理进程。而在启动服务的时候,不仅支持tcp和http,还支持https。本文主要分析nsqd源码中值得借鉴的点。

1. 加载数据文件

在nsqd的start和stop函数中,程序除了读取配置以后,还涉及数据文件的加载和写入。数据文件中主要包含当前nsqd的topic和channel信息。

  • 数据是以json格式存放的,格式定义为meta。包含topic和channel的信息,以及是否paused
  • 数据存在两个文件中,一个代表当前的,另一个带ID的是用来作回滚的(从注释上看是这样的,但是这个文件可以是符号链接,具体应用场景未知)

有以下几点是可以学习的:

  • 使用atomic原子操作来处理一些线程间共享的数据,避免使用锁,可以简化代码,降低开销
  • 写文件的时候,可以先写一个tmp文件,写成功了再rename为正式的问题。
  • 写文件时,将文件fsync落盘用sync()
  • windows建立symlink需要管理员权限,所以需要重新写一份,而linux可以通过os.Symlink直接建立符号连接,不用写两份文件。
// nsqd/nsqd.go
type meta struct {
    Topics []struct {
        Name     string `json:"name"`
        Paused   bool   `json:"paused"`
        Channels []struct {
            Name   string `json:"name"`
            Paused bool   `json:"paused"`
        } `json:"channels"`
    } `json:"topics"`
}

func writeSyncFile(fn string, data []byte) error {
    f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
    if err != nil {
        return err
    }

    _, err = f.Write(data)
    if err == nil {
        err = f.Sync()
    }
    f.Close()
    return err
}

func (n *NSQD) LoadMetadata() error {
    atomic.StoreInt32(&n.isLoading, 1)
    defer atomic.StoreInt32(&n.isLoading, 0)

    fn := newMetadataFile(n.getOpts())
    // old metadata filename with ID, maintained in parallel to enable roll-back
    fnID := oldMetadataFile(n.getOpts())

    // ......
    // 此处省略代码为读取文件的过程
    // ......

    var m meta
    err = json.Unmarshal(data, &m)


    // ......
    // 此处省略代码为恢复数据过程
    // ......

    return nil
}

func (n *NSQD) PersistMetadata() error {

    // ......
    // 此处省略代码为获取数据过程
    // ......
    
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

    err = writeSyncFile(tmpFileName, data)
    if err != nil {
        return err
    }
    err = os.Rename(tmpFileName, fileName)
    if err != nil {
        return err
    }
    // technically should fsync DataPath here

    stat, err := os.Lstat(fileNameID)
    if err == nil && stat.Mode()&os.ModeSymlink != 0 {
        return nil
    }

    // if no symlink (yet), race condition:
    // crash right here may cause next startup to see metadata conflict and abort

    tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

    if runtime.GOOS != "windows" {
        err = os.Symlink(fileName, tmpFileNameID)
    } else {
        // on Windows need Administrator privs to Symlink
        // instead write copy every time
        err = writeSyncFile(tmpFileNameID, data)
    }
    if err != nil {
        return err
    }

    err = os.Rename(tmpFileNameID, fileNameID)
    if err != nil {
        return err
    }
    // technically should fsync DataPath here

    return nil
}
2. 在用flag模块读取命令行配置时,可以使用flag.Var,读取指定类型的配置。
type Value interface {
    String() string
    Set(string) error
}

func (f *FlagSet) Var(value Value, name string, usage string)

Var方法使用指定的名字、使用信息注册一个flag。该flag的类型和值由第一个参数表示,该参数应实现了Value接口。例如:

// nsqd/nsqd.go
type tlsMinVersionOption uint16

func (t *tlsMinVersionOption) Set(s string) error {
    s = strings.ToLower(s)
    switch s {
    case "":
        return nil
    case "ssl3.0":
        *t = tls.VersionSSL30
    case "tls1.0":
        *t = tls.VersionTLS10
    case "tls1.1":
        *t = tls.VersionTLS11
    case "tls1.2":
        *t = tls.VersionTLS12
    default:
        return fmt.Errorf("unknown tlsVersionOption %q", s)
    }
    return nil
}

func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }

func (t *tlsMinVersionOption) String() string {
    return strconv.FormatInt(int64(*t), 10)
}

......
......

    tlsRequired := tlsRequiredOption(opts.TLSRequired)
    tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
3. 可以使用一个chan来结束一个线程。原理是close chan之后,chan会输出该类型的零值。
// 结束的时候
// nsqd/nsqd.go
func (n *NSQD) Exit() {
    ......
    close(n.exitChan)
    ......
}

func (n *NSQD) queueScanLoop() {
    ......
    for {
        select {
        ......
        case <-n.exitChan:
            goto exit
        ......
        }
    }
    ......
exit:
    n.logf("QUEUESCAN: closing")
    ......
}

// nsqd/lookup.go
func (n *NSQD) lookupLoop() {
    ......
    for {
        select {
        ......
        case <-n.exitChan:
            goto exit
        ......
        }
    }
    ......
}
4. 最关键的topic的创建和消息的发布,将在接下来的文章中详细分析。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,315评论 1 15
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,574评论 2 39
  • 新员工的学习思路 一个人,首先要确立自己的人生观,价值观,世界观。三...
    ajin1973阅读 212评论 0 0
  • 在落日下 持剑的手是金黄色 长剑一挑 击破 落雪纷纷 泪无痕 人无魂 酒后风采 仗剑多几分 山之巅 手脱剑 脚步如...
    红与黑1830阅读 278评论 0 2