事件总线(event-bus)使用指南

概览

  • 案例
  • 事件发布
    • 前置条件
    • IDL定义
    • 关键性配置
    • 发布任务服务实现
    • 事件发布
  • 事件订阅
    • 依赖
    • 作为订阅者
  • 既是事件发送者,也是订阅者?
  • 示例项目

案例

  • 假设一个 A 服务为事件发送方,B 服务为事件订阅方
  • 假设 A 服务中的 register 接口入库操作后,会发送 RegisteredEvent
  • 假设 B 服务订阅了该事件消息,由订阅者自行处理订阅到的消息

事件发布(生产者,Producer)

前置条件

1.需要发送消息的项目依赖jar包

  • sbt项目在build.sbt里加入如下依赖
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT"
  • maven项目在pom.xml中加入如下依赖:
<dependency>
    <groupId>com.today</groupId>
    <artifactId>event-bus_2.12</artifactId>
    <version>0.1-SNAPSHOT</version>
</dependency>

2.数据库存储支持,需在业务数据库中加入此表

SET FOREIGN_KEY_CHECKS = 0;

DROP TABLE IF EXISTS `dp_common_event`;
CREATE TABLE `dp_common_event` (
  `id` bigint(20) NOT NULL COMMENT '事件id,全局唯一, 可用于幂等操作',
  `event_type` varchar(255) DEFAULT NULL COMMENT '事件类型',
  `event_binary` blob DEFAULT NULL COMMENT '事件内容',
  `updated_at` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
--  Table structure for `event_lock`
-- ----------------------------
DROP TABLE IF EXISTS `dp_event_lock`;
CREATE TABLE `dp_event_lock` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
--  Records of `event_lock`
-- ----------------------------
BEGIN;
INSERT INTO `dp_event_lock` VALUES ('1', 'event_lock');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

3.IDL定义

  • 以事件双方约定的消息内容定义IDL结构体
  • 规定必须为每个事件定义事件ID,以便消费者做消息幂等

==> events.thrift

namespace java com.github.dapeng.user.events

/**
* 注册成功事件, 由于需要消费者做幂等,故加上事件Id
**/
struct RegisteredEvent {
    /**
    * 事件Id
    **/
    1: i64 id,
    /**
    * 用户id
    **/
    2: i64 userId
}

...more

4.IDL服务接口事件声明

  • 接口可能会触发一个或多个事件

== >user_service.thrift

namespace java com.github.dapeng.user.service

include "user_domain.thrift"
include "events.thrift"

/**
* 事件发送端业务服务
**/
service UserService{
/**
# 用户注册
## 事件
    注册成功事件,激活事件
**/
    string register(user_domain.User user)
    (events="events.RegisteredEvent,events.ActivedEvent")
    
    ...more
    
}(group="EventTest")

关键性配置(定时任务)

在spring的配置文件spring/services.xml进行定义,注意init-method指定startScheduled

这里采用的是同步模式,当然eventbus也支持异步模式

<!--messageScheduled 定时发送消息bean-->
<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask" init-method="startScheduled">
    <constructor-arg name="topic" value="${kafka_topic}"/>
    <constructor-arg name="kafkaHost" value="${kafka_producer_host}"/>
    <constructor-arg name="tidPrefix" value="${kafka_tid_prefix}"/>
    <constructor-arg name="dataSource" ref="tx_demo_dataSource"/>
</bean>
  • topic kafka消息topic,领域区分(建议:领域_版本号_event)
  • kafkaHost kafka集群地址(如:127.0.0.1:9091,127.0.0.1:9092)
  • tidPrefix kafka事务id前缀,领域区分
  • dataSource 使用业务的 dataSource

==>config_user_service.properties

# event config
kafka_topic=user_1.0.0_event
kafka_producer_host=127.0.0.1:9092
kafka_tid_prefix=user_1.0.0

在dapeng.properties中配置:



soa.eventbus.publish.period=500 //代表轮询数据库消息库时间,如果对消息及时性很高,请将此配置调低,建议最低为100ms,默认配置是1000ms


事件触发

  • 在做事件触发前,你需要实现 AbstractEventBus ,并将其交由spring托管,来做自定义的本地监听分发

==>commons/EventBus.scala

object EventBus extends AbstractEventBus {

  /**
    * 事件在触发后,可能存在本地的监听者,以及跨领域的订阅者
    * 本地监听者可以通过实现该方法进行分发
    * 同时,也会将事件发送到其他领域的事件消息订阅者
    * @param event
    */
  override def dispatchEvent(event: Any): Unit = {
    event match {
      case e:RegisteredEvent =>
        // do somthing 
      case _ =>
        LOGGER.info(" nothing ")
    }
  }
  override def getInstance: EventBus.this.type = this
}
  • 当本地无任何监听时==>
override def dispatchEvent(event: Any): Unit = {}

==> spring/services.xml

<bean id="eventBus" class="com.github.dapeng.service.commons.EventBus" factory-method="getInstance">
    <property name="dataSource" ref="tx_demo_dataSource"/>
</bean>
  • 事件发布
EventBus.fireEvent(RegisteredEvent(event_id,user.id))

事件定时发布修改:

在dapeng.properties加入环境变量配置

//每次轮询间隔事件为100ms
soa.eventbus.publish.period=100


在业务系统的services.xml中配置,指定初始化方法,即定时轮询任务的方法:

<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask" init-method="startScheduled">
    <constructor-arg name="topic" value="${KAFKA_TOPIC}"/>
    <constructor-arg name="kafkaHost" value="${KAFKA_PRODUCER_HOST}"/>
    <constructor-arg name="tidPrefix" value="${KAFKA_TID_PREFIX}"/>
    <constructor-arg name="dataSource" ref="tx_demo_dataSource"/>
</bean>


重点: 配置轮询发布消息的时间间隔,以ms为单位,在dapeng.properties中配置

soa.eventbus.publish.period=500 //代表500ms


生产方因为轮询数据库发布消息,如果间隔很短,会产生大量的日志,需要修改级别,在logback下进行如下配置:

<!--将eventbus包下面的日志都放入单独的日志文件里 dapeng-eventbus.%d{yyyy-MM-dd}.log-->
<appender name="eventbus" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <prudent>true</prudent>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                    注意: 这里detail-  后面 加自己系统的名字。 例如这里的 goods
        <fileNamePattern>${soa.base}/logs/detail-goods-eventbus.%d{yyyy-MM-dd}.log</fileNamePattern>
        <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder>
        <pattern>%d{MM-dd HH:mm:ss SSS} %t %p - %m%n</pattern>
    </encoder>
</appender>

<!-- additivity:是否向上级(root)传递日志信息, -->
<!--com.today.eventbus包下的日志都放在上面配置的单独的日志文件里-->
<logger name="com.today.eventbus" level="DEBUG" additivity="false">
    <appender-ref ref="eventbus"/>
</logger>

<!--sql 日志显示级别-->
<logger name="druid.sql" level="OFF"/>
<logger name="wangzx.scala_commons.sql" level="DEBUG"/>
<logger name="org.apache.kafka.clients.consumer.KafkaConsumer" level="INFO"/>
<logger name="org.springframework.jdbc.datasource.DataSourceUtils" level="INFO"/>



事件订阅 (消费者 Consumer)

依赖

除需要向上面生产者一样依赖eventbus的jar包外,还需要依赖生产者端的api jar包

<!--事件发送方api-->
<dependency>
    <groupId>com.today</groupId>
    <artifactId>user-api_2.12</artifactId>
    <version>0.1-SNAPSHOT</version>
</dependency>

<!--if => sbt project--> 
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT",
"com.today" % "user-api_2.12" % "0.1-SNAPSHOT"

注解支持配置:

<bean id="postProcessor" class="com.today.eventbus.spring.MsgAnnotationBeanPostProcessor"/>

附(kafka日志级别调整):
==>logback.xml

<logger name="org.apache.kafka.clients.consumer" level="INFO"/>

作为一个订阅者

// java

@KafkaConsumer(groupId = "eventConsumer1", topic = "user_1.0.0_event",kafkaHostKey = "kafka.consumer.host"))
public class EventConsumer {
    @KafkaListener(serializer = RegisteredEventSerializer.class)
    public void subscribeRegisteredEvent(RegisteredEvent event){
        LOGGER.info("Subscribed RegisteredEvent ==> {}",event.toString());
    }
    ...
}

注意: 订阅方在消费消息时,如果处理消息可能会抛出业务异常(就是业务有关的异常,如前置检查不通过,等等),在消费消息时,需要捕获业务系统。

@KafkaListener(serializer = classOf[ModifySkuBuyingPriceEventSerializer])
def modifySkuBuyingPriceEvent(event: ModifySkuBuyingPriceEvent): Unit = {
  // 重点
 try {
    logger.info(s"=====> ModifySkuBuyingPriceEvent")
    val ModifySkuBuyingPriceItemList = event.modifySkuBuyingPriceEventItems.map(
      x => build[ModifySkuBuyingPriceConsumer](x)()
    )
    val result = consumer.modifySkuBuyingPrice(ModifySkuBuyingPriceItemList) // 收到事件后调用业务接口示例
    logger.info(s"收到消息$event =>成功修改sku进价, ${result} ")
  } catch {
  //logger的写法自己定义
    case e: SoaException => logger.error("业务抛出的异常,消息不会重试", e)
  }

}

//scala

serializer = classOf[RegisteredEventSerializer]

@KafkaConsumer

  • groupId 订阅者领域区分
  • topic 订阅的 kafka 消息 topic
  • kafkaHostKey 可自行配置的kafka地址,默认值为dapeng.kafka.consumer.host。可以自定义以覆盖默认值
    • 用户只要负责把这些配置放到env或者properties里面
    • 如:System.setProperty("kafka.consumer.host","127.0.0.1:9092");

@KafkaListener

  • serializer 事件消息解码器,由事件发送方提供.

既是消费者也是订阅者?

如果服务既有接口会触发事件,也存在订阅其他领域的事件情况。只要增加缺少的配置即可

重点可以看如下发布者demo

https://github.com/leihuazhe/publish-demo

示例项目

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 226,197评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 97,254评论 3 410
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 173,718评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,801评论 1 305
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,732评论 6 404
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 54,200评论 1 318
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,389评论 3 433
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 41,484评论 0 282
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 48,024评论 1 328
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,013评论 3 352
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,125评论 1 359
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,698评论 5 353
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,407评论 3 342
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,795评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,996评论 1 278
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,724评论 3 384
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,150评论 2 368

推荐阅读更多精彩内容