日志收集项目架构设计及Kafka
目前主流的日志收集方案ELK
ELK问题:增加一个日志收集项,需要手动修改配置。
部署的时候麻烦,没一个filebeat都需要配置一个配置文件
我们的架构图:
学习到的技能:
服务端agent开发
后端服务组件开发
Kafka和zookeeper的使用
ES和Kibana的使用
etcd的使用
消息队列的通信模型
点对点模式:生产者发送到queue中,消费者取queue。
kafka
总结面试:
Kafka
- Kafka集群的架构
1. broker
2. topic
3. particition分区:把同一个topic分成不同的分区,提高负载
1. leader:分区主节点
2. follower:分区的从节点- Consumer Group
- 生产者往Kafka发送数据的流程(6步)
- 先从Kafka集群中获取leader信息。
- 生产者将消息发送给leader。
- leader将消息写入本地磁盘。
- follower从leader拉取消息数据。
- follower将消息写入本地磁盘后向leader发送ACK。
- leader收到所有的follower的ACK之后向生产者发送ACK。
- Kafka选择分区模式(3种)
- 指定往哪个分区写
- 指定key,Kafka根据key做hash然后决定写哪个分区
- 轮询方式
- 生产者往Kafka发送数据的模式(3种)
- 0:把数据发给leader就成功,效率最高,安全性最低。
- 1:把数据发送给leader,等待leader回ACK。
- all:把数据发送给leader,确保follwer从leader拉取数据回复ack给leader,然后leader再回复ACK;安全性高。
- 分区存储文件的原理
- 为什么Kafka快?因为他把记录每个数据在物理磁盘上的位置,偏移,把非连续的数据变成连续数据。
- 消费者组消费数据的原理
项目安装流程:
安装JDK,配置环境变量JAVA_HOME=D:\java\jdk-12.0.1(安装目录)
配置到path下。%JAVA_HOME%\bin
下载Kafka不要下载带有src的文件。
将Kafka 解压完之后,进入config文件配置dataDir=/tmp/zookeeper。
启动命令:zookeeper-server-start.bat config\zookeeper.properties(如果不行试试这个bin\windowszookeeper-server-start.bat config\zookeeper.properties)
LogAgent的工作流程:
-
读日志--talif第三方库
go get github.com/hpcloud/tail
tailf的用法:
package main import ( "fmt" "time" "github.com/hpcloud/tail" ) // tailf的用法示例 func main() { fileName := "./my.log" config := tail.Config{ ReOpen: true, // 重新打开 Follow: true, // 是否跟随 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件那个位置开始读 MustExist: false, // 文件不存在报错 Poll: true, // } tails, err := tail.TailFile(fileName, config) if err != nil { fmt.Printf("tail file failed, err:%v\n", err) return } var ( line *tail.Line ok bool ) for { line, ok = <-tails.Lines if !ok { fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename) time.Sleep(time.Second) continue } fmt.Println("line:", line.Text) } }
-
往Kafka写日志--sarama第三方库
windows下下载1.19之前的版本,因为1.20后版本增加了ztcd算法,windows出现缺少gcc的错误。
首先创建module :go mod init
进入 go.mod
-
添加
require ( github.com/Shopify/sarama v1.19 )
- 执行 go mod download 或者 go get
- 如果还不行,用goland的话,设置GOMODULE的代理。
package main import ( "fmt" "github.com/Shopify/sarama" ) // 基于sarama第三方库开发的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 构造一个消息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 连接kafka client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
go build:会自动下载依赖
然后打开cmd,启动zookeeper,输入命令:bin\windowszookeeper-server-start.bat config\zookeeper.properties
在重新启动一个cmd,启动kafka,输入命令:kafka-server-start.bat D:\kafka_2.13-2.4.0\config\server.properties
go mod tidy //纠正一下mod版本
在Kafka显示消费:bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=web_log --from-beginning
把gopkg.in/fsnotify.v1 v1.4.7 // indirect 改成1.4.7版本,解决这个包下不下来问题,不知道1.4.9为什么不行。
消费终端console中文乱码问题待解决。
把main中的固定参数写成配置文件:建立一个conf目录,创建config.ini。
写入:
[kafka]
address=127.0.0.1:9092
topic=web_log
[taillog]
path=./my.log
第三方ini:"gopkg.in/ini.v1"
项目原因:
ELK问题:增加一个日志收集项,需要手动修改配置。
部署的时候麻烦,没一个filebeat都需要配置一个配置文件
使用etcd管理被收集的日志项。
Kafka:消息队列。
tailf:从文件里读日志
go-int:解析ini文件
etcd
使用etcd优化日志收集项目
查:
Raft协议:
1. 选举
2. 日志复制机制
3. 异常处理(脑裂)
4. zookeeper的zad协议的区别。
etcd
etcd架构
etcd.exe启动etcd,默认在2379端口监听客户端通信,在2380端口监听节点间通信。
默认etcdctrl为v2版本,如果要改成v3版本:
windows:
set ETCDCTL_API=3
Linux&MAC:
export ETCDCTL_API=3
put:设置键值对
etcdctl --endpoints=http://127.0.0.1:2379 put boao "vj"
get:获取
etcdctl --endpoints=http://127.0.0.1:2379 get boao
delete:删除
etcdctl --endpoints=http://127.0.0.1:2379 del boao
go语言操作etcd。
操作ETCD
这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。
安装
go get go.etcd.io/etcd/clientv3
put命令用来设置键值对数据,get命令用来根据key获取值。
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// etcd client put/get demo
// use etcd/clientv3
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "lmh", "lmh")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "lmh")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
watch操作
watch用来获取未来更改的通知。
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// watch demo
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// watch key:lmh change
rch := cli.Watch(context.Background(), "lmh") // <-chan WatchResponse
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
将上面的代码保存编译执行,此时程序就会等待etcd中lmh这个key的变化。
例如:我们打开终端执行以下命令修改、删除、设置lmh这个key。
etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put lmh "lmh1"
OK
etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 del lmh
1
etcd> etcdctl.exe --endpoints=http://127.0.0.1:2379 put lmh "lmh2"
OK
上面的程序都能收到如下通知。
watch>watch.exe
connect to etcd success
Type: PUT Key:lmh Value:lmh1
Type: DELETE Key:lmh Value:
Type: PUT Key:lmh Value:lmh2
查etcd的watch底层实现给客户端发通知的
Logtransfer:从Kafka里面把日志取处理,写入ES,使用Kibana做可视化的展示。
Elastic search :开源搜索引擎
kibana:图形化界面展示
系统监控:gopsutil做系统监控信息的采集,写入influxDB,使用grafana做展示。
prometheus监控:采集性能指标数据,保存起来,使用grafana做展示。
kafka消费:根据topic找所有的分区,每一个分区去消费数据。
logTransfer实现:
项目名称(想):
项目总结:
- 项目架构(图)
- 为什么不用ELK
- logAgent里面如何保证日志不丢/重启之后继续收集日志(记录读取文件的offset)。
- Kafka
- etcd的watch原理。
- es相关知识点
找工作:
- 开发:算法和数据结构
- 简历好好写。