ActiveMQ学习

  • JMS

JMS(Java Message Service):java消息服务,它是一套API,定义了Java程序访问消息中间件的接口,包括了消息的创捷,发送和接收.大名鼎鼎的ActiveMQ便是Apache出品的关于JMS的一种实现,还有阿里巴巴的RocketMQ.
在java消息服务中,一共有三种角色,producer(生产者),broker(中间件),consumer(消费者),生产者生产消息到中间件,消费者获取消息进行消费.其示意图如下:


JMS示意图
  • JMS的规范

既然说JMS是一套规范,一套接口,那么我们可以在javax.jms包里面找到这些接口


jms

这套规范定义了连接工厂(ConnectionFactory),连接(Connection),会话(Session),目的地(Destination),生产者(Producer),消费者(Consumer)等对象


JMS

连接工厂(ConnectionFactory)
客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROCKER_URL);

JMS连接(Connection)
连接对象封装了与生产者与JMS中间件之间的虚拟连接,使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息,JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。

Connection connection = connectionFactory.createConnection();
//使用的时候一定要先打开它,
connection.start();
//使用中
...
//创建完连接后,需要在程序使用结束后关闭它:
connection.close();

JMS 会话(Session)
Session是一个单线程上下文,用于生产和消费消息,可以创建出消息生产者和消息消费者。
Session对象实现了Session接口,在创建完连接后,我们可以使用它创建Session。

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

createSession中第一个参数表示是否支持事务,true为支持,false为不支持。
第二个参数表示事务类型,当为AUTO_ACKNOWLEDGE时 ,接收到消息(receive 或 onMessage运行完毕,成功返回时),即为消费成功,然后broker从队列里移除该数据。不关心该数据有没有正确被处理成我们想要的结果;Session设置为CLIENT_ACKNOWLEDGE 时,必须手动调用acknowledge 方法才为消费成功,然后从队列里移除该条数据。

目的地(Destination)
目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题。

//创建一个topic
Destination destination = session.createTopic("GiftMsg");
//创建一个queue
Destination destination = session.createQueue("GiftMsg");

JMS消息生产者
消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者;

//创建生产者
MessageProducer messageProducer = session.createProducer(destination);
//生产者发送消息
TextMessage textMessage = session.createTextMessage("Hello World");
messageProducer.send(textMessage);

JMS消息消费者
消息消费者由Session创建,用于接受目的地发送的消息。消费者实现MessageConsumer接口,我们可以为目的地、队列或话题创建消费者;

//创建消费者
MessageConsumer consumer  = session.createConsumer(destination);
//消费者用receive消费消息
TextMessage textMessage = (TextMessage) consumer.receive();
//消费者用监听器消费信息
 consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage)message;
                }
 });
  • Queue模式 和 Topic模式

我们可以分为点对点模式(Queue)与订阅模式(Topic),二者比较如下:

类型 点对点模式 订阅模型
消费消息的消费者数量 无论有多少消费者监听消息,当中只有一个消费者能处理消息 所有监听消息的消费者都能处理该消息
获取消息方式 pull(消费者先发个请求询问broker是否有消息,如果有消息broker会把消息发给中间件) push(无须消费者询问,broker会主动把消息发送订阅者)
持久化 消息会被保存到DB中,broker重启,消息犹在,如果没有消费者消费,消息就不会消失.只有当消息被消费,broker才会对消息进行进一步处理,比如删除等 消息不会落地,一到broker就会被发送出去,随后消息消失
完整性 每一条信息都能被消费 如果没有监听者进行监听,消息就会消失
接受策略 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器
  • receive 和 onMessage

如果为false,表示使用同步。当consumer使用receive()获取消息时,那么session将会把消息添加到consumer的本地queue,然后唤醒receive等待;当consumer使用messageListener异步侦听消息时,将会调用其onMessage()方法直到方法执行完毕,然后返回。

  • 同步和异步

JMS本身就是异步机制,即生产者发送消息以后,可以不管消费者是否接收到消息,从而继续工作,但是根据消费者接收信息的方式,可以分成同步接收和异步接收.
同步接收(pull)
消费者主动接收消息,即不断从中间件中拉取和轮询消息,如果消息队列为空,消费者会发生阻塞.
异步接收(push)
消费者设置监听器,当中间件有消息的时候,会发广播给各个消费者,而不用消费者定期去拉取。消费者在消费消息的过程中,还可以去做其他事,不会发生阻塞。

以下有些东西纯属臆想:

  • ack机制

不管是queue还是topic,每个消费者启动后,都会在中间件那里注册一个“会话”,用来存放与该消费者相关的数据(消费者与各个中间件的“会话”保持联系)。消费者消费完消息以后,会发送一个ack给中间件,告诉它我已经消费完这个消息了,然后中间件就会删掉总表或者会话当中对应的消息。当中间件收到消息的时候,这里我猜,queue肯定是会放在一个总表里面,然后由总表再随机发放到queue的某个“会话当中”,而topic我感觉会放在各个topic的“会话”中。当消息被push或者pull出去以后,消息的状态可能就会发生变化,比如由原来的0变成了1,当消费者返回ack后,这个消息就会被删掉。如果是queue,还会把总表的消息状态改变一下。

optimizeAcknowledge
String brokerUrl = "tcp://localhost:61616?" +   
                   "jms.optimizeAcknowledge=true" +   
                   "&jms.optimizeAcknowledgeTimeOut=30000" +   
                   "&jms.redeliveryPolicy.maximumRedeliveries=6";  
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);  

optimizeAcknowledge是针对消费者而言的,当optimizeAcknowledge=false的时候,表示逐个确认,中间件会发放一系列的消息给消费者,消费者每消费完一个就会发送ack给中间件;如果是等于true,即批量确认,消费者会消费满一定量的消息以后,再批量确认这批消息。

prefetchSize

那么这个一定量是多少呢?那就是prefetchSize,prefetchSize的值可以如下设置:

String queueName = "test-queue?customer.prefetchSize=100";  
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
Destination queue = session.createQueue(queueName);  

prefetchSize的值不仅针对消费者,也针对中间件当针对消费者时,即上面谈到的消费满prefetchSize个消息后发送确认;当针对中间件的时候,中间件会推送消息给消费者,当推送出去且还没消费的消息满prefetchSize个的时候,中间件便不会再向消费者推送消息,当prefetchSize = 0的时候,broker不会向消费者推送消息,即消费者想要获取消息,有且只能通过pull来拉取。那么客户端是怎么判断是否达到prefetchSize的呢?我想这就跟我们之前说的“会话”,只要中间件检测一下会话中的状态=1的消息的的数量即可。

  • activeMQ的参数

alwaysSyncSend/useAsyncSend

jms.alwaysSyncSend=false&jms.useAsyncSend=true
用来设定producer(或者session)发送消息的方式:同步/异步。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容