第十五篇:使用ActiveMQ

前言:
前面我们可以利用Solr集群来实现我们的搜索功能,但是有没有发现在我们每次添加一个新的商品的时候都要重新导入一次索引库,效率非常低。这就需要我们优化一下我们的方案。我们想到最好就是我们添加商品的时候可以单独将该商品同步到索引库。那么我们可以想象有以下几种方案:
方案一:在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑。
缺点:业务逻辑耦合度高,业务拆分不明确
方案二:业务逻辑在taotao-search中实现,调用服务在taotao-manager实现。业务逻辑分开。
缺点:服务之间的耦合度变高。服务的启动有先后顺序。
方案三:使用消息队列。MQ是一个消息中间件。如图


image.png

怎么理解消息中间件呢?我们可以把它理解为一个秘书,消息的发布者就是大老板,大老板下午三点要开个会,他只需跟秘书说一声,下午三点,我要开个会,就行了,老板不用管秘书是怎样通知各项目经理的,也不用管项目经理要带什么材料,他所做的只是告诉秘书一声而已。秘书负责与各个项目经理联系,告诉各个项目经理应该准备什么。MQ便相当于"秘书"这个角色。当添加一个商品时,商品服务只需要告诉消息中间件MQ,MQ便去通知其它服务做各自该做的事情,比如通知搜索服务去同步索引库,通知redis服务去同步缓存,通知生成静态页面等等。
常见的作为MQ中间件的有:ActiveMQ、RabbitMQ、Kafka。

1.什么是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

2.ActiveMQ的消息形式

对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。我们用的最多的就是TextMessage而已。

  • StreamMessage -- Java原始值的数据流
  • MapMessage--一套名称-值对
  • TextMessage--一个字符串对象
  • ObjectMessage--一个序列化的 Java对象
  • BytesMessage--一个字节的数据流


    image.png

3.ActiveMQ的安装

1.进入http://activemq.apache.org/下载ActiveMQ
使用的版本是5.12.0
2.安装环境
1.需要jdk
2.Linux虚拟机
3.安装步骤
第一步:先把ActiveMQ的压缩包上传到Linux系统
第二步:解压
第三步:启动ActiveMQ
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
4.进入管理后台:
(http://192.168.208.40:8161/admin)
用户名:admin
密码:admin

image.png

有时候我们可能会遇到503错误的问题
解决方法:
1、查看机器名
[root@itcast168 bin]# cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=itcast168
2、修改host文件
[root@itcast168 bin]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 itcast168
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
[root@itcast168 bin]#
3、重启Activemq服务

4.测试ActiveMQ

4.1 Queue

4.1.1生产者:生产消息,发送端

1.把jar包添加到依赖中,使用5.11.2版本的包(为什么用这个版本的包?因为5.12的话里面有spring的集成,会影响我们本来的架构)

 <!--ActivqMq组件-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>

步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。

//测试消息队列的发送者
    @Test
    public void testActiveMqProducer() throws Exception{
        //1.创建一个连接的工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.208.40:61616");
        //2.使用ConnectionFactory创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        //3.开启连接
        connection.start();
        //4.使用Connection对象创建一个Session对象
        //第一个参数:是否开启事务(分布式事务)。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.使用Session对象创建一个Destination对象,(topic、queue),此处创建一个Queue对象
        //topic的话就是
//        Destination topic = session.createTopic();
        Destination queue = session.createQueue("test-queue");
        //6.使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        //7.创建一个Message对象,创建一个TextMessage对象
        TextMessage  textMessage  = session.createTextMessage("hello activeMq,this is my first 1");
        //8.使用生产者发送信息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();

    }

4.1.2 消费者:接收信息

第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

//测试消息的接受者
    @Test
    public void testQueueConsumer() throws Exception{
        //1.创建一个连接的工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.208.40:61616");
        //2.使用ConnectionFactory创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        //3.开启连接
        connection.start();
        //4.使用Connection对象创建一个Session对象
        //第一个参数:是否开启事务(分布式事务)。true:开启事务,第二个参数忽略。
        //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.使用Session对象创建一个Destination对象,(topic、queue),此处创建一个Queue对象
        Destination queue = session.createQueue("test-queue");
        //6.使用Session对象创建一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(queue);
        //7.接收消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

image.png

运行生产者


image.png

运行消费者


image.png

我们可以观察到数目发生了变化
image.png

4.2 Topic

4.2.1 Producer

消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

    @Test
    public void testTopicProducer() throws JMSException{
        //1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口号。注意参数brokerURL的开头是
        //tcp://而不是我们通常的http://,端口是61616而不是我们访问activemq后台管理页面所使用的8161
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.208.40:61616");
        //2.使用ConnectionFactory创建一个连接Connection对象
        Connection connection = connectionFactory.createConnection();
        //3.开启连接。调用Connection对象的start方法
        connection.start();
        //4.使用Connection对象创建一个Session对象
        //第一个参数是是否开启事务,一般不使用分布式事务,因为它特别消耗性能,而且顾客体验特别差,现在互联网的
        //做法是保证数据的最终一致(也就是允许暂时数据不一致),比如顾客下单购买东西,一旦订单生成完就立刻响应给用户
        //下单成功。至于下单后一系列的操作,比如通知会计记账、通知物流发货、商品数量同步等等都先不用管,只需要
        //发送一条消息到消息队列,消息队列来告知各模块进行相应的操作,一次告知不行就两次,直到完成所有相关操作为止,这
        //也就做到了数据的最终一致性。如果第一个参数为true,那么第二个参数将会被忽略掉。如果第一个参数为false,那么
        //第二个参数为消息的应答模式,常见的有手动和自动两种模式,我们一般使用自动模式。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在我们使用topic
        //参数就是消息队列的名称
        Topic topic = session.createTopic("test-topic");
        //6.使用Session对象创建一个Producer对象
        MessageProducer producer = session.createProducer(topic);
        //7.创建一个TextMessage对象
        //有两种方式,第一种方式:
//      TextMessage textMessage = new ActiveMQTextMessage();
//      textMessage.setText("hello,activemq!!!");
        //第二种方式:
        TextMessage textMessage = session.createTextMessage("hello,activemq topic");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }

4.2.2 消费者

@Test
    public void testTopicConsumer() throws Exception{
        //1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口号。注意参数brokerURL的开头是
        //tcp://而不是我们通常的http://,端口是61616而不是我们访问activemq后台管理页面所使用的8161
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.30:61616");
        //2.使用ConnectionFactory创建一个连接Connection对象
        Connection connection = connectionFactory.createConnection();
        //3.开启连接。调用Connection对象的start方法
        connection.start();
        //4.使用Connection对象创建一个Session对象
        //第一个参数是是否开启事务,一般不使用分布式事务,因为它特别消耗性能,而且顾客体验特别差,现在互联网的
        //做法是保证数据的最终一致(也就是允许暂时数据不一致),比如顾客下单购买东西,一旦订单生成完就立刻响应给用户
        //下单成功。至于下单后一系列的操作,比如通知会计记账、通知物流发货、商品数量同步等等都先不用管,只需要
        //发送一条消息到消息队列,消息队列来告知各模块进行相应的操作,一次告知不行就两次,直到完成所有相关操作为止,这
        //也就做到了数据的最终一致性。如果第一个参数为true,那么第二个参数将会被忽略掉。如果第一个参数为false,那么
        //第二个参数为消息的应答模式,常见的有手动和自动两种模式,我们一般使用自动模式。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在我们使用queue
        //参数就是消息队列的名称
        Topic topic = session.createTopic("test-topic");
        //6.使用Session对象创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(topic);
        //7.向Consumer对象中设置一个MessageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        String text = textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //8.程序等待接收用户结束操作
        //程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,
        //当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。
        System.out.println("topic消费者1111。。。。。");
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

topic形式的话会存在一个问题就是不能持久化,也就是如果消息发送者发送消息的时候,如果没有消费者运行的话,它将无法消费者条消息。(即就算消费者没有及时消费的话,就算再启动也无法获得生产者传过来的消息)。如何解决的话我们可以考虑将数据保存在磁盘这样就不会丢失了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容