Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump prost and tonic dependency to 0.13.1 and 0.12.1 respectively #1748

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 149 additions & 66 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ opentelemetry_sdk = { version = "0.22.1" }
parking_lot = { version = "0.12" }
paste = "1.0"
pin-project = "1.0"
prost = "0.12.1"
prost-build = "0.12.1"
prost = { version = "0.13.1" }
prost-build = { version = "0.13.1" }
prost-dto = { version = "0.0.2" }
prost-types = "0.12.1"
prost-types = { version = "0.13.1" }
rand = "0.8.5"
rayon = { version = "1.10" }
regress = { version = "0.10" }
Expand Down Expand Up @@ -152,10 +152,11 @@ thiserror = "1.0"
tokio = { version = "1.29", default-features = false, features = ["rt-multi-thread", "signal", "macros", ] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.10" }
tonic = { version = "0.10.2", default-features = false }
tonic-reflection = { version = "0.10.2" }
tonic-health = "0.10.2"
tonic-build = "0.11.0"
tonic = { version = "0.12.1", default-features = false }
tonic-0-10 = { package = "tonic", version = "0.10.2", default-features = false }
tonic-reflection = { version = "0.12.1" }
tonic-health = { version = "0.12.1" }
tonic-build = { version = "0.12.1" }
tower = "0.4"
tower-http = { version = "0.5.2", default-features = false }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/admin-rest-model/src/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl From<DeploymentMetadata> for Deployment {
protocol_type,
http_version,
} => Self::Http {
uri: address.to_string().parse().unwrap(),
uri: address,
protocol_type,
http_version,
additional_headers: value.delivery_options.additional_headers.into(),
Expand Down
1 change: 1 addition & 0 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ serde_with = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
tonic-0-10 = { workspace = true }
tower = { workspace = true, features = ["load-shed", "limit"] }
tracing = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn create_deployment<V>(
} => (
DiscoverEndpoint::new(
Endpoint::Http(
uri.to_string().parse().unwrap(),
uri,
if use_http_11 {
http::Version::HTTP_11
} else {
Expand Down
45 changes: 42 additions & 3 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

Expand All @@ -31,11 +32,12 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
use http_body::Frame;
use http_body_util::StreamBody;
use okapi_operation::*;
use restate_core::network::protobuf::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;

use restate_core::network::protobuf::node_svc::StorageQueryRequest;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::Status;

use super::error::StorageQueryError;
use crate::state::QueryServiceState;
Expand Down Expand Up @@ -79,7 +81,7 @@ pub async fn query(
data_body: response.data,
..FlightData::default()
})
.map_err(FlightError::from),
.map_err(|status| FlightError::from(tonic_status_012_to_010(status))),
);

// create a stream without LargeUtf8 or LargeBinary columns as JS doesn't support these yet
Expand Down Expand Up @@ -268,3 +270,40 @@ impl Stream for ConvertRecordBatchStream {
}
}
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_status_012_to_010(status: Status) -> tonic_0_10::Status {
let code = tonic_0_10::Code::from(status.code() as i32);
let message = status.message().to_owned();
let details = Bytes::copy_from_slice(status.details());
let metadata = tonic_metadata_map_012_to_010(status.metadata());
tonic_0_10::Status::with_details_and_metadata(code, message, details, metadata)
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_metadata_map_012_to_010(metadata_map: &MetadataMap) -> tonic_0_10::metadata::MetadataMap {
let mut resulting_metadata_map =
tonic_0_10::metadata::MetadataMap::with_capacity(metadata_map.len());
for key_value in metadata_map.iter() {
match key_value {
KeyAndValueRef::Ascii(key, value) => {
// ignore metadata map entries if conversion fails
if let Ok(value) =
tonic_0_10::metadata::MetadataValue::from_str(value.to_str().unwrap_or(""))
{
if let Ok(key) = tonic_0_10::metadata::MetadataKey::from_str(key.as_str()) {
resulting_metadata_map.insert(key, value);
}
}
}
KeyAndValueRef::Binary(key, value) => {
if let Ok(key) = tonic_0_10::metadata::MetadataKey::from_bytes(key.as_ref()) {
let value = tonic_0_10::metadata::MetadataValue::from_bytes(value.as_ref());
resulting_metadata_map.insert_bin(key, value);
}
}
}
}

resulting_metadata_map
}
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ enum-map = { workspace = true }
enumset = { workspace = true }
futures = { workspace = true }
hostname = { workspace = true }
http_0_2 = { package = "http", version = "0.2" }
hyper_0_14 = { package = "hyper", version = "0.14" }
http = { workspace = true }
humantime = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
Expand Down
15 changes: 6 additions & 9 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};

use super::connection::{Connection, ConnectionSender};
use super::error::{NetworkError, ProtocolError};
use super::grpc_util::create_grpc_channel_from_advertised_address;
use super::handshake::{negotiate_protocol_version, wait_for_hello, wait_for_welcome};
use super::metric_definitions::{
self, CONNECTION_DROPPED, INCOMING_CONNECTION, MESSAGE_PROCESSING_DURATION, MESSAGE_RECEIVED,
ONGOING_DRAIN, OUTGOING_CONNECTION,
};
use super::protobuf::node_svc::node_svc_client::NodeSvcClient;
use super::{Handler, MessageRouter};
use crate::network::net_util::create_tonic_channel_from_advertised_address;
use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind};
use crate::{Metadata, TargetVersion};

Expand Down Expand Up @@ -142,12 +142,9 @@ impl ConnectionManager {
let nodes_config = self.metadata.nodes_config_ref();
let my_node_id = self.metadata.my_node_id();
// NodeId **must** be generational at this layer
let peer_node_id = hello
.my_node_id
.clone()
.ok_or(ProtocolError::HandshakeFailed(
"NodeId is not set in the Hello message",
))?;
let peer_node_id = hello.my_node_id.ok_or(ProtocolError::HandshakeFailed(
"NodeId is not set in the Hello message",
))?;

if peer_node_id.generation() == 0 {
return Err(
Expand Down Expand Up @@ -230,7 +227,7 @@ impl ConnectionManager {
self.metadata
.sync(
MetadataKind::NodesConfiguration,
TargetVersion::from(header.my_nodes_config_version.clone().map(Into::into)),
TargetVersion::from(header.my_nodes_config_version.map(Into::into)),
)
.await?;
nodes_config = self.metadata.nodes_config_ref();
Expand Down Expand Up @@ -291,7 +288,7 @@ impl ConnectionManager {
let channel = {
let mut guard = self.inner.lock().unwrap();
if let hash_map::Entry::Vacant(entry) = guard.channel_cache.entry(address.clone()) {
let channel = create_grpc_channel_from_advertised_address(address)
let channel = create_tonic_channel_from_advertised_address(address)
.map_err(|e| NetworkError::BadNodeAddress(node_id.into(), e))?;
entry.insert(channel.clone());
channel
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/network/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum NetworkError {
#[error("operation aborted, node is shutting down")]
Shutdown(#[from] ShutdownError),
#[error("node {0} address is bad: {1}")]
BadNodeAddress(NodeId, http_0_2::Error),
BadNodeAddress(NodeId, http::Error),
#[error("timeout: {0}")]
Timeout(&'static str),
#[error("protocol error: {0}")]
Expand Down
179 changes: 0 additions & 179 deletions crates/core/src/network/grpc_util.rs

This file was deleted.

2 changes: 1 addition & 1 deletion crates/core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
mod connection;
mod connection_manager;
mod error;
pub mod grpc_util;
mod handshake;
mod message_router;
pub(crate) mod metric_definitions;
pub mod net_util;
mod network_sender;
mod networking;
pub mod protobuf;
Expand Down
Loading
Loading