消息中间件就我目前接触过的主要有ActiveMQ,Kafka,RabbitMQ,IBM MQ,RocketMQ。目前ActiveMQ,Kafka,RabbitMQ作为老牌的开源中间件,已经被各个需要消息中间件服务的公司广泛接受并研究,RocketMQ做成了阿里的消息平台对外提供云端消息服务,IBM MQ作为一个商业软件也有大量市场应用。
消息中间件,作为一个提供异步,系统解耦,模块间数据传输的软件,对客户端提供的功能都是类似的,但每个消息中间件的API却不尽相同,甚至可以说天差地别。拿ActiveMQ的JMS协议和Kafka的仿AMQP的Producer为例。
AMQ的发送一条消息的流程为:
- 建立连接工厂 ConnectionFactory
ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",url);
- 通过连接工厂建立连接并启动 Connection
Connection conn = cf.createConnection();
conn.start
- 通过连接建立会话 Session
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
- 通过会话建立目的地 Destination
Destination dest = session.createQueue("test");
- 在会话中指定目的地建立生产者 Producer
Producer producer = session.createProducer(dest)
- 在会话中建立一条消息 Message
TextMessage message = session.createTextMessage("S");
- 使用Producer发送Message
producer.send(message);
至此完成从连接建立到发送消息的过程。当然后续如果需要继续发送消息,直接使用producer.send即可。如果需要往不同的队列发消息,则可以通过session建立不同的producer(从第4步开始),或者在connection中建立新的session(从第3步开始)。
而Kafka的消息发送流程为:
- 实例化一个Properties类
Properties props = new Properties();
- 往Properties中填入服务器地址,编码器,ack等属性
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- 将Properties作为入参实例化一个KafkaProducer
Producer<String, String> producer = new KafkaProducer<>(props);
- 建立一个消息ProducerRecord
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value);
- 调用Producer的send方法发送ProducerRecord
producer.send(record);
可以看到目的地是固化在消息属性里的。Kafka发送消息无需建立目的地,而是使用producer直接发送消息。可对于用户来说,需要知道kafka只能发送byte[]数组,上文第4步中的key和value其实是byte[]类,因此如果发送字符串,需要调用getByte()方法。
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
但是实际上客户端其实并不需要知道自己能发送什么。
设计一个通用的API。
站在一个系统开发者的角度,如果我要用消息中间件,那最好发送消息,消费消息可以封装成一个服务,我只需要输入必要的参数,就可以进行消息的发送/接收了,由API来负责帮我进行一般的系统设置,最后的整个过程应该是极简的。
这件事有点像是在设计一个消息中间件规范化的接口(就像是micro USB口)。以后如果需要替换消息中间件,只需要技术人员更新server端,应用更新一下配置文件就可以完成所有的工作了。更甚一步,配置文件做好容错的机制,那只需要在配置文件里配置好所有消息中间件的配置参数,替换一个消息中间件产品对应用可以是透明的。再进一步,如果以后有一个配置中心,应用直接连接配置中心获取MQ的连接配置,连配置文件都不需要写了。
从第一步开始,所有的过程都封装在一个客户端类MQClient中。
对比ActiveMQ和Kafka,我们可以看到Kafka的设计其实是比较简单的。但秉持着可配置的东西尽量都做成配置文件的思路。第一步应该在实例化客户端类时,从配置文件读取所有的配置。
- 实例化一个MQClient对象
MQClient mc = new MQClient("mqclient.properties");
对应AMQ的1,2两步,Kafka的1,2两步
在实例化时,从mqclient.properties文件中读取出所有的配置,并建立连接。
- 建立一个消息
MQMessage msg = new MQMessage(string);
对应AMQ的6步,Kafka的第4步
对于一个消息来说,在建立消息时只需要知道消息的消息体,比如string,当然可以用重写来定义多个类型,比如AMQ支持ObjectMessage,BlobMessage,MapMessage等。在MQMessage的构造函数中,根据其给出的消息不同而建立不同的消息。�
- 发送消息
mc.send(destination,msg)
对应AMQ的3,4,5,7步,Kafka的3,5步
在这个方法中,需要封装生产者的建立和消息的发送两步。从destination可以得到目的地,因此AMQ可以通过第2步中建立的session来建立生产者。Kafka直接建立生产者就可以了。而后使用send方法来发送消息。
在API的设计中,需要着重考虑的注意点是:
- AMQ的Connection、Session和Producer等资源的复用和及时关闭
- 异常处理时抛出的异常需要细化
- 配置文件根据不同的产品配置不同,需要有容错机制
下一步研究Consumer的通用化,由于Kafka较为复杂,作为普通用户暂时只考虑High Level。