在<a href="http://www.jianshu.com/p/f786b6ff91c7">之前的文章</a>中介绍了spring boot简化集成Akka,最近通过在项目中的实践,又有的新的想法。
首先定义Akka配置类
import akka.actor.ActorSystem
import akka.actor.Props
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import com.sam.demo.akka.scala.facade.ProcessManagers
@Component
class AkkaConfig {
val system = ActorSystem("ReactiveEnterprise")
val processManagersRef = system.actorOf(Props[ProcessManagers].withDispatcher("my-thread-pool-dispatcher"), "processManagers")
@Bean
def processManagers = {
processManagersRef
}
@Bean
def worker = {
system.actorSelection("/user/processManagers/worker")
}
}
在配置类中,首先定义了ActorRef对象processManagersRef,作用是作为其他业务相关Actor的监控者,避免在akka user节点下创建过多的Actor。 注意worker,是通过路径查询获取。worker的创建放在processManagers中。当AkkaConfig实例化时,processManagersRef随之实例化。
ProcessManagers如下:
import org.slf4j.LoggerFactory
import akka.actor.Actor
import akka.actor.Props
import com.sam.demo.akka.scala.Worker
class ProcessManagers extends Actor {
val logger = LoggerFactory.getLogger(getClass)
val worker = context.actorOf(Props[Worker].withDispatcher("my-thread-pool-dispatcher"), "worker")
def receive = {
case x: Any =>
}
}
该类无任务业务方法,其作用仅仅是创建了另一个Actor(worker),将来可扩展ProcessManagers作为其他业务Actor的监控者。
在该类实例化过程中,同时也创建了worker的ActorRef
Worker定义如下:
import akka.actor.Actor
import org.slf4j.LoggerFactory
import com.sam.demo.BeanFactory
import com.sam.demo.service.DomainService
class Worker extends Actor {
val logger = LoggerFactory.getLogger(getClass)
lazy val domainService = BeanFactory.buildFactory().getBean(classOf[DomainService])
def receive = {
case x: String => {
logger.info("x:{}", x)
val textMsg = new TextMessage(domainService.toUpper(x))
sender ! textMsg
}
}
}
通过BeanFactory获得spring IOC容器中的业务Bean,从而实现业务方法的调用。
关键点来了,如何向Actor发消息? 给哪个Actor发消息? 之前的做法是发消息给ProcessManagers,ProcessManagers透传消息给Worker,之所以这样做,是因为无法将Worker的ActorRef引用注入到Spring IOC容器中的Bean中。
现在,在AkkaConfig中已经将Worker的引用(注意: 通过system.actorSelection的返回类型是ActorSelection,不是ActorRef)暴露为一个Spring Bean,是需要将Worker的引用注入给调用方即可。
调用方如下:
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.sam.demo.akka.scala.TextMessage;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
@RestController
@RequestMapping(value={"/users"})
public class UserController {
@Autowired
private ActorSelection worker;
@SuppressWarnings({"rawtypes","unchecked"})
@RequestMapping(value={"/find"})
public String find(String id) throws Exception {
String uuid = UUID.randomUUID().toString();
Future future = Patterns.ask(worker, uuid, Timeout.apply(10L, TimeUnit.SECONDS));
TextMessage o = (TextMessage)Await.result(future, Duration.create(10, TimeUnit.SECONDS));
return o.msg();
}
}
现在实现了调用方直接将消息发给业务Actor,由原来2次消息发送变成了1次。
代码其他部分保持不变,其他细节参考<a href="http://www.jianshu.com/p/f786b6ff91c7">Spring非常规集成Akka</a>