TcpServer供用户直接使用
,生命期由用户控制。用户只需设置好callback,再调用start即可。
TcpServer内部使用Acceptor来获取新连接的fd
。TcpServer会为新连接创建对应的TcpConnection对象。
在TcpServer构造函数中先初始化acceptor成员,acceptor(new Acceptor(loop, listenAddr))
// Acceptor::handleRead函数中会回调用TcpServer::newConnection
// _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection, this, _1, _2));
调用TcpServer::start(),开始Acceptor::listen()。新的连接到达,TcpServer::acceptor.acceptChannel可读,poll返回,调用Channel::handleEvent()处理活动通道,调用Acceptor::handleRead(),函数中调用accept(2)来接受新连接,并回调TcpServer::newConnection()。
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr(0);
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
TcpServer::newConnection中,创建TcpConnection对象coon,把它加入ConnectionMap,设置好callback,再调用coon->connectEstablished,回调用户提供的connectionCallback_。
//传入connfd
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
......
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn; // conn 是TcpConnectionPtr 对象
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->connectEstablished();
}
void TcpConnection::connectEstablished()
{
channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注
connectionCallback_(shared_from_this());
}
现在一个新连接已建立,对等方发送数据到connfd,内核接收缓冲区不为空,TcpConnection::channel_可读事件发生,poll返回,调用Channel::handleEvent()处理活动通道,调用TcpConnection::handleRead()。
/ 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
void TcpConnection::handleRead(Timestamp receiveTime)
{
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
messageCallback_(shared_from_this(), buf, n);
}
Acceptor TcpServer接收新连接时序图
测试代码
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr &conn,
const char *data,
ssize_t len)
{
printf("onMessage(): received %zd bytes from connection [%s]\n",
len, conn->name().c_str());
}
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TcpServer server(&loop, listenAddr, "TestServer");
server.setConnectionCallback(onConnection);
server.setMessageCallback(onMessage);
server.start();
loop.loop();
}
多线程TcpServer
one loop per thread的思想实现多线程TcpServer的关键步骤是在新建TcpConnection时从evenloop pool里挑一个给TcpConnection用
。ioLoop和loop_间的线程切换都发生在连接建立和断开的时刻
,不影响正常业务的性能。
也就是说多线程TcpServer自己的EvenLoop只用来接受新连接,而新连接会使用其他的EvenLoop来执行IO。
单线程TcpServer的EvenLoop是与TcpConnection共享的。muduo的evenloop pool由EventLoopThreadPool实现。