003 - event_watcher.h 和 event_watcher.cc (续 PipeEventWatcher)

通过002的解释,可以看到EventWatcher是一个含有虚方法的类, 有三个派生的类继承了EventWatcher, 分别是PipeEventWatcher,TimerEventWatcher,SignalEventWatcher,下面分别介绍这三种事件类型。

  • PipeEventWatcher 管道类型的事件,一端写入,另一端读出。
class EVPP_EXPORT PipeEventWatcher : public EventWatcher {
public:
    PipeEventWatcher(EventLoop* loop, const Handler& handler);
    PipeEventWatcher(EventLoop* loop, Handler&& handler);
    ~PipeEventWatcher();

    bool AsyncWait();
    void Notify();
    evpp_socket_t wfd() const { return pipe_[0]; }
private:
    virtual bool DoInit();
    virtual void DoClose();
    static void HandlerFn(evpp_socket_t fd, short which, void* v);

    evpp_socket_t pipe_[2]; // Write to pipe_[0] , Read from pipe_[1]
};

首先看构造函数的部分, EventLoopHandler 作为构造函数的参数, 来初初始化父类内部的protected变量。

PipeEventWatcher::PipeEventWatcher(EventLoop* loop,
                                   const Handler& handler)
    : EventWatcher(loop->event_base(), handler) {
    memset(pipe_, 0, sizeof(pipe_[0]) * 2);
}

PipeEventWatcher::PipeEventWatcher(EventLoop* loop,
                                   Handler&& h)
    : EventWatcher(loop->event_base(), std::move(h)) {
    memset(pipe_, 0, sizeof(pipe_[0]) * 2);
}

在构造函数中, memsetpipe_成员变量进行内存重置。

下面主要看重写的两个函数 DoInit()DoClose()

bool PipeEventWatcher::DoInit() {
    assert(pipe_[0] == 0);

    if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, pipe_) < 0) {
        int err = errno;
        LOG_ERROR << "create socketpair ERROR errno=" << err << " " << strerror(err);
        goto failed;
    }

    if (evutil_make_socket_nonblocking(pipe_[0]) < 0 ||
        evutil_make_socket_nonblocking(pipe_[1]) < 0) {
        goto failed;
    }

    ::event_set(event_, pipe_[1], EV_READ | EV_PERSIST,
                &PipeEventWatcher::HandlerFn, this);
    return true;
failed:
    Close();
    return false;
}

DoInit() 中调用了libevent内部函数 evutil_socketpair 初始化成员变量pipe_. 让pipe_[0]变成写端, pipe_[1]变成读端。 在调用函数 evutil_make_socket_nonblocking将两个socket设置成非租塞模式。用 event_set 设置事件类型和回调函数, 当 pipe_[1] 有数据可以读取的时候就调用 PipeEventWatcher::HandlerFn去执行,并且EV_PERSIST标志可以保证可以重复执行。


void PipeEventWatcher::DoClose() {
    if (pipe_[0] > 0) {
        EVUTIL_CLOSESOCKET(pipe_[0]);
        EVUTIL_CLOSESOCKET(pipe_[1]);
        memset(pipe_, 0, sizeof(pipe_[0]) * 2);
    }
}

当成员变量pipe_[0] 被初始化的时候,调用EVUTIL_CLOSESOCKET 函数关闭socket。在读端合写端都关闭之后,用menset 将成员变量重置。

最重要的是回调函数


void PipeEventWatcher::HandlerFn(evpp_socket_t fd, short /*which*/, void* v) {
    LOG_INFO << "PipeEventWatcher::HandlerFn fd=" << fd << " v=" << v;
    PipeEventWatcher* e = (PipeEventWatcher*)v;
#ifdef H_BENCHMARK_TESTING
    // Every time we only read 1 byte for testing the IO event performance.
    // We use it in the benchmark test program 
    //  1. evpp/benchmark/ioevent/evpp/
    //  1. evpp/benchmark/ioevent/fd_channel_vs_pipe_event_watcher/
    char buf[1];
#else
    char buf[128];
#endif
    int n = 0;

    if ((n = ::recv(e->pipe_[1], buf, sizeof(buf), 0)) > 0) {
        e->handler_();
    }
}

在回调函数中,首先将参数v进行了指针的强制类型转换成PipeEventWatcher指针e, 然后全局recv函数去读取 pipe_[1]的内容, 当读取到有内容的时候,就去执行e中的handler_ 函数。

bool PipeEventWatcher::AsyncWait() {
    return Watch(Duration());
}

void PipeEventWatcher::Notify() {
    char buf[1] = {};

    if (::send(pipe_[0], buf, sizeof(buf), 0) < 0) {
        return;
    }
}

AsyncWait函数内部调用了父类Watch函数,将此类事件安装到evenet_base上。Notify函数触发此类事件类型。

evpp_socket_t wfd() const { return pipe_[0]; }

客户程序可以调用此方法获取写入端。

总结:
这个类主要是实现管道类型的事件处理, 当一端被写入,另一端会自动读取管道的内容,并且异步调用之前已经预设好的函数e->handler_();

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容