常用消息中间件有activeMQ,RabbitMQ,Kafka,ZeroMQ,MetaMQ,RocketMQ,本文这里只介绍第一个。
一、linux上安装
下载tar.gz包 解压tar -zxvf
在MQ目录下bin/linux-x86-64下 ./activemq start ./activemq stop
http://IP:8161/admin
默认用户名密码 admin/admin
三、发布和消费消息(这里以P2P为例)
1,写一个发消息服务如下:
@Service("notifyService")
public class NotifyServiceImpl implements INotifyService{
private static final Logger logger = LoggerFactory.getLogger(NotifyServiceImpl.class);
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
@Resource(name = "awardMsgDestinationQueue")
private Destination awardMsgDestinationQueue;
@Override
public void sendAwardMsg(final String msg) {
try {
Destination destination = this.awardMsgDestinationQueue;
jmsTemplate.send(destination,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(msg);
return textMessage;
}
});
} catch (Exception ex) {
ex.printStackTrace();
logger.error("向默认队列发送消息失败");
}
}
}
2,写一个消费监听器
public class AwardMsgQueueListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
msg = tm.getText();
} catch (JMSException e1) {
logger.error("从消息队列获取消息出现异常,请检查");
e1.printStackTrace();
return;
}
......//根据实际业务拿到msg并处理
}
}
由上可以看出,我们发布消息注入了jms模板和queue,这就需要我们先进行配置
二、整合spring
基础配置文件如下:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--使用缓存可以提升效率-->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<!-- 定义推送中奖消息队列(Queue) -->
<bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="awardMsgDestinationQueue"/>
</bean>
<!-- 配置监听者(Queue) -->
<bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />
<!-- 配置多个消息监听容器,配置连接工厂,监听的目标是defaultDestinationQueue,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="awardMsgDestinationQueue" />
<property name="messageListener" ref="awardMsgQueueListener" />
</bean>