diff --git a/Cargo.toml b/Cargo.toml index a2ea70e2f..f15ca9a42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.36.0", features = [ "process", "io-util", "macros", "net" once_cell = "1.8.0" -openssh-mux-client = { version = "0.17.0", optional = true } +openssh-mux-client = { version = "0.17.6", optional = true } libc = "0.2.137" diff --git a/src/native_mux_impl/session.rs b/src/native_mux_impl/session.rs index 23b4e11d4..64f9ce7ed 100644 --- a/src/native_mux_impl/session.rs +++ b/src/native_mux_impl/session.rs @@ -67,6 +67,24 @@ impl Session { Ok(()) } + pub(crate) async fn close_port_forward( + &self, + forward_type: crate::ForwardType, + listen_socket: crate::Socket<'_>, + connect_socket: crate::Socket<'_>, + ) -> Result<(), Error> { + Connection::connect(&self.ctl) + .await? + .close_port_forward( + forward_type.into(), + &listen_socket.into(), + &connect_socket.into(), + ) + .await?; + + Ok(()) + } + async fn close_impl(&self) -> Result<(), Error> { Connection::connect(&self.ctl) .await? diff --git a/src/process_impl/session.rs b/src/process_impl/session.rs index 7188ffc6a..612ba3132 100644 --- a/src/process_impl/session.rs +++ b/src/process_impl/session.rs @@ -139,6 +139,43 @@ impl Session { } } + pub(crate) async fn close_port_forward( + &self, + forward_type: ForwardType, + listen_socket: Socket<'_>, + connect_socket: Socket<'_>, + ) -> Result<(), Error> { + let flag = match forward_type { + ForwardType::Local => OsStr::new("-L"), + ForwardType::Remote => OsStr::new("-R"), + }; + + let mut forwarding = listen_socket.as_os_str().into_owned(); + forwarding.push(":"); + forwarding.push(connect_socket.as_os_str()); + + let port_forwarding = self + .new_cmd(&[OsStr::new("-O"), OsStr::new("cancel"), flag, &*forwarding]) + .output() + .await + .map_err(Error::Ssh)?; + + if port_forwarding.status.success() { + Ok(()) + } else { + let exit_err = String::from_utf8_lossy(&port_forwarding.stderr); + let err = exit_err.trim(); + + if err.is_empty() { + if let Some(master_error) = self.discover_master_error() { + return Err(master_error); + } + } + + Err(Error::Ssh(io::Error::new(io::ErrorKind::Other, err))) + } + } + async fn close_impl(&self) -> Result<(), Error> { let exit = self .new_cmd(&["-O", "exit"]) diff --git a/src/session.rs b/src/session.rs index d1bf891f6..121874c57 100644 --- a/src/session.rs +++ b/src/session.rs @@ -471,9 +471,6 @@ impl Session { /// /// Otherwise, `listen_socket` on the remote machine will be forwarded to `connect_socket` /// on the local machine. - /// - /// Currently, there is no way of stopping a port forwarding due to the fact that - /// openssh multiplex server/master does not support this. pub async fn request_port_forward( &self, forward_type: impl Into, @@ -490,6 +487,25 @@ impl Session { }) } + /// Close a previously established local/remote port forwarding. + /// + /// The same set of arguments should be passed as when the port forwarding was requested. + pub async fn close_port_forward( + &self, + forward_type: impl Into, + listen_socket: impl Into>, + connect_socket: impl Into>, + ) -> Result<(), Error> { + delegate!(&self.0, imp, { + imp.close_port_forward( + forward_type.into(), + listen_socket.into(), + connect_socket.into(), + ) + .await + }) + } + /// Terminate the remote connection. /// /// This destructor terminates the ssh multiplex server diff --git a/tests/openssh.rs b/tests/openssh.rs index 47638ebad..501832447 100644 --- a/tests/openssh.rs +++ b/tests/openssh.rs @@ -829,7 +829,7 @@ async fn remote_socket_forward() { eprintln!("Creating remote process"); let cmd = format!( - "echo -e '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n' | nc localhost {} >/dev/stderr", + "echo -e '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10' | nc localhost {} >/dev/stderr", port ); let child = session @@ -851,6 +851,16 @@ async fn remote_socket_forward() { assert_eq!(DATA, &buffer); + eprintln!("Canceling port forward"); + session + .close_port_forward(ForwardType::Remote, (loopback(), *port), &*unix_socket) + .await + .unwrap(); + + eprintln!("Trying to connect again"); + let e = output.try_read(&mut buffer).unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + drop(output); drop(output_listener); @@ -902,6 +912,16 @@ async fn local_socket_forward() { drop(output); + eprintln!("Closing port forward"); + session + .close_port_forward(ForwardType::Local, &*unix_socket, (loopback(), port)) + .await + .unwrap(); + + eprintln!("Trying to connect again"); + let e = UnixStream::connect(&unix_socket).await.unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::ConnectionRefused); + eprintln!("Waiting for session to end"); let output = child.wait_with_output().await.unwrap(); eprintln!("local_socket_forward: {:#?}", output);