UDT Server在执行UDT::listen()之后,就可以接受其它节点的连接请求了。这里我们研究一下UDT连接建立的过程。
连接的发起
来看连接的发起方。如前面我们看到的那样,UDT Client创建一个Socket,可以将该Socket绑定到某个端口,也可以不绑定,然后就可以调用UDT::connect()将这个Socket连接到UDT Server了。来看UDT::connect()的定义(src/api.cpp):
UDT::connect() API实现的结构跟其它的API没有太大的区别,不再赘述,直接来分析CUDTUnited::connect():
调用CUDTUnited::locate(),查找UDT Socket对应的CUDTSocket结构。若找不到,则抛出异常直接返回;否则,继续执行。
根据UDT Socket的IP版本,检查目标地址的有效性。若无效,则退出,否则继续执行。
检查UDT Socket的状态。确保只有处于INIT或OPENED状态的UDT Socket才可以执行connect()操作。新创建的UDT Socket处于INIT状态,bind之后UDT Socket处于OPENED状态。如果UDT Socket处于INIT状态,且不是Rendezvous模式,还会执行s->m_pUDT->open(),将UDT Socket与多路复用器CMultiplexer,然后将状态置为OPENED。
前面我们在bind的执行过程中有看到将UDT Socket与多路复用器CMultiplexer关联的过程CUDTUnited::updateMux()。但这里执行的updateMux()的不同之处在于,它既没有传递有效的系统UDP socket,也没有传递有效的本地端口地址。回想updateMux()的实现,这两个参数主要决定了CMultiplexer的CChannel将与哪个端口关联。来看两个CChannel::open()的实现(src/channel.cpp):
|
|
可以看到CChannel::open()主要是把UDT的CChannel与一个系统的UDP socket关联起来,它们总共处理了3中情况,一是调用者已经创建并绑定到了目标端口的系统UDP socket,这种最简单,直接将传递进来的UDPSOCKET赋值给CChannel的m_iSocket,然后设置系统UDP socket的选项;二是传递进来了一个有效的本地端口地址,此时CChannel会自己先创建一个系统UDP socket,并将该socket绑定到传进来的目标端口地址,一、二两种情况正是UDT的两个bind API的情况;三是既没有有效的系统UDP socket,又没有有效的本地端口地址传进来,则会在创建了系统UDP socket之后,先再找一个可用的端口地址,然后将该socket绑定到找到的端口地址,这也就是UDT Socket没有bind,直接connect的情况。
将UDT Socket的状态置为CONNECTING。
执行s->m_pUDT->connect(name),连接UDT Server。如果连接失败,有异常抛出,UDT Socket的状态会退回到OPENED状态,然后返回。在这个函数中会完成建立连接整个的网络消息交互过程。
将连接的目标地址复制到UDT Socket的Peer Address。然后返回0表示成功结束。
在仔细地分析连接建立过程中的数据包交互之前,可以先粗略地看一下这个过程收发了几个包,及各个包收发的顺序。我们知道在UDT中,所有数据包的收发都是通过CChannel完成的,我们可以在CChannel::sendto()和CChannel::recvfrom()中加log来track这一过程。通过UDT提供的demo程序appserver和appclient(在app/目录下)来研究。先在一个终端下执行appserver:
改造appclient,使得它只发送一个比较小的数据包就结束,编译后在另一个终端下执行,可以看到有如下的logs吐出来:
在appclient运行的这段时间,在运行appserver的终端下的可以看到有如下的logs输出:
可以看到,UDT Client端先发送了一个消息MSG1给UDT Server;UDT Server端收到消息MSG1之后,回了一个消息MSG2给UDT Client;UDT Client收到消息MSG2,又回了一个消息MSG3给UDT Server;UDT Server收到消息MSG3后又回了一个消息MSG4给UDT Client,然后从UDT::accept()返回,自此UDT Server认为一个连接已经成功建立;UDT Client则在收到消息MSG4后,从UDT::connect()返回,并自此认为连接已成功建立,可以进行数据的收发了。用一幅图来描述这个过程:
至于MSG1、2、3、4的具体格式及内容,则留待我们后面来具体分析了。
接着来看连接建立过程消息交互具体的实现,也就是CUDT::connect()函数:
|
|
可以看到,在这个函数中主要完成了如下的这样一些事情:
检查CUDT的状态。确保只有已经与多路复用器关联,即处于OPENED状态的UDT Socket才能执行CUDT::connect()操作。如前面看到的,bind操作可以使UDT Socket进入OPENED状态。对于没有进行过bind的UDT Socket,CUDTUnited::connect()会做这样的保证。
拷贝目标网络地址为UDT Socket的PeerAddr。
执行m_pRcvQueue->registerConnector()向接收队列注册Connector。来看这个函数的执行过程(src/queue.cpp):
|
|
可以看到,在这个函数中,主要是向接收队列CRcvQueue的CRendezvousQueue m_pRendezvousQueue中插入了一个CRL结构。那CRendezvousQueue又是个什么东西呢?来看它的定义(src/queue.h):
|
|
可以看到,它就是一个简单的容器,提供的操作也是常规的插入、移除及检索等操作:
|
|
那接收队列CRcvQueue是用这个队列来做什么的呢?这主要与接收队列CRcvQueue的消息dispatch机制有关。在接收队列CRcvQueue的worker线程中,接收到一条消息之后,它会根据消息的目标SocketID,及发送端的地址等信息,将消息以不同的方式进行dispatch,m_pRendezvousQueue中的CUDT是其中的一类dispatch目标。后面我们在研究消息接收时,会再来仔细研究接收队列CRcvQueue的worker线程及m_pRendezvousQueue。
- 构造 连接请求 消息CHandShake m_ConnReq。可以看一下CHandShake的定义(src/packet.h):
|
|
CHandShake的m_iID为发起端UDT Socket的SocketID,请求类型m_iReqType将被设置为了1,还设置了m_iMSS用于协商MSS值。CHandShake的构造函数会初始化所有的字段(src/packet.cpp):
可以看到m_iCookie被初始化为了0。但注意在这里,CHandShake m_ConnReq的构造过程中,m_iCookie并没有被赋予新值。
随机初始化序列号Sequence Number。
创建一个CPacket结构request,为它创建大小为m_iPayloadSize的缓冲区,将该缓冲区pack进CPacket结构,并专门把request.m_iID,也就是这个包发送的目的UDT SocketID,设置为0。
m_iPayloadSize的值根据UDT Socket创建者的不同,在不同的地方设置。由应用程序创建的UDT Socket在CUDT::open()中设置,比如Listening的UDT Socket在bind时会执行CUDT::open(),或者连接UDT Server但没有执行过bind操作的UDT Socket会在CUDTUnited::connect()中执行CUDT::open();UDT Server中由Listening的UDT Socket收到连接请求时创建的UDT Socket,在CUDT::connect(const sockaddr peer, CHandShake hs)中初设置;发起连接的UDT Socket还会在CUDT::connect(const CPacket& response)中再次更新这个值。但这个值总是被设置为m_iPktSize - CPacket::m_iPktHdrSize,CPacket::m_iPktHdrSize为固定的UDT Packet Header大小16。
m_iPktSize总是与m_iPayloadSize在相同的地方设置,被设置为m_iMSS - 28。m_iMSS,MSS(Maximum Segment Size,最大报文长度),这里是UDT协议定义的一个选项,用于在UDT连接建立时,收发双方协商通信时每一个报文段所能承载的最大数据长度。在CUDT对象创建时被初始化为1500,但可以通过UDT::setsockopt()进行设置。
这里先来看一下CPacket的结构(src/packet.h):
|
|
它的数据成员是有4个uint32_t元素的数组m_nHeader,描述UDT Packet的Header,和有两个元素的iovec数组m_PacketVector。另外的几个引用则主要是为了方便对这些数据成员的访问,看下CPacket的构造函数就一目了然了(src/packet.cpp):
|
|
注意m_PacketVector的第一个元素指向了m_nHeader。
在CPacket::pack()中:
|
|
在CPacket::pack()中,首先将m_nHeader[0],也就是m_iSeqNo的bit-0设为1表示这是一个控制包,将bit-1~15设置为消息的类型,然后根据消息的不同类型进行不同的处理。对于Handshake消息,其pkttype为0,这里主要关注pkttype为0的case。可见它就是让m_PacketVector[1]指向前面创建的缓冲区。
- 将Handshake消息m_ConnReq序列化进前面创建的缓冲区,并正确地设置CPacket request的长度:
|
|
序列化时,会将Handshake消息m_ConnReq全部的内容拷贝进缓冲区。略感奇怪,这个地方竟然完全没有顾及字节序的问题。
- 调用发送队列的sendto()函数,向目标地址发送消息:
|
|
CSndQueue的sendto()函数直接调用了CChannel::sendto():
|
|
在CChannel::sendto()中会处理Header的字节序问题。
这里总结一下,UDT Client向UDT Server发送的连接建立请求消息的内容:消息主要分为两个部分一个是消息的Header,一个是消息的Content。Header为4个uint32_t类型变量,从前到后这4个变量的含义分别为sequence number,message number,timestamp和目标SocketID。就Handshake而言,sequence number的最高位,也就是bit-0为1,表示这是一个控制消息,bit-1~15为pkttype 0,其它位为0;message number及timestamp均为0,目标SocketID为0。
Content部分,总共48个字节,主要用于进行连接的协商,如MSS等,具体可以看CHandShake。
- 检查是否是同步接收模式。如果不是的话,则delete掉前面为request CPacket的CHandShake创建的缓冲区并退出。后面与UDT Server端进一步的消息交互会有接收队列等帮忙异步地推动。否则继续执行。值得一提的是,CUDT在其构造函数中,会将m_bSynRecving置为true,但在拷贝构造函数中,则会继承传入的值。但这个值如同MSS值一样,也可以通过UDT::setOpt()设置。也就是说由应用程序创建的UDT Socket默认处于同步接收模式,比如Listening的UDT Socket和发起连接的UDT Socket,但可以自行设置,由Listening的UDT Socket在接收到连接建立请求时创建的UDT Socket,则会继承Listening UDT Socket的对应值。
我们暂时先看SynRecving模式,也就是默认模式下的UDT Socket的行为。
创建一个CPacket response,同样为它创建一个大小为m_iPayloadSize的缓冲区以存放数据,并将缓冲区pack进response中。这个CPacket response会被用来存放从UDT Server发回的相应的信息。
进入一个循环执行后续的握手动作,及消息的超时重传等动作。可以将这个循环看做由3个部分组成。
循环开始的地方是一段发送消息的代码,在这段代码中,其实做了两个事情,或者说可能会发送两种类型的消息,一是第一个握手消息的超时重传,二是第二个握手消息的发送及超时重传。看上去发送的都是CHandShake m_ConnReq,但在接收到第一个握手消息的响应之后,这个结构的某些成员会根据响应而被修改。注意,发送第一个握手消息之后,首次进入循环,将会跳过这个部分。
之后的第二部分,主要用于接收响应,第一个握手消息的响应及第二个握手消息的响应。来看CRcvQueue::recvfrom()(src/queue.cpp):
|
|
这也是一个生产者-消费者模型,在这里就如同listen的过程一样,也只能看到这个生产与消费的故事的一半,即消费的那一半。生产者也是RcvQueue的worker线程。这个地方会等待着消息的到来,但也不会无限制的等待,可以看到,这里接收消息的等待时间大概为1s。这里是在等待一个CPacket队列的出现,也就是m_mBuffer中目标UDT Socket的CPacket队列。这里会从这个队列中取出第一个packet返回给调用者。如果队列被取空了,会直接将这个队列从m_mBuffer中移除出去。
循环的第三部分是整个连接建立消息交互过程的超时处理,可以看到,非Rendezvous模式下超时时间为3s,Rendezvous模式下,超时时间则会延长十倍。
CUDT::connect()执行到接收第一个握手消息的相应时,连接建立请求的发起也算是基本完成了。下面来看UDT Server端收到这个消息时是如何处理的。
UDT Server对首个Handshake消息的处理
来看UDT Server端收到这个消息时是如何处理的。如我们前面在 UDT协议实现分析——bind、listen与accept 一文中了解到的,Listening的UDT Socket会在UDT::accept()中等待连接请求进来,那是一个生产者与消费者的故事,UDT::accept()是生产者,接收队列RcvQueue的worker线程是消费者。
我们这就来仔细地看一下RcvQueue的worker线程,当然重点会关注对于Handshake消息,也就是目标SocketID为0,pkttype为0的packet的处理(src/queue.cpp):
|
|
这个函数,首先创建了一个sockaddr,用于保存发送端的地址。
然后就进入了一个循环,不断地接收UDP消息。
循环内的第一行是执行Timer的tick(),这个是UDT自己的定时器Timer机制的一部分。
接下来的这个子循环也主要与RcvQueue的worker线程中消息的dispatch机制有关。
然后是取一个CUnit,用来接收其它端点发送过来的消息。如果取不到,则接收UDP包并丢弃。然后跳过后面消息dispatch的过程。这个地方的m_UnitQueue用来做缓存,也用来防止收到过多的包消耗过多的资源。完整的CUnitQueue机制暂时先不去仔细分析。
然后就是取到了CUnit的情况,则先通过CChannel接收一个包,并根据包的内容进行包的dispatch。不能跑偏了,这里主要关注目标SocketID为0,pkttype为0的包的dispatch。可以看到,在Listener存在的情况下,是dispatch给了listener,也就是Listening的UDT Socket的CUDT的listen()函数,否则会dispatch给通道上处于Rendezvous模式的UDT Socket。(在 UDT协议实现分析——bind、listen与accept 一文中关于listen的部分有具体理过这个listener的设置过程。)可以看到,对于相同的通道CChannel,也就是同一个端口上,Rendezvous模式下的UDT Socket和Listening的UDT Socket不能共存,或者说同时存在时,Rendezvous的行为可能不是预期的,但多个处于Rendezvous模式下的UDT Socket可以共存。
接收队列CRcvQueue的worker()线程做的其它事情,暂时先不去仔细看。这里先来理一下Listening的UDT Socket在接收到Handshake消息的处理过程,也就是CUDT::listen(sockaddr* addr, CPacket& packet)(src/core.cpp):
|
|
在这个函数中主要做了这样的一些事情:
检查UDT Socket的状态,如果处于Closing状态下,就返回,否则继续执行。
检查包的数据部分长度。若长度不为CHandShake::m_iContentSize 48字节,则说明这不是一个有效的Handshake,则返回,否则继续执行。
创建一个CHandShake hs,并将传入的packet的数据部分反序列化进这个CHandShake。这里来扫一眼这个CHandShake::deserialize()(src/packet.cpp):
123456789101112131415161718int CHandShake::deserialize(const char* buf, int size) {if (size < m_iContentSize)return -1;int32_t* p = (int32_t*) buf;m_iVersion = *p++;m_iType = *p++;m_iISN = *p++;m_iMSS = *p++;m_iFlightFlagSize = *p++;m_iReqType = *p++;m_iID = *p++;m_iCookie = *p++;for (int i = 0; i < 4; ++i)m_piPeerIP[i] = *p++;return 0;}
这个函数如同它的反函数serialize()一样没有处理字节序的问题。
计算cookie值。所谓cookie值,即由连接发起端的网络地址(包括IP地址与端口号)及时间戳组成的字符串计算出来的16个字节长度的MD5值。时间戳精确到分钟值。用于计算MD5值的字符串类似127.0.0.1:49033:0。
计算出来cookie值之后的部分,应该被分成两个部分。一部分处理连接发起端发送的地一个握手包,也就是hs.m_iReqType == 1的block,在CUDT::connect()中构造m_ConnReq的部分我们有看到这个值要被设为1的;另一部分则处理连接发起端发送的第二个握手消息。这里我们先来看hs.m_iReqType == 1的block。
它取前一步计算的cookie的前4个字节,直接将其强转为一个int值,赋给前面反序列化的CHandShake的m_iCookie。这个地方竟然顾及字节序的问题,也没有顾及不同平台的差异,即int类型的长度在不同的机器上可能不同,这个地方用int32_t似乎要更安全一点。将CHandShake的m_iID,如我们在CUDT::connect()中构造m_ConnReq的部分我们有看到的,为连接发起端UDT Socket的SocketID,设置给packet的m_iID,也就是包的目标SocketID。再将hs重新序列化进packet。通过发送队列SndQueue发送经过了这一番修改的packet。然后返回。
总结一下UDT Server中Listening的UDT Socket接收到第一个HandShake包时,对于这个包的处理过程:
计算一个cookie值,设置给接收到的HandShake的cookie字段,修改包的目标SocketID字段为发起连接的UDT Socket的SocketID,包的其它部分原封不动,最后将这个包重新发回给连接发起端。
UDT Client发送第二个HandShake消息
UDT Server接收到第一个HandShake消息,回给UDT Client一个HandShake消息。这样球就又被踢回给了UDT Client端。接着来看在UDT Client端接收到首个HandShake包的响应后会做什么样的处理。
我们知道在CUDT::connect(const sockaddr* serv_addr)中,发送首个HandShake包之后,会调用CRcvQueue::recvfrom()来等着接收UDT Server的响应,消费者焦急地等待着食物的到来。在消息到来时,CUDT::connect()会被生产者,也就是CRcvQueue的worker线程唤醒。这里就来具体看一下这个生产与消费的故事的另一半,生产的故事,也就是CRcvQueue的worker线程的消息dispatch。
在CRcvQueue::worker()中包dispatch的部分可以看到:
|
|
我们知道UDT Server回复的消息中是设置了目标SocketID了的。因而会走id > 0的block。
在CUDT::connect( const sockaddr* serv_addr )中有看到调用m_pRcvQueue->registerConnector()将CUDT添加进RcvQueue的m_pRendezvousQueue中,因而这里会执行id > 0 block中下面的那个block。
如果前面对于m_bSynRecving的分析,默认情况为true。因而这个地方会执行CRcvQueue::storePkt()来存储包。来看这个函数的实现:
|
|
在这个函数中会保存接收到的packet,并在必要的时候唤醒等待接收消息的线程。(对应CRcvQueue::recvfrom()的逻辑来看。)
然后来看CUDT::connect(const sockaddr* serv_addr)在收到第一个HandShake消息的响应之后会做什么样的处理,也就是CUDT::connect(const CPacket& response)(src/core.cpp):
这个函数会处理第一个HandShake的响应,也会处理第二个HandShake的响应,这里先来关注第一个HandShake的响应的处理,因而只列出它的一部分的代码。
这个函数先是检查了CUDT的状态,检查了packet的有效性,然后就是将接收到的包的数据部分反序列化至CHandShake m_ConnRes中。我们不关注对于Rendezvous模式的处理。
接着会检查m_ConnRes的m_iReqType,若为1,则设置m_ConnReq.m_iReqType为-1,设置m_ConnReq.m_iCookie为m_ConnRes.m_iCookie用以标识m_ConnReq为一个合法的第二个HandShake packet;同时设置m_llLastReqTime为0,如我们前面对CUDT::connect(const sockaddr* serv_addr)的分析,以便于此刻保存于m_ConnReq中的第二个HandShake能够被发送出去as soon as possible。
这第二个HandShake,与第一个HandShake的差异仅仅在于有了有效的Cookie值,且请求类型ReqType为-1。其它则完全一样。
UDT Server对第二个HandShake的处理
UDT Client对于m_ConnReq的改变并不足以改变接收队列中worker线程对这个包的dispatch规则,因而直接来看CUDT::listen(sockaddr* addr, CPacket& packet)中对于这第二个HandShake消息的处理。
接着前面对于这个函数的分析,接前面的第4步。
对于这第二个HandShake,它的ReqType自然不再是1了,而是-1。因而在计算完了cookie值之后,它会先验证一下HandShake包中的cookie值是否是有效的,如果无效,则直接返回。根据这个地方的逻辑,可以看到cookie的有效时间最长为2分钟。
检查包的Flag和Type,如果不是HandShake包,则直接返回,否则继续执行。
检查连接发起端IP的版本及Socket类型SockType与本地Listen的UDT Socket是否匹配。若不匹配,则将错误码1002放在发过来的HandShanke的ReqType字段中,设置packet的目标SocketID为发起连接的SocketID,然后将这个包重新发回给UDT Client。
检查之后,发现完全匹配的情况。调用CUDTUnited::newConnection()创建一个新的UDT Socket。若创建过程执行失败,则将错误码1002放在发过来的HandShanke的ReqType字段中。若创建成功,会设置发过来的packet的目标SocketID为适当的值,然后将同一个包再发送回UDT Client。CUDTUnited::newConnection()会适当地修改HandShake packet的一些字段。若失败在执行s_UDTUnited.m_EPoll.update_events()。
返回hs.m_iReqType。
然后来看在CUDTUnited::newConnection()中是如何新建Socket的:
在这个函数中做了如下这样的一些事情:
找到listening的UDT Socket的CUDTSocket结构,若找不到则直接返回-1。否则继续执行。
检查相同的连接请求是否已经处理过了。在CUDTUnited有一个专门的缓冲区m_PeerRec,用来存放由Listening的Socket创建的UDT Socket,这里主要是通过在这个缓冲区中查找是否已经有connection请求对应的socket来判断:
|
|
如果已经为这个connection请求创建了UDT Socket,又分为两种情况:
(1). 为connection请求创建的UDT Socket还是好的,可用的,则根据之前创建的UDT Socket的一些字段设置接收到的HandShake,m_iReqType会被设置为-1,m_iID会被设置为UDT Socket的SocketID。然后返回0。如我们前面在CUDTUnited::newConnection()中看到的,这样返回之后,CUDTUnited::newConnection()会发送一个响应消息给UDT Client。
(2). 为connection请求创建的UDT Socket已经烂掉了,不可用了,此时则主要会将其状态设置为CLOSED,设置时间戳,将其从m_pQueuedSockets和m_pAcceptSockets中移除出去。然后执行后续的新建UDT Socket的流程。
但对于一个由Listening Socket创建的UDT Socket而言,又会是什么原因导致它处于broken状态呢?此处这样的检查是否真有必要呢?后面会再来研究。
检查m_pQueuedSockets的大小是否超出了为Listening的UDT Socket设置的backlog大小,若超出,则返回-1,否则继续执行。
创建一个CUDTSocket对象。创建一个CUDT对象,这里创建的CUDT对象会继承Listening的UDT Socket的许多属性(src/api.cpp):
|
|
为SelfAddr分配内存。
为PeerAddr分配内存。
拷贝发送端地址到PeerAddr。
设置SocketID。等等。
- 执行ns->m_pUDT->open()完成打开动作。然后执行updateMux(ns, ls),将新建的这个UDT Socket绑定到Listening的UDT Socket所绑定的多路复用器:
|
|
- 执行 ns->m_pUDT->connect(peer, hs):
|
|
这个函数里会根据HandShake包设置非常多的成员。但主要来关注m_pRcvQueue->setNewEntry(this),这个调用也是与RcvQueue的worker线程的消息dispatch机制有关。后面我们会再来仔细地了解这个函数。
这个函数会在最后发送响应给UDT Client。
将UDT Socket的状态置为CONNECTED。拷贝Channel的地址到PeerAddr。
将创建的CUDTSocket放进m_Sockets中,同时放进m_PeerRec中。
将创建的UDT Socket放进m_pQueuedSockets中。这正是Listening UDT Socket accept那个生产-消费故事的另一半,这里是生产者。
将等待在accept()的线程唤醒。至此在UDT Server端,accept()返回一个UDT Socket,UDT Server认为一个连接成功建立。
UDT Client从UDT::connect()返回
如我们前面看到的,CUDT::connect(const sockaddr* serv_addr)在发送了第二个Handshake消息之后,它就会开是等待UDT Server的第二次响应。UDT Server发送第二个Handshake消息的相应之后,UDT Client端将会返回并处理它。这个消息的dispatch过程与第一个HandShake的响应消息的处理过程一致,这里不再赘述。这里来看这第二个HandShake的响应消息的处理,同样是在CUDT::connect(const CPacket& response)中:
|
|
- 这里做的第一件事就是调用m_pRcvQueue->removeConnector(m_SocketID)将自己从RevQueue的RendezvousQueue中移除,以表示自己将不再接收Rendezvous消息(src/queue.cpp):123456789101112131415void CRcvQueue::removeConnector(const UDTSOCKET& id) {m_pRendezvousQueue->remove(id);CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i != m_mBuffer.end()) {while (!i->second.empty()) {delete[] i->second.front()->m_pcData;delete i->second.front();i->second.pop();}m_mBuffer.erase(i);}}
这个函数执行完之后,RcvQueue暂时将无法向UDT Socket dispatch包。
根据协商的值重新做配置。这里我们可以再来看一下UDT的协商指的是什么。纵览连接建立的整个过程,我们并没有看到针对这些需要协商的值UDT本身有什么特殊的算法来计算,因而所谓的协商则主要是UDT Client端和UDT Server端,针对这些选项,不同应用程序层不同设置的同步协调。
准备所有的数据缓冲区。
设置CUDT的状态,m_bConnecting为false,m_bConnected为true。
执行m_pRcvQueue->setNewEntry(this),注册socket来接收数据包。这里来看一下CRcvQueue::setNewEntry(CUDT* u):
|
|
这个操作本身非常简单。但把CUDT结构放进CRcvQueue之后,又会发生什么呢?回忆我们前面看到的CRcvQueue::worker(void* param)函数中循环开始部分的这段代码:
对照这段代码中用到的几个函数的实现:
可以了解到,在 执行m_pRcvQueue->setNewEntry(this),注册socket之后,CRcvQueue的worker线程会将这个CUDT结构从它的m_vNewEntry中移到另外的两个容器m_pRcvUList和m_pHash中。那然后呢?在CRcvQueue::worker(void* param)中不是还有下面这段吗:
就是这样,可以说,在CUDT::connect(const CPacket& response)中是完成了一次UDT Socket消息接收方式的转变。
- 执行s_UDTUnited.connect_complete(m_SocketID)结束整个的connect()过程:
|
|
UDT Socket至此进入CONNECTED状态。
Done。