- exchange的声明和删除
- 声明: exchangeDeclare方法详解
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client.
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
exchange: 交换器名称
type: 交换器类型,上节提到的direct fanout headers和topic
durable: 是否持久化 持久话将会将交换器存盘,服务器重启时不会丢失
autoDelete: 是否自动删除. 自动删除的前提是至少有一个队列或者交换器与这个交换器绑定.之后所有的交换器和队列与这个交换器解绑后,该交换器会自动删除.不能错误的理解为连接断开后交换器会删除
internal: 是否内置的 这个参数为true时表示这个交换器只能由其他交换器发送消息.不能与客户端直连
arguments: 其他一些结构化参数
删除交换器: exchangeDelete
* Delete an exchange
* @see com.rabbitmq.client.AMQP.Exchange.Delete
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
* @param exchange the name of the exchange
* @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
- queue的声明和删除
* Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
* flag to true and returns no result (as there will be no response from the server).
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @throws java.io.IOException if an error is encountered
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
* Delete a queue
* @see com.rabbitmq.client.AMQP.Queue.Delete
* @see com.rabbitmq.client.AMQP.Queue.DeleteOk
* @param queue the name of the queue
* @param ifUnused true if the queue should be deleted only if not in use
* @param ifEmpty true if the queue should be deleted only if empty
* @return a deletion-confirm method to indicate the queue was successfully deleted
* @throws java.io.IOException if an error is encountered
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
* Purges the contents of the given queue.
* @see com.rabbitmq.client.AMQP.Queue.Purge
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
* @param queue the name of the queue
* @return a purge-confirm method if the purge was executed successfully
* @throws java.io.IOException if an error is encountered
Queue.PurgeOk queuePurge(String queue) throws IOException;
- queueBind exchangeBind queueUnbind exchangeUnbind
- queueBind方法详解
* Bind a queue to an exchange.
* @see com.rabbitmq.client.AMQP.Queue.Bind
* @see com.rabbitmq.client.AMQP.Queue.BindOk
* @param queue the name of the queue 要绑定的队列名
* @param exchange the name of the exchange 要绑定的交换器名
* @param routingKey the routing key to use for the binding 绑定时的routingBind
* @param arguments other properties (binding parameters) 绑定参数
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queueUnbind 可以解绑已经绑定的队列和交换器
- 何时创建
- 由生产者和消费者声明
RabbitMQ 官方建议,生产者和消费者都应该尝试创建队列. - 程序上线前在服务器上创建好,比如通过页面管理、RabbitMQ命令或更好的配置中心下发
* Publish a message.
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
* set. Note that the RabbitMQ server does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
- exchange: 交换器名称
- RoutingKey: 路由键
- props: 消息的基本属性集 包括14个属性
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
- byte[] 消息体
- mandatory和immediate下节讲述
* Start a non-nolocal, non-exclusive consumer.
* @param queue the name of the queue 队列名称
* @param autoAck true if the server should consider messages 是否自动确认.建议设置成false
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param consumerTag a client-generated consumer tag to establish context 消费者标签,用来区分不同消费者
* @param callback an interface to the consumer object 回调函数,用来处理推送来的消息
* @return the consumerTag associated with the new consumer
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @see com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
void basicReject(long deliveryTag, boolean requeue) throws IOException;
public void consumeMessage() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
boolean autoAck = false;
channel.basicConsume("test_queue", autoAck, "testConsumerTag", new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
long tag = envelope.getDeliveryTag();
if (tag / 2 == 0) {
channel.basicReject(tag, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), true);
RabbitMQUtils.close(connection, channel);
public void closeChannel() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
- 创建连接:
package com.pctf.basic;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionCreator {
private static final String HOST = "";
private static final int PORT = 5672;
private static final String USER_NAME = "root";
private static final String PASSWORD = "123456";
private static final ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (IOException e) {
} catch (TimeoutException e) {
return null;
- 关闭连接:
package com.pctf.utils;
import java.io.Closeable;
import java.io.IOException;
public class RabbitMQUtils {
public static void close(AutoCloseable... closeables) {
if (closeables != null) {
for (AutoCloseable c : closeables) {
if (c != null) {
try {
} catch (Exception e) {