Skip to content

Commit

Permalink
cancellation is now necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Jul 23, 2024
1 parent a59d217 commit 4bc2968
Showing 1 changed file with 49 additions and 44 deletions.
93 changes: 49 additions & 44 deletions cli/src/commands/cloud/environments/tunnel/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::error::Error;
use std::fmt::{Display, Formatter};
use std::future::Future;

use std::sync::{Arc};
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
Expand All @@ -23,9 +23,10 @@ use hyper::Response;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use reqwest::Body;
use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
use restate_cli_util::CliContext;
use restate_types::retries::RetryPolicy;
use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
use tokio_util::sync::CancellationToken;
use tower::Service as _;
use tracing::{error, info};
use url::Url;
Expand Down Expand Up @@ -279,53 +280,49 @@ where

let this = Arc::new(RwLock::new(self));

let token = CancellationToken::new();
{

hyper::server::conn::http2::Builder::new(hyper_util::rt::TokioExecutor::new())
let server =
hyper::server::conn::http2::Builder::new(hyper_util::rt::TokioExecutor::new())
.serve_connection(
io,
hyper::service::service_fn(|req| {
let this = this.clone();
async {
loop {
let guard = this.read().await;
match &guard.status {
HandlerStatus::AwaitingStart => {
// slow path
drop(guard);
let guard = this.write_owned().await;
match guard.status {
// won the race; process start
HandlerStatus::AwaitingStart => return Self::process_start(guard, req),
// lost the race
_ => {
let guard = guard.downgrade();
match &guard.status {
HandlerStatus::Proxying => {
return guard.proxy(req).await
}
HandlerStatus::Failed(err) => {
return Err(err.clone())
}
HandlerStatus::AwaitingStart => unreachable!("cannot re-enter awaiting start state"),
}
},
let token = token.clone();
async move {
let guard = this.read().await;
match &guard.status {
HandlerStatus::AwaitingStart => {
drop(guard);
let guard = this.write_owned().await;
match &guard.status {
// won the race; process start
HandlerStatus::AwaitingStart => {
Self::process_start(guard, req, token)
}
// lost the race to someone that failed
HandlerStatus::Failed(err) => Err(err.clone()),
// lost the race but they succeeded; treat this as a normal proxy request
HandlerStatus::Proxying => {
let guard = guard.downgrade();
guard.proxy(req).await
}
},
HandlerStatus::Proxying => {
return guard.proxy(req).await
}
HandlerStatus::Failed(err) => {
return Err(err.clone())
}
}
HandlerStatus::Proxying => guard.proxy(req).await,
HandlerStatus::Failed(err) => Err(err.clone()),
}
}
}),
).await?;
);

tokio::select! {
server_result = server => server_result?,
_ = token.cancelled() => {},
}
}

let this = this.read().await;
let this = this.read().await;

if let HandlerStatus::Failed(err) = &this.status {
Err(err.clone().into())
Expand All @@ -340,7 +337,11 @@ where
Proxy: Fn(Arc<HandlerInner>, reqwest::Request) -> ProxyFut + Send + Sync + 'static,
ProxyFut: Future<Output = Result<Response<Body>, StartError>> + Send + 'static,
{
fn process_start(mut this: OwnedRwLockWriteGuard<Self>, req: Request<Incoming>) -> Result<Response<Body>, StartError> {
fn process_start(
mut this: OwnedRwLockWriteGuard<Self>,
req: Request<Incoming>,
token: CancellationToken,
) -> Result<Response<Body>, StartError> {
let body = req.into_body();

let resp = Response::builder()
Expand All @@ -358,18 +359,22 @@ where

// keep holding the lock until this is complete; no other requests should be processed
tokio::task::spawn(async move {
match tokio::time::timeout(Duration::from_secs(5), process_start(
this.inner.clone(),
body,
)).await {
match tokio::time::timeout(
Duration::from_secs(5),
process_start(this.inner.clone(), body),
)
.await
{
Ok(Ok(())) => {
this.status = HandlerStatus::Proxying;
},
}
Ok(Err(err)) => {
this.status = HandlerStatus::Failed(err)
this.status = HandlerStatus::Failed(err);
token.cancel();
}
Err(_timeout) => {
this.status = HandlerStatus::Failed(StartError::Timeout)
this.status = HandlerStatus::Failed(StartError::Timeout);
token.cancel();
}
}
});
Expand Down

0 comments on commit 4bc2968

Please sign in to comment.