使用akka作异步任务处理

同步转异步是一种常见的优化手段,最近一次在做调优时便大量使用了这种方式。通常在一个业务场景中会包含多个操作,有些操作的结果需要让用户立马知道,但有些操作则不需要。这些用户不需要等待结果的操作,我们在编程的时候便可以异步处理。这么做最直接的效果就是缩短接口响应速度,提升用户体验。

我此次优化的是下单场景。创建订单时同步操作有: 查询库存,扣款,刷新库存; 可异步的操作有: 通知风控系统,给买家发送扣款邮件和短信,通知卖家,创建一些定时任务。

最初我用的方案是Spring提供的@Async机制。这是一种很轻量的做法,只需要在可异步调用的方法上加上@Async注解即可。但是这种做法也存在两个问题: 1. 不支持类内部方法之间的调用。使用这种方式,我必须要把一些需要异步调用的方法转移到一个新类里,这点让人不爽。2. 当系统crash的时候,缓存的任务就丢了。因此,这个方案并不特别理想。

两年之前用akka做过一个社交应用的后端服务,而且消息模型天生异步,所以自然想到了用akka。但是用akka的话也有一些地方需要注意。第一,Actor是单线程顺序执行,如果任务比较多最好使用actor router。actor router管理多个actor,可以做到一定限度的并行执行。第二,使用有持久化actor,确保任务不会丢失。我会以发push提醒为例描述一下这个方案的实现细节。多数场景中发push提醒都可进行异步调用。

classes.png

下单逻辑都放在OrderService中,下单成功给卖家发送push提醒时,Orderservice会给NotificationActor发送一个消息。

NotificationActor有两个职责:1. 保存接收到的任务;2. 把消息转发给NotificationWorker,当Worker执行成功之后把消息删除。在最新版本的akka中可以使用At-Least-Once Delivery实现这两个功能。

NotificationWorkerRouter仅仅处理发送逻辑。WorkerActor以Router方式进行部署,以实现并行处理,提高处理效率。

下边看一下具体实现细节:

public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef notificationWorkers = null;
    private final String uniqueId = UUID.randomUUID().toString();

    @Autowired
    public NotificationActor(final ActorSystemManager actorSystemManager) {
        this.notificationWorkers = actorSystemManager.notificationWorkers;
    }

    @Override public String persistenceId() {
        return "journal:notification-actor:" + uniqueId;
    }

    @Override public void onReceiveRecover(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            deliverAckMessage((NotificationMessage) msg);
        }
    }

    @Override public void onReceiveCommand(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            persist(msg, m -> { deliverAckMessage((NotificationMessage) m); });
        } else if (msg instanceof Confirm) {
            Confirm confirm = (Confirm) msg;
            confirmMessage(new MsgConfirmed(confirm.deliveryId));
        } else if (msg instanceof UnconfirmedWarning) {
            UnconfirmedWarning warning = (UnconfirmedWarning) msg;
            warning.getUnconfirmedDeliveries().forEach(d -> {
                log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message());

                confirmMessage(new MsgConfirmed(d.deliveryId()));
            });
        } else {
            unhandled(msg);
        }
    }

    private void deliverAckMessage(NotificationMessage event) {
        deliver(notificationWorkers.path(), (Function<Long, Object>) deliveryId -> new AckMessage(deliveryId, event));
    }

    private void confirmMessage(final MsgConfirmed evt) {
        confirmDelivery(evt.deliveryId);
        deleteMessages(evt.deliveryId);
    }

    public interface NotificationMessage extends Event {}

    public static final @Data class PushMessage implements NotificationMessage {
        private final Long source;
        private final Long target;
        private final String trigger;
        private final ImmutableMap<String, Serializable> payload;
    }
}

public class NotificationWorkerActor extends UntypedActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private final @NonNull NotificationService notificationService;

    @Autowired
    public NotificationWorkerActor(final NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override public void onReceive(final Object event) throws Throwable {
        if (event instanceof AckMessage) {
            final AckMessage ackMessage = (AckMessage) event;
            NotificationMessage msg = (NotificationMessage) ackMessage.msg;
            log.info("[NOTIFICATION] receive message: {}", msg);

            if (msg instanceof PushMessage) {
                final PushMessage m = (PushMessage) msg;
                log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget());
                notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload());
            }
            sender().tell(new Confirm(ackMessage.deliveryId), self());
        } else {
            unhandled(event);
        }
    }
}

public class OrderService {
    public void createOrder() {
        actorSystemManager.notificationActor.tell(
          new PushMessage(), ActorRef.noSender()
        );
    }
}

最早实施这个方案的时候遇到一个问题,说一下这个问题如何产生的。我们一共有三台服务器,三台服务器都会部署同样的代码,以NotificationActor为例,它会分别部署在三个机器上。actor journal我们使用mysql存储。akka persistent actor内部有一个sequence number用来对接收到的消息进行计数,这个数字是递增的。同时这个数字也会在journal中记录。最初我的persistenceId方法是这样实现的:

@Override public String persistenceId() {
    return "journal:notification-actor";
}

那么,假如server1上的NotificationActor接收了一个消息,那么它的sequence number会变成1,mysql中将会存储的sequence number为1的消息。这时server2上也接收到了一个消息,因为它的最初sequence number是0,所以它也会把现在接收到的消息的sequence number设置为1。但是显然这条消息是不能持久化的,因为它和数据库记录的sequence number冲突了。根本原因是三台服务器上的NotificationActor的persistenceId是一样的。

上边代码中给出了一种方案,把persistenceId变成random的,每次actor启动的时候都会得到不同的persistenceId,这样就解决了上述问题。还有一种方案是引入akka cluster,使用akka singleton。这种方案会在下一篇文章中详细说明。


write on 2017-1-7

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • Actor系统的实体 在Actor系统中,actor之间具有树形的监管结构,并且actor可以跨多个网络节点进行透...
    JasonDing阅读 3,338评论 2 6
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,940评论 6 13
  • 首先说一下Actor Model,作为一种进程或者线程间的通信模型,一般来说有两种选择,一种是CSP,比如Go语言...
    墨弈阅读 3,370评论 0 51
  • 1 基本流处理 让我们首先看看使用akka-stream处理流的真正含义。图1展示了在某个处理节点上,元素是一个个...
    乐言笔记阅读 2,649评论 1 1