ActiveMQ实践

ActiveMQ安装

下载地址: apache-activemq-5.15.12-bin.tar.gz
解压和启动

tar -zxvf apache-activemq-5.15.12-bin.tar.gz
cd apache-activemq-5.15.12/bin
./activemq start

# 查看启动端口
netstat -anp|grep 61616

#关闭
./activemq stop
image.png

服务器防火墙配置

1、ECS服务器需要先开启8161(web管理页面端口)、61616(activemq服务监控端口) 两个端口


image.png

2、打开linux防火墙端口(我这里是Centos7)

firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent

# 重新载入
firewall-cmd --reload
# 查看
# firewall-cmd --zone=public --query-port=8161/tcp
# 删除
# firewall-cmd --zone=public --remove-port=61616/tcp --permanent

测试启动

访问地址:http://ip:8161/admin
认证:admin/admin

image.png

消息发送至MQ测试

1.引入依赖

  <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.12</version>
  </dependency>

2.编写逻辑

package com.liuyun;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.security.util.Password;

import javax.jms.*;

/**
 * Class
 * Created by wwx193433
 * 2020-04-16 18:39
 */
public class Producer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKEURL = "failover://tcp://ip:61616";
//    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //1.连接MQ
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;//消息目的地
        MessageProducer messageProducer = null;//消息生产者
        try{
            System.out.println(BROKEURL);
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("消息发送");
            messageProducer = session.createProducer(destination);

            //2.发送消息
            for(int i = 0;i<10;i++){
                String txt = "Hello World, this is "+i;
                TextMessage textMessage = session.createTextMessage(txt);
                messageProducer.send(textMessage);
            }
            session.commit();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //3.断开
            try {
                messageProducer.close();
                connection.close();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 发送验证


    image.png

    image.png

消息消费

 package com.liuyun;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Class
 * Created by wwx193433
 * 2020-04-16 18:39
 */
public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKEURL = "failover://tcp://ip:61616";
//    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //1.连接MQ
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;//消息目的地
        MessageConsumer messageConsumer = null;//消息消费者
        try{
            System.out.println(BROKEURL);
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("消息发送");
            messageConsumer = session.createConsumer(destination);

            //2.发送消息
            for(int i = 0;i<5;i++){
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
                System.out.println(textMessage.getText());
            }
            session.commit();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //3.断开
            try {
                messageConsumer.close();
                connection.close();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
image.png

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 1. 固定IP 界面配置 2. 关闭SELINUX 修改如下: :wq! # 保存退出 3. 配置hosts 4....
    企业信息化架构阅读 685评论 0 0
  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,569评论 0 11
  • 一、什么是MQ? MQ,中文名字叫做消息中间件。既然是中间件,那么就说明它左边有东西,右边也有东西。那么左边是什么...
    贪挽懒月阅读 844评论 3 5
  • 1.简述:回想老王打电话讲故事案例. 2.优势:解耦,异步,横向扩展,顺序保障,安全可靠... 3.JMS(jav...
    老汉健身阅读 1,029评论 0 0
  • 工作地方显压力 回家地方无压力 迷茫目标有很多 想要实现无动力 时刻觉得好愚蠢 做事动手不动脑 每次都被领导骂 净...
    酸一样的柠檬阅读 250评论 0 0

友情链接更多精彩内容