Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): Use Stream instead of TrySteam for client calls #61

Merged
merged 9 commits into from
Oct 10, 2019
4 changes: 2 additions & 2 deletions tonic-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn generate_client_streaming(method: &Method, proto: &str, path: String) -> Toke
quote! {
pub async fn #ident<S>(&mut self, request: tonic::Request<S>)
-> Result<tonic::Response<#response>, tonic::Status>
where S: Stream<Item = Result<#request, tonic::Status>> + Send + 'static,
where S: Stream<Item = #request> + Send + 'static,
{
self.ready().await?;
let codec = tonic::codec::ProstCodec::new();
Expand All @@ -151,7 +151,7 @@ fn generate_streaming(method: &Method, proto: &str, path: String) -> TokenStream
quote! {
pub async fn #ident<S>(&mut self, request: tonic::Request<S>)
-> Result<tonic::Response<tonic::codec::Streaming<#response>>, tonic::Status>
where S: Stream<Item = Result<#request, tonic::Status>> + Send + 'static,
where S: Stream<Item = #request> + Send + 'static,
{
self.ready().await?;
let codec = tonic::codec::ProstCodec::new();
Expand Down
2 changes: 1 addition & 1 deletion tonic-examples/src/routeguide/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("FEATURE = {:?}", response);

let outbound = async_stream::try_stream! {
let outbound = async_stream::stream! {
let mut interval = Interval::new_interval(Duration::from_secs(1));

while let Some(time) = interval.next().await {
Expand Down
19 changes: 7 additions & 12 deletions tonic-interop/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,10 @@ pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAsser
// }

pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
let requests = REQUEST_LENGTHS
.iter()
.map(|len| StreamingInputCallRequest {
payload: Some(crate::client_payload(*len as usize)),
..Default::default()
})
.map(|v| Ok(v));
let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest {
payload: Some(crate::client_payload(*len as usize)),
..Default::default()
});

let stream = stream::iter(requests);

Expand Down Expand Up @@ -154,9 +151,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
let (mut tx, rx) = mpsc::unbounded_channel();
tx.try_send(make_ping_pong_request(0)).unwrap();

let result = client
.full_duplex_call(Request::new(rx.map(|s| Ok(s))))
.await;
let result = client.full_duplex_call(Request::new(rx)).await;

assertions.push(test_assert!(
"call must be successful",
Expand Down Expand Up @@ -272,7 +267,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V
let result = client.unary_call(Request::new(simple_req)).await;
validate_response(result, assertions);

let stream = stream::iter(vec![Ok(duplex_req)]);
let stream = stream::iter(vec![duplex_req]);
let result = match client.full_duplex_call(Request::new(stream)).await {
Ok(response) => {
let stream = response.into_inner();
Expand Down Expand Up @@ -359,7 +354,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestA
req_unary.metadata_mut().insert(key1, value1.clone());
req_unary.metadata_mut().insert_bin(key2, value2.clone());

let stream = stream::iter(vec![Ok(make_ping_pong_request(0))]);
let stream = stream::iter(vec![make_ping_pong_request(0)]);
let mut req_stream = Request::new(stream);
req_stream.metadata_mut().insert(key1, value1.clone());
req_stream.metadata_mut().insert_bin(key2, value2.clone());
Expand Down
8 changes: 4 additions & 4 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<T> Grpc<T> {
M1: Send + 'static,
M2: Send + 'static,
{
let request = request.map(|m| stream::once(future::ok(m)));
let request = request.map(|m| stream::once(future::ready(m)));
self.client_streaming(request, path, codec).await
}

Expand All @@ -81,7 +81,7 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
S: Stream<Item = Result<M1, Status>> + Send + 'static,
S: Stream<Item = M1> + Send + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<T> Grpc<T> {
M1: Send + 'static,
M2: Send + 'static,
{
let request = request.map(|m| stream::once(future::ok(m)));
let request = request.map(|m| stream::once(future::ready(m)));
self.streaming(request, path, codec).await
}

Expand All @@ -134,7 +134,7 @@ impl<T> Grpc<T> {
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Data: Into<Bytes>,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
S: Stream<Item = Result<M1, Status>> + Send + 'static,
S: Stream<Item = M1> + Send + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + 'static,
M2: Send + 'static,
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub(crate) fn encode_client<T, U>(
) -> EncodeBody<impl Stream<Item = Result<BytesBuf, Status>>>
where
T: Encoder<Error = Status>,
U: Stream<Item = Result<T::Item, Status>>,
U: Stream<Item = T::Item>,
{
let stream = encode(encoder, source).into_stream();
let stream = encode(encoder, source.map(|x| Ok(x))).into_stream();
EncodeBody::new_client(stream)
}

Expand Down