标签:
本节研究事件和IO复用的实现,主要是Event和Epoller并给出C++实现;
本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:
(1)每一个IO线程一个EventLoop对象,并拥有它,其主要功能就是loop循环;
(2)Event对象始终负责一个fd的IO事件分发,但是它不拥有该fd,析构时也不会关闭这个fd,Event会根据时间类型的不同分发为不同的回调,如ReadCallback、WriteCallback等;回调对象为C++中的function表示;
(3)Epoller是IO复用(epoll机制)的实现,它是EventLoop对象的成员,只供EventLoop调用;
事件循环时序图如下:
(1)_update主要包括两个操作,添加和更新事件;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_ADD,EPOLL_CTL_MOD;
(2)_remove主要删除事件操作;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_DEL;
事件操作时序图如下:
class EventLoop;
class Event final
{
public:
Event(const Event&) = delete;
Event& operator=(const Event&) = delete;
Event(int fd, EventLoop *loop);
void setReadCallback(const ReadCallback& cb)
{
_readCallback = cb;
}
void setWriteCallback(const WriteCallback& cb)
{
_writeCallback = cb;
}
void setErrorCallback(const ErrorCallback& cb)
{
_errorCallback = cb;
}
void setCloseCallback(const CloseCallback& cb)
{
_closeCallback = cb;
}
bool isWriting() const
{
return _events & _cWriteEvent;
}
bool isReading() const
{
return _events & _cReadEvent;
}
void disableWriting()
{
_events &= ~_cWriteEvent;
_update();
}
void enableWriting()
{
_events |= _cWriteEvent;
_update();
}
void disableAll()
{
_events = 0;
_update();
}
void enableReading()
{
_events |= _cReadEvent;
_update();
}
int fd() const
{
return _fd;
}
uint32_t events() const
{
return _events;
}
void setRevents(uint32_t revents)
{
_revents = revents;
}
void handleEvent();
void remove();
private:
void _update();
uint32_t _events;
uint32_t _revents;
int _fd;
EventLoop* _loop;
ReadCallback _readCallback;
WriteCallback _writeCallback;
ErrorCallback _errorCallback;
CloseCallback _closeCallback;
static uint32_t _cReadEvent;
static uint32_t _cWriteEvent;
};
说明几点
(1)_events为需要关心的事件类型,而_revents为epoll_wait返回后发生的事件类型,需要根据_revents进行相应的分发操作;
(2)具体的分发操作为 ReadCallback _readCallback、WriteCallback _writeCallback、ErrorCallback _errorCallback、CloseCallback _closeCallback;
uint32_t Event::_cReadEvent = EPOLLIN;
uint32_t Event::_cWriteEvent = EPOLLOUT;
Event::Event(int efd, EventLoop *loop):
_events(0), //impotant
_fd(efd),
_loop(loop)
{
}
void Event::remove()
{
_loop->removeEvent(this);
}
void Event::_update()
{
_loop->updateEvent(this);
}
void Event::handleEvent()
{
if ((_revents & EPOLLHUP) && !(_revents & EPOLLIN))
//do not handle HUP about socket, we should handle before EPOLLIN,because if readCallback handclose ,the event will destructor, the _revents will random
{
LOG_TRACE << "_closeCallback, fd: " << fd();
if (_closeCallback)
_closeCallback();
}
if (_revents & (EPOLLIN | EPOLLRDHUP))
{
LOG_TRACE << "_readCallback, fd: " << fd();
if (_readCallback)
_readCallback(TimeStamp::now());
}
if (_revents & EPOLLOUT)
{
LOG_TRACE << "_writeCallback, fd: " << fd();
if (_writeCallback)
_writeCallback();
}
if (_revents & EPOLLERR)
{
LOG_TRACE << "_errorCallback, fd: " << fd();
if (_errorCallback)
_errorCallback();
}
}说明几点:
(1)handleEvent()就是根据_revents不同的事件类型进行相应不同的回调操作;
(2)_update()和remove即为相应的事件操作;
class Event;
class EventLoop;
class Epoller final
{
public:
Epoller(const Epoller&) = delete;
Epoller& operator=(const Epoller&) = delete;
Epoller(EventLoop* loop);
~Epoller();
void removeEvent(Event* event);
void updateEvent(Event* event);
void pollAndHandleEvent(int seconds);
private:
int _epollfd;
EventLoop* _loop;
std::vector<struct epoll_event> _activeEvents;
std::map<int, Event*> _eventMap;
static const int cMaxEventNumber = 1000;
};
}
说明几点:
(1)pollAndHandleEvent主要为调用epoll_wait和事件处理函数;
(2)removeEvent,updateEvent()分别为删除事件和更新事件;
(3)_eventMap为建立fd和Event建立的map,红黑树高效查找;_activeEvents为内核epoll_wait填写的活动事件列表;
Epoller::Epoller(EventLoop* loop) :
_epollfd(epoll_create(100)),
_loop(loop)
{
assert(_epollfd >= 0);
_activeEvents.resize(cMaxEventNumber);
LOG_TRACE << "epollfd fd: " << _epollfd;
}
Epoller::~Epoller()
{
::close(_epollfd);
}
void Epoller::updateEvent(Event* event)
{
struct epoll_event epollEvent;
epollEvent.data.fd = event->fd();
epollEvent.events = event->events();
int err;
if (_eventMap.find(event->fd()) == _eventMap.end())
{
//LOG_TRACE << "fd: [" << event->fd() << "] add to epoll";
err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, event->fd(), &epollEvent);
_eventMap[event->fd()] = event;
}
else
{
//LOG_TRACE << "fd: [" << event->fd() << "] update from epoll";
err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, event->fd(), &epollEvent);
}
if (err != 0)
{
LOG_SYSERR << "epoll_ctl error";
}
}
void Epoller::removeEvent(Event* event)
{
if (epoll_ctl(_epollfd, EPOLL_CTL_DEL, event->fd(), NULL) != 0)
{
LOG_SYSERR << "epoll_ctl error";
}
assert(_eventMap.find(event->fd()) != _eventMap.end());
if (_eventMap.find(event->fd()) != _eventMap.end())
{
_eventMap.erase(event->fd());
}
}
void Epoller::pollAndHandleEvent(int seconds)
{
int num = ::epoll_wait(_epollfd, &(*_activeEvents.begin()), _activeEvents.size(), seconds * 1000);
if (num == static_cast<int>(_activeEvents.size())) {
_activeEvents.resize(2 * num);
}
for (int i = 0; i < num; ++i)
{
int fd = _activeEvents[i].data.fd;
Event* event = _eventMap[fd];
event->setRevents(_activeEvents[i].events);
event->handleEvent();
}
}
EventLoop和本节相关的函数,完整介绍请见后面的博客;
更新和删除事件
void EventLoop::updateEvent(Event* event)
{
assertInThreadLoop();
_epoller->updateEvent(event);
}
void EventLoop::removeEvent(Event* event) //only invoke by _handclose
{
assertInThreadLoop();
_epoller->removeEvent(event);
}
loop中调用_epoller的pollAndHandleEvent
void EventLoop::loop() //only invoke by loop thread
{
assertInThreadLoop();
assert(!_loop);
_loop = true;
while (_loop)
{
int seconds = _timerQueue->minUpdateSeconds();
if (seconds == 0) {
seconds = loopSeconds;
}
_epoller->pollAndHandleEvent(seconds);
_doPendingFunctors();
}
}标签:
原文地址:http://blog.csdn.net/skyuppour/article/details/44727659