Muduo源码笔记系列:
muduo源码阅读笔记(0、下载编译muduo)
muduo源码阅读笔记(1、同步日志)
muduo源码阅读笔记(2、对C语言原生的线程安全以及同步的API的封装)
muduo源码阅读笔记(3、线程和线程池的封装)
muduo源码阅读笔记(4、异步日志)
muduo源码阅读笔记(5、Channel和Poller)
muduo源码阅读笔记(6、ExevntLoop和Thread)
muduo源码阅读笔记(7、EventLoopThreadPool)
muduo源码阅读笔记(8、定时器TimerQueue)
muduo源码阅读笔记(9、TcpServer)
muduo源码阅读笔记(10、TcpConnection)
muduo源码阅读笔记(11、TcpClient)
前言
本章新涉及的文件有:
TcpClient.h/cc:和TcpServer不同的是,TcpClient位于客户端,主要是对客户发起的连接进行管理,TcpClient只有一个loop,也会和TcpConnection配合,将三次握手连接成功的sockfd交由TcpConnection管理。
Connector.h/cc:Muduo将一个客户端的sock分成了两个阶段,分别是:连接阶段、读写阶段,Connector就是负责fd的连接阶段,当一个sockfd连接成功后,将sockfd传给TcpClient,由TcpClient将sockfd传给TcpConnection进行读写管理,Connector和TcpServer的Acceptor在设计上有这类似的思想,不同的是,Connector是可以针对同一个ip地址进行多次连接,产生不同的sockfd、而Acceptor是去读listen sock来接收连接,产生不同sockfd。
总体来说,TcpClient的实现是严格遵循 TcpServer的实现的,
Connector的实现 提供的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class Connector : noncopyable, public std::enable_shared_from_this<Connector>{ public : typedef std::function<void (int sockfd)> NewConnectionCallback; Connector (EventLoop* loop, const InetAddress& serverAddr); ~Connector (); void setNewConnectionCallback (const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } void start () ; void restart () ; void stop () ; const InetAddress& serverAddress () const { return serverAddr_; } private : enum States { kDisconnected, kConnecting, kConnected }; static const int kMaxRetryDelayMs = 30 *1000 ; static const int kInitRetryDelayMs = 500 ; void setState (States s) { state_ = s; } void startInLoop () ; void stopInLoop () ; void connect () ; void connecting (int sockfd) ; void handleWrite () ; void handleError () ; void retry (int sockfd) ; int removeAndResetChannel () ; void resetChannel () ; EventLoop* loop_; InetAddress serverAddr_; bool connect_; States state_; std::unique_ptr<Channel> channel_; NewConnectionCallback newConnectionCallback_; int retryDelayMs_; };
简单记录一下连接阶段启动流程:
调用Connector::start()->
connect_ 赋值为 true。
在loop任务队列追加Connector::startInLoop()回调任务
执行回调任务:Connector::startInLoop()
调用Connector::connect()
创建非阻塞的连接sock
::connect(sock, …)
调用Connector::connecting(int sockfd)
new channel(sockfd)赋值给channel_将Connector::handleWrite()和Connector::handleError()设置给cahnnel的写回调以及错误处理回调
使能Poller开始监听sockfd
当连接成功,会触发sockfd的写事件,从而调用Connector::handleWrite()->
将sockfd和channel_解绑,并将channel_ rest。
调用newConnectionCallback_(也即TcpClient::newConnection)将连接完成的sockfd传给TcpClient处理
感兴趣的读者,可以自行阅读源码,了解连接过程中,stop、retry的流程。
实现的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 void Connector::start () { connect_ = true ; loop_->runInLoop (std::bind (&Connector::startInLoop, this )); } void Connector::startInLoop () { loop_->assertInLoopThread (); assert (state_ == kDisconnected); if (connect_){ connect (); }else { LOG_DEBUG << "do not connect" ; } } void Connector::stop () { connect_ = false ; loop_->queueInLoop (std::bind (&Connector::stopInLoop, this )); } void Connector::stopInLoop () { loop_->assertInLoopThread (); if (state_ == kConnecting){ setState (kDisconnected); int sockfd = removeAndResetChannel (); retry (sockfd); } } void Connector::connect () { int sockfd = sockets::createNonblockingOrDie (serverAddr_.family ()); int ret = sockets::connect (sockfd, serverAddr_.getSockAddr ()); int savedErrno = (ret == 0 ) ? 0 : errno; switch (savedErrno){ case 0 : case EINPROGRESS: case EINTR: case EISCONN: connecting (sockfd); break ; } } void Connector::connecting (int sockfd) { setState (kConnecting); assert (!channel_); channel_.reset (new Channel (loop_, sockfd)); channel_->setWriteCallback ( std::bind (&Connector::handleWrite, this )); channel_->setErrorCallback ( std::bind (&Connector::handleError, this )); channel_->enableWriting (); } int Connector::removeAndResetChannel () { channel_->disableAll (); channel_->remove (); int sockfd = channel_->fd (); loop_->queueInLoop (std::bind (&Connector::resetChannel, this )); return sockfd; } void Connector::resetChannel () { channel_.reset (); } void Connector::handleWrite () { LOG_TRACE << "Connector::handleWrite " << state_; if (state_ == kConnecting){ int sockfd = removeAndResetChannel (); int err = sockets::getSocketError (sockfd); if (err){ LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl (err); retry (sockfd); }else { setState (kConnected); if (connect_){ newConnectionCallback_ (sockfd); }else { sockets::close (sockfd); } } }else { assert (state_ == kDisconnected); } } void Connector::handleError () { LOG_ERROR << "Connector::handleError state=" << state_; if (state_ == kConnecting){ int sockfd = removeAndResetChannel (); int err = sockets::getSocketError (sockfd); LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl (err); retry (sockfd); } } void Connector::retry (int sockfd) { sockets::close (sockfd); setState (kDisconnected); if (connect_){ LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort () << " in " << retryDelayMs_ << " milliseconds. " ; loop_->runAfter (retryDelayMs_/1000.0 , std::bind (&Connector::startInLoop, shared_from_this ())); retryDelayMs_ = std::min (retryDelayMs_ * 2 , kMaxRetryDelayMs); }else { LOG_DEBUG << "do not connect" ; } }
TcpClient的实现 提供的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 class TcpClient : noncopyable{ public : TcpClient (EventLoop* loop, const InetAddress& serverAddr, const string& nameArg); ~TcpClient (); void connect () ; void disconnect () ; void stop () ; TcpConnectionPtr connection () const { MutexLockGuard lock (mutex_) ; return connection_; } EventLoop* getLoop () const { return loop_; } bool retry () const { return retry_; } void enableRetry () { retry_ = true ; } const string& name () const { return name_; } void setConnectionCallback (ConnectionCallback cb) { connectionCallback_ = std::move (cb); } void setMessageCallback (MessageCallback cb) { messageCallback_ = std::move (cb); } void setWriteCompleteCallback (WriteCompleteCallback cb) { writeCompleteCallback_ = std::move (cb); }private : void newConnection (int sockfd) ; void removeConnection (const TcpConnectionPtr& conn) ; EventLoop* loop_; ConnectorPtr connector_; const string name_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; bool retry_; bool connect_; int nextConnId_; mutable MutexLock mutex_; TcpConnectionPtr connection_ GUARDED_BY (mutex_) ; };
TcpClient核心函数TcpClient::newConnection,该函数会作为连接器的回调,当sockfd连接成功后,该函数被调用,设置必要信息后,为该sockfd产生一个TcpConnection对象,后续该fd的读写,全权交由TcpConnection处理。逻辑比较简单,实现如下:
实现的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 TcpClient::TcpClient (EventLoop* loop, const InetAddress& serverAddr, const string& nameArg) : loop_ (CHECK_NOTNULL (loop)), connector_ (new Connector (loop, serverAddr)), name_ (nameArg), connectionCallback_ (defaultConnectionCallback), messageCallback_ (defaultMessageCallback), retry_ (false ), connect_ (true ), nextConnId_ (1 ){ connector_->setNewConnectionCallback ( std::bind (&TcpClient::newConnection, this , _1)); LOG_INFO << "TcpClient::TcpClient[" << name_ << "] - connector " << get_pointer (connector_); } void TcpClient::connect () { LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to " << connector_->serverAddress ().toIpPort (); connect_ = true ; connector_->start (); } void TcpClient::disconnect () { connect_ = false ; { MutexLockGuard lock (mutex_) ; if (connection_){ connection_->shutdown (); } } } void TcpClient::stop () { connect_ = false ; connector_->stop (); } void TcpClient::newConnection (int sockfd) { loop_->assertInLoopThread (); InetAddress peerAddr (sockets::getPeerAddr(sockfd)) ; char buf[32 ]; snprintf (buf, sizeof buf, ":%s#%d" , peerAddr.toIpPort ().c_str (), nextConnId_); ++nextConnId_; string connName = name_ + buf; InetAddress localAddr (sockets::getLocalAddr(sockfd)) ; TcpConnectionPtr conn (new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr)) ; conn->setConnectionCallback (connectionCallback_); conn->setMessageCallback (messageCallback_); conn->setWriteCompleteCallback (writeCompleteCallback_); conn->setCloseCallback ( std::bind (&TcpClient::removeConnection, this , _1)); { MutexLockGuard lock (mutex_) ; connection_ = conn; } conn->connectEstablished (); } void TcpClient::removeConnection (const TcpConnectionPtr& conn) { loop_->assertInLoopThread (); assert (loop_ == conn->getLoop ()); { MutexLockGuard lock (mutex_) ; assert (connection_ == conn); connection_.reset (); } loop_->queueInLoop (std::bind (&TcpConnection::connectDestroyed, conn)); if (retry_ && connect_){ LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to " << connector_->serverAddress ().toIpPort (); connector_->restart (); } }
细节明细: 疑问
在TcpConnection::handleClose()实现当中,为什么没有调用close,关闭sockfd?也看了一下TcpConnection的析构、TcpConnection::connectDestroyed(),没有一个地方调用了close来关闭sockfd
解答
在 TcpConnection 对象析构的时候。TcpConnection 持有一个 Socket 对象,Socket 是一个 RAII handler,它的析构函数会 close(sockfd_)。这样,如果发生 TcpConnection 对象泄漏,那么我们从 /proc/pid/fd/ 就能找到没有关闭的文件描述符,便于查错。
原文链接:https://blog.csdn.net/Solstice/article/details/6208634
总结 Muduo设计的TcpServer和TcpClient代码思想及其统一,一些算法题也是需要这样的抽象思维,所以我认为这也是以后从事it最重要的品质,可以避免很多不必要的bug。