rocketmq消息中间件搭建安装

RocketMQ介绍

RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,具有以下特点:

能够保证严格的消息顺序

提供丰富的消息拉取模式

高效的订阅者水平扩展能力

实时的消息订阅机制

亿级消息堆积能力

选用理由:

强调集群无单点,可扩展,任意一点高可用,水平可扩展 

海量消息堆积能力,消息堆积后,写入低延迟

支持上万个队列

消息失败重试机制

消息可查询

开源社区活跃

成熟度(经过双十一考验)


RocketMQ的各部分角色介绍

角色名称功能


Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息


Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费

Broker:消息中转角色,负责存储消息,转发消息,一般也称为Server

NameServer:管理中心,一般存储Broker的信息

        RocketMQ这四个角色就相当于我们现实生活中的邮政系统,其中Producer、Consumer、 Broker、NameServer分别代表发信者、收信者、负责暂存和传输的邮局、以及协调各个地方邮局的管理机构。

        启动RocketMQ之前先要启动NameServer,再启动Broker,这时候消息队列已经在开始工作了。如果想要发送消息,就用Producer;接受消息就用Consumer。如果程序中既要接收也要发送,可以启动多个Producer和Consumer。如果想要增加可靠性或者增大吞吐量,防止单点故障也可以在多台机器上部署多个NameServer和Broker,并且每个Broker也可以部署一个或者过个Slave。

        大致了解了基本角色功能后,再介绍两个重要的名词概念Topic(主题)和Message Queue(消息队列)。当一个企业搭建好消息平台后会有多条业务线接入进来,同一个业务也会有不同类型的消息需要投递,如何保证这消息准确地进行,就需要给不同类型的消息加上Topic名称来进行区分。所以在发送消息和接受消息时,需要先创建Topic。有了Topic后,仍然还有性能问题需要考虑。当一个Topic下的消息投递量或者发送量过大怎么办,这就需要在一个Topic下设置一个或者多个Message Queue来提高并行处理速度。有了Message Queue后,消息就可以并行地向各个Message Queue进行分发,从而消费者也可以从多个队列中读取消息,满足性能要求。

RocketMQ单点安装

参照官网:Downloading the Apache RocketMQ Releases - Apache RocketMQ

RocketMQ多级集群部署以及安装

本次先讲如何利用两台物理机,搭建出双主双从无单点故障的高可用RoketMQ集群。假设这两台物理机的ip分别为192.168.218.51和192.168.218.52,端口号默认为9876。

1.启动多个NameServer和Broker

首先按照单点部署,在两台服务器上分别安装RocketMQ,服务地址分别为192.168.218.51:9876和192.168.218.52:9876,然后启动NameServer(nohup sh bin/mqnamesrv &)

启动Broker,每台机器都需启动一个Master角色和一个Slave角色,作为主备。修改的配置文件在安装目录下的conf/2m-2s-sync下。

192.168.218.51机器上的Master Broker配置文件(conf/2m-2s-sync/broker-a.properties)

brokerClusterName=ifind-rocketmq-cluster     所属集群名字,集群比较多可以分成多个Cluster,每个供一个业务使用

brokerName=broker-a    broker名字,注意此处不同的配置文件填写的不一样,2选1

brokerId=1    0表示 Master,>0表示 Slave

namesrvAddr=192.168.216.57:9876;192.168.216.61:9876;192.168.216.58:9876;192.168.216.62:9876    nameServer地址,分号分割

defaultTopicQueueNums=4    在发送消息时,自动创建服务器不存在的topic,默认创建的队列数

autoCreateTopicEnable=true    是否允许 Broker自动创建Topic,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true    是否允许 Broker自动创建订阅组,建议线下开启,线上关闭

listenPort=10911    Broker对外服务的监听端口

deleteWhen=04    删除文件时间点,默认凌晨 4点

fileReservedTime=120    文件保留时间,默认 48小时

mapedFileSizeCommitLog=1073741824    commitLog每个文件的大小默认1G

brokerRole=ASYNC_MASTER    Broker的角色

flushDiskType=ASYNC_FLUSH    刷盘方式    

修改192.168.218.51机器上的Master Broker配置文件(conf/2m-2s-sync/broker-a-s.properties)

修改192.168.218.52机器上的Master Broker配置文件(conf/2m-2s-sync/broker-b.properties)

修改192.168.218.52机器上的Master Broker配置文件(conf/2m-2s-sync/broker-b-s.properties)

几个配置参数的含义

参数名含义

brokerId有三种:SYNCMASTER ASYNCMASTER SLAVE,SYNC表示当Slave和Master消息同步完成 后,再返回发送成功的状态

flushDiskType表示刷盘策略,分为SYNCFLUSH和ASYNCFLUSH两种,代表同步刷盘和异步刷盘。同 步状态下,消息真正写入磁盘才返回成功状态;异步刷盘情况下,消息写入缓存后才返回成功状态

2.发送和接受消息的demo

procucer

public class SyncProducer {

    public static void main(String[] args) throws Exception {

        //实例化一个生产者

        DefaultMQProducer producer = new DefaultMQProducer("please_input_group_name"); //生产组名称

        producer.setNamesrvAddr("192.168.218.51"); //确定服务地址,集群时通过读取配置文件变量赋值

        producer.start();  //生产者开始工作

        //发送消息

        for () {

            //三个参数,第一个topic,第二个tag标识,第三个是消息内容

            Message msg = new Message("test-topic","tag-a","msg");

            SendResult sendResult = producer.send(msg); //生产者发送消息

        }

        producer.shutdown();

    }

}

consumer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_input_group_name"); //消费组名称

consumer.subscribe("topic","*");    //消费者订阅

consumer.registerMessageListener(new MessageListenerConcurrentliy) {

    public ConsumeConcurrentStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        //处理的逻辑代码

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

};

consumer.start();


消息队列的协调者

NameServer的功能

NameServer是消息队列中的状态服务器,集群中的各个组件通过它了解全局的信息。同时,各个机器都要定期向NameServer上报自己的状态,如果超时不上报的话,其它组件会把这个机器从列表中移除。NameServer可以部署多个,本身是无状态的,也就是Broker、Topic等状态信息不会持久存储,是由各个角色上报存储到内存的。

集群状态的存储结构

private final HashMap> topicQueue topicQueueTable //这个map的key是Topic的名称,存储了所有topic的信息。Value存储着Broker的名称、读写Queue的数量以及同步标识等 

private final HashMap //这个结构key是BrokerName,value存储着地址信息以及所属Cluster的名称 

private final HashMap //这个结构的key是Broker的ip地址,value为Broker机器的实时状态,包括上次更新状态的时间戳 

private final HashMap> //key为Cluster的名称,set中存储的是Broker的名称,就是集群的BrokerName的集合

 private final HashMap> filterServerTable //key是Broker的地址,value是和这个Broker关联的多个过滤器的地址

以上五个变量的定义,可以清楚的看出各个组件的状态是如何进行存储的,而NameServer的作用便是维护这五个变量中存储的信息。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容