基于kafka,zookeeper实现的日志收集平台搭建

1.项目简介

时间:2022年7月 项目人员:3人

项目环境:centos7(9台,1核2G), kafka(2.12) ,nginx, filebeat, zookeeper(3.6.3), python3.6, mysql

项目描述:分别使用3台Linux服务器来作为nginx集群搭建前端web服务,kafka集群和zookeeper集群,2台Linux机器做反向代理集群。通过filebeat收集前端nginx集群的访问日志,统一存入kafka集群平台,部署zookeeper集群来对kafka进行配置管理,通过python编写消费者对nginx的log日志做数据清洗,获取流量信息存入mysql数据库。

2.项目步骤

1.规划整个项目的拓扑结构和思维导图

2、搭建nginx集群作为前端web服务

3、搭建kafka集群实现日志的统一收集

4.搭建zookeeper集群实现来对kafka集群进行配置管理

5、搭建filebeat,调试生产者和消费者

6、通过python编写消费者,通过IP调用淘宝接口将省份,运营商等信息爬取下来

7.清洗nginx日志,收集带宽信息存入数据库

8、创建分布式任务基于流量进行告警监控

3.项目部署

1.环境准备

1.准备好9台Linux机器(1核2G)

2.配置好静态ip地址(我使用的是桥接模式,在同一个局域网内可以和其他Linux机器通信)

vim/etc/sysconfig/network-scripts/ifcfg-ens33

3.配置好本地DNS服务器(114.114.114.114)

vim/etc/resolv.conf

4.修改主机名(推荐用下面的方式,修改了HOSTNAME变量的值,同时也修改了/etc/hostname文件里的内容,永久生效)

hostnamectlset-hostname  +主机名

5.每一台机器上都写好域名解析(方便后续直接用主机名进行操作)

vim/etc/hosts

ps:DNS解析的顺序

1、浏览器的缓存

2、本地hosts文件 --linux(/etc/hosts)

3、找本地域名服务器 -- linux(/etc/resolv.conf)

6.安装基本软件(wget是用于获取web的数据,chronyd是时间同步服务)

yuminstallwget lsof vim -yyum -yinstallchrony

7.开启chronyd服务,关闭防火墙服务和selinux(防止防火墙影响主机之间的通信)

systemctl enable chronydsystemctlstartchronydsystemctlstopfirewalldsystemctldisablefirewalld

关闭selinux:vim /etc/selinux/config,设置SELINUX=disabled

2.nginx搭建

1.安装好epel源和nginx服务

yuminstallepel-release-yyuminstallnginx -y

2. 启动nginx并设置为开机自启

systemctlstartnginxsystemctlenablenginx

3.编辑配置文件

vim /etc/nginx/nginx.conf将listen80default_server;修改成:listen80;

ps:对于.conf文件的配置做一个简介

全局块:配置影响nginx全局的指令。一般有运行nginx服务器的用户组,nginx进程pid存放路径,日志存放路径,配置文件引入,允许生成worker process数等。

events块:配置影响nginx服务器或与用户的网络连接。有每个进程的最大连接数,选取哪种事件驱动模型处理连接请求,是否允许同时接受多个网路连接,开启多个网络连接序列化等。

http块:可以嵌套多个server,配置代理,缓存,日志定义等绝大多数功能和第三方模块的配置。如文件引入,mime-type定义,日志自定义,是否使用sendfile传输文件,连接超时时间,单连接请求数等。

server块:配置虚拟主机的相关参数,一个http中可以有多个server。

location块:配置请求的路由,以及各种页面的处理情况

4.自定义一个server用来检测

ps:后面搭建filebeat时就是收集

/var/log/nginx/sc/access.log下的日志

vim/etc/nginx/conf.d/sc.confserver {listen80default_server;server_namewww.sc.com;#html源代码文本路径        root        /usr/share/nginx/html;    #访问日志的保存路径    access_log  /var/log/nginx/sc/access.log main;    location  / {    }}

5.语法检测

nginx-t

6.重新加载nginx服务

nginx-s  reload

3.kafka,zookeeper搭建

ps:kafka是一种消息中间件,和其他MQ相比,有着单机10万级高吞吐量,高可用性强,分布式,一个partition多个replica,少数宕机不会丢失数据,一般配合大数据类系统进行实时数据计算,日志分析场景。

broker:kafka的节点。一台服务器相当于一个节点

topic:主题,消息的分类。比如nginx,mysql日志给不同的主题,就是不同的类型。

partition:分区。提高吞吐量,提高并发性。(多个partition会导致消息顺序混乱,如果对消息顺序有要求就只设置一个partition就可以了)

replica: 副本。完整的分区备份。

ps:zookeeper是一种分布式应用协调管理服务,具有配置管理,域名管理,分布式数据存储,集群管理等功能,在本次项目中用于对kafka集群进行配置(topic,partition,replica等)管理

kafka3.0版本已经脱离zookeeper管理,自己实现了zookeeper功能

1.安装基本软件(虽然kafka自带zookeeper软件,但实测不太好用,所以用官方标准的)

#安装javayum install java wget  -y#安装kafkawget  https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz #解压到/opt目录下tar  xf  kafka_2.12-2.8.1.tgz#安装zookeeperwget  https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz#解压到/opt目录下tar  xf  apache-zookeeper-3.6.3-bin.tar.gz

2.配置kafka(修改

/opt/kafka_2.12-2.8.1/config /server.properties文件)

broker.id=x(第一台为1以此类推)

listeners=PLAINTEXT://nginx-kafka01(主机名):9092

zookeeper.connect=192.168.1.213:2181,192.168.1.214:2181,192.168.1.215:2181(三台Linux的IP)

ps:用zookeeper管理多个kafka集群(一个集群一个目录)

zookeeper.connect=192.168.1.213:2181/kafka1(第一组集群放在zookeeper路径/kafka1下面),192.168.1.214:2181/kafka2(第二组集群放在zookeeper路径/kafka2下面),192.168.1.215:2181/kafka3(第三组集群放在zookeeper路径/kafka3下面)等

3.配置zookeeper(3888和4888都是端口,一个用于数据传输,一个用于检验存活性和选举)

cd/opt/apache-zookeeper-3.6.3-bin/confscp zoo_sample.cfg zoo.cfg#修改zoo.cfg, 添加如下三行:server.1=192.168.1.213:3888:4888server.2=192.168.1.214:3888:4888server.3=192.168.1.215:3888:4888

4.创建/tmp/zookeeper目录 ,在目录中添加myid文件(文件内容就是本机指定的zookeeper id)

如:在192.168.1.213机器上

echo 1 > /tmp/zookeeper/myid

5.启动zookeeper(3台)

bin/zkServer.shstart

6.查看zookeeper状态(如下)

bin/zkServer.shstatus

ps:开启zk和kafka的时候,一定是先启动zk,再启动kafka

关闭服务的时候,kafka先关闭,再关闭zk

7.查看zookeeper管理kafka的配置(ls时是以tree形式进行查看--必须从/开始搜索)

cd/opt/apache-zookeeper-3.6.3-bincd bin./zkCli.sh

示例如下:(三个ids对应三台kafka机器)

8.启动kafka(-daemon是启动守护进程)

bin/kafka-server-start.sh -daemonconfig/server.properties

9.创建topic

bin/kafka-topics.sh--create--zookeeper192.168.1.213:2181--replication-factor3--partitions3--topicscbin/kafka-topics.sh--list--zookeeper192.168.1.213:2181

10.创建生产者和消费者(kafka服务中自带用于测试的生产者和消费者)进行测试,检测输入的数据是否可以被消费

#创建生产者bin/kafka-console-producer.sh --broker-list192.168.1.213:9092--topic sc#创建消费者    bin/kafka-console-consumer.sh --bootstrap-server192.168.1.215:9092--topic sc --from-beginning

示例如下:一台作为生产者输入数据,另一台作为消费者接收数据

4.filebeat部署

Filebeat 是使用 Golang 实现的轻量型日志采集器,也是 Elasticsearch stack 里面的一员。本质上是一个 agent ,可以安装在各个节点上,根据配置读取对应位置的日志,并上报到相应的地方去。

Filebeat 由两个主要组件组成:harvester 和 prospector。

采集器 harvester 的主要职责是读取单个文件的内容。读取每个文件,并将内容发送到 the output。 每个文件启动一个 harvester,harvester 负责打开和关闭文件,这意味着在运行时文件描述符保持打开状态。如果文件在读取时被删除或重命名,Filebeat 将继续读取文件。

查找器 prospector 的主要职责是管理 harvester 并找到所有要读取的文件来源。如果输入类型为日志,则查找器将查找路径匹配的所有文件,并为每个文件启动一个 harvester。每个 prospector 都在自己的 Go 协程中运行。

1.安装依赖包

rpm --importhttps://packages.elastic.co/GPG-KEY-elasticsearch

2.编辑/etc/yum.repos.d/fb.repo文件

[elastic-7.x]name=Elastic repositoryfor7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md

3.安装filebeat

yuminstall  filebeat -y

4.检查filebeat的安装路径及其相关文件

rpm-qa  |grep filebeatrpm -ql  filebeat

5.修改配置文件(vim

/etc/filebeat/filebeat.yml)

内容如下:

filebeat.inputs:- type:log# Change totrueto enablethisinput configuration.  enabled:true# Paths that should be crawledandfetched. Glob based paths.  paths:    - /var/log/nginx/sc/access.log#==========------------------------------kafka-----------------------------------output.kafka:  hosts: ["192.168.1.213:9092","192.168.1.214:9092","192.168.1.215:9092"]  topic: nginxlog  keep_alive:10s

6.设置开机启动服务,并检查filebeat是否启动

#设置开机自启systemctlenablefilebeat#启动服务:systemctl start  filebeat# 查看fi

7.创建主题和消费者来检测数据

#创建主题nginxlogbin/kafka-topics.sh--create--zookeeper192.168.1.213:2181--

当我访问自己搭建的web服务时,filebeat将access.log的日志输出,消费者成功接收数据,表示filebeat搭建成功!

5.编写python创建消费者并将收集的数据写入数据库

ps:也可以使用pandas去实现

import jsonimport requestsimport timeimport pymysql#连接数据库db = pymysql.connect(    host ="192.168.1.213",      #mysql主机ip    user ="qilin",              #用户名    passwd ="123456",            #密码    database ="weblog2"#数据库)taobao_url ="https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="#查询ip地址的信息(省份和运营商isp),通过taobao网的接口def resolv_ip(ip):    response = requests.get(taobao_url+ip)ifresponse.status_code ==200:      tmp_dict = json.loads(response.text)      prov = tmp_dict["data"]["region"]      isp = tmp_dict["data"]["isp"]returnprov,ispreturnNone,None#将日志里读取的格式转换为我们指定的格式def trans_time(dt):    #把字符串转成时间格式    timeArray =time.strptime(dt,"%d/%b/%Y:%H:%M:%S")    #timeStamp = int(time.mktime(timeArray))    #把时间格式转成字符串    new_time =time.strftime("%Y-%m-%d %H:%M:%S", timeArray)returnnew_time#从kafka里获取数据,清洗为我们需要的ip,时间,带宽from pykafka import KafkaClientclient = KafkaClient(hosts="192.168.1.213:9092,192.168.1.214:9092,192.168.1.215:9092")topic = client.topics['nginxlog'] balanced_consumer = topic.get_balanced_consumer(  consumer_group='testgroup',    #自动提交offset  auto_commit_enable=True,      zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181') #consumer = topic.get_simple_consumer()i =1formessageinbalanced_consumer:ifmessage isnotNone:        line = json.loads(message.value.decode("utf-8"))log= line["message"]      tmp_lst =log.split()      ip = tmp_lst[0]      dt = tmp_lst[3].replace("[","")      bt = tmp_lst[9]      dt = trans_time(dt)      prov, isp = resolv_ip(ip)ifprovandisp:print(dt,prov,isp,bt)          cursor = db.cursor()          try:            cursor.execute(f"insert into mynginxlog values({i},{dt},'{prov}','{isp}',{bt})")            db.commit()            i +=1except Exception as e:print("插入失败",e)              db.rollback()#createtablemynginxlog(# id int primary key auto_increment,# dt datetimenotnull,# prov varchar(20),# isp varchar(20),# bd float# )charset=utf8;#关闭数据库db.close()

效果如下:

6.创建分布式任务基于流量进行告警监控

1.了解Celery

celery 是由python开发的 ,简单、灵活、可靠的分布式任务处理框架

ps:celery 的5个角色

Task:就是任务,有异步任务和定时任务

Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。

Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

Beat:定时任务调度器,根据配置定时将任务发送给Broker。

Backend:结果存储在redis中

Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。

redis通常在数据库做缓存,也可以作为消息中间件,存储任务及结果

2.安装redis并修改监听ip

ps:redis是一种key-value键值存储的数据库。

redis开启持久化的两种模式:

AOF 全持久化模式 每一次操作日志都会同步到磁盘

RDB 半持久化模式 定时的将内存内容快照写入磁盘

yuminstallepel-release-yyuminstallredis -y

3.修改监听ip并启动服务

#修改监听ipvim /etc/redis.confbind 0.0.0#启动redissystemctl start redis.0

4.安装Celery

pipinstall  celery

5.编辑Celery

config.py

fromcelery.schedulesimportcrontab#配置消息中间件的地址BROKER_URL ="redis://192.168.1.213:6379/1"#配置结果存放地址CELERY_RESULT_BACKND ="redis://192.168.1.213:6379/2"#启动celery时,导入任务CELERY_IMPORTS = {'celery_tasks'}#时区CELERY_TIMEZONE ="Asia/Shanghai"#设置定时任务CELERYBEAT_SCHEDULE = {'log-every-minute':{'task':'celery_tasks.scheduled_task','schedule':crontab(minute='*/1')    }}

app.py

fromceleryimportCelery#实例化对象,传入一个名字celery_app = Celery('celery_app')celery_app.config_from_object('config')

celery_tasks.py

fromappimportcelery_app@celery_app.taskdef scheduled_task(*args,**kwargs):print("this is schedule task")

4.项目心得

1.通过网络拓扑图和思维导图的建立,提高了项目整体的落实和效率

2.对搭建前端web服务,集群的部署更为熟悉

3.对于kafka集群统一收集web日志更为了解,对于zookeeper管理kafka集群有了更深的理解

4.对于脑裂现象的出现和解决有了更加清晰的认识

5.通过团队交流提高了团队协作能力,遇到困难去CSDN博客查阅,加强了自主学习能力和troubleshooting能力

5.常见问题

1.为什么要做反向代理集群,以及怎么实现负载均衡

DNS其实也可以实现负载均衡,www.sc.com可以解析成多个ip地址,对应相应的nginx服务器,一般来说,会以轮询的方式解析成各个ip。但是如果其中一台服务器挂了,DNS不会立马将这个ip地址去掉,还是会解析成挂掉的ip,可能会造成访问失败。虽然客户端会有重试机制,但还是会影响用户体验。而在web应用前面加反向代理,客户端不会直接访问到服务器,而是通过代理服务器访问,这样服务器安全性也会提高,负载均衡控制容易很多。

反向代理机通过keepalived双vip互为主备实现高可用,提高资源利用率。

vip:www.sc.com解析成两个虚拟ip

互为主备:一台作为master,一台backup

2.为什么使用kafka做日志收集

1、可以更直观地排除错误所在,直接将数据导入到mysql里面的话,我们排错需要登录三台nginx服务器查看日志,而吐到kafka里面方便定位故障

2.日志集中管理,后续需要日志的程序直接从kafka获取即可,尽可能减少日志处理对nginx里web服务的影响

3.kafka中leader的作用

leader和follower:一个分区有多个副本,选举一台作为leader,其他作为follower存放在ISR队列中

生产者和消费者只和leader打交道,leader接收数据后,再根据ISR同步到其他follower

生产者跟任何一台broker连接都可以,虽然这个broker可能没有leader部署,但broker会返回当前请求副本leader的信息,最后生产者再跟leader交互

2.kafka如何保证高可用

多个broker+多个partition+多个replica

ISR->in-sync-replica 集合列表(需要同步的follower集合)

比如说5个副本,1个leader,4follower-》ISR

有一条消息来了,leader怎么知道要同步哪些副本呢?根据ISR来。

如果一个follower挂了,那就从这个列表里别除了

如果一个follower卡住或者同步过慢它也会从ISR里删除

如果有一个机器宕机,后续启动之后想要重新加入ISR,必须得同步到HW(最高水位线)值

3.如何保证数据一致性

1、生产者可以通过request.required.acks设置ack可以为0(生产者不需要接收响应,发完就发下一条),1(默认,1 eadert收到就会给生产发送响应),-1(等待ISR列表中的每一个副本都接收到,才给生产者响应)

2、消费者消费数据时,引入了High Water Mark机制。木桶效应,只能消费ISR列表里偏移量最少的副本的消息数量。

4.zookeeper在kafka中的作用

1、保存kafka的元信息,topic,parition,副本信息

2、选举kafka controller (通过抢占的方式来选出controller。选举出的kafka controller管理kafka副本的leader和follwer:同步,选举)

5.zookeeper中leader的选举及数据的同步

一致性算法:少数服从多数原则,票数过半的当选为leader(>=n//2+1)

zookeeper集群中,节点存活数必须过半,集群才能正常使用(若不是这样,49%的正在使用,而之前51%宕机的机器突然启动,这样leader就会混乱,从而导致脑裂现象)

Zk集群节点数一般设置为奇数,方便选举

数据同步:只要过半节点同步完成,就表示数据已经commit。zookeeper不是强一致性,它属于最终一致性

6. 消费者如何知道下次从哪里开始消费

消费者消费的时候,会记录自己的消费偏移量,消费偏移量可以自己保存在本地,也可以提交到kafka的_consumer_offset主题里面保存

kafka日志每个分区保存时按段保存的:segment(一个segment由一个index和log文件组成) /data假设有如下segment

00.log 11.log 22.log

00.log保存的是第一条到11条的日志

11.log保存的是第12条到第22条的日志

22.log保存的是第22条之后的日志

数据的存储目录:

文件夹:<topic name>-<分区号>

分出多个segment便于做数据清理

kafka可以按照两个维度清理数据

1、按大小

2、按时间

任意一个条件满足,都可以触发日志清理

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容