准备vagrant四台机器 一个nameserver 3个broker
vagrant配置目录结构
Vagrantfile配置如下,其中的ip配置成自己定义的就可以了
# -*- mode: ruby -*-
# vi: set ft=ruby :
Vagrant.require_version ">= 1.6.0"
boxes = [
{
:name => "mq-broken-1",
:eth1 => "192.168.210.11",
:mem => "2048",
:cpu => "2"
}
]
Vagrant.configure(2) do |config|
config.vm.box = "centos/7"
boxes.each do |opts|
config.vm.define opts[:name] do |config|
config.vm.hostname = opts[:name]
config.vm.provider "vmware_fusion" do |v|
v.vmx["memsize"] = opts[:mem]
v.vmx["numvcpus"] = opts[:cpu]
end
config.vm.provider "virtualbox" do |v|
v.customize ["modifyvm", :id, "--memory", opts[:mem]]
v.customize ["modifyvm", :id, "--cpus", opts[:cpu]]
end
config.vm.network :private_network, ip: opts[:eth1]
end
end
config.vm.synced_folder "./labs", "/home/vagrant/labs"
config.vm.provision "shell", privileged: true, path: "./setup.sh"
end
setup.sh 配置
#/bin/sh
# install some tools
sudo yum install -y git vim gcc glibc-static telnet net-tools
启动
vagrant up
进入虚拟机
vagrant ssh
如果要进入到root用户 密码默认的也是 vagrant
su root
拉取dledger代码并编译(我的代码是在本机拉取编译的 所以要安装jdk maven git)
git clone https://github.com/openmessaging/openmessaging-storage-dledger.git
cd openmessaging-storage-dledger
mvn clean install -DskipTests
拉取rocketmq代码并编译
git clone https://gitee.com/mirrors/rocketmq.git
cd rocketmq
git checkout -b store_with_dledger origin/store_with_dledger
mvn -Prelease-all -DskipTests clean install -U
接着进入到一个目录中
cd distribution/target/apache-rocketmq
在这个目录中,需要编辑三个文件,一个是bin/runserver.sh,一个是bin/runbroken.sh,另外一个是bin/tools.sh
在这里面找到如下三行,然后注释掉第二行和第三行,同事将第一行的值修改为你的JDK的主目录
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/local/jdk
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
启动NameServer
cd distribution/target/apache-rocketmq
nohup sh ./bin/mqnamesrv -n 192.168.210.9:9876 &
启动Broker
vim conf/dledger/broker-n1.conf
brokerIP1=192.168.210.12
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=192.168.210.9:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.210.11:40911;n1-192.168.210.12:40912;n2-192.168.210.13:40913
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
其中brokerIP1是本机IP
dLegerPeers是三个broker的ip和端口
namesrvAddr是nameServewr的ip
三个为一组 这三个的brokerName要一样,如果有另外一组,可以给另一组换一个名字,比如RaftNode01
nohup sh bin/mqbroker -c conf/dledger/broker-n1.conf autoCreateTopicEnable=true &
然后代码测试一下
生产者代码如下
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProdecer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("nanawoaini");
producer.setNamesrvAddr("192.168.210.9:9876");
producer.start();
for(int i = 0; i < 1 ; i++){
Message msg = new Message("TopicTest","TagA",("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n",sendResult);
}
producer.shutdown();
}
}
消费者代码如下
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("nanawoaini123");
consumer.setNamesrvAddr("192.168.210.9:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext ctx) {
System.out.println(Thread.currentThread().getName() + "recevie new Message: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}