Skip to content

Commit

Permalink
Feat: Closing an existing port forward (#165)
Browse files Browse the repository at this point in the history
* Implement cancel_port_forwarding for process-mux

* Update doc comment

* Fix test

* Fix test

* Ignore cancel tests for native-mux

* Fix local forwarding test

* Update cfg

* Update cfg

* Update cfg

* Update cfg

* Update cfg

* Update cfg

* Rename API and to `read` 0 test in remote forward test

* Fix native mux impl

* Try `UnixStream::connect`

* What is getting read in?

* Extra LF by echo?

* `try_read` would block

* Update `openssh-mux-client` in Cargo.toml
  • Loading branch information
jaywonchung authored Sep 10, 2024
1 parent 94b63e6 commit a3fc969
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ tokio = { version = "1.36.0", features = [ "process", "io-util", "macros", "net"

once_cell = "1.8.0"

openssh-mux-client = { version = "0.17.0", optional = true }
openssh-mux-client = { version = "0.17.6", optional = true }

libc = "0.2.137"

Expand Down
18 changes: 18 additions & 0 deletions src/native_mux_impl/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ impl Session {
Ok(())
}

pub(crate) async fn close_port_forward(
&self,
forward_type: crate::ForwardType,
listen_socket: crate::Socket<'_>,
connect_socket: crate::Socket<'_>,
) -> Result<(), Error> {
Connection::connect(&self.ctl)
.await?
.close_port_forward(
forward_type.into(),
&listen_socket.into(),
&connect_socket.into(),
)
.await?;

Ok(())
}

async fn close_impl(&self) -> Result<(), Error> {
Connection::connect(&self.ctl)
.await?
Expand Down
37 changes: 37 additions & 0 deletions src/process_impl/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,43 @@ impl Session {
}
}

pub(crate) async fn close_port_forward(
&self,
forward_type: ForwardType,
listen_socket: Socket<'_>,
connect_socket: Socket<'_>,
) -> Result<(), Error> {
let flag = match forward_type {
ForwardType::Local => OsStr::new("-L"),
ForwardType::Remote => OsStr::new("-R"),
};

let mut forwarding = listen_socket.as_os_str().into_owned();
forwarding.push(":");
forwarding.push(connect_socket.as_os_str());

let port_forwarding = self
.new_cmd(&[OsStr::new("-O"), OsStr::new("cancel"), flag, &*forwarding])
.output()
.await
.map_err(Error::Ssh)?;

if port_forwarding.status.success() {
Ok(())
} else {
let exit_err = String::from_utf8_lossy(&port_forwarding.stderr);
let err = exit_err.trim();

if err.is_empty() {
if let Some(master_error) = self.discover_master_error() {
return Err(master_error);
}
}

Err(Error::Ssh(io::Error::new(io::ErrorKind::Other, err)))
}
}

async fn close_impl(&self) -> Result<(), Error> {
let exit = self
.new_cmd(&["-O", "exit"])
Expand Down
22 changes: 19 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ impl Session {
///
/// Otherwise, `listen_socket` on the remote machine will be forwarded to `connect_socket`
/// on the local machine.
///
/// Currently, there is no way of stopping a port forwarding due to the fact that
/// openssh multiplex server/master does not support this.
pub async fn request_port_forward(
&self,
forward_type: impl Into<ForwardType>,
Expand All @@ -490,6 +487,25 @@ impl Session {
})
}

/// Close a previously established local/remote port forwarding.
///
/// The same set of arguments should be passed as when the port forwarding was requested.
pub async fn close_port_forward(
&self,
forward_type: impl Into<ForwardType>,
listen_socket: impl Into<Socket<'_>>,
connect_socket: impl Into<Socket<'_>>,
) -> Result<(), Error> {
delegate!(&self.0, imp, {
imp.close_port_forward(
forward_type.into(),
listen_socket.into(),
connect_socket.into(),
)
.await
})
}

/// Terminate the remote connection.
///
/// This destructor terminates the ssh multiplex server
Expand Down
22 changes: 21 additions & 1 deletion tests/openssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ async fn remote_socket_forward() {

eprintln!("Creating remote process");
let cmd = format!(
"echo -e '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n' | nc localhost {} >/dev/stderr",
"echo -e '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10' | nc localhost {} >/dev/stderr",
port
);
let child = session
Expand All @@ -851,6 +851,16 @@ async fn remote_socket_forward() {

assert_eq!(DATA, &buffer);

eprintln!("Canceling port forward");
session
.close_port_forward(ForwardType::Remote, (loopback(), *port), &*unix_socket)
.await
.unwrap();

eprintln!("Trying to connect again");
let e = output.try_read(&mut buffer).unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::WouldBlock);

drop(output);
drop(output_listener);

Expand Down Expand Up @@ -902,6 +912,16 @@ async fn local_socket_forward() {

drop(output);

eprintln!("Closing port forward");
session
.close_port_forward(ForwardType::Local, &*unix_socket, (loopback(), port))
.await
.unwrap();

eprintln!("Trying to connect again");
let e = UnixStream::connect(&unix_socket).await.unwrap_err();
assert_eq!(e.kind(), io::ErrorKind::ConnectionRefused);

eprintln!("Waiting for session to end");
let output = child.wait_with_output().await.unwrap();
eprintln!("local_socket_forward: {:#?}", output);
Expand Down

0 comments on commit a3fc969

Please sign in to comment.