[xactor]学习笔记--2--addr

打杂的我,又闲下来有时间继续看代码了。
人狠话不多,上代码。我们先看actor。
actor.rs 前面部分引用的东西

use crate::addr::ActorEvent;
use crate::runtime::spawn;
use crate::{Addr, Context};
use futures::channel::mpsc::UnboundedReceiver;
use futures::channel::oneshot;
use futures::lock::Mutex;
use futures::{FutureExt, StreamExt};
use std::sync::Arc;
use anyhow::Result;

这个时候看到第一个addr:ActorEvent.

type ExecFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

pub(crate) type ExecFn<A> =
    Box<dyn FnOnce(Arc<Mutex<A>>, Arc<Context<A>>) -> ExecFuture + Send + 'static>;

pub(crate) enum ActorEvent<A> {
    Exec(ExecFn<A>),
    Stop(Option<Error>),
}

我们看到
ExecFuture 是定义了一个Type, 把Future 放到一个Box里面 ,再放到Pin中。
ExecFn 是定义了一个 执行函数的类型
ActorEvent 是枚举 Actor的事件有哪些。

接着还有use crate::{Addr, Context};
Actor模型定义来说,Addr对应的就是mailbox.
我们看下定义

pub struct Addr<A> {
    pub(crate) actor_id: u64,
    pub(crate) tx: mpsc::UnboundedSender<ActorEvent<A>>,
    pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
}

核心的东西来了 ,他来了。
actor_id 是定义了id,是u64
tx 是发送通道, , 用的是std的 mpsc,发送的内容是Actor的事件。
rx_exit 是定义接收通道,用的是oneshot::Receiver.

接着我们就转到先看addr.rs

//这些代码主要是完成Addr的Clone,PartialEq ,Hash 

impl<A> Clone for Addr<A> {
    fn clone(&self) -> Self {
        Self {
            actor_id: self.actor_id,
            tx: self.tx.clone(),
            rx_exit: self.rx_exit.clone(),
        }
    }
}

// 
impl<A> PartialEq for Addr<A> {
    fn eq(&self, other: &Self) -> bool {
        self.actor_id == other.actor_id
    }
}

impl<A> Hash for Addr<A> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.actor_id.hash(state)
    }
}

完成对Addr的主要接口实现
impl<A: Actor> Addr<A> {
    /// Returns the id of the actor.
  //返回actor的ID
    pub fn actor_id(&self) -> u64 {
        self.actor_id
    }

    /// Stop the actor. 停止actor.通过发送事件,进行停止
    pub fn stop(&mut self, err: Option<Error>) -> Result<()> {
        self.tx.start_send(ActorEvent::Stop(err))?;
        Ok(())
    }

    /// Send a message `msg` to the actor and wait for the return value.
  ///  发送一个消息给actor 并且等待返回值
    pub async fn call<T: Message>(&mut self, msg: T) -> Result<T::Result>
    where
        A: Handler<T>,
    {
        let (tx, rx) = oneshot::channel();
        self.tx
            .start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
                Box::pin(async move {
                    let mut actor = actor.lock().await;
                    let res = Handler::handle(&mut *actor, &ctx, msg).await;
                    let _ = tx.send(res);
                })
            })))?;

        Ok(rx.await?)
    }

    /// Send a message `msg` to the actor without waiting for the return value.
    // 发送一个消息给actor 不等待返回值
    pub fn send<T: Message<Result = ()>>(&mut self, msg: T) -> Result<()>
    where
        A: Handler<T>,
    {
        self.tx
            .start_send(ActorEvent::Exec(Box::new(move |actor, ctx| {
                Box::pin(async move {
                    let mut actor = actor.lock().await;
                    Handler::handle(&mut *actor, &ctx, msg).await;
                })
            })))?;
        Ok(())
    }

    /// Create a `Caller<T>` for a specific message type
   /// 为特定的消息类型创建一个“Caller”
    pub fn caller<T: Message>(&self) -> Caller<T>
    where
        A: Handler<T>,
    {
        let addr = self.clone();
        Caller(Box::new(move |msg| {
            let mut addr = addr.clone();
            Box::pin(async move { addr.call(msg).await })
        }))
    }

    /// Create a `Sender<T>` for a specific message type
  /// 为特定的消息类型创建一个“Sender”
    pub fn sender<T: Message<Result = ()>>(&self) -> Sender<T>
    where
        A: Handler<T>,
    {
        let addr = self.clone();
        Sender(Box::new(move |msg| {
            let mut addr = addr.clone();
            addr.send(msg)
        }))
    }

    /// Wait for an actor to finish, and if the actor has finished, the function returns immediately.
  ///等待actor 完成,如果actor 已经完成,函数立即返回。
    pub async fn wait_for_stop(self) {
        if let Some(rx_exit) = self.rx_exit {
            rx_exit.await.ok();
        } else {
            futures::future::pending::<()>().await;
        }
    }
}

首先我们先看第一个

pub async fn call<T: Message>(&mut self, msg: T) -> Result<T::Result>
    where
        A: Handler<T>,

代码中我们又看到使用了Message,Handler的类型。
继续翻找;

/// Represents a message that can be handled by the actor.
///表示可以由 actor 处理的消息。
pub trait Message: 'static + Send {
    /// The return value type of the message
    /// This type can be set to () if the message does not return a value, or if it is a notification message
    type Result: 'static + Send;
}

/// Describes how to handle messages of a specific type.描述如何处理特定类型的消息。
/// Implementing Handler is a general way to handle incoming messages.实现Handler是处理传入消息的一种常用方法。
/// The type T is a message which can be handled by the actor.类型T是可以由actor处理的消息。
#[async_trait::async_trait]
pub trait Handler<T: Message>: Actor {
    /// Method is called for every message received by this Actor.
    async fn handle(&mut self, ctx: &Context<Self>, msg: T) -> T::Result;
}

然后我们继续看下一个。

pub fn caller<T: Message>(&self) -> Caller<T>
    where
        A: Handler<T>,

有一个新的类型 Caller

use crate::{Message, Result};
use std::future::Future;
use std::pin::Pin;

pub(crate) type CallerFn<T> = Box<
    dyn Fn(T) -> Pin<Box<dyn Future<Output = Result<<T as Message>::Result>> + Send + 'static>>
        + 'static,
>;

pub(crate) type SenderFn<T> = Box<dyn Fn(T) -> Result<()> + 'static + Send>;

/// Caller of a specific message type
pub struct Caller<T: Message>(pub(crate) CallerFn<T>);

impl<T: Message> Caller<T> {
    pub async fn call(&mut self, msg: T) -> Result<T::Result> {
        self.0(msg).await
    }
}

/// Sender of a specific message type
pub struct Sender<T: Message>(pub(crate) SenderFn<T>);

impl<T: Message<Result = ()>> Sender<T> {
    pub fn send(&mut self, msg: T) -> Result<()> {
        self.0(msg)
    }
}

其中我们三个文件都看到了一个引用

use crate:: Result;

我们在lib.rs 中看到定义

/// Alias of anyhow::Result
pub type Result<T> = anyhow::Result<T>;

/// Alias of anyhow::Error
pub type Error = anyhow::Error;

看到actor的Result 和Error都是使用 anyhow。

从代码上我们知道,actor 的 addr 具有的属性和函数。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351