Reactor事件处理框架

Reactor模式是事件驱动模型的一个实现。它是一个同步的按照事件到达顺序处理事件的分发器。简单的说就是有一个线程不断轮询事件源,然后将其分发给对应的处理器

餐厅点菜的Reactor模式图
Reactor

EvenLoop循环机制

one loop per thread每个线程只有一个EvenLoop对象,EvenLoop的构造函数会检查当前线程是否已经创建了其他EvenLoop对象。把TimerQueue的成员函数调用移到其IO线程,这样可以不用锁的情况保证线程安全性。

创建了EvenLoop对象的线程是IO线程,其主要功能是运行事件循环EvenLoop::loop()。EvenLoop::loop函数的真正工作内容是,调用Poller::poll获得当前活动事件的Channel列表,然后依次调用每个Channel的handleEvent函数

EvenLoop::loop可以终止事件循环,只要将quit设为true即可。`但quit不是立刻发生的,它会在下一次检查while(!quit)的时候起效。`如果在非当前IO线程调用quit,延迟可以长达数秒,wakeup函数唤醒当前IO线程。如果EvenLoop::loop正阻塞在某个调用中,quit不会立刻生效。
Reactor

EvenLoop可以在线程间调配任务,在它的IO线程内执行某个用户任务回调,即EventLoop::runInLoop(const Functor& cb),如果用户在当前IO线程调用这个函数,回调会同步进行;如果用户在其他IO线程调用,cb会被加入队列,IO线程会被唤醒来调用这个Functor。

由于IO线程平时阻塞在事件循环loop的poll调用中,为了让IO线程能立刻执行用户回调,需要设法唤醒。传统的做法,使用pipe,IO线程始终监视此管道的readable事件。eventfd是一个比pipe更高效的线程间事件通知机制,一方面它比pipe少用一个file descripor,节省了资源;另一方面,eventfd的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像pipe那样可能有不定长的真正buffer。

事件分发机制

Reactor最核心的是事件分发机制,将IO multiplexing拿到的IO事件分给各个文件描述符的处理函数。

Channel class

每个Channel对象只属于某一个IO线程,只负责一个文件描述符的IO事件分发,但并不拥有这个fd。Channel把不同的IO事件分发为不同的回调,例如ReadCallback,WriteCallback等。

muduo用户一般不直接使用Channel,而会使用更上层的封装,Channel的生命期由其owner class负责管理。用户一般只要setCallback和enable函数。

Channel::handleEvent()是Channel的核心,它由EventLoop::loop()调用,它的功能是根据revents_的值分别调用不同的用户回调

enable*函数中调用update()。Channel::update()会调用EventLoop::updateChannel(),转而调用Poller::updateChannel()。
Reactor

Poller class

poll函数

#include <sys/poll.h> 
int poll(struct pollfd *ufds, unsigned int nfds, int timeout); 

ufds 指向struct pollfd数组 
nfds 指定pollfd 数组元素的个数,也就是要监测几个pollfd

struct pollfd 
{ 
    int fd; 
    short int events; /* fd上,我们感兴趣的事件*/ 
    short int revents; /* Types of events that actually occurred. */ 
};

一个EventLoop对象对应一个Poller成员对象,Poller在muduo中是一个抽象基类,因为muduo同时要支持poll和epoll两种IO multiplexing机制。Poller是EvenLoop的间接成员,只供其owner EventLoop在IO线程调用,无须加锁。其生命期与EventLoop相等。

Poller供EventLoop调用的函数就只有两个,poll和updateChannel。

poll通过fillActiveChannels遍历pollfds_,找出有活动事件的fd,把对应的Channel填入activeChannel。然后在逐一处理handleEvent。值得注意的是,我们不能一边遍历pollfds,一边调用Channel::handleEvent,因为后者会添加或者删除Channel,从而造成pollfds在遍历期间改变大小,这非常危险。

Poller::updateChannel的主要功能是维护和更新pollfds_数组

typedef std::map<int, Channel*> ChannelMap; //fd到Channel*的映射
PollFdList pollfds_; //不会在每次调用poll之前临时构造pollfd数组