打杂的我,又闲下来有时间继续看代码了。
人狠话不多,上代码。我们先看actor。
actor.rs 前面部分引用的东西
use crate::addr::ActorEvent;
use crate::runtime::spawn;
use crate::{Addr, Context};
use futures::channel::mpsc::UnboundedReceiver;
use futures::channel::oneshot;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt};
use std::sync::Arc;
use anyhow::Result;
这个时候看到第一个addr:ActorEvent.
type ExecFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
pub(crate) type ExecFn<A> =
Box<dyn FnOnce(Arc<Mutex<A>>, Arc<Context<A>>) -> ExecFuture + Send + 'static>;
pub(crate) enum ActorEvent<A> {
Exec(ExecFn<A>),
Stop(Option<Error>),
}
我们看到
ExecFuture 是定义了一个Type, 把Future 放到一个Box里面 ,再放到Pin中。
ExecFn 是定义了一个 执行函数的类型
ActorEvent 是枚举 Actor的事件有哪些。
接着还有use crate::{Addr, Context};
Actor模型定义来说,Addr对应的就是mailbox.
我们看下定义
pub struct Addr<A> {
pub(crate) actor_id: u64,
pub(crate) tx: mpsc::UnboundedSender<ActorEvent<A>>,
pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
}
核心的东西来了 ,他来了。
actor_id 是定义了id,是u64
tx 是发送通道, , 用的是std的 mpsc,发送的内容是Actor的事件。
rx_exit 是定义接收通道,用的是oneshot::Receiver.
接着我们就转到先看addr.rs
//这些代码主要是完成Addr的Clone,PartialEq ,Hash
impl<A> Clone for Addr<A> {
fn clone(&self) -> Self {
Self {
actor_id: self.actor_id,
tx: self.tx.clone(),
rx_exit: self.rx_exit.clone(),
}
}
}
//
impl<A> PartialEq for Addr<A> {
fn eq(&self, other: &Self) -> bool {
self.actor_id == other.actor_id
}
}
impl<A> Hash for Addr<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state)
}
}
完成对Addr的主要接口实现
impl<A: Actor> Addr<A> {
/// Returns the id of the actor.
//返回actor的ID
pub fn actor_id(&self) -> u64 {
self.actor_id
}
/// Stop the actor. 停止actor.通过发送事件,进行停止
pub fn stop(&mut self, err: Option<Error>) -> Result<()> {
self.tx.start_send(ActorEvent::Stop(err))?;
Ok(())
}
/// Send a message `msg` to the actor and wait for the return value.
/// 发送一个消息给actor 并且等待返回值
pub async fn call<T: Message>(&mut self, msg: T) -> Result<T::Result>
where
A: Handler<T>,
{
let (tx, rx) = oneshot::channel();
self.tx
.start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
Box::pin(async move {
let mut actor = actor.lock().await;
let res = Handler::handle(&mut *actor, &ctx, msg).await;
let _ = tx.send(res);
})
})))?;
Ok(rx.await?)
}
/// Send a message `msg` to the actor without waiting for the return value.
// 发送一个消息给actor 不等待返回值
pub fn send<T: Message<Result = ()>>(&mut self, msg: T) -> Result<()>
where
A: Handler<T>,
{
self.tx
.start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
Box::pin(async move {
let mut actor = actor.lock().await;
Handler::handle(&mut *actor, &ctx, msg).await;
})
})))?;
Ok(())
}
/// Create a `Caller<T>` for a specific message type
/// 为特定的消息类型创建一个“Caller”
pub fn caller<T: Message>(&self) -> Caller<T>
where
A: Handler<T>,
{
let addr = self.clone();
Caller(Box::new(move |msg| {
let mut addr = addr.clone();
Box::pin(async move { addr.call(msg).await })
}))
}
/// Create a `Sender<T>` for a specific message type
/// 为特定的消息类型创建一个“Sender”
pub fn sender<T: Message<Result = ()>>(&self) -> Sender<T>
where
A: Handler<T>,
{
let addr = self.clone();
Sender(Box::new(move |msg| {
let mut addr = addr.clone();
addr.send(msg)
}))
}
/// Wait for an actor to finish, and if the actor has finished, the function returns immediately.
///等待actor 完成,如果actor 已经完成,函数立即返回。
pub async fn wait_for_stop(self) {
if let Some(rx_exit) = self.rx_exit {
rx_exit.await.ok();
} else {
futures::future::pending::<()>().await;
}
}
}
首先我们先看第一个
pub async fn call<T: Message>(&mut self, msg: T) -> Result<T::Result>
where
A: Handler<T>,
代码中我们又看到使用了Message,Handler的类型。
继续翻找;
/// Represents a message that can be handled by the actor.
///表示可以由 actor 处理的消息。
pub trait Message: 'static + Send {
/// The return value type of the message
/// This type can be set to () if the message does not return a value, or if it is a notification message
type Result: 'static + Send;
}
/// Describes how to handle messages of a specific type.描述如何处理特定类型的消息。
/// Implementing Handler is a general way to handle incoming messages.实现Handler是处理传入消息的一种常用方法。
/// The type T is a message which can be handled by the actor.类型T是可以由actor处理的消息。
#[async_trait::async_trait]
pub trait Handler<T: Message>: Actor {
/// Method is called for every message received by this Actor.
async fn handle(&mut self, ctx: &Context<Self>, msg: T) -> T::Result;
}
然后我们继续看下一个。
pub fn caller<T: Message>(&self) -> Caller<T>
where
A: Handler<T>,
有一个新的类型 Caller
use crate::{Message, Result};
use std::future::Future;
use std::pin::Pin;
pub(crate) type CallerFn<T> = Box<
dyn Fn(T) -> Pin<Box<dyn Future<Output = Result<<T as Message>::Result>> + Send + 'static>>
+ 'static,
>;
pub(crate) type SenderFn<T> = Box<dyn Fn(T) -> Result<()> + 'static + Send>;
/// Caller of a specific message type
pub struct Caller<T: Message>(pub(crate) CallerFn<T>);
impl<T: Message> Caller<T> {
pub async fn call(&mut self, msg: T) -> Result<T::Result> {
self.0(msg).await
}
}
/// Sender of a specific message type
pub struct Sender<T: Message>(pub(crate) SenderFn<T>);
impl<T: Message<Result = ()>> Sender<T> {
pub fn send(&mut self, msg: T) -> Result<()> {
self.0(msg)
}
}
其中我们三个文件都看到了一个引用
use crate:: Result;
我们在lib.rs 中看到定义
/// Alias of anyhow::Result
pub type Result<T> = anyhow::Result<T>;
/// Alias of anyhow::Error
pub type Error = anyhow::Error;
看到actor的Result 和Error都是使用 anyhow。
从代码上我们知道,actor 的 addr 具有的属性和函数。