Java消息中间件

为什么使用消息中间件

消息中间件作用:解耦服务调用。松耦合。 使用中间件,不用等调用的服务处理完才返回结果。提高效率。

042.jpg

消息中间件解决服务调用之间的耦合

043.jpg

消息中间件带来的好处

  • 解耦
  • 异步
  • 横向扩展
  • 安全可靠
  • 顺序保证
  • 等等。。。

什么是中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。

什么是消息中间件:关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统

什么是JMS:Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

什么是AMQP:AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不接受客户端/中间件不同产品,不同开发语言等条件的限制。

044.png
常用消息中间件对比
  • ActiveMQ

    • ActiveMQ是Apache出品的,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊地位。
    • 多种语言和协议编写客户端,语言:Java、C、C++、C#、Ruby、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
    • 完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
    • 虚拟主题,组合目的,镜像队列
  • RabbitMQ

    • RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
    • 支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
    • AMQP的完整实现(vhost、Exchange、Binding、Routing Key等)
    • 事务支持/发布确认
    • 消息持久化
  • Kafka

    • Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
    • 通过O(1)的磁盘结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能
    • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
    • Partition、Consumer Group
045.png
JMS相关概念
  • 提供者:实现JMS规范的消息中间件服务器

  • 客户端:发送或接收消息的应用程序

  • 生产者/发布者:创建并发送消息的客户端

  • 消费者和订阅者:接收并处理消息的客户端

  • 消息:应用程序之间传递的数据内容

  • 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

  • 队列模式

    • 客户端包括生产者和消费者
    • 队列中的消息只能被一个消息者消费
    • 消费者可以随时消费队列中的消息
046.png
  • 主体模型
    • 客户端包括发布者和订阅者
    • 主题中的消息被所有订阅者消费
    • 消费者不能消费订阅之前就发送到主题中的消息
047.png
  • JMS编码接口
    • ConnectionFactory:用于创建连接到消息中间件的连接工厂
    • Connection:代表了应用程序和消息服务器之间的通信链路
    • Destination:指消息发布和接收的地点,包括队列和主题
    • Session:表示一个单线程的上下文,用于发送和接收消息
    • MessageConsumer:由会话创建,用于接收发送到目标的消息
    • MessageProducer:由会话创建,用于发送消息到目标
    • Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,一个消息体
048.png
win安装activemq

%activeMQ%\bin\win64:windows64位启动目录

  • activemq.bat:启动activemq

  • InstallService.bat:安装activemq服务到系统服务

  • 启动完,访问localhost:8161

  • 点击Manage ActiveMQ broker,用户名和密码:admin/admin

队列模式的消息演示

pom.xml

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>

AppProducer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppProducer {

    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws Exception {
        // 1、创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL) ;

        // 2、创建Connection
        Connection connection = connectionFactory.createConnection();

        // 3、启动连接
        connection.start();

        // 4、创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  // 是否在事务中处理,应答模式

        // 5、创建一个目标(队列)
        Destination destination = session.createQueue(QUEUE_NAME);
        // 6、创建生产者
        MessageProducer producer = session.createProducer(destination);


        for(int i = 0; i < 10; i++){
            // 7、创建消息
            TextMessage message = session.createTextMessage("create message " + i);
            // 8、发布消息
            producer.send(message);
            System.out.println("消息已发送 :" + message.getText());
        }
        // 9、关闭连接
        connection.close();
    }

}

AppConsumer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Jas
 * @create 2018-04-13 15:27
 **/
public class AppConsumer {

    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws Exception {

        // 1、创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(REMOTE_URL);

        // 2、创建Connection
        Connection connection = connectionFactory.createConnection();

        // 3、启动连接
        connection.start();

        // 4、创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  // 是否在事务中处理,应答模式

        // 5、创建一个目标(队列)
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6、创建一个消费者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7、创建一个监听器
        /*  Lambda表达式
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" +  message1.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        */
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" + message1.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 8、关闭连接
        // connection.close();  消息接收是异步的过程,所以关闭连接则接收不到消息
    }
}

启动两次AppConsumer监听消息发布

启动AppProducer发布消息,两个AppConsumer监听接收到的消息分别为:

消费者接收到消息:create message 0
消费者接收到消息:create message 2
消费者接收到消息:create message 4
消费者接收到消息:create message 6
消费者接收到消息:create message 8
消费者接收到消息:create message 1
消费者接收到消息:create message 3
消费者接收到消息:create message 5
消费者接收到消息:create message 7
消费者接收到消息:create message 9
主题模式的消息演示

AppProducer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppProducer {
    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String TOCPI_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建主题
        Destination destination = session.createTopic(TOCPI_NAME);

        MessageProducer producer = session.createProducer(destination);
        for(int i = 0; i < 10; i++){
            TextMessage message = session.createTextMessage("create message " + i);
            producer.send(message);
            System.out.println("消息已发送 :" + message.getText());
        }

        connection.close();
    }
}

AppConsumer.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppConsumer {

    private static final String REMOTE_URL = "tcp://127.0.0.1:61616";
    private static final String TOCPI_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory(REMOTE_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(TOCPI_NAME);

        MessageConsumer consumer = session.createConsumer(destination);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage message1 = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" +  message1.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        
    }
}

运行AppProducer后,再运行AppConsumer,监听不到消息发布;两个AppConsumer监听,会全部接收到AppProducer发布的消息。

使用Spring集成JMS连接ActiveMQ

  • ConnectionFactory:用于管理连接的连接工厂
    • 一个Spring为我们提供的连接池
    • JmsTemplate每次发消息都会重新创建连接,会话和productor
    • spring提供了SingleConnectionFactory和CachingConnectionFactory
  • JmsTemplate:用于发送和接收消息的模版类
    • 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms
    • JmsTemplate类是线程安全的,可以在整个应用范围使用
  • MessageListerner:消息监听器
    • 实现一个onMessage方法,该方法只接收一个Message参数

pom.xml

<properties>
    <spring-version>4.3.9.RELEASE</spring-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring-version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
        <!-- 排除 ActiveMQ 自身依赖的 Spring -->
        <exclusions>
            <exclusion>
                <artifactId>spring-context</artifactId>
                <groupId>org.springframework</groupId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

ProducerService.java

public interface ProducerService {
    /**
     * 生产者发送消息
     * @param message
     */
    void sendMessage(String message);
}

common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd 
       http://www.springframework.org/schema/context 
       http://www.springframework.org/schema/context/spring-context.xsd">
    
    <context:annotation-config/>

    <!-- ActiveMQ 提供的ConnectionFactory -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!-- 配置 brokerURL,这里为你自己开启 ActiveMQ 服务的地址-->
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>

    <!-- Spring jms为我们 提供的连接池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
    </bean>

    <!-- 
        点对点或队列模型
        配置队列的目的地 
    -->
    <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-jms-queue"/>
    </bean>

    <!-- 
        发布者/订阅者模型
        配置主题的目的地 
    -->
    <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="spring-jms-topic"/>
    </bean>
</beans>

producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 导入公共配置 -->
    <import resource="common.xml"/>
    
    <!-- 配置 JmsTemplate -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <!-- 把 ProducerServiceImpl 交给Spring IoC 容器管理-->
    <bean class="com.jas.jms.producer.ProducerServiceImpl"/>
</beans>

ProducerServiceImpl.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.annotation.Resource;
import javax.jms.*;

public class ProducerServiceImpl implements ProducerService {

    @Autowired
    JmsTemplate jmsTemplate;
    /**
     * 这里以 @Resource 方式注入目的地对象
     * 如果是发布者/订阅者模式,只选要修改 name 中的值为“activeMQTopic”即可
     */
    @Resource(name = "activeMQQueue")
    Destination destination;

    @Override
    public void sendMessage(final String message) {
       jmsTemplate.send(destination, new MessageCreator() {
           @Override
           public Message createMessage(Session session) throws JMSException {
               TextMessage textMessage = session.createTextMessage(message);
               return textMessage;
           }
       });
        System.out.println("消息已发送:" + message);
    }
}

Producer.java

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Producer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerService producerService = context.getBean(ProducerService.class);

        for (int i = 0; i < 10; i++) {
            producerService.sendMessage("test message:" + i);
        }
        
        // 关闭 IoC 容器
        context.close();
    }
}

consumer.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 导入公共配置 -->
    <import resource="common.xml"/>

    <!-- 配置自定义消费者消息监听器 -->
    <bean id="consumerMessageListener" class="com.jas.jms.consumer.ConsumerMessageListener"/>
    
    <!-- 配置消息监听器的容器 -->
    <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="activeMQQueue"/>
        <!--
            配置发布者/订阅者模型的目的地
            <property name="destination" ref="activeMQTopic"/>
         -->
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>
</beans>

ConsumerMessageListener.java

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;

        try {
            System.out.println("接收已接收:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
}

ActiveMQ集群配置

  • 为什么要对消息中间件集群?

    • 实现高可用,以排除单点故障引起的服务中断
    • 实现负载均衡,以提升效率为更多客户提供服务
  • 集群方式

    • 客户端集群:让多个消费者消费同一队列
    • Broker clusters:多个Broker之间同步消息
    • Master Slave:实现高可用
  • ActiveMQ失效转义(failover)

    • 允许当中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器。
    • 语法:failover:(uri,...,uriN)?transportOptions
  • transportPotions参数说明

    • randomize默认为true,表示在URI列表中选择URI连接时是否采用随机策略
    • initialReconnectDelay默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
    • maxReconnectDelay默认30000,单位毫秒,最长重连的时间间隔
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容

  • 中间件 非底层操作系统软件,非业务应用软件,不能直接给最终用户使用和带来价值的软件。 消息中间件 关注于数据的发送...
    wch853阅读 1,019评论 0 0
  • 什么是消息中间件? 关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。 什么是JMS? Jav...
    勿念及时雨阅读 947评论 0 1
  • 大型系统的演变必然的发展方向是分布式,而在分布式系统中应用与应用之间互相连接越来越紧密,在应用之间的消息传递就像家...
    谜00016阅读 1,512评论 0 5
  • 本篇文章是SpringCloud的统一化配置学习之前的最后一个番外篇,工欲善其事必先利其器,基础牢靠了,才能学的更...
    青衣敖王侯阅读 910评论 0 4
  • 一、简介 1.1 什么是 JMS JMS 即 Java 消息服务(Java Message Service)应用程...
    AaronSimon阅读 1,193评论 0 7