背景
接入方需在小程序中集成错误收集SDK,SDK采集到错误后需上报数据,为保证数据的实时更新,平台接收到数据后需进行如下处理:
- 根据收集到的错误数据以一定规则生成
errId
; - 将 收集到的上报错误 +
errId
入库(基础错误库); - 对收集到的错误数据进行分析,
errId
+ 分析后的数据入库(错误类别库);
基础错误库 & 错误类别库 E-R图如下:(仅标注部分属性)
Q1:为什么还需要将基础错误入库?
分析后的错误数据存储在错误类别库中,在错误列表页根据当前小程序的appkey
获取对应的错误数据,展示错误列表。
错误类别数据和基础错误数据通过errId
进行关联,用户可在错误列表页面点击某一类别错误进入详情查看错误具体信息。
- 在接入方进行错误排查的时候,需要用到上报的具体错误信息,即基础错误库来定位问题并修复;
- Kafka集群消息默认保留2天;
存在问题
按照上述流程,在正常情况下上报一条错误的平均响应时间在 2.5秒 !
2019.09 ~ 2020.02 试运行期间,上报错误总量达200万条,日均上报错误达40000条。按照试运行期间的数据量,预测随着后期接入方的增加,日均上报量会更大,高峰期甚至会超过1000条/秒。
目前集群上有2台服务器,配置MaxClient为1000个,那么系统的理论峰值QPS为:2*1000/2.5=800(800QPS)
,1秒钟可处理完成800个请求!在高并发的情况下,平台的性能会受到很大影响,具体如下:
- 高并发情况下,所有的请求直接怼到数据库,会造成数据库连接异常;
- 平台读写不分离,高并发情况下,数据查询效率也会降低直接影响平台数据展示;
- 高并发情况下,机器处于高负载状态,错误上报平均响应时间也会大大增加;
为了优化体验,避免超量错误上报时大量数据操作影响平台性能,尝试引入消息中间件(Kafka)优化数据处理流程。
Q2:什么是消息中间件?
中间件位于系统软件和应用软件之间的部分,连接着系统各部分。中间件分为很多种:
- 通信处理(消息)中间件
- 事务处理(交易)中间件
- 数据存取管理中间件
- Web服务器中间件
- 安全中间件
- 跨平台和架构的中间件
- 专用平台中间件
- 网络中间件
其中消息中间件是用于接收消息,并且将消息传递给需要消息的软件系统,是中间件中唯一不可缺少的,也是需求量最大的中间件产品。常见的消息中间件产品有:Kafka、RocketMQ、RabbitMQ、ZeroMQ、ActiveMQ、MetaMQ、Redis等。
Q3:Kafka是什么?
Kafka 是一个高吞吐量、分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
最初由Linkedln公司开发,后来成为Apache项目的一部分。核心模块使用Scala语言开发,支持多语言(如Java、C/C++、Python、Go、Erlang、Node.js等)客户端。
Q4:使用消息队列的好处?
-
解耦
将消息写入消息队列,需要消息的系统可自己从队列中订阅,系统间不会相互影响,可独立扩展和修改处理过程。 - 可恢复性
消息队列降低了进程间的耦合度,所以即使有一个处理消息的进程挂掉,加入队列的消息仍然可以在系统恢复后被处理。 - 缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 - 灵活性 & 峰值处理能力(削峰填谷)
在访问量剧增的情况下,应用仍然需要继续发挥作用。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 -
异步通信
消息队列提供了异步处理机制,允许用户 把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要 的时候再去处理它们。
Q5:削峰填谷?
MQ的一个典型应用场景是缓冲流量,削峰填谷。下游消息接收方无法控制到达自己的流量,如果调用方不限速,很可能把下游服务压垮。对于下游服务来说,请求的到来可能是没有规律的。
例如,某应用的处理能力是每秒 10 个请求。在某一秒,突然到来了 30 个请求,而接下来两秒,都没有请求到达。在这种情况下,如果直接拒绝 20 个请求,应用在接下来的两秒就会空闲。所以,需要把请求突刺均摊到一段时间内,让系统负载保持在请求处理水位之内,同时尽可能地处理更多请求,从而起到“削峰填谷”的效果。
解决方案
引入消息中间件 Kafka 。
将上报的错误写入消息队列,非必要的业务逻辑(错误分析)以异步的方式运行,加快响应速度。优化错误分析流程如下:
- 监控平台收集到错误后仅打印错误日志,直接返回响应结果;
- 通过Flume实时采集日志至Kafka集群中;
- Kafka消费者消费错误日志的消息,处理并存储入库(基础库和错误类别库);
效果
上报错误平台响应时间从 2.5s 降低到 24ms ,响应时间提升99%,在基础设施不变的情况下,系统理论峰值QPS达到8万,QPS有100倍的提升,效果明显。
上报错误后消费者实时分析,前台展示数据实时更新:
Kafka接入
必须知道的Kafka的基本概念:
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
- Producer:消息生产者,就是向 kafka broker 发消息的客户端;
- Consumer :消息消费者,向 kafka broker 取消息的客户端;
- Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负 责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所 有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Kafka集群(Kafka Cluster)中有一个核心的概念Broker,是多个节点,一个服务器就是一个Broker
- Broker中具体存数据的叫Topic,一个一个的主题(为了将消息分类存储),生产者和消费者面向的都是一个topic
- Topic有分区的概念,也有副本的概念:Leader、Follower(备份用的)
Q6:为什么需要 Kafka topic ?
Kafka将一组消息抽象归纳为一个主题(Topic)。一个主题就是对消息的一个分类,生产者将消费发送到特定主题(云平台通过Flume实时采集日志到Kafka,需要配置Topic),消费者订阅主题或主题的某些分区进行消费。
Q7:为什么需要 Consumer Group ?
在 Kafka 中每一个消费者都属于一个特定消费组,我们可以为每个消费者指定一个消费组,以 groupId 代表消费组名称,通过 group.id 配置设置。如果不指定消费组,则该消费者属于默认消费组 test-consumer-group。
假设我们有系统A、系统B都需要消费Kafka的消息,在不申请 Consumer Group的情况下,系统A和系统B中的消费者都会被归属在默认消费者组中,这样的话我们在排查消费情况的时候难以区分,因此我们需要申请groupId。
同一主题的一条消息只能被同一个消费组下的某一个消费者消费消息组是Kafka用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一消费组。
Q8:为什么选择 Kafka ?
- Kafka能够很好地处理大量积压的数据,以便能够周期性地加载离线数据进行处理(数据分析模块有离线任务的需求);
- Kafka能够支持分区、分布式,实时地处理消息,同时具有容错保障机制;
- Kafka可以为外部系统提供一种持久性日志等分布式系统。可以很方便与HDFS和Flume进行整合,方便将Kafka采集的数据持久化。
- Kafka-node 客户端完善,社区活跃,前端接入成本相对较低;
数据采集(生产者)
监控平台收集到上报的错误数据后只执行打印日志的流程,通过Flume实时采集日志至Kafka集群中。
Q9:为什么采用Flume采集而不是自己写Producer?
- 系统解耦
如果自己实现生产者,平台将依赖kafka,平台(主要用于展示)和数据处理耦合严重,如果kafka生产出现问题可能会影响到平台数据展示。 - 实时性保障
Flume采集至Kafka延迟在秒级别,秒级别的延迟是可以接受的。 - 开发成本
通过Flume采集降低前端同学理解&开发成本。
数据分析(消费者)
为降低数据展示平台和数据分析模块的耦合度,将数据统计模块单独抽离部署,主要完成消费逻辑的处理、离线任务的执行、匹配告警等相关数据分析的处理,具体架构如下:
数据展示
Node.js如何链接Kafka
安装 Kafka-node 客户端 链接公司Kafka 创建消费者消费,使用消费者消费的时候需要注意偏移量的控制,👇分享一个我遇到的情况
当我们的任务开始的时候,如果之前消费过某个topic,Kafka会保留当前消费的offset,我们一般会去获取这个offset来继续从上次结束的地方继续消费。
第一次执行任务完成后offset为67,然后中断消费,2天后公司Kafka集群清理了日志,集群中消息被清理,这时再启动任务,消费会从上一次的offset67开始消费消息,自然而然你的offset肯定已经不在有效范围内,所以就报OffsetOutOfRangeException
,这个异常可以通过监听消费者的offsetOutOfRange
处理。(分享这个例子是为了说明 offset偏移量 控制的重要性,当然正常情况下,线上不会发生这种情况)
Q10:为什么需要第二次会从上次消费的地方继续消费 ?
默认情况下,当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。在初始化ConsumerGroup的时候我们设置 fromOffset
值为 latest
。fromOffset属性可设置值有以下几种:
- earliest:当各分区下已有提交的offset时,从提交的offset开始消费; 无提交的offset时,从头开始消费。
- latest:当各分区下有已提交当offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
- none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
简单的消费者组消费Demo如下:
import * as Kafka from 'kafka-node';
export default class Consumer {
constructor(option) {
this.init({ ...option });
}
/**
* 消费者组消费
* @param clientId 客户端id
* @param topic 主题id
* @param groupId 消费者组id
* @param kafkaHost kafaka集群
*/
init({ clientId, topic, groupId, kafkaHost }) {
const consumerGroup = new Kafka.ConsumerGroup({
id: clientId,
groupId,
kafkaHost,
sessionTimeout: 15000,
fromOffset: 'latest', // 设置当前偏移量为最新 不重复消费
}, [topic]);
consumerGroup.on('error', (err) => {
console.log(err);
});
consumerGroup.on('message', (message) => {
// 消息处理
});
}
}
好文推荐: