Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GraphQL] Add a separate timeout field in ServiceConfig for mutations #17742

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -3063,7 +3063,14 @@ type ServiceConfig {
"""
maxPageSize: Int!
"""
Maximum time in milliseconds that will be spent to serve one request.
Maximum time in milliseconds spent waiting for a response from fullnode after issuing a
a transaction to execute. Note that the transaction may still succeed even in the case of a
timeout. Transactions are idempotent, so a transaction that times out should be resubmitted
until the network returns a definite response (success or failure, not timeout).
"""
mutationTimeoutMs: Int!
"""
Maximum time in milliseconds that will be spent to serve one query request.
"""
requestTimeoutMs: Int!
"""
Expand Down
26 changes: 25 additions & 1 deletion crates/sui-graphql-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ const MAX_MOVE_VALUE_DEPTH: u32 = 128;

pub(crate) const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 40_000;

/// The time to wait for a transaction to be executed such that the effects can be returned to the
/// GraphQL query. If the transaction takes longer than this time to execute, the query will return
/// a timeout error, but the transaction will continue its execution.
///
/// It's the sum of pre+post quorum timeouts from [`sui_core::authority_aggregator::TimeoutConfig`]
/// plus a small buffer of 10% rounded up: 60 + 7 => round_up(67 * 1.1) = 74
/// <https://github.com/MystenLabs/sui/blob/eaf05fe5d293c06e3a2dfc22c87ba2aef419d8ea/crates/sui-core/src/authority_aggregator.rs#L84-L85>
pub(crate) const DEFAULT_MUTATION_TIMEOUT_MS: u64 = 74_000;

const DEFAULT_IDE_TITLE: &str = "Sui GraphQL IDE";

pub(crate) const RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD: Duration = Duration::from_millis(10_000);
Expand Down Expand Up @@ -126,6 +135,8 @@ pub struct Limits {
#[serde(default)]
pub max_page_size: u64,
#[serde(default)]
pub mutation_timeout_ms: u64,
#[serde(default)]
pub request_timeout_ms: u64,
#[serde(default)]
pub max_type_argument_depth: u32,
Expand Down Expand Up @@ -300,7 +311,15 @@ impl ServiceConfig {
self.limits.max_page_size
}

/// Maximum time in milliseconds that will be spent to serve one request.
/// Maximum time in milliseconds spent waiting for a response from fullnode after issuing a
/// a transaction to execute. Note that the transaction may still succeed even in the case of a
/// timeout. Transactions are idempotent, so a transaction that times out should be resubmitted
/// until the network returns a definite response (success or failure, not timeout).
async fn mutation_timeout_ms(&self) -> u64 {
self.limits.mutation_timeout_ms
}

/// Maximum time in milliseconds that will be spent to serve one query request.
async fn request_timeout_ms(&self) -> u64 {
self.limits.request_timeout_ms
}
Expand Down Expand Up @@ -483,6 +502,7 @@ impl Default for Limits {
max_db_query_cost: MAX_DB_QUERY_COST,
default_page_size: DEFAULT_PAGE_SIZE,
max_page_size: MAX_PAGE_SIZE,
mutation_timeout_ms: DEFAULT_MUTATION_TIMEOUT_MS,
request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS,
max_type_argument_depth: MAX_TYPE_ARGUMENT_DEPTH,
max_type_argument_width: MAX_TYPE_ARGUMENT_WIDTH,
Expand Down Expand Up @@ -537,6 +557,7 @@ mod tests {
max-db-query-cost = 50
default-page-size = 20
max-page-size = 50
mutation-timeout-ms = 74000
request-timeout-ms = 27000
max-type-argument-depth = 32
max-type-argument-width = 64
Expand All @@ -555,6 +576,7 @@ mod tests {
max_db_query_cost: 50,
default_page_size: 20,
max_page_size: 50,
mutation_timeout_ms: 74_000,
request_timeout_ms: 27_000,
max_type_argument_depth: 32,
max_type_argument_width: 64,
Expand Down Expand Up @@ -617,6 +639,7 @@ mod tests {
max-db-query-cost = 20
default-page-size = 10
max-page-size = 20
mutation-timeout-ms = 74000
request-timeout-ms = 30000
max-type-argument-depth = 32
max-type-argument-width = 64
Expand All @@ -638,6 +661,7 @@ mod tests {
max_db_query_cost: 20,
default_page_size: 10,
max_page_size: 20,
mutation_timeout_ms: 74_000,
request_timeout_ms: 30_000,
max_type_argument_depth: 32,
max_type_argument_width: 64,
Expand Down
45 changes: 34 additions & 11 deletions crates/sui-graphql-rpc/src/extensions/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

use async_graphql::{
extensions::{Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery},
parser::types::ExecutableDocument,
parser::types::{ExecutableDocument, OperationType},
Response, ServerError, ServerResult,
};
use async_graphql_value::Variables;
use std::sync::Mutex;
use std::sync::{
atomic::{AtomicBool, Ordering},
Mutex,
};
use std::time::Duration;
use std::{net::SocketAddr, sync::Arc};
use tokio::time::timeout;
Expand All @@ -22,12 +25,14 @@ pub(crate) struct Timeout;
#[derive(Debug, Default)]
struct TimeoutExt {
pub query: Mutex<Option<String>>,
pub is_mutation: AtomicBool,
}

impl ExtensionFactory for Timeout {
fn create(&self) -> Arc<dyn Extension> {
Arc::new(TimeoutExt {
query: Mutex::new(None),
is_mutation: AtomicBool::new(false),
})
}
}
Expand All @@ -42,7 +47,12 @@ impl Extension for TimeoutExt {
next: NextParseQuery<'_>,
) -> ServerResult<ExecutableDocument> {
let document = next.run(ctx, query, variables).await?;
*self.query.lock().unwrap() = Some(ctx.stringify_execute_doc(&document, variables));
let is_mutation = document
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another noob q: parse_query and validation work on the entire request document right? Wouldn't the logic below set any query to is_mutation = True as long as one of the top-level operations is a mutation?

Copy link
Contributor Author

@stefan-mysten stefan-mysten May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question which made me think again about this. At least in the IDE, you cannot run both a query request and a mutation at the same time, so for mutations you have to specify mutation {executeTransactionBlock(args) {fields}}, so this would return True, and for a query { chainIdentifier } or just { chainIdentifier } would return false because the OperationType is Query.

.operations
.iter()
.any(|(_, operation)| operation.node.ty == OperationType::Mutation);
self.is_mutation.store(is_mutation, Ordering::Relaxed);

Ok(document)
}

Expand All @@ -52,10 +62,18 @@ impl Extension for TimeoutExt {
operation_name: Option<&str>,
next: NextExecute<'_>,
) -> Response {
let cfg = ctx
.data::<ServiceConfig>()
let cfg: &ServiceConfig = ctx
.data()
.expect("No service config provided in schema data");
let request_timeout = Duration::from_millis(cfg.limits.request_timeout_ms);

// increase the timeout if the request is a mutation
let is_mutation = self.is_mutation.load(Ordering::Relaxed);
let request_timeout = if is_mutation {
Duration::from_millis(cfg.limits.mutation_timeout_ms)
} else {
Duration::from_millis(cfg.limits.request_timeout_ms)
};

timeout(request_timeout, next.run(ctx, operation_name))
.await
.unwrap_or_else(|_| {
Expand All @@ -74,13 +92,18 @@ impl Extension for TimeoutExt {
%error_code,
%query
);
Response::from_errors(vec![ServerError::new(
let error_msg = if is_mutation {
format!(
"Request timed out. Limit: {}s",
"Mutation request timed out. Limit: {}s",
request_timeout.as_secs_f32()
),
None,
)])
)
} else {
format!(
"Query request timed out. Limit: {}s",
request_timeout.as_secs_f32()
)
};
Response::from_errors(vec![ServerError::new(error_msg, None)])
})
}
}
72 changes: 64 additions & 8 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,15 +577,16 @@ pub mod tests {
use crate::{
config::{ConnectionConfig, Limits, ServiceConfig, Version},
context_data::db_data_provider::PgManager,
extensions::query_limits_checker::QueryLimitsChecker,
extensions::timeout::Timeout,
extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
};
use async_graphql::{
extensions::{Extension, ExtensionContext, NextExecute},
Response,
};
use std::sync::Arc;
use std::time::Duration;
use sui_sdk::{wallet_context::WalletContext, SuiClient};
use sui_types::transaction::TransactionData;
use uuid::Uuid;

/// Prepares a schema for tests dealing with extensions. Returns a `ServerBuilder` that can be
Expand Down Expand Up @@ -641,7 +642,7 @@ pub mod tests {
Uuid::new_v4()
}

pub async fn test_timeout_impl() {
pub async fn test_timeout_impl(wallet: WalletContext) {
struct TimedExecuteExt {
pub min_req_delay: Duration,
}
Expand All @@ -667,37 +668,92 @@ pub mod tests {
}
}

async fn test_timeout(delay: Duration, timeout: Duration) -> Response {
async fn test_timeout(
delay: Duration,
timeout: Duration,
query: &str,
sui_client: &SuiClient,
) -> Response {
let mut cfg = ServiceConfig::default();
cfg.limits.request_timeout_ms = timeout.as_millis() as u64;
cfg.limits.mutation_timeout_ms = timeout.as_millis() as u64;

let schema = prep_schema(None, Some(cfg))
.context_data(Some(sui_client.clone()))
.extension(Timeout)
.extension(TimedExecuteExt {
min_req_delay: delay,
})
.build_schema();

schema.execute("{ chainIdentifier }").await
schema.execute(query).await
}

let query = "{ chainIdentifier }";
let timeout = Duration::from_millis(1000);
let delay = Duration::from_millis(100);
let sui_client = wallet.get_client().await.unwrap();

test_timeout(delay, timeout)
test_timeout(delay, timeout, query, &sui_client)
.await
.into_result()
.expect("Should complete successfully");

// Should timeout
let errs: Vec<_> = test_timeout(delay, delay)
let errs: Vec<_> = test_timeout(delay, delay, query, &sui_client)
.await
.into_result()
.unwrap_err()
.into_iter()
.map(|e| e.message)
.collect();
let exp = format!("Request timed out. Limit: {}s", delay.as_secs_f32());
let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
assert_eq!(errs, vec![exp]);

// Should timeout for mutation
// Create a transaction and sign it, and use the tx_bytes + signatures for the GraphQL
// executeTransactionBlock mutation call.
let addresses = wallet.get_addresses();
let gas = wallet
.get_one_gas_object_owned_by_address(addresses[0])
.await
.unwrap();
let tx_data = TransactionData::new_transfer_sui(
addresses[1],
addresses[0],
Some(1000),
gas.unwrap(),
1_000_000,
wallet.get_reference_gas_price().await.unwrap(),
);

let tx = wallet.sign_transaction(&tx_data);
let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();

let signature_base64 = &signatures[0];
let query = format!(
r#"
mutation {{
executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
effects {{
status
}}
}}
}}"#,
tx_bytes.encoded(),
signature_base64.encoded()
);
let errs: Vec<_> = test_timeout(delay, delay, &query, &sui_client)
.await
.into_result()
.unwrap_err()
.into_iter()
.map(|e| e.message)
.collect();
let exp = format!(
"Mutation request timed out. Limit: {}s",
delay.as_secs_f32()
);
assert_eq!(errs, vec![exp]);
}

Expand Down
14 changes: 13 additions & 1 deletion crates/sui-graphql-rpc/tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,19 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_timeout() {
test_timeout_impl().await;
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
let connection_config = ConnectionConfig::ci_integration_test_cfg();
let cluster =
sui_graphql_rpc::test_infra::cluster::start_cluster(connection_config, None).await;
cluster
.wait_for_checkpoint_catchup(0, Duration::from_secs(10))
.await;
// timeout test includes mutation timeout, which requies a [SuiClient] to be able to run
// the test, and a transaction. [WalletContext] gives access to everything that's needed.
let wallet = cluster.validator_fullnode_handle.wallet;
test_timeout_impl(wallet).await;
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3067,7 +3067,14 @@ type ServiceConfig {
"""
maxPageSize: Int!
"""
Maximum time in milliseconds that will be spent to serve one request.
Maximum time in milliseconds spent waiting for a response from fullnode after issuing a
a transaction to execute. Note that the transaction may still succeed even in the case of a
timeout. Transactions are idempotent, so a transaction that times out should be resubmitted
until the network returns a definite response (success or failure, not timeout).
"""
mutationTimeoutMs: Int!
"""
Maximum time in milliseconds that will be spent to serve one query request.
"""
requestTimeoutMs: Int!
"""
Expand Down Expand Up @@ -4213,3 +4220,4 @@ schema {
query: Query
mutation: Mutation
}

Loading