Skip to content

Commit

Permalink
heartbeat: kill the TCP connection when no heartbeat from server
Browse files Browse the repository at this point in the history
Specs tell us that we should do that (and not go through the classic connection close procedure, but we already didn't do that)
  • Loading branch information
Keruspe committed Apr 25, 2024
1 parent ffa251a commit 4315481
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
16 changes: 13 additions & 3 deletions src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{channels::Channels, ConnectionStatus, Error};
use crate::{channels::Channels, killswitch::KillSwitch, ConnectionStatus, Error};
use executor_trait::FullExecutor;
use parking_lot::Mutex;
use reactor_trait::Reactor;
Expand All @@ -12,6 +12,7 @@ use std::{
pub struct Heartbeat {
connection_status: ConnectionStatus,
channels: Channels,
killswitch: KillSwitch,
executor: Arc<dyn FullExecutor + Send + Sync>,
reactor: Arc<dyn Reactor + Send + Sync>,
inner: Arc<Mutex<Inner>>,
Expand All @@ -24,10 +25,12 @@ impl Heartbeat {
executor: Arc<dyn FullExecutor + Send + Sync>,
reactor: Arc<dyn Reactor + Send + Sync>,
) -> Self {
let killswitch = Default::default();
let inner = Default::default();
Self {
connection_status,
channels,
killswitch,
executor,
reactor,
inner,
Expand All @@ -38,6 +41,10 @@ impl Heartbeat {
self.inner.lock().timeout = Some(timeout);
}

pub(crate) fn killswitch(&self) -> KillSwitch {
self.killswitch.clone()
}

pub(crate) fn start(&self) {
let heartbeat = self.clone();
self.executor.spawn(Box::pin(async move {
Expand All @@ -53,7 +60,9 @@ impl Heartbeat {
return None;
}

self.inner.lock().poll_timeout(&self.channels)
self.inner
.lock()
.poll_timeout(&self.channels, &self.killswitch)
}

pub(crate) fn update_last_write(&self) {
Expand Down Expand Up @@ -92,13 +101,14 @@ impl Default for Inner {
}

impl Inner {
fn poll_timeout(&mut self, channels: &Channels) -> Option<Duration> {
fn poll_timeout(&mut self, channels: &Channels, killswitch: &KillSwitch) -> Option<Duration> {
let timeout = self.timeout?;

// The value stored in timeout is half the configured heartbeat value as the spec recommends to send heartbeats at twice the configured pace.
// The specs tells us to close the connection after twice the configured interval has passed.
if Instant::now().duration_since(self.last_read) > 4 * timeout {
self.timeout = None;
killswitch.kill();
channels.set_connection_error(Error::MissingHeartbeatError);
return None;
}
Expand Down
23 changes: 18 additions & 5 deletions src/io_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
frames::Frames,
heartbeat::Heartbeat,
internal_rpc::InternalRPCHandle,
killswitch::KillSwitch,
protocol::{self, AMQPError, AMQPHardError},
socket_state::{SocketEvent, SocketState},
thread::ThreadHandle,
Expand Down Expand Up @@ -44,6 +45,7 @@ pub struct IoLoop {
connection_io_loop_handle: ThreadHandle,
stream: Pin<Box<dyn AsyncIOHandle + Send>>,
status: Status,
killswitch: KillSwitch,
frame_size: FrameSize,
receive_buffer: Buffer,
send_buffer: Buffer,
Expand All @@ -66,6 +68,7 @@ impl IoLoop {
protocol::constants::FRAME_MIN_SIZE,
configuration.frame_max(),
);
let killswitch = heartbeat.killswitch();

Ok(Self {
connection_status,
Expand All @@ -78,6 +81,7 @@ impl IoLoop {
connection_io_loop_handle,
stream,
status: Status::Initial,
killswitch,
frame_size,
receive_buffer: Buffer::with_capacity(FRAMES_STORAGE * frame_size as usize),
send_buffer: Buffer::with_capacity(FRAMES_STORAGE * frame_size as usize),
Expand Down Expand Up @@ -165,24 +169,33 @@ impl IoLoop {
res = self.critical_error(err);
}
}
self.internal_rpc.stop();
self.heartbeat.cancel();
let internal_rpc = self.internal_rpc.clone();
if self.killswitch.killed() {
internal_rpc.register_internal_future(std::future::poll_fn(move |cx| {
self.stream
.as_mut()
.poll_close(cx)
.map(|res| res.map_err(From::from))
}));
}
internal_rpc.stop();
res
})?,
);
waker.wake();
Ok(())
}

fn poll_socket_events(&mut self) {
self.socket_state.poll_events();
}

fn stop(&mut self) {
self.status = Status::Stop;
self.heartbeat.cancel();
}

fn poll_socket_events(&mut self) {
self.socket_state.poll_events();
}

fn check_connection_state(&mut self) {
if self.connection_status.closed() {
self.stop();
Expand Down
17 changes: 17 additions & 0 deletions src/killswitch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

#[derive(Default, Clone)]
pub(crate) struct KillSwitch(Arc<AtomicBool>);

impl KillSwitch {
pub(crate) fn kill(&self) {
self.0.store(true, Ordering::SeqCst);
}

pub(crate) fn killed(&self) -> bool {
self.0.load(Ordering::SeqCst)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ mod frames;
mod id_sequence;
mod internal_rpc;
mod io_loop;
mod killswitch;
mod parsing;
mod queue;
mod registry;
Expand Down

0 comments on commit 4315481

Please sign in to comment.