activemq Artemis笔记

参考:https://activemq.apache.org/components/artemis/documentation/

activemq 是什么

ActiveMQ是开源的,支持多种协议(CORE,AMQP,MQTT,JMS...),
基于java的消息系统(或消息中间件)。
它支持业界标准协议,这样有利于客户端的选择(从c,c++,python,.net等)

当期有2个版本的ActiveMQ

  • ActiveMQ 5 "Classic"
  • ActiveMQ Artemis

因为Artemis是下一代的ActiveMQ,所以后面介绍基于Artemis。

ActiveMQ Artemis

下一代高性能,非阻塞架构,基于事件驱动的消息系统

特性:

  • 提供实现JMS 1.1 & 2.0 的客户端,包含JNDI
  • 通过共享存储和网络复制提供高可用
  • 简单&强大的协议无感知的寻址模型(addressing model)
  • 灵活的集群用于分布式负载
  • 基于日志的低延迟持久化
  • 方便从ActiveMQ 5迁移

使用

下载Artemis,并解压

//目录结构如下
         |___ bin //二进制和脚本
         |
         |___ examples //各自demo
         |      |___ common
         |      |___ features
         |      |___ perf
         |      |___ protocols
         |
         |___ lib //jar和libraries 
         |      |___ client
         |
         |___ schema //用于校验Artemis配置
         |
         |___ web //web上下文
                |___ api //api文档
                |___ hacking-guide
                |___ migration-guide //迁移指南
                |___ user-manual //用户手册

创建Broker Instance

执行  ${ARTEMIS_HOME}/bin/artemis create mybroker
${ARTEMIS_HOME}下载解压的目录
mybroker名字自定义
image.png

可以看到生成了一个目录mybroker,里面有各种文件。所有Broker Instance实际是一个包含所有配置文件和运行时数据的目录。

  • bin: 可执行脚本
  • etc: 配置
  • data: 消息持久化保存
  • log: 日志
  • tmp: 临时文件,可以安全删除
特别注意的是2个文件
//启动配置
etc/bootstrap.xml
//核心配置
etc/broker.xml

bootstrap.xml 内部包含了broker.xml,几乎所有的配置都在broker.xml进行,Artemis提供了大部分默认配置


//启动服务(使用了默认的etc/bootstrap.xml配置)
mybroker/bin/artemis run 

//指定配置启动服务(使用自定义bootstrap.xml配置)
mybroker/bin/artemis run -- xml:path/to/bootstrap.xml


//关闭服务
mybroker/bin/artemis stop
//生产者
public class Producer {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.98:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("TEST.QUEUE");
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i= 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("hello world! " + i);
            producer.send(message);
            System.out.println(message);
        }
        producer.close();
    }
}
//消费者
public class Consumer {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.98:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("TEST.QUEUE");
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println(message);
            }
        });

        connection.start();
    }
}

架构

image.png

Protocal

前面说过activemq支持多协议,(CORE,AMQP,MQTT,JMS...),对应图中Core Protocol,ProtocolA...,ProtocolA,B,C可以是MQTT等对应的协议。比较特殊的JMS Protocol 从图中看是基于Core Protocol,所以Core Protocol,MQTT等协议是直接和protocalManager交互,而JMS protocol是通过Core Protocol和protocalManager交互的。

ProtocalManager

ProtocalManager 负责协议到Artemis Server 核心模型的转换,即不管外部是什么协议,到Artemis Server都统一成一个概念。这样增加协议不会涉及Artemis Server的核心改动。这个统一的概念(核心模型),叫Addressing Model

Addressing Model

Addressing Model是Artemis 特有的,强大,灵活,高性能。它包含三个核心概念

  • Address
  • Queue
  • Routing Types

Address,Queue,Routing Types 三者的概念和联系代表了Artemis 消息流转的核心。

Address

Address 表示一个消息的终端(messaging endpoint),类似其它系统的topic概念,Address可以绑定0个或多个queue 和 一个routing type.

Queue

Queue 是和Address关联的,Address对Queue是一对多的关系,当对应于一个Address的消息到来时,消息讲发送给Address的一个或多个Queue(取决于Routing Types的配置),Queue同时可以配置成自动创建和删除。
比如当Producter给一个Address发送消息时,如果Address不存在,Artemis会自动创建该Address。

Routing Types

Routing Types 决定了消息是如何发送给Address的Queue。
Routing Types 有2种类型。

  • Anycast:发送给单个queue,point-to-point
  • Multicast:发送给每个queue publish-subscribe

注意:可以为一个address指定多个Routing Type,但不建议这么做。

介绍几种常见的Address配置

1:Point-to-Point Messaging
image.png

address.foo(Address)的Routing Types是Anycast
消息{1,2,3,4,5}依次发送到address.foo的Q1,Consumer 1 & Consumer 2 共同消息Q1的一部分消息,Consumer 1 & Consumer 2之间的负载取决于Artemis的负责策略。

配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="address.foo">
      <anycast>
         <queue name="anycast"/>
      </anycast>
   </address>
</addresses>
2:Publish-Subscribe Messaging
image.png

消息会同时发给pubsub.foo的2个Queue,Consumer 1 & Consumer 2消费相同消息

配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="topic.foo">
      <multicast/>
   </address>
</addresses>

下面这种配置没有必要,因为artemis可以自动创建queue

<!--/etc/broker.xml-->
<addresses>
   <address name="topic.foo">
      <multicast>
         <queue name="client123.topic.foo"/>
         <queue name="client456.topic.foo"/>
      </multicast>
   </address>
</addresses>
3:Point-to-Point Address multiple Queues
image.png
配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="address.foo">
      <anycast>
         <queue name="q1"/>
         <queue name="q2"/>
      </anycast>
   </address>
</addresses>
4:Point-to-Point Address multiple Queues
image.png
配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="foo.orders">
      <anycast>
         <queue name="orders"/>
      </anycast>
      <multicast/>
   </address>
</addresses>

Persistence(持久化)

消息系统高可用,要在crash或者restart的情况下消息不丢失。Artemis 提供了2种持久化方式,以便系统恢复后消息重新投递。

  • 文件日志
  • JDBC

文件日志

磁盘写操作包含了移动磁头以定位到要写的位置,散落在磁盘的不同位置的文件意味着频繁的磁头移动,导致性能很差,所以磁盘的顺序写可以最大的利用磁盘的性能。类似于kafka的文件日志也是基于此。

Artemis的文件日志是只追加的,原因如上所述。Artemis的文件日志由多个文件组成,每个文件都是固定大小,预先创建。

add message, update message, delete message都会记录到文件日志,当一个文件满了之后,记录到下一个日志文件。

随着删除记录被添加到日志中 Artemis有一个复杂的文件垃圾收集算法,它可以确定是否还需要某个特定的日志文件——也就是说,它的所有数据已被删除,如果是,则可以回收和重用文件

Artemis同时有个压缩算法用于压缩日志文件(删除无用的数据,并压缩)

文件日志的写入方式

  • Java NIO
  • Linux Asynchronous IO
  • Memory mapped

日志分类

  • bindings journal
  • message journal
  • large message data
  • duplicate id caches
  • paging data

bindings journal

保存了绑定相关数据:包含了queue的信息和属性,id序列计数器,始终是Java NIO方式,因为比起message 日志,bindings日志量级低

message journal

保存了消息相关数据:包含消息本身,重复Id缓存。
默认使用 AIO,如果不支持 AIO 回退到 Java NIO

零持久化(Zero Persistence)

意味着以下数据都没有持久化

  • bindings data
  • message data
  • large message data
  • duplicate id caches
  • paging data
//表示不做持久化,如下设置
persistence-enabled in broker.xml to false.

传输

Artemis 的底层传输涉及2个重要概念

  • Acceptors
  • Connectors

Acceptors

我们从配置看起

<!--broker.xml-->
<acceptors>
    <acceptor name="netty">tcp://localhost:61617</acceptor>
    <!--可以有多个acceptor-->
    <acceptor>...</acceptor>
</acceptors>

acceptor表示Artemis 服务的一个接收器,用于接收客户端请求。
上面的配置定义了一个接收器,底层使用了netty,端口是61617。

可以为acceptor指定一些配置,方法是在url后面设置

<acceptor name="netty">tcp://localhost:61617?sslEnabled=true&keyStorePath=/path</acceptor>

Connectors

如同Acceptors用于服务器如何接收请求,Connectors是用于指定如何请求服务器

<!--broker.xml-->
<connector name="netty">tcp://localhost:61617</connector>

上面的配置主要用于:

  • 当服务bridged另一个服务(涉及bridged概念)
  • 当服务是cluster的一部分(涉及cluster的概念)

客户端直连服务器

服务器端无需配置Connectors

ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61617");

ClientSessionFactory sessionFactory = locator.createClientSessionFactory();

ClientSession session = sessionFactory.createSession(...);

Netty传输配置

<!--指定协议-->
<acceptor name="netty">tcp://localhost:61617?protocols=CORE,AMQP</acceptor>

发送&提交的保证

事务消息

当commit 或 rollback 请求发送到服务器时,客户端阻塞直到服务器执行后返回结果。

服务器会把commit 或 rollback 提交到文件日志中。
journal-sync-transactional配置影响这个过程。
当journal-sync-transactional 为 false时,服务器在响应客户端后某个时间写入。如果设置为true服务器会确保写入文件日志后再返回可客户端。

非事务消息发送

blockOnDurableSend 配置

如果设置为true,当发送持久化的非事务消息时,客户端会阻塞直到消息到达服务,并返回响应。默认是true

blockOnNonDurableSend 配置

如果设置为true,当发送非持久化非事务消息时,客户端会阻塞直到消息到达服务,并返回响应。默认是false

设置block on sends 为true 会降低系统性能,因为发送响应后下个发送才能执行。意味着系统的性能被网络往返时间限制,而不是带宽。
为了更好的性能,建议使用事务消息批量发送 或者 使用
Artemis 推荐的 ‘asynchronous send acknowledgements’ 特性

journal-sync-non-transactional设置

如果设置为true,当系统受到持久化非事务消息时,消息会routed 给 至少一个queue,然后系统持久化该消息,之后才发送响应给客户端。

BlockOnAcknowledge 设置

If this is set to true then all calls to acknowledge on non transacted sessions will block until the acknowledge has reached the server, and a response has been sent back. You might want to set this to true if you want to implement a strict at most once delivery policy. The default value is false
这里文档描述的意思不是特别理解,下面是从源代码注释看该设置表达的意思

//org.apache.activemq.artemis.api.core.client.ClientSession
/**
    * Returns whether the ClientConsumer created by the session will <em>block</em> when they acknowledge a message.
    *
    * @return <code>true</code> if the session's ClientConsumer block when they acknowledge a message, <code>false</code> else
    */
   boolean isBlockOnAcknowledge();

Asynchronous Send Acknowledgements

如果使用非事务session,又要保证消息发送,就如‘非务消息’发送里说的,需要牺牲性能。

作为改进,Artemis 提供了新的特性:'Asynchronous Send Acknowledgements',通过这个特性,Artemis 可以配置为发送消息无需阻塞等待,而是异步接收消息的acknowledgement 响应。确保消息到达的同时提高系统吞吐量。

Session.setSendAcknowledgementHandler(new MySendAcknowledgementsHandler());
class MySendAcknowledgementsHandler implements SendAcknowledgementHandler {

            int count = 0;

            @Override
            public void sendAcknowledged(final Message message) {
               System.out.println("Received send acknowledgement for message " + count++);
            }
}

Artemis Plugin Support

Artemis 被设计成支持插件,所有插件在同一个时间注册,并连接在一起,按照注册的时间顺序依次执行。

实现插件步骤

<broker-plugins>
   <broker-plugin class-name="some.plugin.UserPlugin">
      <property key="property1" value="val_1" />
      <property key="property2" value="val_2" />
   </broker-plugin>
</broker-plugins>
...

Configuration config = new ConfigurationImpl();
...

config.registerBrokerPlugin(new UserPlugin());

Libaio Native Libraries

Thread management

Server-Side Thread Management

Artemis 内部维护一个标准线程用于一般性的任务,另一个scheduled 线程池用于scheduled 任务

code read

send_deliver

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容