1.使用默认交换机动态注入队列名
连接配置
public class ConnectionManager{
private Logger logger = Logger.getLogger(getClass());
private static ConnectionManager instance = new ConnectionManager();
private Map<String, Connection> connectionTable = Collections.synchronizedMap(new HashMap<>());
private ConnectionFactory connectionFactory;
public static ConnectionManager getInstance() {
return instance;
}
private ConnectionManager(){
initialize();
}
private boolean initialize(){
InputStream inputStream = null;
try {
inputStream = Class.forName(ConnectionManager.class.getName()).getResourceAsStream("/rabbitmq.properties");
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
}
Properties properties = new Properties();
try {
properties.load(inputStream);
String host = properties.getProperty("host");
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setAutomaticRecoveryEnabled(true);
} catch (IOException e) {
logger.error("load rabbirmq failed.", e);
}
return true;
}
public Connection getAndCreateConnection(String connectionName) {
Connection connection = connectionTable.get(connectionName);
if (connection != null && connection.isOpen()) {
System.err.println("connection: "+connection);
return connection;
}
synchronized (this) {
connection = connectionTable.get(connectionName);
if (connection != null) {
return connection;
}
try {
connection = connectionFactory.newConnection(connectionName);
this.connectionTable.put(connectionName, connection);
} catch (IOException e) {
// TODO 可以发邮件通知消息服务器负责人,不能获取连接
// 增加计数,当获取连接次数达到一定时,可以重启消息服务器
e.printStackTrace();
} catch (TimeoutException e) {
// TODO 可以发邮件通知消息服务器负责人,不能获取连接
e.printStackTrace();
}
}
return connection;
}
public Connection reConnection(String connectionName) {
Connection connection = null;
try {
for(;;){
connection = getAndCreateConnection(connectionName);
if (connection.isOpen()) {
break;
}else {
System.err.println("connection not open");
}
Thread.sleep(100);
}
} catch (Exception e) {
}
return connection;
}
}
//rabbitmq.properties 内容
//host=192.168.1.192
推送消息默认配置
public class MQDefaultPublishServiceImpl implements ShutdownListener{
private final ConnectionManager connectionManager = ConnectionManager.getInstance();
private String queueName = "ivg-default-queue";
private final Object channelLock = new Object();
private Map<String, Channel> channelTable = Collections.synchronizedMap(new HashMap<>());
public MQDefaultPublishServiceImpl() {
}
public void sendMsg(byte[] body) throws IOException {
sendMsg(body, queueName);
}
private Channel getChannel(String queueName) throws IOException {
Channel ch = channelTable.get(queueName);
if (ch == null) {
synchronized (channelLock) {
ch = channelTable.get(queueName);
if (ch == null) {
Connection connection = connectionManager.reConnection(getConnectionName());
if (connection != null) {
ch = connection.createChannel();
ch.queueDeclare(queueName, false, false, false, null);
ch.addShutdownListener(this);
channelTable.put(queueName, ch);
}
}
}
}
return ch;
}
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
if (cause.getReference() instanceof Channel) {
try {
clearChannels();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void clearChannels() throws IOException, TimeoutException {
try {
for(Channel channel:channelTable.values()){
channel.removeShutdownListener(this);
channel.close();
channel = null;
}
} finally {
channelTable.clear();
}
}
public String getConnectionName() {
return "default-producer";
}
public void sendMsg(byte[] body, String queueName) throws IOException {
// TODO Auto-generated method stub
Channel channel = getChannel(queueName);
try {
channel.basicPublish("", queueName, null, body);
} catch (IOException | AlreadyClosedException e) {
System.err.println(UtilAll.timeMillisToHumanString()+": sendMsg failed");
}
}
}
接收消息配置
public abstract class MQAbstractConsumerService implements MQConsumerService,Consumer,ShutdownListener{
private final ConnectionManager connectionManager = ConnectionManager.getInstance();
protected Channel channel = null;
public MQAbstractConsumerService() {
initialize();
}
public boolean initialize() {
Connection connection = connectionManager.getAndCreateConnection(getConnectionName());
if (connection.isOpen()) {
try {
channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
channel.addShutdownListener(this);
channel.basicConsume(getQueueName(), false, this);
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
return false;
}
@Override
public void handleConsumeOk(String consumerTag) {
// TODO Auto-generated method stub
}
@Override
public void handleCancelOk(String consumerTag) {
// TODO Auto-generated method stub
}
@Override
public void handleCancel(String consumerTag) throws IOException {
// TODO Auto-generated method stub
}
@Override
public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
// TODO Auto-generated method stub
if (!channel.isOpen()) {
return;
}
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// TODO Auto-generated method stub
}
@Override
public void handleRecoverOk(String consumerTag) {
// TODO Auto-generated method stub
}
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
if (cause.getReference() instanceof Channel) {
System.err.println(Thread.currentThread().getName()+": shutdownCompleted");
try {
channel.removeShutdownListener(this);
channel.close();
channel = null;
Connection connection = connectionManager.reConnection(getConnectionName());
channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
channel.addShutdownListener(this);
channel.basicConsume(getQueueName(), false, this);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
public class MQConsumerServiceImpl extends MQAbstractConsumerService implements MQConsumerService{
ExecutorService executorService = Executors.newFixedThreadPool(5);
private Logger logger = LoggerFactory.getLogger("MQConsumerServiceImpl");
@Override
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] arg3) throws IOException {
super.handleDelivery(arg0, envelope, arg2, arg3);
Runnable task = new Runnable() {
@Override
public void run() {
String message = null;
try {
message = new String(arg3, "UTF-8");
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
System.out.println(" [x] Received '" + message + "'");
boolean result = false;
try {
result = handleReceiveMsg(message);
} catch (Exception e) {
logger.error("rabbitmq consumer error.",e);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
if (result) {
channel.basicAck(envelope.getDeliveryTag(), false);
}else {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
executorService.submit(task);
}
protected boolean handleReceiveMsg(String msg) {
return true;
}
@Override
public String getConnectionName() {
return "ivg-consumer";
}
@Override
public String getQueueName() {
return "ivg-default-queue";
}
}
/*之后只要继承MQConsumerServiceImpl 重写
@Override
protected boolean handleReceiveMsg(String msg) {
//这边写接收消息业务
}
*/
2.使用 RabbitTemplate
初始化绑定参数
application.properties
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
rabbitmq.exchange=test_exchange
rabbitmq.routingkey=test_routingkey
@Configuration
public class AmqpProviderConfig {
@Value("${rabbitmq.exchange}")
private String exchange;
@Value("${rabbitmq.routingkey}")
private String routingkey;
/** 固定内部通讯队列 ,直接写死绑定交换机与路由队列 **/
/* @Bean
public Queue serverInlineQueue() {
return new Queue(routingkey);
}*/
/* @Bean
public Binding bindingServerExchange(Queue serverInlineQueue) {
DirectExchange exchangeobj = new DirectExchange(exchange, false, true);
return BindingBuilder.bind(serverInlineQueue).to(exchangeobj).with(routingkey);
}*/
//这边初始化是绑定交换机,延迟绑定(调用发送方法管理界面才显示)
@Bean
TopicExchange exchange() {
return new TopicExchange(exchange);
}
}
发送消息
@Component
public class RabbitMqSend implements RabbitTemplate.ConfirmCallback {
private static final Logger log = LoggerFactory.getLogger(RabbitMqSend.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.exchange}")
private String exchange;
@Value("${rabbitmq.routingkey}")
private String routingkey;
/**
* 发送方法
*
* @param msg
*/
//这边code不动态注入就直接用写死的routingkey
public void sendMsg(String msg,String code) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//动态注入路由key
rabbitTemplate.convertAndSend(exchange, code, msg, correlationData);
}
/**
* 将消息发送到mq server回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO Auto-generated method stub
log.debug("send id:" + correlationData.getId());
if (ack) {// 调用成功
log.warn(correlationData.getId() + ":" + "发送成功.");
} else {
log.warn(correlationData.getId() + ":" + "发送失败.");
}
}
}
-
动态绑定需要自己 在后台管理界面 手动将路由队列绑定到交换机
先创建队列,然后将其绑定到交换机
以上
best wishes