去年写过一篇tbnet的分析文章,主要介绍tbnet框架结构及基本使用方法,最近又重读了下源码,有一些新的收获。
连接是核心
tbnet框架围绕网络连接(对应Connection类)展开,Connection主要包含如下成员。
class Connection {
bool _isServer; // 区分自己是server还是client
IPacketHandler *_defaultPacketHandler; // 客户端收到包的回调
IServerAdapter *_serverAdapter; // server收到包时的回调
Socket *_socket; // 底层socke的封装
IPacketStreamer *_streamer; // 对包网络数据进行编解码的接口
PacketQueue _outputQueue; // 输出队列
PacketQueue _inputQueue; // 输入队列
ChannelPool _channelPool; // ChannelPool,后面介绍
};
Connection是客户端与服务器之间建立的网络连接的抽象,tbnet默认使用长连接,如果不主动关闭会一直保持,直到超过一定阈值时间(默认15分钟)连接上没有传输过数据,会将连接关闭掉,这么做主要是为了防止系统资源被大量的无用连接耗尽。
区分server与client
大部分的应用场景,server和client在处理连接的时候行为都不大相同,tbnet通过给Connection设置标记来区分server和client,并允许server和client设置不同的回调接口(接收到网络包后如何处理),分别为_serverAdapter和_defaultPacketHandler,IServerAdapter和IPacketHandler都是抽象接口,基于tbnet进行开发时,开发者需要自己实现这两个回调接口。
class IServerAdapter {
virtual ~IServerAdapter();
virtual IPacketHandler::HPRetCode handlePacket(Connection *connection, Packet *packet) = 0;
// 这个用于扩展server批量处理网络请求
virtual bool handleBatchPacket(Connection *connection, PacketQueue &packetQueue) {
return false;
}
};
class IPacketHandler {
virtual ~IPacketHandler() {}
virtual HPRetCode handlePacket(Packet *packet, void *args) = 0;
};
网络包
tbnet将网络上传输的数据抽象为网络包,对用Packet类,每个packet都拥有一个头部,头部是定长的,包含标识网络包的id,网络包的长度等信息,每个网络包都有一个encode和decode的接口,分别用于将网络包转化为二进制数据、以及将二进制数据转化为网络包。上层如果需要定义新的网络包,只需要继承Packet并实现编解码接口即可。
class PacketHeader {
uint32_t _chid; // channel id
int _pcode; // 网络包id
int _dataLen; // 网络包长度
};
class Packet {
virtual ~Packet();
virtual bool encode(DataBuffer *output) = 0;
virtual bool decode(DataBuffer *input, PacketHeader *header) = 0;
PacketHeader _packetHeader; // 包头
int64_t _expireTime; // 超时时间
Channel *_channel; // packet对应的Channel
};
class Channel {
uint32_t _id; // channel id
void *_args; // extra args
IPacketHandler *_handler; // 回调函数
int64_t _expireTime; // channel超时时间
};
上面已经多次提到Channel,channel在tbnet里的作用适用于”关联”其请求包和反馈包的,比如client发送一个请求A给server,server恢复一个B包给client,client如何得知B包是对A包的回复呢。tbnet的实现方式是在网络包发送时为其分配一个Channel,每个Channel有一个id唯一标识,这个id也将作为网络包头部的一部分被发送到server端,server在回包时,将接受到的请求包里的channel id设置为反馈包的channel id,client在接收到反馈包时,从channel id就知道这个包是对应哪个请求包的。
Chnanel里除了包含id外,还有hander和args成员,前者是发包是为这个包设置的回调函数(接收到针对这个包的反馈包时如何处理),后者是发包时指定的额外参数指针,在接收到反馈包时这个指针会被传递给IPacketHandler::handlePacket。
输入输出队列
每个连接对应一个网络包输入队列和一个输出队列_inputQueue,_outputQueue,tbnet抽象出的网络包处理逻辑如下:
- 在连接上发包,直接将包添加到连接的输出队列,后续工作交给eventLoop处理(后面介绍)
- 在连接上收到包,如果是server,调用IServerAdapter::handlePacket;如果是client,调用IPacketHandler::handlePacket处理网络包。如果server设置了batch模式,则收到的网络包会直接加入到连接的输入队列,当输入队列里包数超过某阈值时,调用IServerAdapter::handleBatchPacket批量处理输入队列里的网络包。
发送网络包由Connection::postPacket接口完成。
bool Connection::postPacket(Packet *packet, IPacketHandler *packetHandler, void *args, bool noblocking) {
// 如果是client, 并且有queue长度的限制
_outputCond.lock();
_queueTotalSize = _outputQueue.size() + _channelPool.getUseListCount() + _myQueue.size();
if (!_isServer && _queueLimit > 0 && noblocking && _queueTotalSize >= _queueLimit) {
_outputCond.unlock();
return false;
}
_outputCond.unlock();
Channel *channel = NULL;
packet->setExpireTime(_queueTimeout); // 设置超时
if (_streamer->existPacketHeader()) { // 存在包头
uint32_t chid = packet->getChannelId(); // 从packet中取
if (_isServer) {
assert(chid != 0); // server回包不能为空
} else {
channel = _channelPool.allocChannel(); // 客户端每次分配新的Channel
channel->setHandler(packetHandler); // 设置收到反馈包时
channel->setArgs(args); // 将发包时的参数存在Channel里
packet->setChannel(channel); // 设置回去
}
}
_outputCond.lock();
// 写入到outputqueue中
_outputQueue.push(packet);
if (_iocomponent != NULL && _outputQueue.size() == 1U) {
_iocomponent->enableWrite(true);
}
_outputCond.unlock();
return true;
}
接受网络包由Connection::handlePacket接口完成。
bool Connection::handlePacket(DataBuffer *input, PacketHeader *header) {
Packet *packet;
IPacketHandler::HPRetCode rc;
void *args = NULL;
Channel *channel = NULL;
IPacketHandler *packetHandler = NULL;
if (_streamer->existPacketHeader() && !_isServer) { // 存在包头
uint32_t chid = header->_chid; // 从header中取
chid = (chid & 0xFFFFFFF);
channel = _channelPool.offerChannel(chid);
// channel没找到,说明请求已经超时了,Channel已经被移除
if (channel == NULL) {
input->drainData(header->_dataLen);
TBSYS_LOG(WARN, "没找到channel, id: %u, %s", chid, tbsys::CNetUtil::addrToString(getServerId()).c_str());
return false;
}
packetHandler = channel->getHandler();
args = channel->getArgs();
}
// 将接收到的数据解码为Packet
packet = _streamer->decode(input, header); // 实际的编解码工作由DefaultPacketStreamer完成
if (packet == NULL) {
packet = &ControlPacket::BadPacket;
} else {
packet->setPacketHeader(header);
// server端批量处理模式, 直接放入queue, 返回
if (_isServer && _serverAdapter->_batchPushPacket) {
if (_iocomponent) _iocomponent->addRef();
_inputQueue.push(packet);
if (_inputQueue.size() >= 15) { // 大于15个packet就调用一次
_serverAdapter->handleBatchPacket(this, _inputQueue);
_inputQueue.clear();
}
return true;
}
}
// 回调处理接收到的网络包
if (_isServer) {
rc = _serverAdapter->handlePacket(this, packet); // server端的回调
} else {
rc = packetHandler->handlePacket(packet, args); // client的回调
}
return true;
}
两个核心线程
上面介绍了tbnet在网络包层面上的处理逻辑,发送一个网络包,最终要将该网络包通过OS底层接口发送到tcp队列,这里就开始涉及tbnet的事件模型,tbnet默认使用epoll来处理读写事件。
eventLoop线程不断获取ready的读写事件进行处理,timeout线程将超时的请求从请求队列里移除掉(发了请求,但超过指定时间没有收到回复),timeout线程的处理逻辑比较简单,这里主要介绍eventLoop的具体实现。
void Transport::eventLoop(SocketEvent *socketEvent) {
IOEvent events[MAX_SOCKET_EVENTS];
while (!_stop) {
// 检查是否有事件发生
int cnt = socketEvent->getEvents(1000, events, MAX_SOCKET_EVENTS);
// 循环处理每个事件
for (int i = 0; i < cnt; i++) {
IOComponent *ioc = events[i]._ioc;
ioc->addRef();
// 读写
bool rc = true;
if (events[i]._readOccurred) {
rc = ioc->handleReadEvent(); // 可读,处理读事件
}
if (rc && events[i]._writeOccurred) {
rc = ioc->handleWriteEvent(); // 可写,处理写事件
}
ioc->subRef();
if (!rc) {
removeComponent(ioc);
}
}
}
}
上面代码里的IOComponent是对Socket、Event和Connection的封装,eventLoop的工作就是不断的通过SocketEvent(封装epoll实现)来获取可读或者可写的事件,分别调用对应IOComponent的handleReadEvent和handleWriteEvent接口。
处理读写事件时,监听文件描述符和普通连接的文件描述符的处理是不同的,他们的行为分别在TcpAcceptor和TcpComponent两个类里实现。
先来看看针对监听描述符上的读写是如何处理的,实际上监听文件描述符上只会有读事件发生。
bool TCPAcceptor::handleReadEvent() {
Socket *socket;
while ((socket = ((ServerSocket*)_socket)->accept()) != NULL) {
TCPComponent *component = new TCPComponent(_owner, socket, _streamer, _serverAdapter);
if (!component->init(true)) {
delete component;
return true;
}
// 加入到iocomponents中,及注册可读到socketevent中
_owner->addComponent(component, true, false);
return true;
}
再看TcpComponent是如何处理读写事件的,最终TcpConnection的readData和writeData被调用。
bool TCPComponent::handleWriteEvent() {
_lastUseTime = tbsys::CTimeUtil::getTime();
bool rc = true;
if (_state == TBNET_CONNECTED) {
rc = _connection->writeData(); // 调用TcpConnection的writeData()
}
return rc;
}
bool TCPComponent::handleReadEvent() {
_lastUseTime = tbsys::CTimeUtil::getTime();
bool rc = false;
if (_state == TBNET_CONNECTED) {
rc = _connection->readData(); // 调用TcpConnection的readData()
}
return rc;
}
TcpConnection::writeData的处理比较简单,直接将输出队列里的网络包,逐个编码为二进制数据,然后调用write接口写到tcp层。
bool TCPConnection::writeData() {
// 将输出队列里的包移到临时队列
_outputCond.lock();
_outputQueue.moveTo(&_myQueue);
if (_myQueue.size() == 0 && _output.getDataLen() == 0) { // 返回
_iocomponent->enableWrite(false);
_outputCond.unlock();
return true;
}
_outputCond.unlock();
Packet *packet;
int ret;
int writeCnt = 0;
int myQueueSize = _myQueue.size();
do {
while (_output.getDataLen() < READ_WRITE_SIZE) {
if (myQueueSize == 0)
break;
packet = _myQueue.pop(); // 从队列取出网络包
myQueueSize --;
_streamer->encode(packet, &_output); // 将网络包编码为二进制数据
_channelPool.setExpireTime(packet->getChannel(), packet->getExpireTime());
packet->free();
TBNET_COUNT_PACKET_WRITE(1);
}
if (_output.getDataLen() == 0) {
break;
}
// write data
ret = _socket->write(_output.getData(), _output.getDataLen()); // 实际发送数据
if (ret > 0) {
_output.drainData(ret);
}
writeCnt ++;
} while (ret > 0 && _output.getDataLen() == 0 && myQueueSize>0 && writeCnt < 10);
return true;
}
TcpConnection::readData的处理稍微复杂一些,主要是要考虑到网络包边界问题,比如上层在实现一个网络包的编解码协议时,编码的时候错误的多编码了几个字节,而在解码时,并没有将多余的字节从接受buffer里移除,如果不做任何处理,这多余的几个字节就被当成下一个网络包的包头部分处理,这样就会导致接下来所有的网络包都会解析错误。tbnet为了解决这个问题,每个数据包先添加一个魔数,代表一个网络包的开始,然后再是包头和实际的数据。在解析数据包时,只有遇到魔数才会认为是一个读到网络包头。(如果很不幸,多编码的数据刚好有跟魔数相等的,网络包的解析还是会乱掉)
bool TCPConnection::readData() {
_input.ensureFree(READ_WRITE_SIZE);
int ret = _socket->read(_input.getFree(), _input.getFreeLen());
int readCnt = 0;
int freeLen = 0;
bool broken = false;
while (ret > 0) {
_input.pourData(ret);
freeLen = _input.getFreeLen();
while (1) {
if (!_gotHeader) {
_gotHeader = _streamer->getPacketInfo(&_input, &_packetHeader, &broken);
if (broken) break;
}
// 如果有足够的数据, decode, 并且调用handlepacket
if (_gotHeader && _input.getDataLen() >= _packetHeader._dataLen) {
handlePacket(&_input, &_packetHeader); // 调用Connection::handlePacket
_gotHeader = false;
_packetHeader._dataLen = 0;
TBNET_COUNT_PACKET_READ(1);
} else {
break;
}
}
if (broken || freeLen > 0 || readCnt >= 10) {
break;
}
if (_packetHeader._dataLen - _input.getDataLen() > READ_WRITE_SIZE) {
_input.ensureFree(_packetHeader._dataLen - _input.getDataLen());
} else {
_input.ensureFree(READ_WRITE_SIZE);
}
ret = _socket->read(_input.getFree(), _input.getFreeLen());
readCnt++;
}
return !broken;
}
连接管理
客户端要发送数据时先通过Transport::connect建立到server的连接,然后就可以调用postPacekt发包了,为了达到长连接复用的目的,tbnet封装了ConnectionMananger,每次客户端建立都某server的连接,就将serverid与Connection加入到一个map中,下次如果需要像同一个server发包,直接根据serverid从map中取出Connection使用。
ref:
2013-04-19 14:14
blog.yunnotes.net