在 前文 中,我们有看到,数据发送的过程,大体是发送者CUDT将要发送的数据放进它的CSndBuffer m_pSndBuffer,并将它自己添加进它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆里,后面CSndQueue m_pSndQueue的worker线程会通过CSndUList::pop()从CSndUList m_pSndUList的堆顶CUDT中获取一个要发送的包来发送,包的获取主要是通过CUDT::packData()来完成,而这个函数正是UDT中包发送的执行中心。
CUDT::packData()
这里就来看一下CUDT::packData()的定义(src/core.cpp):
|
|
在这个函数中,处理了两大类packet的读取,一是丢失的packet,二是正常的顺序传输的包。来看一下这个函数具体的执行过程:
- 读取当前的时间entertime。
- 更新m_ullTimeDiff。在UDT中,包发送会有一个随着网络状况调整的一个发送周期,也就是m_ullInterval值。在每一次发送包时,都会根据m_ullInterval值计算下一次包发送的理想时间,并记录在m_ullTargetTime中。而m_ullTimeDiff则被用来记录当前的这次包发送想对于理想的发送时间的延滞值,这个值会被用于计算下一次包发送的理想时间。UDT正是通过这样的修正来尽可能使的包发送周期能够保持在m_ullInterval值附近。
- 从丢失包列表m_pSndLossList中获取一个丢失的包的SeqNo,并赋值给packet.m_iSeqNo。这个丢失包列表中的包可能是来源于Timer,比如一个包超过了正常时间还没有得到响应,也有可能来源于发送端发回的NACK消息,后面会在来研究这个问题。
- 前一步中获取的SeqNo大于等于0,这表明存在丢失了需要重传的包,则读取丢失的包的内容:
(1). 计算丢失的包的SeqNo与SndLastDataAck的差值offset。
(2). 检查前一步计算出来的offset值,若小于0,表明发送窗口已经滑过了,则直接返回,否则继续执行。
(3). 根据前面计算的offset值,通过m_pSndBuffer->readData()把数据读入packet中。packet的m_iMsgNo会被更新为packet的MsgNo,msglen也会在packet过期时被更新。
(4). m_pSndBuffer->readData()返回0,表明读取的packet的数据长度为0。这样的packet没有实际发送的必要,直接返回,无需进行后续的步骤。
(5). m_pSndBuffer->readData()返回-1,表明要读取的packet已经过期,同样没有发送这个数据包本身的必要。
但此时会发送一个DropMsgRequest给数据接收端。
然后将过期的packet从SndLossList中移除出去。
m_iSndCurrSeqNo的值为最近一次发送的packet的SeqNo,这里还会在必要的时候更新m_iSndCurrSeqNo,以跳过所有被丢弃的packets。必要指的是,m_iSndCurrSeqNo的值小于等于要丢弃的这个Msg的最后一个packet的SeqNo。这也就意味着,要丢弃的这个Msg直到过期被丢弃都没有发完。
这个地方有一点比较奇怪,简化来看,seqpair[0] == packet.m_iSeqNo,seqpair[1] == seqpair[0] + msglen,也就是说seqpair[1]的值为要被丢弃Msg的最后一个packet的SeqNo加1,但在判断是否要更新m_iSndCurrSeqNo时,却是拿m_iSndCurrSeqNo和(seqpair[1] + 1),也就是要被丢弃的Msg的最后一个packet的SeqNo加2在比较,而更新也是被设置为这个值。但实际上,将m_iSndCurrSeqNo设置为被丢弃Msg的最后一个packet的SeqNo已经可以跳过整个Msg的发送了,因为下次要用m_iSndCurrSeqNo来获得SeqNo,会先将这个值加1的。这个地方的逻辑疑似存在bug。
返回0,向调用者表明暂时没有数据要发送。
(6). m_pSndBuffer->readData()返回大于0的值,表明有一个丢失的包需要重新发送,则更新m_iTraceRetrans和m_iRetransTotal,这两个值分别表示一次trace重发的总次数,和此UDT Socket总的重发次数,两者的区别在于前者在被读取之后会被重置为0(CUDT::sample()),而后者则不会。此时需要继续执行后面的第6步。 - 在第3步中读取的SeqNo小于0,表明没有丢失的packet。此时则:
(1). 根据m_iFlowWindowSize和m_dCongestionWindow的值计算cwnd发送窗口的大小。发送窗口大小取这两个值中较小的那个,默认情况下,前者为8192(来自于Handshke消息的m_iFlightFlagSize字段,而m_iFlightFlagSize则根据m_iRcvBufSize和m_iFlightFlagSize得出),后者为16(来自于CC的m_dCWndSize字段,在CUDTCC::init()中该值被初始化为16。)。
(2). 检查发送窗口是否已满。若已满,则将m_ullTargetTime和m_ullTimeDiff重置为0,将ts置为0,然后返回0,向调用者表明没有数据要发送。否则继续执行。
m_iSndLastAck的值为下一次Ack应该确认的packet的SeqNo,CSeqNo::seqlen()计算的是包含两个端点在内的区间的长度。此处对CSeqNo::seqlen()的调用被用来计算,下个packet发送之后,发送窗口中所有的packet的个数。
(3). 读取下一个需要发送的packet,并检查返回的payload值。若payload值为0,表明数据缓冲区中所有的数据都已经发送了,无需再进行实际的发送,则将m_ullTargetTime和m_ullTimeDiff重置为0,将ts置为0,返回0,向调用者表明没有数据要发送。否则继续执行。
(4). 主要是更新m_iSndCurrSeqNo,并设置packet的SeqNo字段。如果SeqNo为16的整数倍,还会设置probe为true。 - 设置packet的m_iTimeStamp,m_iID,及数据长度。
- 更新m_llTraceSent和m_llSentTotal,其中前者表示CUDT这次Trace的过程中发送的总的packet数量,这个值会在CUDT::sample()获取trace数据之后被重置为0,而后者则表示发送的总共的packet数量,不会在CUDT::sample()获取trace数据之后被重置。
- 根据probe的值,更新ts值等。配 合CUDT::packData()的调用者CSndUList::pop()一起看,可知ts是理想中该CUDT下次发送数据的时间点。
probe设置为true,就是表明,当前的这个packet被发送结束之后立即发送下一个packet。即使probe的值不为true,也有可能要立即发送下一个packet,比如延滞时间已经超过了理想的发生周期。存在延滞时间,但该延滞时间又没有超出理想的发送周期的,则下个packet的发送时间具体本次packet的发送时间会小于理想的packet发送周期。
总之这里是希望能够保持packet以接近理想的速率发送。 - 更新m_ullTargetTime为ts。ts这个下次发包的理想时间点还需要m_ullTargetTime进行记录。
- 返回payload值,也就是读取的packet的大小。
这里顺便来看下CSeqNo的设计与实现。这个类被用来帮助进行与SeqNo有关的一些计算(src/common.h):
|
|
连接发起端在执行CUDT::connect(const sockaddr* serv_addr)时,会计算一个随机的值作为m_iISN,也即是发送的首个数据packet的SeqNo,而在连接建立过程中,这个值会被同步给Peer端的Socket。
这里可以看到,UDT Packet的SeqNo是[0, 0x7FFFFFFF]区间中的一个值。每发送一个packet,m_iSndCurrSeqNo都会被递增。通过class CSeqNo可以看到这个递增的规则,即SeqNo超出0x7FFFFFFF时会被归0。也正是由于0是一个合法的SeqNo,在incseq(int32_t seq, int32_t inc)中,SeqNo超出最大值时的计算里能看到有额外的减1,在seqlen()里,SeqNo超出最大值时的计算里能看到加2。
由seqcmp()和seqoff()这两个函数可见,同一时刻同时有效的两个SeqNo seq1和seq2之间的距离不能超过m_iSeqNoTH 0x3FFFFFFF,若超过则表明一定有一个SeqNo越过了最大值0x7FFFFFFF,也即较小的那个值越过了最大值。
这里还可以再来看一下CSndBuffer::readData():
|
|
CSndBuffer::readData(char** data, int32_t& msgno)读取当前的Block。基本上就是读取Block,然后将指向当前Block的指针m_pCurrBlock向后移一个Block。
而CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)则是读取距未响应的Block中最旧一块Block offset个单位的Block。在这个函数中,首先是移动到要读取的目标Block,如果要读取的Block已过期,则使m_pCurrBlock跳过该packet所属的Msg的所有Packet,然后返回-1退出。目标Block没有过期,则读取Block后返回数据长度。
总结在CUDT::packData()中对发送过程的控制。
丢失的包具有最高的发送优先级,这也是发送可靠性的保障方法。所有丢失的packet都会被放进SndLossList,这个List中的包可能来源于超时未得到响应,也可能来源于消息接收端发回的NACK。
对于正常的顺序packet发送的控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由m_dCongestionWindow和m_iFlowWindowSize来表示;二是控制两个包发送的时间间隔,也就是包的发送速率,这一点则主要用m_ullInterval来表示。所有的发送控制机制主要通过影响这几个变量来控制发送过程。
SndLossList
先来看一下CSndLossList这个数据结构。这个Class的定义如下(src/list.h):
|
|
这是一个不可复制容器。提供的接口不是很多,配合注释,都没有太多难以理解的地方。这是一个用数组实现的链表。接着来看这个class的构造和析构(src/list.cpp):
|
|
CUDT::connect()中创建CSndLossList时,size值为m_iFlowWindowSize * 2,也即8192 × 2 == 16384。如果不用数组,而用常规一点的方法来实现的话,链表的节点定义可能是这样的:
|
|
然后来看这个class最关键的函数之一CSndLossList::insert()的定义:
|
|
- 这个函数首先处理了最简单的向空链表中插入元素的case。
这种情况下,m_iHead被赋予0值。m_piData1[m_iHead]会被赋值为要插入的这段丢失packet范围的起始SeqNo。
如果起始SeqNo和结束SeqNo的值不同,m_piData2[m_iHead]还会被赋值为结束SeqNo;如果相同,则m_piData2[m_iHead]将仍然保持构造函数中初始化的-1,以表示这段丢失packet范围只有一个元素。
m_piNext[m_iHead]被赋值为-1以表示这是链表中的最后一个元素。m_iLastInsertPos用来记录上一次插入的位置,这里会被赋值为m_iHead。m_iLength表示CSndLossList中记录的丢失packet的总格数,这里会被设置为这段packet的长度。然后返回m_iLength。
可见m_iHead指向链表的头部。m_piData1,m_piData2和m_piNext这三个数组中相同位置的元素共同表示一个链表节点,它们分别表示一个丢失packet范围的起始SeqNo,结束SeqNo和该节点在链表中next节点的位置。 - 链表中已经有元素了,则将m_iLength保存在origlen中。计算要插入的这段丢失packet的起始SeqNo与链表中原有的头节点的起始SeqNo字段的差值offset。然后计算要插入的这段丢失packet范围的的可能的位置loc,这个可能的位置主要由这段丢失packet范围的起始SeqNo与链表中原有的头节点的起始SeqNo字段的差值决定。
由loc的计算方法可见,数组中的空间是被循环利用的。比如要插入的节点是向CSndLossList中插入的第二个节点,则此时m_iHead仍然为0,而要插入的这个丢失packet范围的起始位置小于原有的头节点的起始SeqNo字段,则新插入的节点将被绕回到数组的尾部。 - 处理offset小于0的情况。这表示插入的这个丢失packet范围的起始位置小于原有的头节点的起始SeqNo字段值,此时则会在loc位置插入一个新的节点以描述这段丢失packet范围。更新m_iHead和m_iLastInsertPos指向新插入的这个节点。并更新m_iLength以体现新加入的这个丢失packet范围。
向单向链表的头部插入元素总是比较简单。由此我们也看到,这个链表是以节点的起始SeqNo字段值的升序排列的有序链表。
但这个地方貌似没有处理新插入的这个丢失packet范围与原有头节点表示的丢失packet范围存在交叉的情况?没错,是没有处理,这种情况会在处理完所有的插入情况之后再统一来做。 - 处理offset大于0的情况。这又分为两种情况:
(1). seqno1 == m_piData1[loc],表明新节点的目标插入位置中原有节点保存的丢失Packet范围的起始SeqNo与要插入的这个丢失Packets范围的起始SeqNo相同。则此时会首先更新m_iLastInsertPos为loc,还需要处理这样的几种case,
case 1:原有的范围中只有一个元素,要插入的这个范围有多个元素。
case 2:原有的范围中有多个元素,要插入的这个范围有一个元素。
case 3:原有的范围和要插入的范围都只有一个元素。
case 4:原有的范围和要插入的范围中都有多个元素,但新插入的范围完全包含原有的范围。
case 5:原有的范围和要插入的范围中都有多个元素,但原有的范围完全包含新插入的范围。
case 6:原有的范围和要插入的范围中都有多个元素,且完全相同。
这些case包含的packet范围的相对关系可以用下图来简单表示:
代码的具体写法不同,这些case中的一些可能会以不同的方式被合并成一个处理,而有些case则不需要对原有的链表进行任何的调整。
if block中处理的是case1和case3,else-if block中处理的是case 4,else block中处理的是case 2,case 5和case 6。其中case 3,case 2,case 5和case 6都不需要对链表做出调整。
以此来看,在第一个if block的内部,应该再加一个else block来直接返回0会比较好一点。
那段代码的一种等价实现形式:
|
|
(2). seqno1与m_piData1[loc]不相等。这其实主要有两种可能,一是loc位置已经有了其它的节点,但该节点所表示的范围的起始SeqNo与要插入的这个范围的起始SeqNo不同;二是loc位置还没有被插入节点。但第一种可能应该是不会出现的,因而这里实际要处理的也就是loc位置还没有节点插入的情况。
对于这种情况,节点的插入位置完全不是问题,关键的问题是调整链表中一些节点的关系。可以看到这里的处理过程:
查找新插入节点前面的那个节点。该查找过程的开始位置由m_piData1[m_iLastInsertPos]与seqno1的相对大小决定,如果前者较小,则从m_iLastInsertPos开始,否则,从m_iHead开始。这大概主要是想要利用空间局部性原理来提高查找的效率。然后就是通过一个循环找到新插入节点前面的那个节点。
找到的这前面的节点所表示的丢失packet范围与要插入的节点所要表示的范围之间的关系又有这样的几种case:
case 1:前面的节点表示的范围只有一个packet。
case 2:前面的节点表示的范围含有多个packet,但它的结束SeqNo仍然小于要插入的范围的起始SeqNo。
case 3:前面的节点表示的范围含有多个packet,但它的结束SeqNo大于等于要插入的范围的起始SeqNo。
前两个case表明两个范围不相交,而case 3则表明两个范围是相交的。对于前两个case,则在前面的那个节点之后插入一个节点,链表中节点的连接关系做适当的调整即可。对于case 3,则需要将插入的这个范围合并入前面的那个节点。如果前面的那个节点包含的范围完全覆盖了要插入的范围,则什麽都不做,如果不是则需要对结束SeqNo字段做一些调整。
- offset值等于0,表明要插入的这个范围的起始SeqNo与Head节点表示的范围的起始SeqNo相同,这个过程则与offset大于0时,seqno1 == m_piData1[loc]中的处理基本一致。
- 从插入新节点的位置开始,合并链表中与新插入的这个丢失packet范围相交,或被包含或紧紧相邻的节点。
- 返回新加入CSndLossList的丢失packet的总个数。
有这个函数的整个执行过程不难看出,头节点中将包含最老的丢失packets。
看完了插入,自然不能不再来看一下CSndLossList::remove():
|
|
先来回忆下CSndLossList的类定义中,对于这个函数的语义的说明:移除所有不大于参数值的SeqNo。这个函数的主要执行过程如下:
- 检查m_iLength是否为0,若为0,说明CSndLossList还没有加入任何SeqNo,则直接返回,否则继续执行。
- 计算头节点所表示的丢失packet范围的起始SeqNo与参数seqno的offset,及可能以seqno作为起始SeqNo字段的节点的位置loc。
根据这个函数的语义,我们知道,其实是不需要处理offset小于的case的。如我们前面所了解的,头节点所表示的SeqNo范围是CSndLossList中SeqNo值最小的一个范围,而如果seqno小于这个范围的起始SeqNo的话,则说明不大于seqno的所有SeqNo都已经不存在了。 - 处理offset == 0的情况。offset == 0,表明seqno是包含于头节点所表示的范围,而且还是这个范围的起始SeqNo。此时又主要分两种情况来处理:
(1). 头节点表示的这个范围只有一个SeqNo。
(2). 头节点表示的范围包含多个SeqNo。
对于情况(1),则头节点将向后滑动一个节点,原来头节点的存放位置会被复位。对于情况(2),为了保证两个节点的相对位置等于节点所表示的丢失packet范围的起始SeqNo的差值这样的一种节点间关系依然成立,需要将头节点保存的位置后移一个位置。对于情况(2),还会再分为两种情况来处理,一是原来的头节点中只包含2个SeqNo,则在移除seqno后只剩下了一个,此时要保持m_piData2[loc]为-1,若包含3个及以上SeqNo的,则要复制原来的头节点的结束SeqNo到新的位置。
其它就是适当地更新m_iLastInsertPos,m_iHead和m_iLength了。这里似乎补一个
|
|
要更好一点。
- 处理offset大于0的情况。对于这种情况,比较麻烦的是找到seqno具体包含在哪个节点中。处理过程大致为:
(1). 检查一下,seqno与m_piData1[loc]是否相等,若相等,则要找的节点已经是找到了,且seqno为目标节点表示的丢失packet范围的起始seqno。此时则会再分为两种情况来处理,一是目标节点中只包含一个SeqNo,则使链表头指向目标节点的下一个节点。
二是目标节点中包含多个SeqNo,则需要将seqno排除在目标节点范围之外新建一个节点,将新节点保存在目标节点后面相邻的位置,若目标节点中包含2个节点,则需要设置新节点的结束SeqNo字段为-1,若大于等于3,则复制此字段的值,然后将原来的目标节点改造为只包含seqno的节点,并使它指向新建的这个节点。总之就是将原来的一个节点拆分成了两个节点,一个节点只包含seqno,另一个则包含原来的目标节点中其余的SeqNo。
(2). seqno与m_piData1[loc]不相等的情况,则需要先找到起始SeqNo字段小于seqno的SeqNo值最大的那个节点。这又可以分为3种case来处理,
case 1:找到的节点只包含一个SeqNo,此时则使链表头指向找到的节点的next节点。
case 2:找到的节点包含多个SeqNo,且seqno小于找到的节点的结束SeqNo字段,此时则需要将找到的节点裂为链表中的两个节点,一个包含的范围为[原节点的起始SeqNo,seqno],另一个包含的范围为[seqno + 1, 原节点的结束SeqNo]。同时链表头应该指向后者。
case 3:找到的节点包含多个SeqNo,且seqno大于等于找到的节点的结束SeqNo字段,此时则同样使链表头指向找到的节点的next节点。 - 移除新找到的头节点之前所有的节点。适当地更新m_iLength等。
这个地方移除的操作看起来好罗嗦。必须要计算出offset值,而不是简单的用比较符号,是由SeqNo的递增规则决定的。但对于offset大于等于0的情况,则可以使用统一的过程来处理:找到起始SeqNo字段不大于seqno的 SeqNo值最大的那个节点,然后根据seqno与这个节点描述的范围的4种关系,即seqno等于找到的节点的起始SeqNo值,seqno大于起始SeqNo值但小于结束SeqNo值,seqno等于结束SeqNo值,seqno大于结束SeqNo值,及找到的节点所描述的范围的大小,来分类处理即可。
还有我们前面在CUDT::packData()中看到的CSndLossList::getLostSeq():
|
|
这个函数读取首个丢失packet的SeqNo。
这个函数会取链表头节点中最小的SeqNo,返回给调用者,然后将这个SeqNo从链表头节点中移除。移除的时候又可以分为2种情况:1. 头节点描述的packet范围只包含一个SeqNo,2. 包含多个SeqNo。
对于情况1,则将链表头节点向后移动一个节点,然后移除原来的头节点。对于情况2,则将原来的头节点分裂为两个节点,一个只包含首个丢失packet的SeqNo,另一个包含原来头节点中其余的SeqNo,令链表头节点指向后一个节点,并移除前一个节点。
可见,通过CSndLossList::getLostSeq()返回给调用者的Packet是会被从CSndLossList中移除出去的。
最后可以再来看一下,经过了这些操作的蹂躏之后,这个链表可能的样子。比如有4段丢失的packet,其SeqNo范围及被插入的顺序为[8, 9],[3, 5],[12,12],[15, 19],假设缓冲区大小为20,则看起来可能为:
UDT中,丢失packet列表大体如此。
Done。