RabbitMQ是消息中间件:它接收并转发消息。你可以认为它是邮局:当你将邮件放到邮箱中,你可以确定邮递员最终会将你的邮件送到你的收件人手中。类似的是,RabbitMQ就是一个邮箱,一个邮局和一个邮递员。
在RabbitMQ与邮局之间最大的区别就是,RabbitMQ不处理纸质消息。它接收,存储并转发二进制数据消息。
RabbitMQ传送消息 ,通常使用一些术语。
生产者(Producing
)意思是仅用来传送消息。一段发送消息的程序就被成为生产者。
一个队列(queue
)就是在RabbitMQ存在的邮箱的名称。然而消息通过RabbitMQ和你的应用的时候,他们可以被存储在一个队列(queue
)中。一个队列仅仅受限于主机的内存和磁盘,它本质上就是一块很大的消息缓冲区。许多生产者(producers
)可以发送消息到一个队列中,许多的消费者(consumers
)可以试图从一个队列中获取消息。下图就是如何我们用来表示一个队列。
消费者(consumer)和接收方意思差不多。一个消费者就是基本都在等待接收消息的程序。
注意:生产者,消费者,消息中间件没有必要都在同一个主机上;实际上大部分的应用也不会这么做。indeed in most applications they don't.
Hello World
这部分我们将用java写两段程序。一个生产者用于发送单一的消息,一个消费者用于接收消息并打印出来。我们先无视Java API中的一些细节,集中心思在这个入门的非常简单的事情。它就是一条“hello world”的消息传送。
下图中,P就是生产者(producer
),C就是消费者(consumer
).中间那个框就是队列(queue
)-RabbitMQ代表消费者的消息缓冲区。
maven pom
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.2</version>
</dependency>
发送消息(Producer
)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello world哈😋";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("send message:" + msg);
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(channel!=null)
channel.close();
if(connection!=null)
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
接收方( Consumer
)
import com.rabbitmq.client.*;
import java.io.IOException;
public class Receive {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
if(channel!=null)
channel.close();
if(connection!=null)
connection.close();
}
}
启动RabbitMQ
./rabbitmq-server -detached
Send:
send message:hello world哈😋
Receive:
[x] Received 'hello world哈😋'