1.安装
下载
erlang-19.0.4-1.el7.centos.x86_64.rpm
https://www.rabbitmq.com/releases/erlang/
http://erlang.org/download/
http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
https://www.rabbitmq.com/install-rpm.html#downloads
注意erlang版本对应rabbitmq版本
1.erlang安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install -y erlang
elk
2.socat安装
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
3.rabbitmq
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.10/rabbitmq-server-3.8.10-1.el8.noarch.rpm
rpm -ivh rabbitmq-server-3.8.10-1.el7.noarch.rpm
4.开启插件
rabbitmq-plugins enable rabbitmq_management
5.重启
systemctl restart rabbitmq-server.service
6.查看运行状态
systemctl status rabbitmq-server.service
7.控制台访问
http://192.168.0.104:15672/
guest/guest
8.报错error:User can only log in via localhost
添加用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*"
systemctl restart rabbitmq-server
使用admin/admin登陆控制台
安装ok
2.java代码demo
2.1 生产者
public class ConnUtils {
public static ConnectionFactory getConnection() {
ConnectionFactory con = new ConnectionFactory();
con.setPort(5672);
con.setHost("192.168.43.84");
con.setUsername("ems");
con.setPassword("ems");
con.setVirtualHost("/ems");
return con;
}
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory con = ConnUtils.getConnection();
// 创建连接对象
Connection connection = con.newConnection();
// 创建通道
Channel channel = connection.createChannel();
/**
* 1. 队列名称
* 2.是否持久化
* 3.是否独占队列
* 4.是否消费完后自动删除队列
* 5.额外参数
*/
channel.queueDeclare("hello", false, false, false, null);
// 1.交换机 2.队列名 3.传递消息额外设置 4.消息具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,
"hello word".getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
con.clone();
}
2.3.消费者
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory con = ConnUtils.getConnection();
// 创建连接对象
Connection connection = con.newConnection();
// 创建通道
Channel channel = connection.createChannel();
channel.basicPublish("","hello",null,"hello word".getBytes(StandardCharsets.UTF_8));
// 1.消费队列名称 2.开始消息自动确认机制 3.消费时回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消息内容:"+new String(body));
}
});
// channel.close();
// connection.close();
// con.clone();
}