AxonFramework命令总线

命令总线是将命令分发给各自的命令Handler的机制。每个命令总是被精确发送到一个命令Handler。如果没有可用的命令Handler为已分发的命令,将会抛出NoHandlerForCommandException异常。对同一命令类型订阅多个命令Handler将导致订阅互相取代。在这种情况下,最后一个订阅获胜。

分发命令

CommandBus提供了两个方法去分发命令到它们各自的Handler:dispatch(commandMessage,callback)和dispatch(commandMessage)。第一个参数是一个包含要发送的实际命令的消息。第二个可选的参数接收一个回调,允许在命令处理完成时通知分发组件。这个回调有两个方法:onSuccess()和onFailure(),分别会在命令处理返回后被调用,或者当它抛出一个异常时调用。

调用组件可能不采取在分发命令的同一线程中调用回调。如果调用线程在继续之前依赖于结果,你可以使用FutureCallback。这是一个Future(在java.concurrent包中定义)和Axon的CommandCallback的组合。或者,考虑使用命令网关。

如果一个应用程序不直接对命令的结果感兴趣,可以使用dispatch(commandMessage) 方法。

SimpleCommandBus

SimpleCommandBus,顾名思义,最简单的实现。它在分发它们的线程中简单的处理命令。命令处理后,修改后的聚合在同一线程被保存和发布生成的事件。在大多数情况下,如web应用程序,该实现将符合你的需求。配置API中SimpleCommandBus是默认使用的实现。

像大多数CommandBus实现一样,SimpleCommandBus允许拦截器进行配置。在命令总线上分发一个命令后调用CommandDispatchInterceptors。在实际的命令handler 方法之前调用CommandHandlerInterceptors,允许你修改或阻塞命令。有关更多信息,请参考命令处理器拦截器。

因为所有命令处理都在同一线程中完成,这个实现仅限于JVM的边界。这个实现的性能是很好的,但不超凡。跨JVM边界,或使你的CPU cycles发挥最大的功效,看看其他CommandBus实现。

AsynchronousCommandBus

顾名思义,AsynchronousCommandBus实现从分发它们的线程异步执行命令。它使用一个Executor在不同的线程来执行实际的处理逻辑。

默认情况下,AsynchronousCommandBus使用一个unbounded缓存的线程池。这意味着分发一个命令时会创建线程。完成处理命令的线程将被重新用于新命令。如果60秒线程没有处理命令,则会停止线程。

或者,Executor实例可以提供不同的线程策略配置。

注意,应用程序停止时应该关闭AsynchronousCommandBus,以确保任何等待线程正确关闭。关闭,调用shutdown()方法。这也将关闭任何Executor实例,如果它实现ExecutorService接口的话。

DisruptorCommandBus

SimpleCommandBus具有合理的性能特性,特别是当你经历了性能调优技巧。事实上,SimpleCommandBus需要锁来防止多个线程并发访问同一聚合,导致处理开销和锁争用。

DisruptorCommandBus采用不同的方法进行多线程处理。不是多个线程每个都执行同样的处理,而是有多个线程,每个负责一件处理。DisruptorCommandBus使用Disruptor,一个小的并发编程框架,通过不同的方法对多线程进行处理来实现更好的性能。任务不是在调用线程中进行处理,而是将任务移交给两组线程进行处理,每组线程负责一部分处理。第一组的线程将执行命令handler,更改一个聚合的状态。第二组将存储并将事件发布到事件存储。

虽然DisruptorCommandBus轻易优于SimpleCommandBus 4倍(!),但有一些限制:

  • DisruptorCommandBus仅支持事件溯源聚合。这个命令总线充当由Disruptor处理聚合的存储库。获取一个存储库的引用,使用createRepository(AggregateFactory)。
  • 一个命令只能导致一个聚合实例状态变化。
  • 当使用缓存时,它只允许给定的标识符为单个聚合。这意味着它是不可能有两个具有相同的标识符的不同类型的聚合。
    命令一般不会引发需要回滚工作单元的故障。当发生回滚时,DisruptorCommandBus不能保证命令按照它们被分发的顺序进行处理。此外,它需要重试其他命令,从而造成不必要的计算。
  • 在创建一个新的聚合实例时,命令更新所创建实例可能并不完全按照所提供的顺序进行。一旦创建了聚合,所有命令将按照它们被分发顺序执行。为了确保顺序,在创建命令上使用回调去等待正在创建的聚合。它不应该耗时超过几毫秒。

构建一个DisruptorCommandBus实例,你需要一个EventStore。该组件在Repositories and Event Stores中有解释。
或者,你可以提供一个DisruptorConfiguration实例,它允许你调整配置优化你的特定环境下的性能:

  • Buffer size:在ringBuffer上注册传入命令的槽数。更高的值可能会增加吞吐量,但也导致更高的延迟。必须是2的次方数,默认为4096。
  • ProducerType: 表示条目是由单线程或多线程生成的。默认为多线程。
  • WaitStrategy:当处理器线程(三个线程负责的实际处理)需要等待对方时使用的策略。最好的WaitStrategy取决于机器上可用的处理器数量,和正在运行的其他进程的数量。如果低延迟是至关重要的,DisruptorCommandBus可以自己认领内核,你可以使用BusySpinWaitStrategy。为了使命令总线索取更少的CPU并且允许其他线程处理,使用YieldingWaitStrategy。最后,你可以使用SleepingWaitStrategy和BlockingWaitStrategy允许其他进程共享CPU。如果命令总线不需要进行专职处理,则后者是合适的。默认为BlockingWaitStrategy。
  • Executor:设置Executor为DisruptorCommandBus提供线程。这个Executor必须能够提供至少4个线程。其中的3个线程,由DisruptorCommandBus的处理组件认领。额外的线程用于调用回调函数,并计划重试以防检测到错误的聚合状态。默认是CachedThreadPool提供线程从一个称为“DisruptorCommandBus”的线程组中。
  • TransactionManager:定义了事务管理器,应该确保存储和事件发布以事务的方式执行。
  • InvokerInterceptors:定义了在调用处理中使用的CommandHandlerInterceptors。这个处理调用实际的命令处理器方法。
  • PublisherInterceptors:定义了在发布处理中使用的CommandHandlerInterceptors。这个发布处理存储和发布生成的事件。
  • RollbackConfiguration:定义工作单元应该回滚的异常。默认配置为回滚未经检查的异常。
  • RescheduleCommandsOnCorruptState:指示已经执行过命令但损坏的聚合(如:因为一个工作单元是回滚)是否应该重新计划。如果为假,回调的onFailure()方法将被调用。如果为的(默认),命令将被重新计划。
  • CoolingDownPeriod:设置等待的秒数,以确保所有命令被处理。在冷却期间,不接受新命令,但是现有的命令仍然处理,并在必要时重新计划。冷却期间确保线程可供重新安排命令和调用回调之用。默认为1000(1秒)。
    缓存:设置缓存存储从Event Store中恢复的聚合实例。缓存用disruptor存储不活跃的聚合实例。
  • InvokerThreadCount:给命令处理器的调用分配线程的数量。一个好的起始点是机器内核数量的一半。
  • PublisherThreadCount:用于发布事件的线程数量。一个好的起始点是一半的内核数量,如果IO上花费大量的时间,可以增加。
  • SerializerThreadCount:使用pre-serialize事件的线程数量。默认为1,但如果没有配置序列化器将被忽略。
  • Serializer:用于执行pre-serialization的序列化器。当配置序列化器时,DisruptorCommandBus将包装所有生成的事件在一个SerializationAware消息上。附加有效负载和元数据的序列化形式,在发布到事件存储之前。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,917评论 25 707
  • 我生产的经历不是太顺利,到了预产期一直没动静,医生说超过一周就得去住院催产了。 盼啊盼啊,好不容易见了红,凌晨打车...
    IceLi87阅读 821评论 0 1
  • 白浅太过思念夜华,白真便带着她到凡间去游历,这青丘狐狸洞里又只剩下凤九与迷谷二人。所以也只有迷谷知道凤九这几年是怎...
    一叶明月阅读 2,303评论 1 32
  • 前言综述 项目中人员或组织的组织结构对日后的项目影响深远,简而言之有三种类型:项目型、职能型、矩阵型权利: 项目型...
    jeffleefree阅读 764评论 0 1