原生 ActiveMQ

ActiveMq是一个apche开源的,基于生产者(producer)-消费者(consumer)模型的消息中间件,通常用于系统间的消息传递。生产者产生消息,将消息发送至消息服务器;消费者通过监听消息服务器中指定的消息进行消费(获取并使用)。

它支持一对一(point-to-point)队列式的消息和一对多(publish/subscribe)广播式的消息。参见下图:

producer_consumer.PNG
  • setp 1:下载ActiveMq,在远程或者本地,启动ActiveMq服务(需要jre环境)。
    通过访问ip + 8161端口可以访问消息服务器的后台管理页面
  • setp 2:引入jar包依赖
    <dependencies>
        <!--ActiveMq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
  • setp 3:创建生产者
@SuppressWarnings({"Duplicates", "UnusedAssignment"})
public class Producer {

    @Test
    public void produceMessage() {

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.25.128:61616");
            // 获得连接
            connection = factory.createConnection();
            // 开启连接
            connection.start();
            // 创建会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建队列(点对点),指定队列名称;当消费者监听该队列,消息才能被消费
            Queue raw_mq_queue = session.createQueue("RAW_MQ_QUEUE");
            // 创建提供者
            producer = session.createProducer(raw_mq_queue);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置消息持久化
            // 创建消息,指定消息内容
            // TextMessage textMessage = session.createTextMessage("the textMessage content");
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("message test");
            // 发送消息
            producer.send(textMessage);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 释放资源(producer,session,connection),略
            
        }

    }

}
  • step 4 :创建消费者
@SuppressWarnings({"Duplicates", "UnusedAssignment"})
public class Consumer {

    @Test
    public void consumeMessage() {

        Connection connection = null;
        Session session = null;
        MessageConsumer messageConsumer = null;

        try {
            // 创建连接工厂
            String username = "admin";
            String password = "admin";
            String url = "tcp://192.168.25.128:61616";
            ConnectionFactory factory = new ActiveMQConnectionFactory(url);
            // 创建连接
            connection = factory.createConnection(username, password);
            // 开启连接
            connection.start();
            // 获得会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地,Destination是Queue和Topic的父接口
            Destination destination = session.createQueue("RAW_MQ_QUEUE");// 指定监听的队列
            // 创建消费者
            messageConsumer = session.createConsumer(destination);
            // 设置消息监听
            /**不使用匿名内部类也可以单独创建一个类,实现MessageListener接口,重写onMessage方法*/
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String text = textMessage.getText();
                        System.out.println("consumer has recieved the message:" + text);

                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            System.in.read();// 只是为了不让消费线程死亡,可以持续监听消息。
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 释放资源
          
        }

    }
}

至此,简单的案例就完成了。

代码地址 : https://github.com/Getthrough/ActiveMq_Raw_DemoCode

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,115评论 19 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,513评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,910评论 8 167
  • 1.UIView概述 UIView是IOS中负责显示视图的,并且负责处理视图的点击交互绘画等功能,起码从表面...
    WRFranky阅读 313评论 1 1
  • 一定又是若溪那个老妖婆在念叨她。天青雪猛地打了一个喷嚏。 真倒霉,明明是华衍师兄自己的问题,掌门糟老头居然怪到了她...
    霜月落花阅读 362评论 0 0