使用 Kafka 和 MongoDB 进行 Go 异步处理

在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

$ cd /<download path>/kafka_2.11-1.1.0

$ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

$ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

version: '3'

services:

  mongodb:

    image: mongo

    ports:

      - "27017:27017"

    volumes:

      - "mongodata:/data/db"

    networks:

      - network1

volumes:

  mongodata:

networks:

  network1:

使用 Docker Compose 去运行 MongoDB docker 容器。

docker-compose up

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

func jobsPostHandler(w http.ResponseWriter, r *http.Request) {

    //Retrieve body from http request

    b, err := ioutil.ReadAll(r.Body)

    defer r.Body.Close()

    if err != nil {

        panic(err)

    }

    //Save data into Job struct

    var _job Job

    err = json.Unmarshal(b, &_job)

    if err != nil {

        http.Error(w, err.Error(), 500)

        return

    }

    saveJobToKafka(_job)

    //Convert job struct into json

    jsonString, err := json.Marshal(_job)

    if err != nil {

        http.Error(w, err.Error(), 500)

        return

    }

    //Set content-type http header

    w.Header().Set("content-type", "application/json")

    //Send back data as response

    w.Write(jsonString)

}

func saveJobToKafka(job Job) {

    fmt.Println("save to kafka")

    jsonString, err := json.Marshal(job)

    jobString := string(jsonString)

    fmt.Print(jobString)

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})

    if err != nil {

        panic(err)

    }

    // Produce messages to topic (asynchronously)

    topic := "jobs-topic1"

    for _, word := range []string{string(jobString)} {

        p.Produce(&kafka.Message{

            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},

            Value:          []byte(word),

        }, nil)

    }

}

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

func main() {

    //Create MongoDB session

    session := initialiseMongo()

    mongoStore.session = session

    receiveFromKafka()

}

func receiveFromKafka() {

    fmt.Println("Start receiving from Kafka")

    c, err := kafka.NewConsumer(&kafka.ConfigMap{

        "bootstrap.servers": "localhost:9092",

        "group.id":          "group-id-1",

        "auto.offset.reset": "earliest",

    })

    if err != nil {

        panic(err)

    }

    c.SubscribeTopics([]string{"jobs-topic1"}, nil)

    for {

        msg, err := c.ReadMessage(-1)

        if err == nil {

            fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))

            job := string(msg.Value)

            saveJobToMongo(job)

        } else {

            fmt.Printf("Consumer error: %v (%v)\n", err, msg)

            break

        }

    }

    c.Close()

}

func saveJobToMongo(jobString string) {

    fmt.Println("Save to MongoDB")

    col := mongoStore.session.DB(database).C(collection)

    //Save data into Job struct

    var _job Job

    b := []byte(jobString)

    err := json.Unmarshal(b, &_job)

    if err != nil {

        panic(err)

    }

    //Insert job into MongoDB

    errMongo := col.Insert(_job)

    if errMongo != nil {

        panic(errMongo)

    }

    fmt.Printf("Saved to MongoDB : %s", jobString)

}

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

$ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。


这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

$ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,916评论 2 89
  • 第一章五个程序 都很好!但是初学编程/没有其他语言基础的不容易看懂。 记一遍不熟悉的东西: who = strin...
    暗黑破坏球嘿哈阅读 1,425评论 0 10
  • fmt格式化字符串 格式:%[旗标][宽度][.精度][arg索引]动词旗标有以下几种:+: 对于数值类型总是输出...
    皮皮v阅读 1,095评论 0 3
  • 三月,十里桃花,伊人在水一方。 晨曦里 夜幕下 等你涉水而来 哪怕烟雨湿了青衫 …… 觉得美,就点赞吧!呵呵……
    一峪百川阅读 439评论 10 16
  • 面向对象概述 Object-Oriented Programming,在面向对象中,算法和数据结构看做一个整体,成...
    EchoNeil阅读 102评论 0 0