消息中间件Producer通用化封装思路

消息中间件就我目前接触过的主要有ActiveMQ,Kafka,RabbitMQ,IBM MQ,RocketMQ。目前ActiveMQ,Kafka,RabbitMQ作为老牌的开源中间件,已经被各个需要消息中间件服务的公司广泛接受并研究,RocketMQ做成了阿里的消息平台对外提供云端消息服务,IBM MQ作为一个商业软件也有大量市场应用。

消息中间件,作为一个提供异步,系统解耦,模块间数据传输的软件,对客户端提供的功能都是类似的,但每个消息中间件的API却不尽相同,甚至可以说天差地别。拿ActiveMQ的JMS协议和Kafka的仿AMQP的Producer为例。

AMQ的发送一条消息的流程为:

  1. 建立连接工厂 ConnectionFactory
ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",url);
  1. 通过连接工厂建立连接并启动 Connection
Connection conn = cf.createConnection();
conn.start
  1. 通过连接建立会话 Session
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
  1. 通过会话建立目的地 Destination
Destination dest = session.createQueue("test");
  1. 在会话中指定目的地建立生产者 Producer
Producer producer = session.createProducer(dest)
  1. 在会话中建立一条消息 Message
TextMessage message = session.createTextMessage("S");
  1. 使用Producer发送Message
producer.send(message);

至此完成从连接建立到发送消息的过程。当然后续如果需要继续发送消息,直接使用producer.send即可。如果需要往不同的队列发消息,则可以通过session建立不同的producer(从第4步开始),或者在connection中建立新的session(从第3步开始)。

而Kafka的消息发送流程为:

  1. 实例化一个Properties类
Properties props = new Properties();
  1. 往Properties中填入服务器地址,编码器,ack等属性
 props.put("bootstrap.servers", "localhost:4242");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  1. 将Properties作为入参实例化一个KafkaProducer
Producer<String, String> producer = new KafkaProducer<>(props);
  1. 建立一个消息ProducerRecord
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value);
  1. 调用Producer的send方法发送ProducerRecord
producer.send(record);

可以看到目的地是固化在消息属性里的。Kafka发送消息无需建立目的地,而是使用producer直接发送消息。可对于用户来说,需要知道kafka只能发送byte[]数组,上文第4步中的key和value其实是byte[]类,因此如果发送字符串,需要调用getByte()方法。

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();

但是实际上客户端其实并不需要知道自己能发送什么。

设计一个通用的API。

站在一个系统开发者的角度,如果我要用消息中间件,那最好发送消息,消费消息可以封装成一个服务,我只需要输入必要的参数,就可以进行消息的发送/接收了,由API来负责帮我进行一般的系统设置,最后的整个过程应该是极简的。

这件事有点像是在设计一个消息中间件规范化的接口(就像是micro USB口)。以后如果需要替换消息中间件,只需要技术人员更新server端,应用更新一下配置文件就可以完成所有的工作了。更甚一步,配置文件做好容错的机制,那只需要在配置文件里配置好所有消息中间件的配置参数,替换一个消息中间件产品对应用可以是透明的。再进一步,如果以后有一个配置中心,应用直接连接配置中心获取MQ的连接配置,连配置文件都不需要写了。

从第一步开始,所有的过程都封装在一个客户端类MQClient中。

对比ActiveMQ和Kafka,我们可以看到Kafka的设计其实是比较简单的。但秉持着可配置的东西尽量都做成配置文件的思路。第一步应该在实例化客户端类时,从配置文件读取所有的配置。

  1. 实例化一个MQClient对象
MQClient mc = new MQClient("mqclient.properties");

对应AMQ的1,2两步,Kafka的1,2两步
在实例化时,从mqclient.properties文件中读取出所有的配置,并建立连接。

  1. 建立一个消息
MQMessage msg = new MQMessage(string);

对应AMQ的6步,Kafka的第4步
对于一个消息来说,在建立消息时只需要知道消息的消息体,比如string,当然可以用重写来定义多个类型,比如AMQ支持ObjectMessage,BlobMessage,MapMessage等。在MQMessage的构造函数中,根据其给出的消息不同而建立不同的消息。�

  1. 发送消息
mc.send(destination,msg)

对应AMQ的3,4,5,7步,Kafka的3,5步
在这个方法中,需要封装生产者的建立和消息的发送两步。从destination可以得到目的地,因此AMQ可以通过第2步中建立的session来建立生产者。Kafka直接建立生产者就可以了。而后使用send方法来发送消息。

在API的设计中,需要着重考虑的注意点是:

  • AMQ的Connection、Session和Producer等资源的复用和及时关闭
  • 异常处理时抛出的异常需要细化
  • 配置文件根据不同的产品配置不同,需要有容错机制

下一步研究Consumer的通用化,由于Kafka较为复杂,作为普通用户暂时只考虑High Level。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容