Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

客户端流式trpc问题 #177

Open
ugenehan opened this issue Sep 2, 2024 · 8 comments
Open

客户端流式trpc问题 #177

ugenehan opened this issue Sep 2, 2024 · 8 comments

Comments

@ugenehan
Copy link

ugenehan commented Sep 2, 2024

客户端同步流式中,如果发送的数据量比较大,客户端不停的发送数据,当客户端显示发送数据结束时,实际上服务端还在接收数据,也就是客户端发送数据快,服务端接收数据慢。
客户端发送完成后(此时服务端还在不停接收数据),调用WriteDone(),会失败(返回码:354),一直打印filber_client_connection_handler.cc:59,MessageWriteDone request_id:0,正常调用该函数打印的request_id不是0。
在WriteDone()之前如果等待一段时间,就会是正常。

@weimch
Copy link
Contributor

weimch commented Sep 9, 2024

354错误是读超时的错误,客户端流的场景,客户端调用WriteDone,会读服务端的响应,而服务端又在处理接收到数据,没法在超时时间内返回(默认是3s超时),所以会得到WriteDone返回码354读超时的错误;

流在收发消息时,不会设置request_id,这是支持连接复用协议在一应一答场景里,用于对应请求和响应的。

而流在一个连接里区分不同流的包用的是另外一个字段stream_id,不会在fiber_client_connection_handler打印的;

具体到这个问题,因服务端处理时间过长,导致在调用WriteDone的预期时间3s内,服务端没法回响应,可以有下面两种方法:

  1. 调大一个客户端默认读超时的配置stream_read_timeout来解决(目前暂时没有支持)
  2. 使用流控,这个默认会开启,窗口是65535字节,只有服务端消费到窗口的1/3数据,客户端才能继续发数据;

按理,你数据量很大的话,可以从流控限制,不知道你的一个数据包有多大呢?单位时间内,客户端发送了多少流包,服务端又处理了多少流包?

@ugenehan
Copy link
Author

每个数据包5MB,客户端每秒40左右,服务端每秒20左右个数据包。

@ugenehan
Copy link
Author

high_percentile_priority_impl.cc的164行,std::max_element直接赋值给了double,没做取值

@weimch
Copy link
Contributor

weimch commented Sep 11, 2024

每个数据包5MB,客户端每秒40左右,服务端每秒20左右个数据包。

客户端每秒发送40个数据包?

@weimch
Copy link
Contributor

weimch commented Sep 11, 2024

high_percentile_priority_impl.cc的164行,std::max_element直接赋值给了double,没做取值

这个在修正了

@ugenehan
Copy link
Author

每个数据包5MB,客户端每秒40左右,服务端每秒20左右个数据包。

客户端每秒发送40个数据包?

是的。大概发送到480个左右就返回错误了

@weimch
Copy link
Contributor

weimch commented Sep 20, 2024

每个数据包5MB,客户端每秒40左右,服务端每秒20左右个数据包。

客户端每秒发送40个数据包?

是的。大概发送到480个左右就返回错误了

有用来测试的代码吗?可以提供下链接以及编译运行命令

@ugenehan
Copy link
Author

ugenehan commented Sep 20, 2024

测试代码用的trpc_stream中的CallClientStreamSayHello改的,从文件中读取数据,大概4GB,将这些数据发送至服务端,每次发送5MB左右。
`
bool CallClientStreamSayHello(const StreamGreeterServiceProxyPtr& proxy, int request_count) {
auto context = ::trpc::MakeClientContext(proxy);
::trpc::test::helloworld::HelloReply reply;
int send_count{0};
int send_bytes{0};
::trpc::Status status{0, 0, "OK"};
bool ok{true};
do {
auto stream = proxy->ClientStreamSayHello(context, &reply);
if (!stream.GetStatus().OK()) {
std::cerr << "stream error:" << stream.GetStatus().ToString() << std::endl;
ok = false;
break;
}
//msg是从文件中读取的数据4GB大小
long long left = msg.size();
const char* p = msg.data();

while(left > 0){
  ::trpc::test::helloworld::HelloRequest request;
  request.set_msg(p,5242880);
  if (status.OK()) {
    ++send_count;
    send_bytes += request.msg().size();
   p += 5242880;
   left -= 5242880;
   continue;
  }
  break;
}

// Check: last write is ok.
if (status.OK()) {
  status = stream.WriteDone();
  if (status.OK()) {
    // Waits the final status of the RPC calling.
    status = stream.Finish();
  } else {
    std::cerr << "write done error: " << status.ToString() << std::endl;
    ok = false;
  }
} else {
  std::cerr << "write error: " << status.ToString() << std::endl;
  ok = false;
}

if (status.OK()) {
  std::cout << "stream rpc succeed, send count: " << send_count << ", send bytes: " << send_bytes
            << ", reply: " << reply.msg() << std::endl;
} else {
  std::cerr << "stream rpc failed:" << status.ToString() << std::endl;
  ok = false;
}

} while (0);
return ok;
}
`

用flow_control限流好像也没起作用。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants