1.配置JMS连接工厂
@Configuration
@EnableJms
@Slf4j
public class MdxpCreator {
private static String host;
private static Integer port;
private static String queueManager;
private static String channel;
private static String username;
private static String password;
private static int ccsid;
@Value("${mdxp.host}")
public void setHost(String host) {
MdxpCreator.host = host;
}
@Value("${mdxp.port}")
public void setPort(Integer port) {
MdxpCreator.port = port;
}
@Value("${mdxp.queue.manager}")
public void setQueueManager(String queueManager) {
MdxpCreator.queueManager = queueManager;
}
@Value("${mdxp.channel}")
public void setChannel(String channel) {
MdxpCreator.channel = channel;
}
@Value("${mdxp.username}")
public void setUsername(String username) {
MdxpCreator.username = username;
}
@Value("${mdxp.password}")
public void setPassword(String password) {
MdxpCreator.password = password;
}
@Value("${mdxp.ccsid}")
public void setCcsid(int ccsid) {
MdxpCreator.ccsid = ccsid;
}
//配置连接工厂
@Bean
public MQQueueConnectionFactory mqQueueConnectionFactory() {
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(host);
try {
mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
mqQueueConnectionFactory.setCCSID(ccsid);
mqQueueConnectionFactory.setChannel(channel);
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setQueueManager(queueManager);
} catch (Exception e) {
log.info(String.valueOf(e)+"配置连接工厂失败!");
}
return mqQueueConnectionFactory;
}
//配置连接认证
@Bean
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter
(MQQueueConnectionFactory mqQueueConnectionFactory) {
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
userCredentialsConnectionFactoryAdapter.setUsername(username);
userCredentialsConnectionFactoryAdapter.setPassword(password);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
return userCredentialsConnectionFactoryAdapter;
}
//配置缓存连接工厂
@Bean
@Primary
public CachingConnectionFactory cachingConnectionFactory
(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(500);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
//配置DefaultJmsListenerContainerFactory
@Bean(name = "jmsQueueListenerCF")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory
(CachingConnectionFactory cachingConnectionFactory) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrency("1-1");
factory.setRecoveryInterval(1000L);
return factory;
}
}
2.监听队列消息
@JmsListener(destination = "队列名字", containerFactory = "jmsQueueListenerCF")
public void onTopicMessage(Message message) throws Exception {
log.info(""+message);
String msg;
if(message instanceof BytesMessage){
//byte类型需要进行格式转化
BytesMessage byteMsg = (BytesMessage)message;
byte[] buff = null;
String data = null;
try
{
long length = byteMsg.getBodyLength();
buff = new byte[(int)length];
byteMsg.readBytes(buff);
data = new String(buff, "UTF-8");
log.info("接收到Byte类型消息:"+data);
//处理消息逻辑代码
}
catch (JMSException e)
{
e.printStackTrace();
}
}else if(message instanceof TextMessage){
try{
TextMessage textMessage=(TextMessage)message;
msg=textMessage.getText();
log.info("接收到Text类型:"+msg);
//处理消息逻辑代码
}catch (Exception e){
log.info("接收消息异常{}");
}
}else{
log.info("既不是byte类型消息也不是text类型消息");
}
}
可参考官方文档:https://www.ibm.com/support/knowledgecenter/zh/SSFKSJ_8.0.0/com.ibm.mq.dev.doc/q032100_.htm