日志收集项目

日志收集项目架构设计及Kafka

目前主流的日志收集方案ELK

360截图20200505170804824.jpg

ELK问题:增加一个日志收集项,需要手动修改配置。
部署的时候麻烦,没一个filebeat都需要配置一个配置文件

我们的架构图:

360截图20200505170932328.jpg

学习到的技能:

  • 服务端agent开发

  • 后端服务组件开发

  • Kafka和zookeeper的使用

  • ES和Kibana的使用

  • etcd的使用

消息队列的通信模型

点对点模式:生产者发送到queue中,消费者取queue。

image.png

kafka

image.png
image.png
image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png
image.png

image.png

image.png

总结面试:

Kafka

  1. Kafka集群的架构
    1. broker
    2. topic
    3. particition分区:把同一个topic分成不同的分区,提高负载
    1. leader:分区主节点
    2. follower:分区的从节点
    1. Consumer Group
  2. 生产者往Kafka发送数据的流程(6步)
    1. 先从Kafka集群中获取leader信息。
    2. 生产者将消息发送给leader。
    3. leader将消息写入本地磁盘。
    4. follower从leader拉取消息数据。
    5. follower将消息写入本地磁盘后向leader发送ACK。
    6. leader收到所有的follower的ACK之后向生产者发送ACK。
  3. Kafka选择分区模式(3种)
    1. 指定往哪个分区写
    2. 指定key,Kafka根据key做hash然后决定写哪个分区
    3. 轮询方式
  4. 生产者往Kafka发送数据的模式(3种)
    1. 0:把数据发给leader就成功,效率最高,安全性最低。
    2. 1:把数据发送给leader,等待leader回ACK。
    3. all:把数据发送给leader,确保follwer从leader拉取数据回复ack给leader,然后leader再回复ACK;安全性高。
  5. 分区存储文件的原理
  6. 为什么Kafka快?因为他把记录每个数据在物理磁盘上的位置,偏移,把非连续的数据变成连续数据。
  7. 消费者组消费数据的原理

项目安装流程:

安装JDK,配置环境变量JAVA_HOME=D:\java\jdk-12.0.1(安装目录)

配置到path下。%JAVA_HOME%\bin

下载Kafka不要下载带有src的文件。

将Kafka 解压完之后,进入config文件配置dataDir=/tmp/zookeeper。

启动命令:zookeeper-server-start.bat config\zookeeper.properties(如果不行试试这个bin\windowszookeeper-server-start.bat config\zookeeper.properties)

image.png


LogAgent的工作流程:

  1. 读日志--talif第三方库

    go get github.com/hpcloud/tail
    

    tailf的用法:

    package main
    
    import (
        "fmt"
        "time"
    
        "github.com/hpcloud/tail"
    )
    
    // tailf的用法示例
    func main() {
        fileName := "./my.log"
        config := tail.Config{
            ReOpen:    true,                                 // 重新打开
            Follow:    true,                                 // 是否跟随
            Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件那个位置开始读
            MustExist: false,                                // 文件不存在报错
            Poll:      true,                                 //
        }
        tails, err := tail.TailFile(fileName, config)
        if err != nil {
            fmt.Printf("tail file failed, err:%v\n", err)
            return
        }
        var (
            line *tail.Line
            ok   bool
        )
        for {
            line, ok = <-tails.Lines
            if !ok {
                fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
                time.Sleep(time.Second)
                continue
            }
            fmt.Println("line:", line.Text)
        }  
    }
    
  2. 往Kafka写日志--sarama第三方库

    1. windows下下载1.19之前的版本,因为1.20后版本增加了ztcd算法,windows出现缺少gcc的错误。

    2. 首先创建module :go mod init

    3. 进入 go.mod

    4. 添加

      require (
      
          github.com/Shopify/sarama v1.19
      )
      
      1. 执行 go mod download 或者 go get
      2. 如果还不行,用goland的话,设置GOMODULE的代理。
      package main
      
      import (
          "fmt"
      
          "github.com/Shopify/sarama"
      )
      
      // 基于sarama第三方库开发的kafka client
      
      func main() {
          config := sarama.NewConfig()
          config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
          config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
          config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
      
          // 构造一个消息
          msg := &sarama.ProducerMessage{}
          msg.Topic = "web_log"
          msg.Value = sarama.StringEncoder("this is a test log")
          // 连接kafka
          client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
          if err != nil {
              fmt.Println("producer closed, err:", err)
              return
          }
          defer client.Close()
          // 发送消息
          pid, offset, err := client.SendMessage(msg)
          if err != nil {
              fmt.Println("send msg failed, err:", err)
              return
          }
          fmt.Printf("pid:%v offset:%v\n", pid, offset)
      }
      

      go build:会自动下载依赖

    然后打开cmd,启动zookeeper,输入命令:bin\windowszookeeper-server-start.bat config\zookeeper.properties

    在重新启动一个cmd,启动kafka,输入命令:kafka-server-start.bat D:\kafka_2.13-2.4.0\config\server.properties

go mod tidy //纠正一下mod版本

在Kafka显示消费:bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=web_log --from-beginning

把gopkg.in/fsnotify.v1 v1.4.7 // indirect 改成1.4.7版本,解决这个包下不下来问题,不知道1.4.9为什么不行。

消费终端console中文乱码问题待解决。

把main中的固定参数写成配置文件:建立一个conf目录,创建config.ini。

写入:

[kafka]
address=127.0.0.1:9092
topic=web_log

[taillog]
path=./my.log

第三方ini:"gopkg.in/ini.v1"

项目原因:
ELK问题:增加一个日志收集项,需要手动修改配置。
部署的时候麻烦,没一个filebeat都需要配置一个配置文件

使用etcd管理被收集的日志项。

Kafka:消息队列。

tailf:从文件里读日志

go-int:解析ini文件

etcd

使用etcd优化日志收集项目

查:
Raft协议:

1. 选举
2. 日志复制机制
3. 异常处理(脑裂)
4. zookeeper的zad协议的区别。

etcd

image.png

image.png

image.png
image.png

image.png

image.png

image.png

etcd架构

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

etcd.exe启动etcd,默认在2379端口监听客户端通信,在2380端口监听节点间通信。

默认etcdctrl为v2版本,如果要改成v3版本:
windows:
set ETCDCTL_API=3
Linux&MAC:
export ETCDCTL_API=3

put:设置键值对
etcdctl --endpoints=http://127.0.0.1:2379 put boao "vj"

get:获取
etcdctl --endpoints=http://127.0.0.1:2379 get boao

delete:删除
etcdctl --endpoints=http://127.0.0.1:2379 del boao

go语言操作etcd。
操作ETCD
这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。

安装
go get go.etcd.io/etcd/clientv3

put命令用来设置键值对数据,get命令用来根据key获取值。

package main

import (
    "context"
    "fmt"
    "time"

    "go.etcd.io/etcd/clientv3"
)

// etcd client put/get demo
// use etcd/clientv3

func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        // handle error!
        fmt.Printf("connect to etcd failed, err:%v\n", err)
        return
    }
    fmt.Println("connect to etcd success")
    defer cli.Close()
    // put
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    _, err = cli.Put(ctx, "lmh", "lmh")
    cancel()
    if err != nil {
        fmt.Printf("put to etcd failed, err:%v\n", err)
        return
    }
    // get
    ctx, cancel = context.WithTimeout(context.Background(), time.Second)
    resp, err := cli.Get(ctx, "lmh")
    cancel()
    if err != nil {
        fmt.Printf("get from etcd failed, err:%v\n", err)
        return
    }
    for _, ev := range resp.Kvs {
        fmt.Printf("%s:%s\n", ev.Key, ev.Value)
    }
}

watch操作
watch用来获取未来更改的通知。

package main

import (
    "context"
    "fmt"
    "time"

    "go.etcd.io/etcd/clientv3"
)

// watch demo

func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        fmt.Printf("connect to etcd failed, err:%v\n", err)
        return
    }
    fmt.Println("connect to etcd success")
    defer cli.Close()
    // watch key:lmh change
    rch := cli.Watch(context.Background(), "lmh") // <-chan WatchResponse
    for wresp := range rch {
        for _, ev := range wresp.Events {
            fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

将上面的代码保存编译执行,此时程序就会等待etcd中lmh这个key的变化。

例如:我们打开终端执行以下命令修改、删除、设置lmh这个key。

etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put lmh "lmh1"
OK

etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 del lmh
1

etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put lmh "lmh2"
OK

上面的程序都能收到如下通知。

watch>watch.exe
connect to etcd success
Type: PUT Key:lmh Value:lmh1
Type: DELETE Key:lmh Value:
Type: PUT Key:lmh Value:lmh2

golang连接etcd失败

查etcd的watch底层实现给客户端发通知的

Logtransfer:从Kafka里面把日志取处理,写入ES,使用Kibana做可视化的展示。

Elastic search :开源搜索引擎

kibana:图形化界面展示

系统监控:gopsutil做系统监控信息的采集,写入influxDB,使用grafana做展示。

prometheus监控:采集性能指标数据,保存起来,使用grafana做展示。

kafka消费:根据topic找所有的分区,每一个分区去消费数据。

logTransfer实现:
项目名称(想):

项目总结:

  1. 项目架构(图)
  2. 为什么不用ELK
  3. logAgent里面如何保证日志不丢/重启之后继续收集日志(记录读取文件的offset)。
  4. Kafka
  5. etcd的watch原理。
  6. es相关知识点

找工作:

  1. 开发:算法和数据结构
  2. 简历好好写。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352