连接建立起来之后,我们就可以通过UDT Socket进行数据的收发了。先来看用来发送数据的几个函数。UDT提供了如下的几个函数用于不同目的下的数据发送:
|
|
send()用来进行流式的数据发送;sendmsg()用来进行数据报式的数据发送;sendfile()与sendfile2()用来执行文件的发送,流式发送,这两者基本一样,仅有的差异在于,前者接收文件的流来发送,而后者则接收文件的路径。
UDT sendmsg()
这里先来看UDT::sendmsg():
|
|
这个API的实现结构与之前看到的listen()、bind()这些略微有点区别,在CUDT的API层,会调用CUDT 对应的实现函数,调用过程:UDT::sendmsg() -> CUDT::sendmsg(UDTSOCKET u, const char buf, int len, int ttl, bool inorder) -> CUDT::sendmsg(const char data, int len, int msttl, bool inorder)。直接来看CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)(src/core.cpp):
|
|
CUDT::sendmsg()主要做了如下这样的一些事情:
检查UDT Socket的类型SockType,若为UDT_STREAM则直接抛异常退出,否则继续执行。
检查CUDT的状态,若UDT Socket不处于Connected状态,就抛异常退出,否则继续执行。
检查传入的参数,主要是数据的长度,既不能太大也不能太小。数据长度太小是指,小于等于0;太大是指,超出了CUDT发送缓冲区的最大大小。若参数无效,就返回,否则继续执行。在默认不通过UDT::setsockopt()修改m_iSndBufSize和m_iMSS这些选项的情况下,m_iSndBufSize为8192,m_iPayloadSize为1456,也就是大概12MB。
当CUDT发送缓冲区的已用大小为0时,会将m_ullLastRspTime更新为当前时间。
检查CUDT发送缓冲区中可用大小,若可用大小不足消息的长度时,执行waitBlockingSending()等待有足够大小可用。在waitBlockingSending()中可以看到,它主要处理了这样3中情况:
(1). UDT Socket不是处于同步发送模式。抛异常,结束发送的整个流程。
(2). 发送的超时时间m_iSndTimeOut为一个小于0的无效值,则永久等待,直到UDT Socket被关掉。
(3). 发送的超时时间m_iSndTimeOut为一个大于等于0的有效值,则等待m_iSndTimeOut个ms或UDT Socket被关闭。在CUDT的构造函数中,iSndTimeOut默认是被设置为-1的,但可以通过UDT::setsockopt()进行设置。
对于后两种情况里,若等待过程是由于CUDT状态变得无效而终止,则还将抛出异常以结束发送过程。
- 检查CUDT发送缓冲区中可用大小,若可用大小不足消息的长度时,则说明waitBlockingSending()可能是因如下的几种情况中的一种出现而结束:
(1). CUDT处于非同步发送模式而直接结束。
(2). CUDT处于同步模式,m_iSndTimeOut为一个有效值,但超市时间到来时仍然没有等到发送缓冲区中有足够的空间。
对于第一种情况的处理是直接返回0。对于第二种情况的处理则是抛出异常。waitBlockingSending()等待过程终结由于UDT Socket而造成的情况,waitBlockingSending()自己会抛出异常的,而不会走到这一步。
当发送缓冲区的已用大小为0时,更新m_llSndDurationCounter为当前时间。
执行m_pSndBuffer->addBuffer(data, len, msttl, inorder)将要发送的数据放入发送缓冲区。
将这个socket加入发送队列SndQueue的发送列表m_pSndUList中。
返回数据长度,也即发送的数据长度。
这也是一个生产与消费的故事。在这里发起发送的线程为生产者,真正将数据发送到网络上的的发送队列RcvQueue的worker线程则为消费者。在UDT::sendmsg()的执行过程中,我们同样只能看到这个故事的一半,生产的那一半。后面会再来分析故事的另一半。
UDT send()
然后来看UDT::send()(src/api.cpp):
|
|
调用用过程:UDT::send() -> CUDT::send(UDTSOCKET u, const char buf, int len, int) -> CUDT::send(const char data, int len)。直接来看CUDT::send(const char* data, int len)(src/core.cpp):
|
|
这个函数与CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)的执行过程极为相似,但还是有如下这样的一些区别:
这个函数的执行要求UDT Socket的类型必须为UDT_STREAM,而不是UDT_DGRAM,这一点与CUDT::sendmsg()正好相反。
要发送的数据的长度,只要大于0就可以了。而在CUDT::sendmsg()中会限制要发送的数据的大小不能超过发送缓冲区的最大大小。
在CUDT::sendmsg()中对数据发送的规则执行的是,要么全部发送,要么一点也不发送。而在这里则是,只要发送缓冲区还没有满,则会将数据尽可能多的加进发送缓冲区以待发送。
其它则完全一样。
发送缓冲区CSndBuffer
来看一下CUDT中用来管理待发送数据的发送缓冲区CSndBuffer,先来看它的定义:
|
|
在这个结构中,似乎在用几个单链表来管理发送缓冲区的数据存储区,struct Buffer的链表,和struct Block的链表,但这些结构究竟如何组织,每个链表的意义是什么,还是要看下几个成员函数的定义。首先是构造函数:
|
|
在这个构造函数中会做如下这样一些事情:
- 在成员初始化列表中初始化所有的成员变量。注意,m_iNextMsgNo被初始化为了1,表示已用blocks数量的m_iCount被初始化为了0。
初始化物理buffer,即分配一个Buffer结构m_pBuffer,为这个Buffer结构分配一块大小为m_iSize * m_iMSS的内存,初始化Buffer结构的m_iSize为m_iSize。m_iSize和m_iMSS的值来自于传入的两个参数size和mss。在connect成功(无论是主动发起连接的UDT Socket,还是由Listening的UDT Socket创建的都一样),创建CSndBuffer时,size值为32,而mss值为m_iPayloadSize。在前面 UDT协议实现分析——连接的建立 一文中我们有仔细地分析过m_iPayloadSize这个值的计算过程。
创建一个Block结构的循环链表,m_pBlock指向链表头。链表的长度同样为m_iSize,也就是32。
初始化前一步创建的链表。使每个Block结构的m_pcData指向m_pBuffer->m_pcData的不同位置,相邻的两个Block结构,所指位置的距离为m_iMSS。
由此不难猜测,Buffer用于实际保存要发送的数据。而Block结构则用于将Buffer的数据缓冲区分段管理。
将m_pFirstBlock、m_pCurrBlock和m_pLastBlock的值都初始化为m_pBlock的值。
初始化用于同步缓冲区操作的m_BufLock。
这里我们弄清了struct Buffer和struct Block,但还是有许多问题还没有弄清楚。m_pBlock,m_pFirstBlock,m_pCurrBlock和m_pLastBlock这几个指针的含义是什么?struct Buffer链表的扩展与收缩,MsgNo的意义等。
接着就来看一下CSndBuffer的其它一些成员函数。来看向CSndBuffer中添加数据的CSndBuffer::addBuffer():
|
|
这个函数的执行过程大体如下:
向CSndBuffer中添加数据总是以整块Block为单位的,对于最后不满整块Block的数据仍然占用整块Block。在这个函数中做的第一件事情就是,计算添加所有的数据需要的Block的个数。
如果CSndBuffer中的可用空间不足,则扩展CSndBuffer空间的大小,直到能满足添加的数据的需求为止。
计算inorder。
通过一个循环将数据复制到CSndBuffer中。
在这段code中,我们不难想到m_pLastBlock指向的是最后一块已用Block之后的那块Block。
关于UDT的Msg,Msg指的是一次UDT::send()或UDT::sendmsg()所发的全部数据。一个Msg中所有的Block共用了相同的一个m_iNextMsgNo。
这里可以看到UDT中Msg的开始与结束的表示方法:Block的m_iMsgNo的最高两位被用来指示Msg的开始和结束,最高位为1表示Msg的开始,第二高位为1则表示Msg的结束。
更新表示一用Block数的m_iCount。
更新m_iNextMsgNo,达到最大值时会重新回到1。
再来看一个与CSndBuffer::addBuffer()类似的函数CSndBuffer::addBufferFromFile():
|
|
这个函数的执行过程,与CSndBuffer::addBuffer()的执行过程极为相似,仅有的差别在于,在这个函数,是将文件的数据逐步的read进CSndBuffer,而在addBuffer()中则是memcpy。
在CSndBuffer::addBuffer()与CSndBuffer::addBufferFromFile()中有这么多的重复code,实在是很有进一步进行抽象的空间。在循环中拷贝数据时,每次计算剩余了多少字节,然后和m_iMSS进行比较以确定到底要复制多少数据。这种计算和比较在大多数情况下都比较多余,只有在循环的最后一次执行时才真正需要这样的操作。可以通过特殊处理最后一块数据的复制,并将循环的退出条件值改为(size -1)来提升性能。
我们前面提到了CSndBuffer容量的扩展,那这里就来看一下执行单次扩展的increase():
|
|
可以看下执行单次容量扩充的含义及过程:
获取CSndBuffer中已有的Block的总数量unitsize。
创建一个Buffer结构,并为它分配数据缓冲区,数据缓冲区的大小为unitsize个Block。由此可见,每一次CSndBuffer的扩展,都会使它的容量加倍。
将前一步创建的Buffer插入到已有的Buffer单链表的尾部。
为前面创建的Buffer,创建对应的Block结构链表,nblk指向这个单链表的头节点,而pb指向这个单链表的尾节点。
如我们前面提到的CSndBuffer中的Block结构是一个循环单向链表,这一步即是将前一步创建的Block单向链表插入CSndBuffer的Block循环链表中。
设置前面创建的所有Block,使它们指向前面创建的Buffer的数据部分的适当位置。
更新表示CSndBuffer中总的Block大小的m_iSize。
如我们前面了解到的,发送数据也是一个生产与消费的故事,UDT::send()和UDT::sendmsg()讲的是生产的故事,至此我们基本上将生产的故事都理清了,接着就来看下消费的故事,也就是CSndQueue中的实际数据发送。
发送队列CSndQueue中数据的实际发送
接着来讲数据发送这个生产-消费故事的另一半,也就是消费的那一半,发送队列CSndQueue中实际的数据发送。先来瞅一下CSndQueue的定义(src/queue.h):
|
|
这个class,不管是成员变量,还是成员函数,看上去基本都还比较亲切,其作用不会让人完全无感,但唯独一个成员变量,也就是CSndUList的m_pSndUList。因而这里就先来看一下这个类。先来看CSndUList的定义:
|
|
从这个类的定义中,我们大概能感觉到这是一个CSNode的容器,但容器的许多组织的细节则仍然是一头雾水,那就从它的构造函数和成员函数中来厘清这些细节。来看它的构造函数(src/queue.cpp):
|
|
在这个函数中做的事情就是分配了一个CSNode指针的数组,并初始化了一个mutex m_ListLock,但这似乎也无法透漏出太多的讯息。然后来看向其中插入元素的insert():
|
|
m_iLastEntry是数组中已经被占用的最后一个位置。在这个函数中,做了两件事:
检查m_iLastEntry是否等于m_iArrayLength - 1,若是,则表明数组中所有的位置都被占用了此时则需要扩充容量,这里的做法就是创建一个长度为之前数组长度2倍的新数组,将之前数组中的内容拷贝到新数组里,更新m_iArrayLength,删除之前的数组,更新m_pHeap只想新数组。
执行CSndUList::insert_()进行实际的插入动作。
再来看CSndUList::insert_():
|
|
在这个函数中主要做了这样一些事:
检查要插入的元素是否已经插入了,主要是根据CUDT的CSNode n的HeapLoc字段是否大于0来判断的。若已经插入,则直接返回。
将CSNode放在数组的尾部。
调整CSNode n在数组中的位置,以使它处于适当的位置。要厘清这个地方的调整过程,可能要复习一下我们曾经学习过的堆数据结构了。可以将堆理解为一个用数组表示的二叉树,如下图所示:
如上图,每个框框中的数字表示该节点在数组中的位置。可以看到一个节点的位置与它的两个子节点及父节点的位置之间的关系:
假设一个节点的位置,也就是该节点在数组中的index为n,则它的父节点的位置为((n -1)/2),而它的两个子几点的位置分别为(2×N+1)和(2×n+2)。
回到CSndUList::insert_()的节点CSNode n的位置调整过程。可以看到,这个过程主要是根据CSNode n的m_llTimeStamp值,若CSNode n的m_llTimeStamp值比它的父节点的m_llTimeStamp小的话就把CSNode n往二叉树的上层浮,而把它的父节点向二叉树的下层沉,依次类推,直到找到某个位置,其父节点的m_llTimeStamp值比它的m_llTimeStamp值小,或者浮到二叉树的最顶层。
由此可见m_pHeap是一个根据CSNode的m_llTimeStamp值建的堆,越往上层,该值越小。而m_pHeap[0]则是整个堆中所有CSNode元素m_llTimeStamp值最小的那个。
更新CSNode n的m_iHeapLoc指向它在数组m_pHeap中的索引。
CSNode n被浮到堆的顶部时,唤醒发送队列的worker线程。
插入的如果是堆中的第一个元素的话,唤醒等待在m_pWindowCond上的线程。
看完了插入元素,再来看移除元素也就是CSndUList::remove():
|
|
CSndUList::remove()是直接调用了CSndUList::remove(),而在CSndUList::remove()主要做了如下这样一些事情:
先检查要移除的CSNode n是否存在与堆中,主要根据CSNode n的HeapLoc字段是否大于0来判断的。若没有插入,则跳到第5步,否则继续执行。
将m_pHeap中的最末尾的元素放进原本由要移除的CSNode n所占用的位置,更新m_iLastEntry,及被改变了位置的原来的最末尾元素CSNode的m_iHeapLoc指向它当前被放置的位置。
如果被调整了位置的CSNode的新位置不是最末尾的位置,则调整该节点的位置。这里主要是在这个节点的m_llTimeStamp值比它的子节点的m_llTimeStamp值更大时,将这个节点向二叉树层次结构的下层沉,而将它的字节点向上浮的过程。一个节点总是有两个字节点,也就是两棵子树,那它又会向哪一棵那边沉呢?可以看到是字节点中m_llTimeStamp值更小的那一边。
总觉得这里应该再有一个节点上浮的过程。如果移除的节点是最末尾节点的直系父节点,这当然没有问题,但如果不是,则关系还是有许多不确定的地方。
更新CSNode n的m_iHeapLoc指向-1。
如果m_iLastEntry为0,即表示堆中不再有元素了,则还会执行唤醒。
回头看我们前面分析的CUDT::send()和CUDT::sendmsg(),它们都通过m_pSndQueue->m_pSndUList->update(this, false)将CUDT插入CSndUList m_pSndUList,这里再来看一下CSndUList::update():
|
|
reschedule参数表示的是,如果之前的已经有发送任务,且还没有执行完,则将新的发送任务放在老的之后。
可以看到在这个函数中,如果CSNode还没有被插入堆中,则尽可能将CSNode放如堆的顶部以便于发送任务尽快执行。若已经插入,且reschedule为false,则直接返回。若已经插入,且reschedule为true,则检查一下CSNode当前是否已经在堆的顶部了,若是就退出。若不是,则先将CSNode从堆中移除,然后再尽可能将CSNode插入堆的顶部。
对发送队列CSndQueue所用的数据结构做这么多分析之后,我们再来看它的worker线程,也就是CSndQueue::worker():
|
|
CSndQueue::worker()中的while循环几乎就是CSndQueue的worker线程执行的全部任务了,这个循环的循环体中主要做了如下这样的一些事情:
- 调用self->m_pSndUList->getNextProcTime(),来查看最近的一次发送任务所需要执行的时间ts。这里来看一下CSndUList::getNextProcTime()的定义:
|
|
若返回值小于等于0,就表示当前没有需要发送的数据,没有需要执行的发送任务。则进入等待状态。在CSndQueue::init()的定义中,CSndUList m_pSndUList的m_pWindowLock和m_pWindowCond会分别指向CSndQueue的m_WindowLock和mWindowCond,因而可见,这个地方的等待,是被CSndUList::insert()唤醒的。
若返回值大于0,则表明存在着需要执行的发送任务。此时则执行下一步。
获取当前的时间currtime。比较currtime与ts,若前者较小,则表明最近需要执行的发送任务它请求的执行时间还没到,则休眠等待直到ts的到来。如我们前面的分析,并根据CSndQueue::init()的定义可见,向m_pSndUList中插入、移除或更新元素,都可能会唤醒这里的等待。若前者较大,或者等待的时刻到了则执行下一步。
执行self->m_pSndUList->pop(addr, pkt),从CSndUList m_pSndUList中抓一个CPacket出来。若没抓到,则进入下一次循环,否则发送抓到的CPacket。来看CSndUList::pop():
int CSndUList::pop(sockaddr*& addr, CPacket& pkt) {
CGuard listguard(m_ListLock);
if (-1 == m_iLastEntry)
return -1;
// no pop until the next schedulled time
uint64_t ts;
CTimer::rdtsc(ts);
if (ts < m_pHeap[0]->m_llTimeStamp)
return -1;
CUDT* u = m_pHeap[0]->m_pUDT;
remove_(u);
if (!u->m_bConnected || u->m_bBroken)
return -1;
// pack a packet from the socket
if (u->packData(pkt, ts) <= 0)
return -1;
addr = u->m_pPeerAddr;
// insert a new entry, ts is the next processing time
if (ts > 0)
insert_(ts, u);
return 1;
}
可以看到CSndUList::pop()做了这样一些事情:
检查m_iLastEntry是否为-1,若为-1,表明没有要执行发送的任务,因而直接返回-1。否则继续执行。
检查当前时间是否小于堆顶元素的m_llTimeStamp,若小于则表明,最近一次发送任务的发送时间还没到,则返回-1,否则继续执行。
获取堆顶元素的CUDT对象u,也就是发送任务的请求者,并将堆顶元素先从对中移除。
检查u的状态,若不处于有效的连接状态,则返回-1,否则继续执行。
执行u->packData(pkt, ts)打出一个数据包来。
使传进来的addr只想CUDT u的PeerAddr,CSndQueue会将这个地址作为包的发送目的地址。
将CUDT u重新插入堆中,时间戳为当前时间,这也就意味着,如果可以的话,就在下一个循环中继续发送CUDT u的数据。
返回1。
在发送队列CSndQueue的worker线程中,最最需要的是尽可能快的获得最近需要执行的发送任务的一些信息。CSndUList::getNextProcTime()和CSndUList::pop()中获取的最主要的信息也是堆顶元素的一些信息。一个严格依照请求的执行时间进行排列的列表意义并不是很大,为了保证这种有序性反倒可能需要消耗不少的时间,采用堆却可以想CSndQueue的worker线程尽可能快的返回最近将要执行的发送任务请求执行的时间。
Done。