muduo库Acceptor TcpServer接收新连接

TcpServer供用户直接使用,生命期由用户控制。用户只需设置好callback,再调用start即可。TcpServer内部使用Acceptor来获取新连接的fd。TcpServer会为新连接创建对应的TcpConnection对象。

在TcpServer构造函数中先初始化acceptor成员,acceptor(new Acceptor(loop, listenAddr))

// Acceptor::handleRead函数中会回调用TcpServer::newConnection
// _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
acceptor_->setNewConnectionCallback(
    boost::bind(&TcpServer::newConnection, this, _1, _2));

调用TcpServer::start(),开始Acceptor::listen()。新的连接到达,TcpServer::acceptor.acceptChannel可读,poll返回,调用Channel::handleEvent()处理活动通道,调用Acceptor::handleRead(),函数中调用accept(2)来接受新连接,并回调TcpServer::newConnection()。

void Acceptor::handleRead()
{
    loop_->assertInLoopThread();
    InetAddress peerAddr(0);
    //FIXME loop until no more
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        // string hostport = peerAddr.toIpPort();
        // LOG_TRACE << "Accepts of " << hostport;
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr);
        }
        else
        {
            sockets::close(connfd);
        }
    }
    else
    {
        // Read the section named "The special problem of
        // accept()ing when you can't" in libev's doc.
        // By Marc Lehmann, author of libev.
        if (errno == EMFILE)
        {
            ::close(idleFd_);
            idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
            ::close(idleFd_);
            idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
        }
    }
}

TcpServer::newConnection中,创建TcpConnection对象coon,把它加入ConnectionMap,设置好callback,再调用coon->connectEstablished,回调用户提供的connectionCallback_。

//传入connfd
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    ......
    TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));

    connections_[connName] = conn; // conn 是TcpConnectionPtr 对象
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->connectEstablished();
}

void TcpConnection::connectEstablished()
{
    channel_->enableReading();  // TcpConnection所对应的通道加入到Poller关注
    connectionCallback_(shared_from_this());
}

现在一个新连接已建立,对等方发送数据到connfd,内核接收缓冲区不为空,TcpConnection::channel_可读事件发生,poll返回,调用Channel::handleEvent()处理活动通道,调用TcpConnection::handleRead()。

/ 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
    boost::bind(&TcpConnection::handleRead, this, _1));

void TcpConnection::handleRead(Timestamp receiveTime)
{
    ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
    messageCallback_(shared_from_this(), buf, n);
}

Acceptor TcpServer接收新连接时序图
Reactor

测试代码

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void onConnection(const TcpConnectionPtr &conn)
{
    if (conn->connected())
    {
        printf("onConnection(): new connection [%s] from %s\n",
               conn->name().c_str(),
               conn->peerAddress().toIpPort().c_str());
    }
    else
    {
        printf("onConnection(): connection [%s] is down\n",
               conn->name().c_str());
    }
}

void onMessage(const TcpConnectionPtr &conn,
               const char *data,
               ssize_t len)
{
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
}

int main()
{
    printf("main(): pid = %d\n", getpid());

    InetAddress listenAddr(8888);
    EventLoop loop;

    TcpServer server(&loop, listenAddr, "TestServer");
    server.setConnectionCallback(onConnection);
    server.setMessageCallback(onMessage);
    server.start();

    loop.loop();
}

多线程TcpServer

one loop per thread的思想实现多线程TcpServer的关键步骤是在新建TcpConnection时从evenloop pool里挑一个给TcpConnection用。ioLoop和loop_间的线程切换都发生在连接建立和断开的时刻,不影响正常业务的性能。

也就是说多线程TcpServer自己的EvenLoop只用来接受新连接,而新连接会使用其他的EvenLoop来执行IO。单线程TcpServer的EvenLoop是与TcpConnection共享的。muduo的evenloop pool由EventLoopThreadPool实现。