原理机制自已百度,直接上代码
也可以参考这位大神
点我官网下载windows版,因为windows版与linux版同步更新,学习时为方便使用windows版完全没有问题.
解压并运行\bin\win64\activemq.bat(跟tomcat很像,linux也一样)
然后执行如下程序
原生使用
package gzz.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
public class TestMq {
@Test
public void testMQProducerQueue() throws Exception {
// 1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 3、开启连接
connection.start();
// 4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
// 6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(queue);
// 7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
// 8、发送消息
producer.send(textMessage);
// 9、关闭资源
producer.close();
session.close();
connection.close();
}
public void TestMQConsumerQueue() throws Exception {
// 1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 3、开启连接
connection.start();
// 4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
// 6、使用会话对象创建生产者对象
MessageConsumer consumer = session.createConsumer(queue);
// 7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
// 8、程序等待接收用户消息
System.in.read();
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
}
springboot整合
配置文件application.yml
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
pool:
enabled: true
max-connections: 100
日志logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d[%F:%L][%p]:%m%n</pattern>
</encoder>
</appender>
<logger name="org.springframework" level="ERROR" />
<logger name="com.netflix" level="ERROR" />
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gzz</groupId>
<artifactId>springboot-activemq</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<!-- <scope>test</scope> -->
</dependency>
<!-- 整合消息队列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果配置线程池则加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
生产者
package gzz.activemq;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
@Service
public class Producer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendMsg(String destinationName, String message) {
System.out.println("==>>发送queue消息 " + message);
Destination destination = new ActiveMQQueue(destinationName);
jmsMessagingTemplate.convertAndSend(destination, message);
}
}
消费者
package gzz.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
@Service
public class Consumer {
@JmsListener(destination = "test.queue")
public void receiveMsg(String text) {
System.out.println("<<==收到消息: " + text);
}
}
发布者
package gzz.activemq;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
@Service
public class Publisher {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
public void publish(String destinationName, String message) {
Destination destination = new ActiveMQTopic(destinationName);
System.out.println("==>>发布topic消息 " + message);
jmsMessagingTemplate.convertAndSend(destination, message);
}
}
订阅者
package gzz.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
@Service
public class Subscriber {
@JmsListener(destination = "test.topic", containerFactory = "myJmsContainerFactory")
public void subscribe(String text) {
System.out.println("<<==收到订阅的消息" + text);
}
}
测试类
package gzz.activemq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo8ActivemqApplicationTests {
@Autowired
private Producer producer;
@Autowired
private Publisher publisher;
//@Test
public void contextLoads() {
for (int i = 0; i < 10; i++) {
producer.sendMsg("test.queue", "Queue Message " + i);
}
}
@Test
public void test() {
for (int i = 0; i < 10; i++) {
publisher.publish("test.topic", "Topic Message " + i);
}
}
}
主程序
package gzz;
import javax.jms.ConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
/**
* @author www.gaozz.club
* @date 2018-08-26
*/
@SpringBootApplication
@EnableJms
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public JmsListenerContainerFactory<?> myJmsContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
}