Skip to content

Commit

Permalink
Minor logging improvements around ingress rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 15, 2024
1 parent 29e09f8 commit bb522b7
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
4 changes: 3 additions & 1 deletion crates/core/src/partitions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ impl PartitionRoutingRefresher {
},
);
self.inflight_refresh_task = task.ok();
};
} else {
trace!("Skipping refresh as a refresh task is already in progress");
}
}
}

Expand Down
20 changes: 18 additions & 2 deletions crates/worker/src/ingress_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use restate_core::network::partition_processor_rpc_client::{
};
use restate_core::network::TransportConnect;
use restate_ingress_http::{RequestDispatcher, RequestDispatcherError};
use restate_types::identifiers::PartitionProcessorRpcRequestId;
use restate_types::identifiers::{PartitionProcessorRpcRequestId, WithInvocationId};
use restate_types::invocation::{InvocationQuery, InvocationRequest, InvocationResponse};
use restate_types::net::partition_processor::{InvocationOutput, SubmittedInvocationNotification};
use restate_types::retries::RetryPolicy;
use std::future::Future;
use std::time::Duration;
use tracing::{debug_span, trace, Instrument};

pub struct RpcRequestDispatcher<C> {
partition_processor_rpc_client: PartitionProcessorRpcClient<C>,
Expand Down Expand Up @@ -59,7 +60,17 @@ impl<C> RpcRequestDispatcher<C> {
Ok(self
.retry_policy
.clone()
.retry_if(operation, |e| is_idempotent || e.is_safe_to_retry())
.retry_if(operation, |e| {
let retry = is_idempotent || e.is_safe_to_retry();

if retry {
trace!("Retrying rpc because of error: {e}.");
} else {
trace!("Rpc failed: {e}");
}

retry
})
.await
.map_err(|e| anyhow!("Error when trying to route the request internally: {e}"))?)
}
Expand All @@ -82,6 +93,7 @@ where
invocation_request.clone(),
)
})
.instrument(debug_span!("send invocation", %request_id, invocation_id = %invocation_request.invocation_id()))
.await
}

Expand All @@ -95,6 +107,7 @@ where
self.partition_processor_rpc_client
.append_invocation_and_wait_output(request_id, invocation_request.clone())
})
.instrument(debug_span!("call invocation", %request_id, invocation_id = %invocation_request.invocation_id()))
.await
}

Expand All @@ -107,6 +120,7 @@ where
self.partition_processor_rpc_client
.attach_invocation(request_id, invocation_query.clone())
})
.instrument(debug_span!("attach to invocation", %request_id, invocation_id = %invocation_query.to_invocation_id()))
.await
}

Expand All @@ -119,6 +133,7 @@ where
self.partition_processor_rpc_client
.get_invocation_output(request_id, invocation_query.clone())
})
.instrument(debug_span!("get invocation output", %request_id, invocation_id = %invocation_query.to_invocation_id()))
.await
}

Expand All @@ -131,6 +146,7 @@ where
self.partition_processor_rpc_client
.append_invocation_response(request_id, invocation_response.clone())
})
.instrument(debug_span!("send invocation response", %request_id, invocation_id = %invocation_response.id))
.await
}
}
13 changes: 11 additions & 2 deletions crates/worker/src/partition/leadership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ where

async fn become_follower(&mut self) {
match &mut self.state {
State::Follower => {}
State::Follower => {
// nothing to do :-)
}
State::Candidate { appender_task, .. } => {
appender_task.abort();
}
Expand Down Expand Up @@ -530,7 +532,11 @@ where
}

// Reply to all RPCs with not a leader
for (_, reciprocal) in awaiting_rpc_actions.drain() {
for (request_id, reciprocal) in awaiting_rpc_actions.drain() {
trace!(
%request_id,
"Failing rpc because I lost leadership",
);
respond_to_rpc(
&self.task_center,
reciprocal.prepare(Err(PartitionProcessorRpcError::LostLeadership(
Expand Down Expand Up @@ -656,6 +662,8 @@ where
},
))),
);
} else {
debug!(%request_id, "Ignoring sending ingress response because there is no awaiting rpc");
}
}
Action::IngressSubmitNotification {
Expand Down Expand Up @@ -819,6 +827,7 @@ where
// In this case, someone already proposed this command,
// let's just replace the reciprocal and fail the old one to avoid keeping it dangling
let old_reciprocal = o.remove();
trace!(%request_id, "Replacing rpc with newer request");
respond_to_rpc(
&self.task_center,
old_reciprocal.prepare(Err(PartitionProcessorRpcError::Internal(
Expand Down

0 comments on commit bb522b7

Please sign in to comment.