Rabbitmq打怪升级之路(十三)Rpc-远程调用模式

简书:亚武de小文 【原创:转载请注明出处】

远程调用模式(RPC)

LengToo上学.png
RabbitMQ有以下几种工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • Headers
  • RPC

RPC

模型图
[亚武de小文]Rpc模型图.png
  • 名词解释:
    Client:RPC客户端
    Server:RPC服务端
    reply_to: 设置并指定回调队列
    correlation_id: 唯一标识,每一个请求都会设置为一个具有唯一性的值,请求发送到rpc_queue队列

  • 流程详解:

  1. 启动客户端(Client)后,创建一个匿名独占的异步回调队列
  2. 客户端消息设置属性:reply_to、correlation_id,发送消息到rpc_queue队列
  3. 服务端(Server)在rpc_queue队列上等待消息。待收到消息进行处理,然后将处理结果封装成消息发送到reply_to指定的队列上,并且此消息携带correlation_id属性
  4. 客户端在reply_to队列上等待消息,当收到消息后,它会检查收到消息的correlation_id。如果值和自己之前发送的一样,则将响应(当前值)返回给程序
参考代码
客户端
  • RpcClient.java
    package com.yawu.xiaowen.rpc;
    
    import com.rabbitmq.client.*;
    import com.yawu.xiaowen.header.Producer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    /**
     * Rpc客户端
     *
     * @author yawu
     * @date 2019.07.02
     */
    public class RpcClient {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String RPC_QUEUE_NAME = "mq_rpc";
    
        public static void execute(String message) {
            try {
                // RabbitMQ建立连接的管理器
                ConnectionFactory factory = new ConnectionFactory();
                // 设置服务器地址
                factory.setHost("127.0.0.1");
                factory.setUsername("guest");
                factory.setPassword("guest");
    
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel()
    
                // 定义临时队列,并返回生成的队列名称
                String replyQueueName = channel.queueDeclare().getQueue();
    
                // 本次请求唯一标志
                String correlation_id = UUID.randomUUID().toString();
                // 生成发送消息的属性
                AMQP.BasicProperties props = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(correlation_id)
                        // 设置指定回调队列
                        .replyTo(replyQueueName)
                        .build();
                // 发送消息,发送到默认交换机
                channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
    
                // 阻塞队列,用于存放回调结果
                final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
                // 定义消息的回退方法
                channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        if (properties.getCorrelationId().equals(correlation_id)) {
                            response.offer(new String(body, "UTF-8"));
                        }
                    }
                });
                // 获取回调的结果
                String result = response.take();
                System.out.println(" [Rpc客户端] 调用结果:'" + result + "'");
    
                LOGGER.info("Rpc客户端消息发送:{}", message);
                channel.close();
                connection.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
服务端
  • RpcServer.java
    package com.yawu.xiaowen.rpc;
    
    import com.rabbitmq.client.*;
    import com.yawu.xiaowen.header.Producer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * Rpc服务端
     *
     * @author yawu
     * @date 2019.07.02
     */
    public class RpcServer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String RPC_QUEUE_NAME = "mq_rpc";
    
        public static void execute() {
            Connection connection = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                // 声明一个rpc_queue队列
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                // 设置该信道同时最多只能获取一个消息
                channel.basicQos(1);
    
                System.out.println(" [RpcServer]等待Rpc请求");
    
                // 定义消息的回调处理类
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 生成返回的结果,关键是设置correlationId值
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
                        // 生成返回
                        String response = generateResponse(body);
                        // 回复消息,通知已经收到请求
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        // 对消息进行应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // 唤醒正在消费的所有的线程
                        synchronized (this) {
                            this.notify();
                        }
                    }
                };
                // 消费消息
                channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
                // 在收到消息前,本线程进入等待状态
                while (true) {
                    synchronized (consumer) {
                        try {
                            consumer.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        /**
         * 暂停10s,并返回结果
         *
         * @param body
         * @return
         */
        private static String generateResponse(byte[] body) {
            System.out.println(" [RpcServer]接收到的请求: " + new String(body));
            try {
                Thread.sleep(1000 * 1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "响应结果:" + new String(body) + "-" + System.currentTimeMillis();
        }
    }
    
Rpc测试代码
  • RpcTest.java
    package com.yawu.xiaowen.rpc;
    
    import org.junit.Test;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Rpc测试类
     *
     * @author yawu
     * @date 2019.07.02
     */
    public class RpcTest {
    
        private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        @Test
        public void rpc() throws InterruptedException {
    
            // Rpc服务端
            executorService.submit(() -> {
                RpcServer.execute();
            });
    
            // Rpc客户端
            executorService.submit(() -> {
                RpcClient.execute("RPC远程调用-发送信息");
            });
    
            // sleep 10s
            Thread.sleep(10 * 1000);
        }
    }
    
  • 运行测试类,结果如下


    Rpc运行结果.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容