第4章 java操作activemq

我的样例工程:D:\springboot\demo-mq-maven

maven依赖

 <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.11.2</version>
 </dependency>

点对点

Producer 生产者

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

Consumer

消费者有两种消费方法::

1、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

 实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。

本例演示异步消费

异步消费

消费者:接收消息。

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源

点对点示例

package com.neuedu.test;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

public class TestQueue {
@Test
public void QueueProduct()
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    
    try {
        Connection conn = connectionFactory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        Queue q = session.createQueue("test-queue");
        
        MessageProducer p = session.createProducer(q);
        
        TextMessage msg = session.createTextMessage("hello world");
        
        p.send(msg);
        
        p.close();
        session.close();
        conn.close();
        
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
@Test
public void QueueConsumer()
{
       //异步消费
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    
    try {
        Connection conn = connectionFactory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        Queue q = session.createQueue("test-queue");
        
        MessageConsumer c = session.createConsumer(q);
        
        c.setMessageListener(new MessageListener(){

            @Override
            public void onMessage(Message arg0) {
                
                TextMessage m = (TextMessage)arg0;
                try {
                    String tm = m.getText();
                    
                    System.out.println(tm);
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
            }
            
        });
        
        try {
            System.in.read();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        c.close();
        session.close();
        conn.close();
        
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

Topic

Producer

Prod使用步骤:

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

第二步:使用ConnectionFactory对象创建一个Connection对象。

第三步:开启连接,调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。

第六步:使用Session对象创建一个Producer对象。

第七步:创建一个Message对象,创建一个TextMessage对象。

第八步:使用Producer对象发送消息。

第九步:关闭资源。

Consumer

消费者:接收消息。

第一步:创建一个ConnectionFactory对象。

第二步:从ConnectionFactory对象中获得一个Connection对象。

第三步:开启连接。调用Connection对象的start方法。

第四步:使用Connection对象创建一个Session对象。

第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。

第六步:使用Session对象创建一个Consumer对象。

第七步:接收消息。

第八步:打印消息。

第九步:关闭资源

订阅/发布

package com.neuedu.test;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

public class TestTopic {
@Test
public  void TopicProduct()
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    
    try {
        Connection conn = connectionFactory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        Topic t = session.createTopic("topic-test");
        
        MessageProducer p = session.createProducer(t);
        
        TextMessage msg = session.createTextMessage("hello topic11111");
        
        p.send(msg);
        
        p.close();
        session.close();
        conn.close();
        
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
@Test
public  void TopicConsumer()
{
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    
    try {
        Connection conn = connectionFactory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        Topic t = session.createTopic("topic-test");
        
        MessageConsumer c = session.createConsumer(t);
        
        c.setMessageListener(new MessageListener(){

            @Override
            public void onMessage(Message arg0) {
                
                TextMessage m = (TextMessage)arg0;
                try {
                    String tm = m.getText();
                    
                    System.out.println(tm);
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
            }
            
        });
        
        try {
            System.in.read();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        c.close();
        session.close();
        conn.close();
        
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

spring整合开发(略)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,516评论 0 11
  • 1. 同步索引库分析 方案一:在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑...
    东方舵手阅读 500评论 0 1
  • 什么是ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ ...
    MrRobot_阅读 588评论 0 3
  • ActiveMQ [toc]简书不支持 toc 目录模式,截图一张。 什么是ActiveMQ ActiveMQ 是...
    inke阅读 435评论 0 4
  • 文章大纲 一、消息中间件基础知识二、ActiveMQ介绍三、ActiveMQ下载安装(Windows版本)四、Ja...
    故事爱人c阅读 701评论 0 1