More than code

More Than Code
The efficiency of your iteration of reading, practicing and thinking decides your understanding of the world.
  1. 首页
  2. 未分类
  3. 正文

brpc-3 IO(2)

2022年6月4日 786点热度 0人点赞 0条评论

IO(2)

20220604083229

我之前画的一个图,表示RPC的通信过程。

上一次说了server端是怎么接受请求并处理的,这一次看看客户端这边的处理。

我们在通过Stub调用RPC的时候,会调用到channel.CallMethod中

void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,
                              const ::EchoRequest* request,
                              ::EchoResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}

这个是protobuf生成的代码里的片段。

在channel.cpp中有CallMethod的实现

20220604084219

最后这块的判断表示是同步还是异步的调用。同步调用我们就会等待RPC返回。异步调用则会通过done->Run来处理返回值。

而上面的cntl->IssueRPC是发送RPC的具体地点。

20220604091857

20220604092438

我们会根据服务器的配置来创建一个连接服务器的socket。比如single server的话就直接向他连接。否则的话就通过LoadBalancer去选择一个服务器。

这时候我们的tmp_sock里就包含了需要连接的服务器的信息。

20220604092802

然后根据连接的种类来创建socket

单连接的话进程内共享一个连接,所以我们将tmp sock传给sending sock

否则的话就是短连接或者连接池,这里GetPooledSocket和GetShortSocket就会根据tmp sock创建对应的socket,并作为sending sock使用。

然后就是验证权限,根据协议相关的方法打包数据,并通过socket的Write写入到socket中

20220604093737

20220604093757

写入消息

20220604093847

写入消息在文档中有专门的描述。即由于我们可能存在多个线程同时向一个fd发消息,所以我们需要保证不会出现竞争的情况,并且还希望达到高性能。

20220604095100

到Write里面,他会从object_pool中拿一个WriteRequest,把数据放进去,并将下一个节点设为UNCONNECTED,然后进入到StartWrite中

20220604095358

首先把当前的request和write head交换。如果之前的write head是空的话,我们则获得写资格。否则的话说明有人正在写入这个fd。我们会把当前的request放到链表头。

写入的时候首先建立链接,通过ConnectIfNot。如果返回是1的话,表示我们正在建立连接,此时我们会直接返回,并让Callback替我们写。也就是wait-free的

20220604095954

20220604100311

socket已经连接了就返回0

否则的话我们就获取这个socket的指针,将KeepWriteIfConnected作为回调传给Connect。我们在链接建立的时候就会调用这个函数。

20220604101324

当前线程只写一次。如果没写完的话就通过keep write线程继续写

20220604101538

通过IsWriteComplete来判断是否写完。如果有新的request,我们反转链表。并返回false。如果没有新的request,判断当前request是否写完。

20220604103235

看一下IsWriteComplete

这里的desired就是我们希望赋给write head的值。

可以看到当data非空的时候,说明没写完,我们需要把write head变成old header。

然后当write head还是old head的时候,说明没有新的请求,我们就返回结果就行。对应了上面注释中的If No这一段

20220604104829

在上面的compare_exchange_strong失败了之后,new head会被赋成write head

这时候我们会把从old head到new head这一段链表反转。这里有一个等待

while(p->next == WriteRequest::UNCONNECTED) {
    sched_yield();
}

当下一个节点为UNCONNECTED的时候,我们会用让出cpu。这是因为当我们把新的request换进来,在他更新next之前被调度出去了。导致链表没能接上。所以这里终止条件的判断是p != old_head而非p == WriteRequest::UNCONNECTED

最后把old head和前面反转的链表连起来。

那么一个问题就是如果在我们反转链表的时候,新的write head出现了怎么办呢?

那么新的write head则不会被处理。因为我们只会处理old head到new head之间的请求

最后返回以后

20220604105847

我们会开启一个bthread,执行后续的写操作。

这里ReAddress和他下面那一行其实已经见到几次了。这里的作用就是把当前socket的指针通过ReAddress传给ptr for keep write,然后再传给req

类似的操作在ConnectIfNot中也有

20220604110340

只不过在这里,如果连接失败的话我们就会释放这个socket

然后来到KeepWrite中。由于之前我们已经反转了链表,所以keep write就一直写就行。

20220604110820

如果之前已经写完的话就跳过这个请求,并把WriteRequest归还给object pool

否则的话就DoWrite,这里就是调用之前看到的
20220604111015
在写入一个消息的地方有用到

20220604111331

找到链表的尾部。继续调用IsWriteComplete来检查是否写完。如果全部写完的话,我们就可以返回,并且要把req返回给object pool

否则的话在IsWriteComplete中我们会把新的请求反转,并接到cur tail后面,等待后续继续写入。并且把新的tail赋给cur tail,用来后续的遍历。因为只有第一次tail为null的时候我们需要自己找到尾部。后续的情况下可以在IsWriteComplete中帮我们找到链表尾部。

回忆一下这个写操作的过程是怎么实现wait-free的:
* 第一个写操作会拥有写权限。他会写入一条消息,然后返回
* 后续的写操作会把请求接到链表中,然后返回
* 未完成的写操作会被放到后台的bthread中持续写入(类似一个IO线程?)
* 进行写入的时候会判断是否建立了连接,如果建立了就直接写,没建立连接则通过回调写入。(省去了等待建立连接的过程)
* 没有选择全通过IO线程写,而是第一个线程写一条消息,应该是一个tradeoff。我们不希望让第一个线程长时间的写,会阻塞。同时我们也不希望全都丢给IO线程,局部性比较差。所以用了这样一个策略,第一个消息原地写,后续的通过IO线程写。

可以看到线程基本不会被阻塞,并且也保证了一定的缓存局部性。

20220604112848

通过这个以及上一次的内容可以看出来:
* 不同的channel可能并用一个fd,写出消息是通过一个socket完成的。
* 不同的socket之间可以并发,我们就有了Concurrency between fd
* 服务器端中,fd内部也有并发,因为我们会有一个线程做parse,生成n个消息,然后开启n - 1个线程去处理这些消息,最后一个原地处理。(其实也可以发现,对于一个fd的写和读还是需要一个线程来完成的,只不过写请求和处理数据可以有多个线程,对应了多线程写fd和多线程Process Request)图中的体现就是一个keep write thread写,一个parse thread读。

标签: 暂无
最后更新:2022年6月4日

sheep

think again

点赞
< 上一篇
下一篇 >

文章评论

取消回复

COPYRIGHT © 2021 heavensheep.xyz. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS