码迷,mamicode.com
首页 > 其他好文 > 详细

【网络组件】事件和IO复用

时间:2015-03-30 09:38:53      阅读:193      评论:0      收藏:0      [点我收藏+]

标签:

   本节研究事件和IO复用的实现,主要是Event和Epoller并给出C++实现;


线程模型

本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:

技术分享



事件循环

(1)每一个IO线程一个EventLoop对象,并拥有它,其主要功能就是loop循环;

(2)Event对象始终负责一个fd的IO事件分发,但是它不拥有该fd,析构时也不会关闭这个fd,Event会根据时间类型的不同分发为不同的回调,如ReadCallback、WriteCallback等;回调对象为C++中的function表示;

(3)Epoller是IO复用(epoll机制)的实现,它是EventLoop对象的成员,只供EventLoop调用;

事件循环时序图如下:

技术分享


事件操作

(1)_update主要包括两个操作,添加和更新事件;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_ADD,EPOLL_CTL_MOD;

(2)_remove主要删除事件操作;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_DEL;

事件操作时序图如下:

技术分享


Event

Event声明

class EventLoop;

class Event final
{
public:
  Event(const Event&) = delete;
  Event& operator=(const Event&) = delete;

  Event(int fd, EventLoop *loop);

  void setReadCallback(const ReadCallback& cb)
  {
    _readCallback = cb;
  }

  void setWriteCallback(const WriteCallback& cb)
  {
    _writeCallback = cb;
  }

  void setErrorCallback(const ErrorCallback& cb)
  {
    _errorCallback = cb;
  }

  void setCloseCallback(const CloseCallback& cb)
  {
    _closeCallback = cb;
  }

  bool isWriting() const
  {
    return _events & _cWriteEvent;
  }

  bool isReading() const
  {
    return _events & _cReadEvent;
  }

  void disableWriting()
  {
    _events &= ~_cWriteEvent;
    _update();
  }

  void enableWriting()
  {
    _events |= _cWriteEvent;
    _update();
  }

  void disableAll()
  {
    _events = 0;
    _update();
  }

  void enableReading()
  {
    _events |= _cReadEvent;
    _update();
  }

  int fd() const
  {
    return _fd;
  }

  uint32_t events() const
  {
    return _events;
  }

  void setRevents(uint32_t revents)
  {
    _revents = revents;
  }

  void handleEvent();

  void remove();
  
private:
  void _update();

  uint32_t _events;
  uint32_t _revents;

  int _fd;
  EventLoop* _loop;

  ReadCallback _readCallback;
  WriteCallback _writeCallback;
  ErrorCallback _errorCallback;
  CloseCallback _closeCallback;

  static uint32_t _cReadEvent;
  static uint32_t _cWriteEvent;
};

说明几点

(1)_events为需要关心的事件类型,而_revents为epoll_wait返回后发生的事件类型,需要根据_revents进行相应的分发操作;

(2)具体的分发操作为  ReadCallback _readCallback、WriteCallback _writeCallback、ErrorCallback _errorCallback、CloseCallback _closeCallback;

Event实现

uint32_t Event::_cReadEvent = EPOLLIN;
uint32_t Event::_cWriteEvent = EPOLLOUT;

Event::Event(int efd, EventLoop *loop):
    _events(0),       //impotant
    _fd(efd),
    _loop(loop)
{

}

void Event::remove()
{
  _loop->removeEvent(this);
}

void Event::_update()
{
  _loop->updateEvent(this);
}

void Event::handleEvent()
{    
  if ((_revents & EPOLLHUP) && !(_revents & EPOLLIN)) 
  //do not handle HUP about socket, we should handle before EPOLLIN,because if readCallback handclose ,the event will destructor, the _revents will random
      {
       LOG_TRACE << "_closeCallback, fd: " << fd();
        if (_closeCallback)
          _closeCallback();
      }
      
  if (_revents & (EPOLLIN | EPOLLRDHUP))
    {
      LOG_TRACE << "_readCallback, fd: " << fd();
      
      if (_readCallback)
        _readCallback(TimeStamp::now());
    }

  if (_revents & EPOLLOUT)
    {
     LOG_TRACE << "_writeCallback, fd: " << fd();
      if (_writeCallback)
        _writeCallback();
    }

  if (_revents & EPOLLERR)
    {
     LOG_TRACE << "_errorCallback, fd: " << fd();
      if (_errorCallback)
        _errorCallback();
    }
}
说明几点:

(1)handleEvent()就是根据_revents不同的事件类型进行相应不同的回调操作;

(2)_update()和remove即为相应的事件操作;


Epoller

Epoller声明

class Event;
class EventLoop;

class Epoller final
{
public:
  Epoller(const Epoller&) = delete;
  Epoller& operator=(const Epoller&) = delete;

  Epoller(EventLoop* loop);
  
  ~Epoller();

  void removeEvent(Event* event);

  void updateEvent(Event* event);

  void pollAndHandleEvent(int seconds);

private:
  int _epollfd;
  EventLoop* _loop;

  std::vector<struct epoll_event>  _activeEvents;
  std::map<int, Event*> _eventMap;

  static const int cMaxEventNumber = 1000;
};

}

说明几点:

(1)pollAndHandleEvent主要为调用epoll_wait和事件处理函数;

(2)removeEvent,updateEvent()分别为删除事件和更新事件;

(3)_eventMap为建立fd和Event建立的map,红黑树高效查找;_activeEvents为内核epoll_wait填写的活动事件列表;


Epoller实现

Epoller::Epoller(EventLoop* loop) :
    _epollfd(epoll_create(100)),
    _loop(loop)
{
  assert(_epollfd >= 0);
  _activeEvents.resize(cMaxEventNumber);
  
  LOG_TRACE << "epollfd fd: " << _epollfd;
}

Epoller::~Epoller()
{
  ::close(_epollfd);
}

void Epoller::updateEvent(Event* event)
{
  struct epoll_event epollEvent;
  epollEvent.data.fd = event->fd();
  epollEvent.events = event->events();

  int err;
  if (_eventMap.find(event->fd()) == _eventMap.end())
    {
      //LOG_TRACE << "fd: [" <<  event->fd() << "] add to epoll";
      err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, event->fd(), &epollEvent);
      _eventMap[event->fd()] = event;
    }
  else
    {
      //LOG_TRACE << "fd: [" <<  event->fd() << "] update from epoll";
      err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, event->fd(), &epollEvent);
    }

  if (err != 0)
    {
      LOG_SYSERR << "epoll_ctl error";
    }
}

void Epoller::removeEvent(Event* event)
{
  if (epoll_ctl(_epollfd, EPOLL_CTL_DEL, event->fd(), NULL) != 0)
    {
      LOG_SYSERR << "epoll_ctl error";
    }

  assert(_eventMap.find(event->fd()) != _eventMap.end());
  if (_eventMap.find(event->fd()) != _eventMap.end())
    {
      _eventMap.erase(event->fd());
    }
}

void Epoller::pollAndHandleEvent(int seconds)
{
  int num = ::epoll_wait(_epollfd, &(*_activeEvents.begin()), _activeEvents.size(), seconds * 1000);

  if (num == static_cast<int>(_activeEvents.size())) {
    _activeEvents.resize(2 * num);
  }
  
  for (int i = 0; i < num; ++i)
    {
      int fd = _activeEvents[i].data.fd;
      Event* event = _eventMap[fd];
      event->setRevents(_activeEvents[i].events);
      event->handleEvent();
    }
}


EventLoop相关

EventLoop和本节相关的函数,完整介绍请见后面的博客;

更新和删除事件

void EventLoop::updateEvent(Event* event)
{
  assertInThreadLoop();
  _epoller->updateEvent(event);
}

void EventLoop::removeEvent(Event* event)  //only invoke by _handclose
{
  assertInThreadLoop();
  _epoller->removeEvent(event);
}


loop中调用_epoller的pollAndHandleEvent

void EventLoop::loop()    //only invoke by loop thread 
{
  assertInThreadLoop();
  
  assert(!_loop);
  _loop = true;
  while (_loop)
    {
      int seconds = _timerQueue->minUpdateSeconds();
      if (seconds == 0) {
        seconds = loopSeconds;
      }
      
      _epoller->pollAndHandleEvent(seconds);

      _doPendingFunctors();
    }
}

【网络组件】事件和IO复用

标签:

原文地址:http://blog.csdn.net/skyuppour/article/details/44727659

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!