最近因为项目中有大量的消息分发需求,突然心血来潮决定挑战一下传说中的并发终极武器AKKA。
因为这个项目是spring boot项目,所以加入Scala一点问题没有。
这里去除了项目中的业务代码,只保留最基本的系统结构。
关于如何搭建spring + Scala的混编环境,不论采用Maven或Gradle作为构建工具网上都有大量的文章,也可以参考我的另一篇博客,java与scala混合编程打包(maven构建)。一直没有机会接触纯Scala项目,所以SBT嘛......不会!
AKKA中,Actor对象的管理是由ActorSystem进行管理。可以简单的把它理解为一个容器,提供Actor对象的管理。那么第一步就是在合适的场景中初始化该容器。该容器的作用其实就是创建第一个Actor对象。
import akka.actor.ActorSystem
import akka.actor.Props
import akka.routing.RandomPool
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
@Component
class AkkaConfig {
val system = ActorSystem("ReactiveEnterprise")
val processManagersRef = system.actorOf(Props[ProcessManagers].withDispatcher("my-thread-pool-dispatcher").withRouter(RandomPool(2)),"processManagers")
@Bean
def processManagers = {
processManagersRef
}
}
自定义一个类,将ActorSystem容易作为该类的属性,并对外暴露,同时创建ActorRef对象,并作为一个Spring bean托管于Spring IOC容器。
现在ActorRef对象就是一个Spring IOC容中的对象,注入到依赖方即可。
import org.springframework.stereotype.Service
import org.springframework.beans.factory.annotation.Autowired
import akka.actor.ActorRef
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import org.slf4j.LoggerFactory
import com.sam.demo.akka.scala.Message.TextMessage
@Service
class UserService {
@Autowired var processManagers: ActorRef = _
implicit val timeout = Timeout(60, TimeUnit.SECONDS)
val logger = LoggerFactory.getLogger(getClass)
def send(x: String) = {
logger.info("{}", x);
val result = Await.result(ask(processManagers, x).mapTo[TextMessage], timeout.duration)
val resultMap = scala.collection.immutable.Map("value" -> result.msg)
resultMap
}
}
现在processManagers已经被注入到UserService,注意:UserService并不是ActorSystem容器中的对象。
真实向Actor发消息的地址是val result = Await.result(ask(processManagers, x).mapTo[TextMessage], timeout.duration)。简单的发了一个String类型的参数。
如果不关心Actor对象的返回值,则可以更简单的通过processManagers ! x发消息即可。
在Actor ProcessManagers中,当然是通过receive函数接收消息
import akka.actor.Actor
import akka.actor.Props
import akka.routing.RandomPool
import org.slf4j.LoggerFactory
import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.Future
class ProcessManagers extends Actor {
val logger = LoggerFactory.getLogger(getClass)
val worker = context.actorOf(Props[Worker].withDispatcher("my-thread-pool-dispatcher").withRouter(RandomPool(4)), "worker")
implicit val timeout = Timeout(60, TimeUnit.SECONDS)
def receive = {
case x: String =>
logger.info("{}", x);
sender ! Await.result(ask(worker, x), timeout.duration)
case x: Int =>
logger.info("{}", x)
}
}
接收到消息后,将该消息再发送给另一个Actor对象,worker。Worker由ProcessManagers在初始化时通过context创建。context是从父类Actor继承而来的ActorContext(不同于ActorSystem)。Worker是ProcessManagers的下级Actor,受ProcessManagers的监控。
import akka.actor.Actor
import org.slf4j.LoggerFactory
import com.sam.demo.BeanFactory
import com.sam.demo.web.controller.DemainService
class Worker extends Actor {
val logger = LoggerFactory.getLogger(getClass)
lazy val demainService = BeanFactory.buildFactory().getBean(classOf[DemainService])
def receive = {
case x: String => {
logger.info("{}", x);
sender ! new Message.TextMessage(demainService.handler)
}
}
}
重点来了,在Worker接收到消息后,需要对消息进行业务处理,这时候往往需要调用传统的Spring IOC容器中的服务对象。但Worker并不受Spring IOC容器的管理,所以自然没法通过Spring IOC进行注入。既然不能通过Spring注入,那么换条路,从Spring BeanFactory中getBean即可。所以lazy val demainService = BeanFactory.buildFactory().getBean(classOf[DemainService])就顺理成章了。
我这里的BeanFactory是自定义的一个Java类。
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class BeanFactory implements ApplicationContextAware {
private ApplicationContext applicationContext;
private static BeanFactory factory;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
factory = this;
}
public <T> T getBean(Class<T> clazz) {
return (T) applicationContext.getBean(clazz);
}
public Object getBean(String name) {
return applicationContext.getBean(name);
}
public static BeanFactory buildFactory(){
return factory;
}
}
实现ApplicationContextAware,同时提供通用的getBean方法,这里的getBean方法只是简单的从Spring ApplicationContext中获取对象并返回。
object Message {
case class TextMessage(msg: String)
}
回过头看Worker,通过demainService.handler模拟业务方法的调用,并封装为另一个消息对象Message(简单的一个case class)。并通过sender方法将Message消息传递给发送者ProcessManagers
而ProcessManagers在接收到Message消息后再发送给自己的sender(UserService)。
UserService中通过mapTo[TextMessage]将消息转型并得到Message中的msg值,并指定超时时间(timeout)。 至此完成整个调用过程。
绕这么大一圈,好处是什么呢?异步。
UserService-->ProcessManagers-->Worker,
Worker-->ProcessManagers-->UserService。整个调用链和返回链都是不同的线程执行。
通过输出日志可以看到UserService,ProcessManagers,Worker分别由不同的线程执行。避免了手工管理线程池。简化了线程间调用模型。当然,线程安全的问题同样存在,依然需要小心翼翼。
00:51:57 [http-nio-8080-exec-1] INFO com.sam.demo.akka.scala.UserService.send - hello
00:51:57 [ReactiveEnterprise-my-thread-pool-dispatcher-19] INFO com.sam.demo.akka.scala.ProcessManagers$$anonfun$receive$1.applyOrElse - hello
00:51:57 [ReactiveEnterprise-my-thread-pool-dispatcher-20] INFO com.sam.demo.akka.scala.Worker$$anonfun$receive$1.applyOrElse - hello
最后加上application.conf
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
# 可选
# akka.cluster.ClusterActorRefProvider
# akka.remote.RemoteActorRefProvider
# akka.actor.LocalActorRefProvider
provider = "akka.actor.LocalActorRefProvider"
default-dispatcher {
throughput = 2
}
}
}
my-thread-pool-dispatcher {
# Dispatcher是基于事件的派发器的名称
type = Dispatcher
# 使用何种 ExecutionService
executor = "thread-pool-executor"
# 配置线程池
thread-pool-executor {
# 容纳基于因子的内核数的线程数下限
core-pool-size-min = 1
# 内核线程数 .. ceil(可用CPU数*倍数)
core-pool-size-factor = 2.0
# 容纳基于倍数的并行数量的线程数上限
core-pool-size-max = 200
}
# Throughput 定义了线程切换到下一个actor之前处理的消息数上限
# 设置成1表示尽可能公平.
throughput = 1
}
这里用到的都是LocalActorRefProvider,Akka更强大的地方是RemoteActorRefProvider和ClusterActorRefProvider。后续有机会再补充。
Akka本身是用Scala开发的,也提供Java客户端,不过个人建议还是使用Scala进行Akka开发,至少语法上要简洁很多。
补充:akka的maven依赖
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.17</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>2.4.17</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>2.4.17</version>
</dependency>