用 java 盘 RabbitMq !

一段时间没有更新了,今天来划划水吧!今天准备盘盘 rabbitmq ,首先我们得知道它是个啥!

就不百度百科了,说我理解的吧,rabbitmq是一套开源的消息队列服务,它的同类型产品有Kafka (apache的), ActiveMQ, RocketMQ (阿里的)等。当然这些产品都有自己的特点,没有谁好谁坏,如何选型视场景而定。这里盘rabbitmq,因为中小型企业用的多。

那什么是消息队列呢?我们可以把消息队列比作是一个存放消息的容器,这个容器以队列的形式呈现。队列嘛,跟食堂排队打饭一样,先排队的先打饭。而“消息”指的是在两台计算机间传送的数据单位。比方说你给别人在qq上发送一条数据,这个数据就是一个消息。

你不禁要问,这玩意有啥用?官方回答是给分布式系统解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。可怕,还是完全不知道是干啥的吧。大佬们都喜欢拿这么一张图扯。

同步处理

上面这个图呢就是同步处理的意思(不要被他左上角的字蒙骗。)意思就是你在一个网站上注册一个账号的过程,这个过程包含三件事,注册信息写入数据库、给你邮箱发送注册成功的信息, 给你手机发送短信。发短信必须在发完邮件之后执行。如果这三件事做完后才给你浏览器发送注册成功的页面时,这个过程你会等150ms。你可能觉得这个时间已经很快了,丝毫不影响啥,可勤奋的程序员们老想着把这个时间再缩短,他们认为这还是太慢了,因为可能还有其他情况耗费一些时间,比如网络再故意延迟一会了,那可就让人难受了。下面这个图呢就是能再优化这个时间的异步处理啦。

异步处理

这个图呢把发邮件和短信做成并行的,同时开始执行,就是说发短信不用等发完邮件才执行。

接下来说解耦, 下面这个图就是两个系统耦合了,库存系统直接调用订单系统的接口。这样订单系统接口一变,也要去改库存系统的代码。要是这两个系统分别是两拨人去开发,一方随便改改接口,说不定两拨人能打起来。


耦合

下面就是解耦场景,库存系统直接跟消息队列打交道,要是订单系统随便改接口代码,消息队列会打死他的,放心!就不需要库存系统这帮人出手了,你说要是库存系统和消息队列吵起来了咋整,放心,消息队列是别人牛逼的人写的,它俩吵起来,八成都会是库存系统做错了!


解耦

流量削锋一般在秒杀,团抢活动这些场景出现。双十一都不陌生,突然在这天访问量剧增。使用消息队列来抗住。


image

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

异步消息,解耦,流量削锋这些消息队列的作用说完了(当然还有作用,自行百度吧,大佬们),该盘代码了。

使用之前呢需要安装rabbitmq 这里不细说,因为比较简单。给个windows上安装的博客 : windows上rabbitmq安装参考
),唯一注意的是使用rabbitmq需要erlang语言环境,就像java应用需要jdk环境一样。玩linux的敲几句命令就完事了,就不用给教程了吧(手动滑稽)。

接下来会慢慢更新 rabbitmq 五种模式的测试代码。源码存放于junan的码云仓库

第一种 简单队列


简单队列

简单队列存在一个生产者,一个消费者,一个队列。
首先需要建立普通maven项目,导入下依赖,只需要这一个依赖即可。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>

然后需要个connection工具类,它作用嘛用于和rabbitmq的建立连接。

package com.junan.rabbitmqTest.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <p>
 *         rabbitmq 工具类产生 rabbitmq 连接
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-29
 */
public class ConnectionsUtil {
    //设置 rabbitmq 服务ip地址
    private static String host = "localhost";
    //设置 rabbitmq 服务端口
    private static Integer port = 5672;
    //设置 rabbitmq 服务登录用户名
    private static String username = "admin";
    //设置 rabbitmq 服务虚拟主机名
    private static String virtualHost = "/test";
    //设置 rabbitmq 服务登录密码
    private static String password = "chenjunan";
    //通过这个方法获取连接
    public static Connection getConnection(){

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        try {
            return factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

}

使用这个工具类是对应修改static属性即可。然后建立生产者。

package com.junan.rabbitmqTest.simple;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <p>
 *      简单消息队列生产者
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-29
 */
public class Producer {
    //给队列取的名字
    private static final  String QUEUE_NAME = "rabbitmq_simple";

    public static void main(String[] args) {
        //从工具类获取连接
        Connection connection = ConnectionsUtil.getConnection();
        try {
            //从连接获取通道
            Channel channel = connection.createChannel();
            //声明队列(具体参数请查api)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //从通道发送 字节消息
            channel.basicPublish("", QUEUE_NAME, null, "hello rabbitmq".getBytes());
            //关闭通道和连接
            channel.close();
            connection.close();
            System.out.println("<==  已发送一条消息!  ==>");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}

接下来是消费者。

package com.junan.rabbitmqTest.simple;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p>
 *              简单消息队列消费者
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-29
 */
public class Consumer {

    //给队列取的名字
    private static final String QUEUE_NAME = "rabbitmq_simple";

    public static void main(String[] args) {
        //从工具类获取连接
        Connection connection = ConnectionsUtil.getConnection();
        try {
            //从连接获取通道
            Channel channel = connection.createChannel();
            //声明队列(具体参数请查api)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //创建消费者(这里只是创建)
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //打印消息内容
                    System.out.println(new String(body));
                }
            };
            //这里开始消费 ,需要把创建的消费者传入
            channel.basicConsume(QUEUE_NAME, true, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

先运行消费者Consumer的main方法。再运行生产者Producer中的main方法发送一条消息,看Consumer消费者能不能收到,反正我是收到了(嘻嘻)。

producer
consumer

以上代码基本使用了rabbitmq的简单模式用生产者给消费者发送了一条消息。

第二种 work 模式


work

这种模式和简单模式的区别就是可以有多个消费者进行消费,我这里使用两个消费者演示。具体参考代码
这次的生产者一共生产50条消息共两个消费者消费。

package com.junan.rabbitmqTest.work;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p>
 *          工作消息队列生产者
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-30
 */
@SuppressWarnings("all")
public class Producer {
    //给队列取的名字
    private static final  String QUEUE_NAME = "rabbitmq_work";

    public static void main(String[] args) {

        Connection connection = ConnectionsUtil.getConnection();
        Channel channel = null;
        try {
            channel = connection.createChannel();
            channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);
            //发送50个消息
            for (int i = 0; i < 50; i++) {
                String msg = " hello   " + i;
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                Thread.sleep(100);
            }
            System.out.println("<==========   生产者已发送 50 条消息!   ==========>");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(channel != null)
                    channel.close();
                if(connection != null)
                    connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

以下是两个消费者。

package com.junan.rabbitmqTest.work;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p>
 *          工作队列消费者一号
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-30
 */
@SuppressWarnings("all")
public class Consumer1 {
    //给队列取的名字
    private static final String QUEUE_NAME = "rabbitmq_work";

    public static void main(String[] args) {

        Connection connection = ConnectionsUtil.getConnection();
        Channel channel = null;
        try {
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                //重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    System.out.println("<== consumer1 ==>  " + new String(body));
                    try {
                         //这里每次循环休息200ms, 让两个消费者休息时间不同, 看他的运行结果
                        Thread.sleep(200);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
            System.out.println("<==========   消费者一号启动!   ==========>");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.junan.rabbitmqTest.work;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p>
 *          工作队列消费者二号
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-30
 */
@SuppressWarnings("all")
public class Consumer2 {
    //给队列取的名字
    private static final String QUEUE_NAME = "rabbitmq_work";

    public static void main(String[] args) {

        Connection connection = ConnectionsUtil.getConnection();
        Channel channel = null;
        try {
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    System.out.println("<== consumer2 ==>  " + new String(body, "utf-8"));
                    try {
                        //这里每次循环休息400ms, 让两个消费者休息时间不同, 看他的运行结果
                        Thread.sleep(400);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
            System.out.println("<==========   消费者二号启动!   ==========>");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

先启动两个消费者等待消息,然后启动生产者给消息队列发送消息。

结果如下:

消费者1号
消费者2号

可以看出我们虽然设置了这两个消费者每次消费休息不同时长。可是从结果看,这两个消费者采用轮询的方式消费这些消息。也就是消息队列给这两个消费者消息很公平,一人一个的给。不管你忙碌还是空闲。这种方式相比简单队列能减轻一部分压力。

第三种 公平分发
这种方式能需要消费者手动确认收到消息,rabbitmq才会给他分发下一条消息。注意对比work模式的代码

生产者 需要对channel设置每次值发送一条消息给消费者

package com.junan.rabbitmqTest.workFair;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p>
 *           公平分发队列生产者
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-30
 */
@SuppressWarnings("all")
public class Producer {
    //给队列取的名字
    private static final  String QUEUE_NAME = "rabbitmq_work";

    public static void main(String[] args) {

        Connection connection = ConnectionsUtil.getConnection();
        Channel channel = null;
        try {
            channel = connection.createChannel();
            //限制每次发送一条消息给消费者
            channel.basicQos(1);
            channel.queueDeclareNoWait(QUEUE_NAME, false, false, false, null);
            //发送50个消息
            for (int i = 0; i < 50; i++) {
                String msg = " hello   " + i;
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                Thread.sleep(100);
            }
            System.out.println("<==========   生产者已发送 50 条消息!   ==========>");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(channel != null)
                    channel.close();
                if(connection != null)
                    connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

消费者1号 (二号类似就不粘代码了,) 注意设置限制每次发送一条消息给消费者和设置autoAck为false(basicConsume这个方法的第二个参数)

package com.junan.rabbitmqTest.workFair;

import com.junan.rabbitmqTest.utils.ConnectionsUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p>
 *          公平分发消费者一号
 * </p>
 *
 * @author junan
 * @version 1.0.0
 * @since 19-5-30
 */
@SuppressWarnings("all")
public class Consumer1 {
    //给队列取的名字
    private static final String QUEUE_NAME = "rabbitmq_work";

    public static void main(String[] args) {

        Connection connection = ConnectionsUtil.getConnection();
        try {
            final Channel channel = connection.createChannel();
            //限制每次发送一条消息给消费者
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                //重写这个方法:事件模型,当有消息传来时执行这个方法,相当于listener
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    System.out.println("<== consumer1 ==>  " + new String(body));
                    try {
                        //这里每次循环休息200ms, 让两个消费者休息时间不同, 看他的运行结果
                        Thread.sleep(200);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    //设置返回的确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //设置不自动确认消息,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
            channel.basicConsume(QUEUE_NAME, false, consumer);
            System.out.println("<==========   消费者一号启动!   ==========>");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接下来看结果

消费者1号
消费者2号

可以看到这个效果就比较满意了,谁的空闲时间多就能多拿到一下消息。能充分利用消费者的能力。
ps: autoAck :这是一个boolean参数,等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
---------------------------------------------------------------------- 有时间继续更新。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,908评论 2 11
  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 2,373评论 0 24
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,826评论 0 3
  • 1.简述RabbitMQ中的几种Exchange的作用。 答:有四种exchange 1)direct excha...
    久伴_不离阅读 2,770评论 0 8
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,650评论 51 786