Muduo源码笔记系列:
muduo源码阅读笔记(0、下载编译muduo)
muduo源码阅读笔记(1、同步日志)
muduo源码阅读笔记(2、对C语言原生的线程安全以及同步的API的封装)
muduo源码阅读笔记(3、线程和线程池的封装)
muduo源码阅读笔记(4、异步日志)
muduo源码阅读笔记(5、Channel和Poller)
muduo源码阅读笔记(6、ExevntLoop和Thread)
muduo源码阅读笔记(7、EventLoopThreadPool)
muduo源码阅读笔记(8、定时器TimerQueue)
muduo源码阅读笔记(9、TcpServer)
muduo源码阅读笔记(10、TcpConnection)
前言
终于到了Muduo网络库最最核心的部分,这里还是建议大家亲自看看源码。源码很好读,博客最多起到辅助作用。因为EventLoop和Thread是绑定的,所以,可能这两部分放在一起更适合。
了解ExevntLoop和Thread后,对One Loop Per Thread思想,就有了一个大体的轮廓,这种设计思想,真的很高效,因为,每个线程都有自己的资源,比如epoll、IO事件处理,定时器、任务队列等。每个线程内部资源都是自我维护的(自己的事情自己做), 除了对线程的任务队列进行操作时有一段极小的临界区需要加锁外,不涉及任何锁的竞争。这里为每个线程设置自己的任务队列的思想特别关键,正是利用每个线程只处理自己任务队列里面的回调任务,实现了将并行任务串行化的效果。 将原本的并发(涉及线程安全,需要加锁)操作,封装成任务(无需加锁)回调,添加到各自的任务队列中,交给线程自己处理。实现了线程的隔离 、无锁化编程 ,巧妙的利用单线程天生串行执行的优势。活该Muduo高性能、高并发。
EventLoop的实现 提供的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 class EventLoop : noncopyable{public : typedef std::function<void ()> Functor; EventLoop (); ~EventLoop (); void loop () ; void quit () ; void runInLoop (Functor cb) ; void queueInLoop (Functor cb) ; size_t queueSize () const ; void wakeup () ; void updateChannel (Channel* channel) ; void removeChannel (Channel* channel) ; void assertInLoopThread () bool isInLoopThread () const { return threadId_ == CurrentThread::tid (); } static EventLoop* getEventLoopOfCurrentThread () ; private : void abortNotInLoopThread () ; void handleRead () ; void doPendingFunctors () ; typedef std::vector<Channel*> ChannelList; bool looping_; std::atomic<bool > quit_; bool eventHandling_; bool callingPendingFunctors_; int64_t iteration_; const pid_t threadId_; Timestamp pollReturnTime_; std::unique_ptr<Poller> poller_; std::unique_ptr<TimerQueue> timerQueue_; int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_; boost::any context_; ChannelList activeChannels_; Channel* currentActiveChannel_; mutable MutexLock mutex_; std::vector<Functor> pendingFunctors_ GUARDED_BY (mutex_) ; };
Muduo在EventLoop中,使用了Linux系统中EventFd,作为wakeupChannel_的成员。这里主要为了将线程即时唤醒处理回调任务。如果你阅读过sylar的源码应该在这里会有所感知,EventLoop::wakeup()
函数其实作用和sylar中的IOManager::tickle()
类似。
此外,为了和Muduo的EventLoop适配,Muduo定时器的实现也是利用Linux上提供的TimeFd,在TimerQueue构造函数中,也会为该fd构造一个Channel,并将该Channel注册到EventLoop的Poller中,这样极大的方便了定时器的管理与维护。
简单画了一下EventLoop中核心函数EventLoop::loop()
的执行流程图:
实现的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 __thread EventLoop* t_loopInThisThread = 0 ; EventLoop::EventLoop () : threadId_ (CurrentThread::tid ()), poller_ (Poller::newDefaultPoller (this )), timerQueue_ (new TimerQueue (this )), wakeupFd_ (createEventfd ()), wakeupChannel_ (new Channel (this , wakeupFd_)){ LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread){ LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; }else { t_loopInThisThread = this ; } wakeupChannel_->setReadCallback ( std::bind (&EventLoop::handleRead, this )); wakeupChannel_->enableReading (); } EventLoop::~EventLoop (){ LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ << " destructs in thread " << CurrentThread::tid (); wakeupChannel_->disableAll (); wakeupChannel_->remove (); ::close (wakeupFd_); t_loopInThisThread = NULL ; } void EventLoop::loop () { assert (!looping_); assertInLoopThread (); looping_ = true ; quit_ = false ; LOG_TRACE << "EventLoop " << this << " start looping" ; while (!quit_){ activeChannels_.clear (); pollReturnTime_ = poller_->poll (kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel () <= Logger::TRACE){ printActiveChannels (); } eventHandling_ = true ; for (Channel* channel : activeChannels_){ currentActiveChannel_ = channel; currentActiveChannel_->handleEvent (pollReturnTime_); } currentActiveChannel_ = NULL ; eventHandling_ = false ; doPendingFunctors (); } LOG_TRACE << "EventLoop " << this << " stop looping" ; looping_ = false ; } void EventLoop::quit () { quit_ = true ; if (!isInLoopThread ()){ wakeup (); } } void EventLoop::runInLoop (Functor cb) { if (isInLoopThread ()) { cb (); }else { queueInLoop (std::move (cb)); } } void EventLoop::queueInLoop (Functor cb) { { MutexLockGuard lock (mutex_) ; pendingFunctors_.push_back (std::move (cb)); } if (!isInLoopThread () || callingPendingFunctors_){ wakeup (); } } void EventLoop::updateChannel (Channel* channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); poller_->updateChannel (channel); } void EventLoop::removeChannel (Channel* channel) { assert (channel->ownerLoop () == this ); assertInLoopThread (); if (eventHandling_){ assert (currentActiveChannel_ == channel || std::find (activeChannels_.begin (), activeChannels_.end (), channel) == activeChannels_.end ()); } poller_->removeChannel (channel); } void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t n = sockets::write (wakeupFd_, &one, sizeof one); if (n != sizeof one){ LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8" ; } } void EventLoop::handleRead () { uint64_t one = 1 ; ssize_t n = sockets::read (wakeupFd_, &one, sizeof one); if (n != sizeof one){ LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8" ; } } void EventLoop::doPendingFunctors () { std::vector<Functor> functors; callingPendingFunctors_ = true ; { MutexLockGuard lock (mutex_) ; functors.swap (pendingFunctors_); } for (const Functor& functor : functors){ functor (); } callingPendingFunctors_ = false ; }
细节明细: 在EventLoop类的成员变量的定义顺序中,poller_的定义位于timerQueue_之上,这个定义的顺序很关键,首先在EventLoop在构造时,先按成员变量的定义顺序构造成员变量,再会构造EventLoop本身。在析构的时候,会先析构自身,再会去按成员变量的定义顺序的倒序,去析构成员变量。考虑到timerQueue_的析构是依赖poller_的,Muduo的定义顺序(先析构timerQueue_,再析构poller_),正好规避了这个问题。
EventLoopThread的实现 简单讲,EventLoop作用就是让EventLoop::loop()跑在线程上。
EventLoopThread的实现代码是自解释的,代码量很少,也很容易理解。
提供的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class EventLoopThread : noncopyable{public : typedef std::function<void (EventLoop*)> ThreadInitCallback; EventLoopThread (const ThreadInitCallback& cb = ThreadInitCallback (), const string& name = string ()); ~EventLoopThread (); EventLoop* startLoop () ; private : void threadFunc () ; EventLoop* loop_ GUARDED_BY (mutex_) ; bool exiting_; Thread thread_; MutexLock mutex_; Condition cond_ GUARDED_BY (mutex_) ; ThreadInitCallback callback_; };
有了EventLoop和EventLoopThread后,结合muduo源码阅读笔记(3、线程和线程池的封装) 一个EventLoopThread线程启动流程如下:
EventLoopThread::startLoop() ->
Thread::start() ->
pthread_create(…, &detail::startThread,…) ->
startThread(void* obj) ->
ThreadData::runInThread() ->
Thread::func_() ->
EventLoopThread::threadFunc() ->
EventLoop::loop()
实现的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 EventLoopThread::EventLoopThread (const ThreadInitCallback& cb, const string& name) : loop_ (NULL ), exiting_ (false ), thread_ (std::bind (&EventLoopThread::threadFunc, this ), name), mutex_ (), cond_ (mutex_), callback_ (cb){ } EventLoopThread::~EventLoopThread (){ exiting_ = true ; if (loop_ != NULL ){ loop_->quit (); thread_.join (); } } EventLoop* EventLoopThread::startLoop () { assert (!thread_.started ()); thread_.start (); EventLoop* loop = NULL ; { MutexLockGuard lock (mutex_) ; while (loop_ == NULL ){ cond_.wait (); } loop = loop_; } return loop; } void EventLoopThread::threadFunc () { EventLoop loop; if (callback_){ callback_ (&loop); } { MutexLockGuard lock (mutex_) ; loop_ = &loop; cond_.notify (); } loop.loop (); MutexLockGuard lock (mutex_) ; loop_ = NULL ; }
总结 在Muduo网络库的设计中,EventLoop 统一使用文件描述符(file descriptor)的方式来处理事件,主要是基于以下一些好处和设计原则:
一致性: 使用文件描述符作为事件的抽象,使得对于不同类型的事件(包括套接字、定时器等)的处理方式一致。这种一致性简化了 EventLoop 内部的设计和实现,使得对于事件的处理更加通用。
多路复用: 文件描述符是多路复用(Multiplexing)机制的核心。通过将多个文件描述符注册到同一个 EventLoop 中,可以使用诸如 select、poll、epoll 等多路复用技术,实现同时监听多个事件并进行有效的事件分发。
高效性: 文件描述符的处理在操作系统层面已经高度优化,使用多路复用机制可以高效地管理和调度大量的事件。这对于实现高性能的网络库尤为重要。
如果读者有阅读其他网络库源码就会不可思议的发现,Muduo的One Loop Per Thread设计思想太精妙了,这种设计几乎不存在锁的竞争!!!
本章完结