简书:亚武de小文 【原创:转载请注明出处】
远程调用模式(RPC)
RabbitMQ有以下几种工作模式 :
- Work queues
- Publish/Subscribe
- Routing
- Topic
- Headers
- RPC
RPC
模型图
名词解释:
Client:RPC客户端
Server:RPC服务端
reply_to: 设置并指定回调队列
correlation_id: 唯一标识,每一个请求都会设置为一个具有唯一性的值,请求发送到rpc_queue队列
流程详解:
- 启动客户端(Client)后,创建一个匿名独占的异步回调队列
- 客户端消息设置属性:reply_to、correlation_id,发送消息到rpc_queue队列
- 服务端(Server)在rpc_queue队列上等待消息。待收到消息进行处理,然后将处理结果封装成消息发送到reply_to指定的队列上,并且此消息携带correlation_id属性
- 客户端在reply_to队列上等待消息,当收到消息后,它会检查收到消息的correlation_id。如果值和自己之前发送的一样,则将响应(当前值)返回给程序
参考代码
客户端
- RpcClient.java
package com.yawu.xiaowen.rpc; import com.rabbitmq.client.*; import com.yawu.xiaowen.header.Producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Rpc客户端 * * @author yawu * @date 2019.07.02 */ public class RpcClient { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String RPC_QUEUE_NAME = "mq_rpc"; public static void execute(String message) { try { // RabbitMQ建立连接的管理器 ConnectionFactory factory = new ConnectionFactory(); // 设置服务器地址 factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel() // 定义临时队列,并返回生成的队列名称 String replyQueueName = channel.queueDeclare().getQueue(); // 本次请求唯一标志 String correlation_id = UUID.randomUUID().toString(); // 生成发送消息的属性 AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(correlation_id) // 设置指定回调队列 .replyTo(replyQueueName) .build(); // 发送消息,发送到默认交换机 channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8")); // 阻塞队列,用于存放回调结果 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); // 定义消息的回退方法 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(correlation_id)) { response.offer(new String(body, "UTF-8")); } } }); // 获取回调的结果 String result = response.take(); System.out.println(" [Rpc客户端] 调用结果:'" + result + "'"); LOGGER.info("Rpc客户端消息发送:{}", message); channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
服务端
- RpcServer.java
package com.yawu.xiaowen.rpc; import com.rabbitmq.client.*; import com.yawu.xiaowen.header.Producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Rpc服务端 * * @author yawu * @date 2019.07.02 */ public class RpcServer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String RPC_QUEUE_NAME = "mq_rpc"; public static void execute() { Connection connection = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个rpc_queue队列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // 设置该信道同时最多只能获取一个消息 channel.basicQos(1); System.out.println(" [RpcServer]等待Rpc请求"); // 定义消息的回调处理类 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 生成返回的结果,关键是设置correlationId值 AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); // 生成返回 String response = generateResponse(body); // 回复消息,通知已经收到请求 channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); // 对消息进行应答 channel.basicAck(envelope.getDeliveryTag(), false); // 唤醒正在消费的所有的线程 synchronized (this) { this.notify(); } } }; // 消费消息 channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // 在收到消息前,本线程进入等待状态 while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (Exception e) { LOGGER.error("an exception was occurred , caused by :{}", e.getMessage()); } finally { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * 暂停10s,并返回结果 * * @param body * @return */ private static String generateResponse(byte[] body) { System.out.println(" [RpcServer]接收到的请求: " + new String(body)); try { Thread.sleep(1000 * 1); } catch (InterruptedException e) { e.printStackTrace(); } return "响应结果:" + new String(body) + "-" + System.currentTimeMillis(); } }
Rpc测试代码
- RpcTest.java
package com.yawu.xiaowen.rpc; import org.junit.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Rpc测试类 * * @author yawu * @date 2019.07.02 */ public class RpcTest { private ExecutorService executorService = Executors.newFixedThreadPool(10); @Test public void rpc() throws InterruptedException { // Rpc服务端 executorService.submit(() -> { RpcServer.execute(); }); // Rpc客户端 executorService.submit(() -> { RpcClient.execute("RPC远程调用-发送信息"); }); // sleep 10s Thread.sleep(10 * 1000); } }
-
运行测试类,结果如下