介绍
RabbitMQ是一种消息中间件。它的核心思想很简单,接收并传递消息。你可以把RabbitMQ想象成邮局:当你把信扔进信箱后,你十分确信邮递员会准确的把信交给收件人。在这个比喻里,RabbitMQ就是邮箱、邮局和邮递员的集合。
RabbitMQ和邮局主要的区别在于,RabbitMQ处理的不是纸质邮件,而是二进制的数据(Messages)
接下来用较为专业的术语解释RabbitMQ以及消息传递。
Producing指的是只做发送操作,其余什么也不干。自然而然,Producer就是只发消息的程序。我们用P指代Producer。
Queue等同于邮箱的意思,它存在于RabbitMQ当中。当消息穿过RabbitMQ到达你的应用程序期间,它们全部都保存在Queue当中。Queue的大小没有限制,你想存多少就存多少-它基本等同于一个容量无限的缓存。大量Producer往里发送消息,大量Consumer从同一个队列里取消息。我们用个图来展示下。
Consuming与接收的含义非常接近。Consumer这类程序主要功能就是接收消息。我们画个C。
注意Producer,Consumer以及Broker可能不在同一台主机中;实际大多数情况下,它们都分布在不同的主机中。
"Hello World"
Using the Java Client
在这部分内容中,我们会写两个JAVA程序;
第一个如下:
Producer发送一条消息,Consumer接收消息并打印。让我们暂时忽视JAVA API的实现细节,集中注意力从简单的调用开始,发一条“Hello World”的消息。
在下方的图中,“P”代表Producer,“C”代表Consumer,中间的红色方块代表Queue(Rabbit为Consumer持有的消息缓存)
Java版RabbitMQ客户端
RabbitMQ使用多种协议。在本部分示例中,采用的是AMQP 0-9-1协议,它是一种开放的、多功能的消息传递协议。同时,RabbitMQ客户端的实现语言也种类繁多,在这里我们选用JAVA版本。
下载安装包,检查签名,解压到你指定的路径 巴拉巴拉~~~
现在我们开始写代码。
Sending
我们称消息发送者为Send,接收者为Recv。Send会连接RabbitMQ,发送一条简短的消息,然后退出。
我们需要导入以下class文件
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
建立类文件并且命名队列。
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然后我们创建一个到服务器的连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Connection掩盖了套接字实现的细节,让我们能专注于选择协议版本和认证以及其他重要的事情上。我们连接本机的broker,下文中我们简称为localhost。我们只需要修改IP地址就能简单的连上其他主机上的broker。
接下来我们创建一个channel,它包含大量我们常用的API。
为了发送消息,我们需要声明一个队列;然后我们向队列中发送消息。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
声明队列是幂等的,只有当队列不存在时,它才会被创建(吐槽,不就是单例嘛。。。)。消息内容是字节数组,你可以随心所以编码它。
最后,别忘记关掉连接。
channel.close();
connection.close();
以上就是整个Send.java的内容。
发送无效怎么办?
如果你第一次使用RabbitMQ并且没有看见发送的消息,你肯定会对这种不知所措的感觉印象深刻。也许是broker没有足够的磁盘空间(默认需要1G)导致拒绝接收消息。通过检查broker的日志文件来判断是否需要降低磁盘需求。 configuration file documentation会教你怎样设置disk_free_limit。
Receving
接下来是reciver,它被RabbitMQ塞入消息。同时,reciver实现起来也比sender复杂,我们需要它监听消息,直到接收并打印出来。
像写sender一样,这里我们也需要引入很多依赖的代码。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
引入的代码中,DefaultConsumer是一个实现了Consumer接口的类,我们会用它来保存RabbitMQ推送的消息。
类似Sender一样构建代码;我们打开Connection和Channel,声明将要消费的队列。注意队列名称需要与发送的队列相同。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意,我们在这里也许要声明队列,是因为我们也许在启动sender前就启动了reciver。我们需要确保在消费消息前,队列已经存在。
接下来我们要告诉RabbitMQ,让它把队列中的消息发给我们。由于它通过异步的方式推送消息,我们在形式上先用一个变量保存消息直到我们实际使用它。这也是DefaultConsumer子类所做的工作。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是整个Recv.java的代码。
Putting it all together
你可以将这两段代码同rabbitMQ客户端代码一起编译。
$ javac -cp rabbitmq-client.jar Send.java Recv.java
为了运行他们,你需要rabbitmq-client.jar并且它依赖于classpath。在终端上,运行sender:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
然后,运行receiver;
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
在Windows环境下,在classpath中用分号代替冒号分离它们
当receiver从RabbitMQ中获取消息后会将其打印出来。recevier会一直运行并等待新的消息,因此我们在另一个终端中启动sender。
如果你想检查队列,尝试使用 rabbitmqctl list_queues
hello
是时候看看Part2,构建一个简单的工作队列。
小秘密:
为了减少打字,你可以为classpath设置环境变量,例如
$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
$ java -cp $CP Send
windows环境下
> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
> java -cp %CP% Send