Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MGS: Replace canned SP identification with RFD250-style discovery #934

Merged
merged 2 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions gateway-sp-comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ http = "0.2.6"
hyper = "0.14.17"
ringbuffer = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
thiserror = "1.0.30"
tokio-tungstenite = "0.17"
tokio-stream = "0.1.8"
Expand Down
177 changes: 59 additions & 118 deletions gateway-sp-comms/src/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,33 @@ use crate::error::Error;
use crate::error::SpCommunicationError;
use crate::error::StartupError;
use crate::management_switch::ManagementSwitch;
use crate::management_switch::ManagementSwitchDiscovery;
use crate::management_switch::SpSocket;
use crate::management_switch::SwitchPort;
use crate::recv_handler::RecvHandler;
use crate::Elapsed;
use crate::KnownSps;
use crate::SpIdentifier;
use crate::SwitchConfig;
use crate::Timeout;
use futures::stream::FuturesUnordered;
use futures::Future;
use futures::Stream;
use gateway_messages::version;
use gateway_messages::BulkIgnitionState;
use gateway_messages::DiscoverResponse;
use gateway_messages::IgnitionCommand;
use gateway_messages::IgnitionState;
use gateway_messages::Request;
use gateway_messages::RequestKind;
use gateway_messages::ResponseError;
use gateway_messages::ResponseKind;
use gateway_messages::SerialConsole;
use gateway_messages::SerializedSize;
use gateway_messages::SpComponent;
use gateway_messages::SpState;
use hyper::header;
use hyper::upgrade;
use hyper::Body;
use omicron_common::backoff;
use omicron_common::backoff::Backoff;
use slog::debug;
use slog::info;
use slog::o;
use slog::Logger;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tokio_tungstenite::tungstenite::handshake;

/// Helper trait that allows us to return an `impl FuturesUnordered<_>` where
Expand All @@ -66,28 +56,30 @@ where

#[derive(Debug)]
pub struct Communicator {
log: Logger,
switch: ManagementSwitch,
request_id: AtomicU32,
recv_handler: Arc<RecvHandler>,
}

impl Communicator {
pub async fn new(
known_sps: KnownSps,
config: SwitchConfig,
discovery_deadline: Instant,
log: &Logger,
) -> Result<Self, StartupError> {
let log = log.new(o!("component" => "SpCommunicator"));
let discovery = ManagementSwitchDiscovery::placeholder_start(
known_sps,
log.clone(),
)
.await?;

let (switch, recv_handler) = RecvHandler::new(discovery, log.clone());
let switch =
ManagementSwitch::new(config, discovery_deadline, log.clone())
.await?;

info!(&log, "started SP communicator");
Ok(Self { log, switch, request_id: AtomicU32::new(0), recv_handler })
Ok(Self { switch })
}

/// Get the name of our location.
///
/// This matches one of the names specified as a possible location in the
/// configuration we were given.
pub fn location_name(&self) -> &str {
&self.switch.location_name()
}

// convert an identifier to a port number; this is fallible because
Expand All @@ -103,13 +95,36 @@ impl Communicator {
self.switch.switch_port_to_id(port)
}

/// Returns true if we've discovered the IP address of our local ignition
/// controller.
///
/// This method exists to be polled during test setup (to wait for discovery
/// to happen); it should not be called outside tests.
pub fn local_ignition_controller_address_known(&self) -> bool {
self.switch.ignition_controller().is_some()
}

/// Returns true if we've discovered the IP address of the specified SP.
///
/// This method exists to be polled during test setup (to wait for discovery
/// to happen); it should not be called outside tests. In particular, it
/// panics instead of running an error if `sp` describes an SP that isn't
/// known to this communicator.
pub fn address_known(&self, sp: SpIdentifier) -> bool {
let port = self.switch.switch_port(sp).unwrap();
self.switch.sp_socket(port).is_some()
}

/// Ask the local ignition controller for the ignition state of a given SP.
pub async fn get_ignition_state(
&self,
sp: SpIdentifier,
timeout: Timeout,
) -> Result<IgnitionState, Error> {
let controller = self.switch.ignition_controller();
let controller = self
.switch
.ignition_controller()
.ok_or(Error::LocalIgnitionControllerAddressUnknown)?;
let port = self.id_to_port(sp)?;
let request =
RequestKind::IgnitionState { target: port.as_ignition_target() };
Expand All @@ -129,7 +144,10 @@ impl Communicator {
&self,
timeout: Timeout,
) -> Result<Vec<(SpIdentifier, IgnitionState)>, Error> {
let controller = self.switch.ignition_controller();
let controller = self
.switch
.ignition_controller()
.ok_or(Error::LocalIgnitionControllerAddressUnknown)?;
let request = RequestKind::BulkIgnitionState;

let bulk_state = self
Expand Down Expand Up @@ -170,7 +188,10 @@ impl Communicator {
command: IgnitionCommand,
timeout: Timeout,
) -> Result<(), Error> {
let controller = self.switch.ignition_controller();
let controller = self
.switch
.ignition_controller()
.ok_or(Error::LocalIgnitionControllerAddressUnknown)?;
let target = self.id_to_port(target_sp)?.as_ignition_target();
let request = RequestKind::IgnitionCommand { target, command };

Expand All @@ -196,6 +217,11 @@ impl Communicator {
// via UDP. SPs will continuously broadcast any serial console data, even if
// there is no attached client. Maybe this is fine, since the serial console
// shouldn't be noisy without a corresponding client driving it?
//
// TODO Because this method doesn't contact the target SP, it succeeds even
// if we don't know the IP address of that SP (yet, or possibly ever)! The
// connection will start working if we later discover the address, but this
// is probably not the behavior we want.
pub async fn serial_console_attach(
self: &Arc<Self>,
request: &mut http::Request<Body>,
Expand Down Expand Up @@ -250,7 +276,7 @@ impl Communicator {
.map(|key| handshake::derive_accept_key(key))
.ok_or(Error::BadWebsocketConnection("missing websocket key"))?;

self.recv_handler.serial_console_attach(
self.switch.serial_console_attach(
Arc::clone(self),
port,
component,
Expand Down Expand Up @@ -281,7 +307,7 @@ impl Communicator {
component: &SpComponent,
) -> Result<(), Error> {
let port = self.id_to_port(sp)?;
self.recv_handler.serial_console_detach(port, component)
self.switch.serial_console_detach(port, component)
}

/// Send `packet` to the given SP component's serial console.
Expand Down Expand Up @@ -398,102 +424,17 @@ impl Communicator {
.collect::<FuturesUnordered<_>>()
}

pub(crate) async fn request_response<F, T>(
async fn request_response<F, T>(
&self,
sp: &SpSocket<'_>,
mut kind: RequestKind,
mut map_response_kind: F,
kind: RequestKind,
map_response_kind: F,
timeout: Option<Timeout>,
) -> Result<T, Error>
where
F: FnMut(ResponseKind) -> Result<T, BadResponseType>,
{
// helper to wrap a future in a timeout if we have one
async fn maybe_with_timeout<F, U>(
timeout: Option<Timeout>,
fut: F,
) -> Result<U, Elapsed>
where
F: Future<Output = U>,
{
match timeout {
Some(t) => t.timeout_at(fut).await,
None => Ok(fut.await),
}
}

// We'll use exponential backoff if and only if the SP responds with
// "busy"; any other error will cause the loop below to terminate.
let mut backoff = backoff::internal_service_policy();

loop {
// It would be nicer to use `backoff::retry()` instead of manually
// stepping the backoff policy, but the dance we do with `kind` to
// avoid cloning it is hard to map into `retry()` in a way that
// satisfies the borrow checker. ("The dance we do with `kind` to
// avoid cloning it" being that we move it into `request` below, and
// on a busy response from the SP we move it back out into the
// `kind` local var.)
let duration = backoff
.next_backoff()
.expect("internal backoff policy gave up");
maybe_with_timeout(timeout, tokio::time::sleep(duration))
.await
.map_err(|err| Error::Timeout {
timeout: err.duration(),
sp: self.port_to_id(sp.port()),
})?;

// request IDs will eventually roll over; since we enforce timeouts
// this should be a non-issue in practice. does this need testing?
let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);

// update our recv_handler to expect a response for this request ID
let response_fut =
self.recv_handler.register_request_id(sp.port(), request_id);

// Serialize and send our request. We know `buf` is large enough for
// any `Request`, so unwrapping here is fine.
let request = Request { version: version::V1, request_id, kind };
let mut buf = [0; Request::MAX_SIZE];
let n = gateway_messages::serialize(&mut buf, &request).unwrap();
let serialized_request = &buf[..n];

// Actual communication, guarded by `timeout` if it's not `None`.
let result = maybe_with_timeout(timeout, async {
debug!(&self.log, "sending {:?} to SP {:?}", request, sp);
sp.send(serialized_request).await.map_err(|err| {
SpCommunicationError::UdpSend { addr: sp.addr(), err }
})?;

Ok::<ResponseKind, SpCommunicationError>(response_fut.await?)
})
.await
.map_err(|err| Error::Timeout {
timeout: err.duration(),
sp: self.port_to_id(sp.port()),
})?;

match result {
Ok(response_kind) => {
return map_response_kind(response_kind)
.map_err(SpCommunicationError::from)
.map_err(Error::from)
}
Err(SpCommunicationError::SpError(ResponseError::Busy)) => {
debug!(
&self.log,
"SP busy; sleeping before retrying send";
"sp" => ?sp,
);

// move `kind` back into local var; required to satisfy
// borrow check of this loop
kind = request.kind;
}
Err(err) => return Err(err.into()),
}
}
self.switch.request_response(sp, kind, map_response_kind, timeout).await
}
}

Expand Down
6 changes: 4 additions & 2 deletions gateway-sp-comms/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ pub enum Error {
.0.slot,
)]
SpAddressUnknown(SpIdentifier),
#[error("timeout ({timeout:?}) elapsed communicating with {sp:?}")]
Timeout { timeout: Duration, sp: SpIdentifier },
#[error(
"timeout ({timeout:?}) elapsed communicating with {sp:?} on port {port}"
)]
Timeout { timeout: Duration, port: usize, sp: Option<SpIdentifier> },
#[error("error communicating with SP: {0}")]
SpCommunicationFailed(#[from] SpCommunicationError),
#[error("serial console is already attached")]
Expand Down
10 changes: 4 additions & 6 deletions gateway-sp-comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ pub mod error;

pub use communicator::Communicator;
pub use communicator::FuturesUnorderedImpl;
pub use management_switch::LocationConfig;
pub use management_switch::LocationDeterminationConfig;
pub use management_switch::SpIdentifier;
pub use management_switch::SpType;
pub use management_switch::SwitchConfig;
pub use management_switch::SwitchPortConfig;
pub use timeout::Elapsed;
pub use timeout::Timeout;

// TODO these will remain public for a while, but eventually will be removed
// altogther; currently these provide a way to hard-code the rack topology,
// which is not what we want.
pub use management_switch::KnownSp;
pub use management_switch::KnownSps;
Loading