IO(2)
我之前画的一个图,表示RPC的通信过程。
上一次说了server端是怎么接受请求并处理的,这一次看看客户端这边的处理。
我们在通过Stub调用RPC的时候,会调用到channel.CallMethod中
void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,
const ::EchoRequest* request,
::EchoResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
这个是protobuf生成的代码里的片段。
在channel.cpp中有CallMethod的实现
最后这块的判断表示是同步还是异步的调用。同步调用我们就会等待RPC返回。异步调用则会通过done->Run来处理返回值。
而上面的cntl->IssueRPC
是发送RPC的具体地点。
我们会根据服务器的配置来创建一个连接服务器的socket。比如single server的话就直接向他连接。否则的话就通过LoadBalancer去选择一个服务器。
这时候我们的tmp_sock里就包含了需要连接的服务器的信息。
然后根据连接的种类来创建socket
单连接的话进程内共享一个连接,所以我们将tmp sock传给sending sock
否则的话就是短连接或者连接池,这里GetPooledSocket和GetShortSocket就会根据tmp sock创建对应的socket,并作为sending sock使用。
然后就是验证权限,根据协议相关的方法打包数据,并通过socket的Write写入到socket中
写入消息
写入消息在文档中有专门的描述。即由于我们可能存在多个线程同时向一个fd发消息,所以我们需要保证不会出现竞争的情况,并且还希望达到高性能。
到Write里面,他会从object_pool中拿一个WriteRequest,把数据放进去,并将下一个节点设为UNCONNECTED,然后进入到StartWrite中
首先把当前的request和write head交换。如果之前的write head是空的话,我们则获得写资格。否则的话说明有人正在写入这个fd。我们会把当前的request放到链表头。
写入的时候首先建立链接,通过ConnectIfNot。如果返回是1的话,表示我们正在建立连接,此时我们会直接返回,并让Callback替我们写。也就是wait-free的
socket已经连接了就返回0
否则的话我们就获取这个socket的指针,将KeepWriteIfConnected作为回调传给Connect。我们在链接建立的时候就会调用这个函数。
当前线程只写一次。如果没写完的话就通过keep write线程继续写
通过IsWriteComplete来判断是否写完。如果有新的request,我们反转链表。并返回false。如果没有新的request,判断当前request是否写完。
看一下IsWriteComplete
这里的desired就是我们希望赋给write head的值。
可以看到当data非空的时候,说明没写完,我们需要把write head变成old header。
然后当write head还是old head的时候,说明没有新的请求,我们就返回结果就行。对应了上面注释中的If No这一段
在上面的compare_exchange_strong失败了之后,new head会被赋成write head
这时候我们会把从old head到new head这一段链表反转。这里有一个等待
while(p->next == WriteRequest::UNCONNECTED) {
sched_yield();
}
当下一个节点为UNCONNECTED的时候,我们会用让出cpu。这是因为当我们把新的request换进来,在他更新next之前被调度出去了。导致链表没能接上。所以这里终止条件的判断是p != old_head
而非p == WriteRequest::UNCONNECTED
最后把old head和前面反转的链表连起来。
那么一个问题就是如果在我们反转链表的时候,新的write head出现了怎么办呢?
那么新的write head则不会被处理。因为我们只会处理old head到new head之间的请求
最后返回以后
我们会开启一个bthread,执行后续的写操作。
这里ReAddress和他下面那一行其实已经见到几次了。这里的作用就是把当前socket的指针通过ReAddress传给ptr for keep write,然后再传给req
类似的操作在ConnectIfNot中也有
只不过在这里,如果连接失败的话我们就会释放这个socket
然后来到KeepWrite中。由于之前我们已经反转了链表,所以keep write就一直写就行。
如果之前已经写完的话就跳过这个请求,并把WriteRequest归还给object pool
否则的话就DoWrite,这里就是调用之前看到的
在写入一个消息的地方有用到
找到链表的尾部。继续调用IsWriteComplete来检查是否写完。如果全部写完的话,我们就可以返回,并且要把req返回给object pool
否则的话在IsWriteComplete中我们会把新的请求反转,并接到cur tail后面,等待后续继续写入。并且把新的tail赋给cur tail,用来后续的遍历。因为只有第一次tail为null的时候我们需要自己找到尾部。后续的情况下可以在IsWriteComplete中帮我们找到链表尾部。
回忆一下这个写操作的过程是怎么实现wait-free的:
* 第一个写操作会拥有写权限。他会写入一条消息,然后返回
* 后续的写操作会把请求接到链表中,然后返回
* 未完成的写操作会被放到后台的bthread中持续写入(类似一个IO线程?)
* 进行写入的时候会判断是否建立了连接,如果建立了就直接写,没建立连接则通过回调写入。(省去了等待建立连接的过程)
* 没有选择全通过IO线程写,而是第一个线程写一条消息,应该是一个tradeoff。我们不希望让第一个线程长时间的写,会阻塞。同时我们也不希望全都丢给IO线程,局部性比较差。所以用了这样一个策略,第一个消息原地写,后续的通过IO线程写。
可以看到线程基本不会被阻塞,并且也保证了一定的缓存局部性。
通过这个以及上一次的内容可以看出来:
* 不同的channel可能并用一个fd,写出消息是通过一个socket完成的。
* 不同的socket之间可以并发,我们就有了Concurrency between fd
* 服务器端中,fd内部也有并发,因为我们会有一个线程做parse,生成n个消息,然后开启n - 1个线程去处理这些消息,最后一个原地处理。(其实也可以发现,对于一个fd的写和读还是需要一个线程来完成的,只不过写请求和处理数据可以有多个线程,对应了多线程写fd和多线程Process Request)图中的体现就是一个keep write thread写,一个parse thread读。
文章评论