标签:
本节研究事件循环EventLoop以及EventLoopPool,并给出C++实现;
本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:
(1)每一个IO线程一个EventLoop对象,并拥有它;
(2)在loop循环中,将会执行_epoller->pollAndHandleEvent(seconds)来处理到期的事件,以及会执行_doPendingFunctors()来处理提交给本IO线程的回调函数Functor,这些回调函数不应该阻塞当前IO线程,否则会造成当前IO线程无法及时处理到来的IO事件;
事件循环时序图如下:
class Epoller;
class Event;
class TimerQueue;
class EventLoop final
{
public:
EventLoop(const EventLoop&) = delete;
EventLoop& operator=(const EventLoop&) = delete;
EventLoop();
~EventLoop();
bool isInThreadLoop();
void assertInThreadLoop();
void loop();
void stop();
void updateEvent(Event* event);
void removeEvent(Event* event);
typedef std::function<void()> Functor;
void runInLoop(const Functor& cb);
void queueInLoop(const Functor& cb);
typedef std::function<void()> TimerCallback;
void addSingleTimer(const TimerCallback& cb, uint32_t interval);
private:
void _handleRead();
void _wakeup();
void _doPendingFunctors();
int _wakeupFd;
std::unique_ptr<Event> _wakeupEvent;
bool _loop;
std::unique_ptr<Epoller> _epoller;
std::unique_ptr<TimerQueue> _timerQueue;
std::vector<Functor> _functorLists;
pid_t _tid;
Base::Mutex _mutex;
};说明几点:
(1)updateEvent,removeEvent来使用epoller来更新或删除相关事件;
(2)_doPendingFunctors为用户提交的需要处理的回调函数Functor;用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,来唤醒当前的IO线程来及时处理Functor;
(3)_loop变量控制loop循环的退出,通过stop()函数,可以让loop循环退出;stop()主要由其他线程调用;
(4)_epoller,_timerQueue,_wakeupEvent都是EventLoop的成员,生命周期由其控制;
namespace
{
const int loopSeconds = 10;
int createEventFd()
{
int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (fd < 0)
{
LOG_SYSERR << "eventfd system error";
}
return fd;
}
}
EventLoop::EventLoop():
_wakeupFd(createEventFd()),
_wakeupEvent(new Event(_wakeupFd, this)),
_loop(false),
_epoller(new Epoller(this)),
_timerQueue(new TimerQueue(this)),
_tid(CurrentThread::tid()),
_mutex()
{
assert(_wakeupFd >= 0);
assert(!_loop);
assertInThreadLoop();
LOG_TRACE << "eventfd fd: " << _wakeupFd;
LOG_INFO << "Thread tid: " << _tid << " run this EventLoop: " << this;
_wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));
_wakeupEvent->enableReading();
}
说明几点:
(1)_wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));设置读事件的处理回调函数,仅仅会读取_wakeup()写入的数值;
(2)_wakeupEvent->enableReading()会将_wakeupFd加入到本epoll的监听事件中去;
(3)assertInThreadLoop();保证执行的线程为拥有EventLoop的IO线程;
EventLoop::~EventLoop()
{
if (_wakeupFd >= 0)
::close(_wakeupFd);
}void EventLoop::updateEvent(Event* event)
{
assertInThreadLoop();
_epoller->updateEvent(event);
}
void EventLoop::removeEvent(Event* event) //only invoke by _handclose
{
assertInThreadLoop();
_epoller->removeEvent(event);
}
void EventLoop::runInLoop(const Functor& cb) //another thread can invoke
{
if (isInThreadLoop())
{
if (cb)
cb();
}
else
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(const Functor& cb) //another thread can invoke
{
{
MutexLockGuard lock(_mutex);
_functorLists.push_back(cb);
}
if (!isInThreadLoop())
{
_wakeup();
}
}
void EventLoop::_wakeup() //other thread wake up loop thread
{
uint64_t value = 1;
auto s = ::write(_wakeupFd, &value, sizeof(uint64_t));
if (s != sizeof(uint64_t))
LOG_SYSERR << "write system error";
}
void EventLoop::_handleRead()
{
assertInThreadLoop();
uint64_t value;
auto s = ::read(_wakeupFd, &value, sizeof(uint64_t));
//when value is non-zero, when write more times, it only read one time by add the read value
if (s != sizeof(uint64_t))
LOG_SYSERR << "read system error";
LOG_TRACE << "eventfd read times: " << value;
}
说明几点:
(1)用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,若不是当前IO线程的提交,将使用 _wakeup()来唤醒当前的IO线程来,及时处理Functor;
(2)_handleRead()为_wakeupEvent读事件的处理回调函数;
(3)queueInLoop要保证线程安全,因此加入到_functorLists,使用互斥量来保护临界区;
void EventLoop::_doPendingFunctors()
{
assertInThreadLoop();
std::vector<Functor> _functors;
{
MutexLockGuard lock(_mutex);
_functors.swap( _functorLists);
}
for (auto& cb : _functors)
{
if (cb)
cb();
}
}
说明几点:
(1)使用_functors栈对象来和当前的_functorLists交换,减小操作_functorLists的时间,这样不会过多的阻塞其他线程向_functorLists提交Functor回调;
bool EventLoop::isInThreadLoop()
{
return _tid == CurrentThread::tid();
}
void EventLoop::assertInThreadLoop()
{
//LOG_TRACE << "Current tid: " << CurrentThread::tid() << " , loop tid: " << _tid;
assert(isInThreadLoop());
}
void EventLoop::loop() //only invoke by loop thread
{
assertInThreadLoop();
assert(!_loop);
_loop = true;
while (_loop)
{
int seconds = _timerQueue->minUpdateSeconds() - TimeStamp::now().secondsSinceEpoch();
if (seconds <= 0) {
seconds = loopSeconds;
}
_epoller->pollAndHandleEvent(seconds);
_doPendingFunctors();
}
}
void EventLoop::stop()
{
_loop = false;
}说明几点:
(1)loop函数中,使用 _epoller->pollAndHandleEvent(seconds)来处理epoll监听的事件,使用 _epoller->pollAndHandleEvent(seconds),其中seconds为需要阻塞的时间,一般是_timerQueue到期的第一个Timer的时间(绝对时间)减去当前的时间,但是如果_timerQueue没有Timer,那么seconds为周期性的时间loopSeconds
(2)_doPendingFunctors()为执行用户提交的Functor;
void EventLoop::addSingleTimer(const TimerCallback& cb, uint32_t interval)
{
_timerQueue->addTimer(cb, interval, false);
}
每一个IO线程都拥有一个EventLoop对象,当主线程派发socket连接时,将使用轮转法从EventLoopPool获得某IO线程的EventLoop对象,该socket连接后面事件的监听将会由该IO线程负责;
class EventLoop;
class EventLoopPool final
{
public:
EventLoopPool(const EventLoopPool&) = delete;
EventLoopPool& operator=(const EventLoopPool&) = delete;
explicit EventLoopPool(size_t loopNums);
~EventLoopPool();
void start();
void stop();
EventLoop* getNextLoop();
private:
void _run();
size_t _loopNums;
bool _running;
size_t _curIndex;
std::vector<EventLoop*> _loops;
std::vector<std::shared_ptr<Base::Thread>> _threads;
Base::Mutex _mutex;
Base::CountDownLatch _countDownLatch;
};说明几点:
(1)master线程调用getNextLoop()可按照轮转法来获得EventLoop对象;
(2)CountDownLatch是倒计时,当所有的IO线程初始化好EventLoop对象后,master线程才会继续执行,防止master线程已经需要派发连接socket,而getNextLoop()时,对应的IO线程尚初始化好EventLoop对象(尚未放入_loops)而造成错误;
EventLoopPool::EventLoopPool(size_t loopNums) :
_loopNums(loopNums),
_running(false),
_curIndex(0),
_mutex(),
_countDownLatch(loopNums)
{
assert(_loopNums > 0);
_loops.reserve(loopNums);
_threads.reserve(loopNums);
}
EventLoopPool::~EventLoopPool()
{
if (_running)
stop();
}void EventLoopPool::start()
{
assert(!_running);
_running = true;
for (size_t i = 0; i < _loopNums; ++i)
{
std::shared_ptr<Thread> thread(new Thread(std::bind(&EventLoopPool::_run, this))); //impotant
_threads.push_back(thread);
thread->start();
}
LOG_TRACE << "wait , Current tid: " << CurrentThread::tid();
_countDownLatch.wait();
}
void EventLoopPool::stop()
{
assert(_running);
_running = false;
for (auto& loop : _loops)
{
loop->stop();
}
for (auto& thread : _threads)
{
thread->join();
}
_loops.clear();
_threads.clear();
}
说明几点:
(1)在master线程start后,产生多个IO线程后,将会执行_countDownLatch.wait()来等待所有的IO线程初始化好EventLoop对象,各个IO线程将会调用_countDownLatch.countDown();
(2)在stop函数中,首先会停止各个EventLoop继续loop,此后这些线程将会退出,此时使用thread->join()来依次等待各个IO线程结束;
EventLoop* EventLoopPool::getNextLoop()
{
_curIndex %= _loopNums;
EventLoop* loop = _loops[_curIndex];
++_curIndex;
return loop;
}
void EventLoopPool::_run()
{
EventLoop loop; //一个loop
std::function<void()> fun = std::bind(&EventLoop::loop, &loop);
{
MutexLockGuard lock(_mutex);
_loops.push_back(&loop); //放入到loop缓冲池中
}
LOG_TRACE << "countDown , Current tid: " << CurrentThread::tid();
_countDownLatch.countDown(); //loop should in the _loops, otherwise getNextLoop will have error
if (fun) {
while (_running)
fun();
}
}
说明几点:
(1)EventLoop为栈对象,当加入到_loops后说明EventLoop已经完整初始化,此时调用_countDownLatch.countDown(),倒计时计数器将会减1;
标签:
原文地址:http://blog.csdn.net/skyuppour/article/details/44781029