From b855d0bce40f015fb55328fc64397bad12ede0ca Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 17 Apr 2024 20:48:06 +1000 Subject: [PATCH 1/3] Add new async fn `Sftp::from_session_with_check_connection` Signed-off-by: Jiahao XU --- src/changelog.rs | 3 ++ src/lib.rs | 7 ++-- src/sftp.rs | 2 +- src/sftp/openssh_session.rs | 70 ++++++++++++++++++++++++++++++++----- 4 files changed, 71 insertions(+), 11 deletions(-) diff --git a/src/changelog.rs b/src/changelog.rs index f1ffd58..ab3c5ca 100644 --- a/src/changelog.rs +++ b/src/changelog.rs @@ -1,6 +1,9 @@ #[allow(unused_imports)] use crate::*; +/// # Added +/// - [`Sftp::from_session_with_check_connection`] for checking connection is +/// alive. #[doc(hidden)] pub mod unreleased {} diff --git a/src/lib.rs b/src/lib.rs index f87db05..1e723f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,11 +61,14 @@ mod unix_timestamp; pub use unix_timestamp::UnixTimeStamp; mod sftp; -#[cfg(feature = "openssh")] -pub use sftp::OpensshSession; use sftp::SftpHandle; +#[cfg(feature = "openssh")] +pub use sftp::{CheckOpensshConnection, OpensshSession}; pub use sftp::{Sftp, SftpAuxiliaryData}; +#[cfg(feature = "openssh")] +pub use openssh; + mod options; pub use options::SftpOptions; diff --git a/src/sftp.rs b/src/sftp.rs index 46b9966..3898c27 100644 --- a/src/sftp.rs +++ b/src/sftp.rs @@ -28,7 +28,7 @@ use tokio_io_utility::assert_send; mod openssh_session; #[cfg(feature = "openssh")] -pub use openssh_session::OpensshSession; +pub use openssh_session::{CheckOpensshConnection, OpensshSession}; #[derive(Debug, destructure)] pub(super) struct SftpHandle(SharedData); diff --git a/src/sftp/openssh_session.rs b/src/sftp/openssh_session.rs index 792eb62..25b7993 100644 --- a/src/sftp/openssh_session.rs +++ b/src/sftp/openssh_session.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{future::Future, pin::Pin, sync::Arc}; use openssh::{ChildStdin, ChildStdout, Error as OpensshError, Session, Stdio}; use tokio::{sync::oneshot, task::JoinHandle}; @@ -9,6 +9,16 @@ use crate::{utils::ErrorExt, Error, Sftp, SftpAuxiliaryData, SftpOptions}; #[derive(Debug)] pub struct OpensshSession(JoinHandle>); +/// Check for openssh connection to be alive +pub trait CheckOpensshConnection { + /// This function should only return on `Err()`. + /// Once the sftp session is closed, the future will be cancelled (dropped). + fn check_connection<'session>( + self: Box, + session: &'session Session, + ) -> Pin> + Send + Sync>>; +} + impl Drop for OpensshSession { fn drop(&mut self) { self.0.abort(); @@ -17,11 +27,12 @@ impl Drop for OpensshSession { #[cfg_attr( feature = "tracing", - tracing::instrument(name = "session_task", skip(tx)) + tracing::instrument(name = "session_task", skip(tx, check_openssh_connection)) )] async fn create_session_task( session: Session, tx: oneshot::Sender>, + check_openssh_connection: Option>, ) -> Option { #[cfg(feature = "tracing")] tracing::info!("Connecting to sftp subsystem, session = {session:?}"); @@ -54,15 +65,48 @@ async fn create_session_task( let stdout = child.stdout().take().unwrap(); tx.send(Ok((stdin, stdout))).unwrap(); // Ok - let original_error = match child.wait().await { - Ok(exit_status) => { - if !exit_status.success() { - Some(Error::SftpServerFailure(exit_status)) + let original_error = { + let check_conn_future = async { + if let Some(checker) = check_openssh_connection { + checker + .check_connection(&session) + .await + .err() + .map(Error::from) } else { None } + }; + + let wait_on_child_future = async { + match child.wait().await { + Ok(exit_status) => { + if !exit_status.success() { + Some(Error::SftpServerFailure(exit_status)) + } else { + None + } + } + Err(err) => Some(err.into()), + } + }; + tokio::pin!(wait_on_child_future); + + tokio::select! { + biased; + + original_error = check_conn_future => { + let occuring_error = wait_on_child_future.await; + match (original_error, occuring_error) { + (Some(original_error), Some(occuring_error)) => { + Some(original_error.error_on_cleanup(occuring_error)) + } + (Some(err), None) | (None, Some(err)) => Some(err), + (None, None) => None, + } + } + original_error = &mut wait_on_child_future => original_error, } - Err(err) => Some(err.into()), }; #[cfg(feature = "tracing")] @@ -99,10 +143,20 @@ impl Sftp { pub async fn from_session( session: openssh::Session, options: SftpOptions, + ) -> Result { + Self::from_session_with_check_connection(session, options, None).await + } + + /// Similar to [`Sftp::from_session`], but takes an additional parameter + /// for checking if the connection is still alive. + pub async fn from_session_with_check_connection( + session: openssh::Session, + options: SftpOptions, + check_openssh_connection: Option>, ) -> Result { let (tx, rx) = oneshot::channel(); - let handle = tokio::spawn(create_session_task(session, tx)); + let handle = tokio::spawn(create_session_task(session, tx, check_openssh_connection)); let msg = "Task failed without sending anything, so it must have panicked"; From 883db63e0fca62b8dae342ae11a678efd52c4344 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 17 Apr 2024 20:54:14 +1000 Subject: [PATCH 2/3] Fix lifetime for `CheckOpensshConnection` Also update changelog Signed-off-by: Jiahao XU --- src/changelog.rs | 3 +-- src/sftp/openssh_session.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/changelog.rs b/src/changelog.rs index ab3c5ca..5bb6b85 100644 --- a/src/changelog.rs +++ b/src/changelog.rs @@ -2,8 +2,7 @@ use crate::*; /// # Added -/// - [`Sftp::from_session_with_check_connection`] for checking connection is -/// alive. +/// - [`Sftp::from_session_with_check_connection`] for checking connection #[doc(hidden)] pub mod unreleased {} diff --git a/src/sftp/openssh_session.rs b/src/sftp/openssh_session.rs index 25b7993..0592c72 100644 --- a/src/sftp/openssh_session.rs +++ b/src/sftp/openssh_session.rs @@ -16,7 +16,7 @@ pub trait CheckOpensshConnection { fn check_connection<'session>( self: Box, session: &'session Session, - ) -> Pin> + Send + Sync>>; + ) -> Pin> + Send + Sync + 'session>>; } impl Drop for OpensshSession { From 2bbb3cc285adf4d86d2f93536f0f5a1eaa4f2f4d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 17 Apr 2024 21:19:35 +1000 Subject: [PATCH 3/3] Impl `CheckOpensshConnection` for function and add example Signed-off-by: Jiahao XU --- .github/workflows/rust.yml | 121 ++++++++++++++++++------------------ check.sh | 1 + src/sftp/openssh_session.rs | 59 +++++++++++++++++- 3 files changed, 120 insertions(+), 61 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2b7b234..04ea935 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -17,14 +17,14 @@ on: branches: - main paths-ignore: - - 'README.md' - - 'LICENSE' - - '.gitignore' + - "README.md" + - "LICENSE" + - ".gitignore" pull_request: paths-ignore: - - 'README.md' - - 'LICENSE' - - '.gitignore' + - "README.md" + - "LICENSE" + - ".gitignore" jobs: os-check: @@ -34,7 +34,10 @@ jobs: fail-fast: false matrix: include: - - { target: x86_64-pc-windows-msvc, args: "--exclude-features openssh" } + - { + target: x86_64-pc-windows-msvc, + args: "--exclude-features openssh", + } - { target: x86_64-apple-darwin } - { target: x86_64-unknown-linux-gnu } steps: @@ -57,65 +60,65 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - with: - submodules: 'recursive' - - name: Install toolchain - run: | - rustup toolchain install stable --component rustfmt,clippy --no-self-update --profile minimal - rustup toolchain install nightly --no-self-update --profile minimal + - uses: actions/checkout@v4 + with: + submodules: "recursive" + - name: Install toolchain + run: | + rustup toolchain install stable --component rustfmt,clippy --no-self-update --profile minimal + rustup toolchain install nightly --no-self-update --profile minimal - - name: Create Cargo.lock for caching - run: cargo update - - uses: Swatinem/rust-cache@v2 + - name: Create Cargo.lock for caching + run: cargo update + - uses: Swatinem/rust-cache@v2 - - run: ./check.sh + - run: ./check.sh build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - with: - submodules: 'recursive' - - - name: Install toolchain - run: rustup toolchain install stable --no-self-update --profile minimal - - - name: Create Cargo.lock for caching - run: cargo update - - uses: Swatinem/rust-cache@v2 - - - name: Compile tests - run: cargo test --all-features --workspace --no-run - - - name: Test ssh connectivity - run: | - # Wait for startup of openssh-server - timeout 15 ./wait_for_sshd_start_up.sh - chmod 600 .test-key - mkdir /tmp/openssh-rs - ssh -i .test-key -v -p 2222 -l test-user 127.0.0.1 -o StrictHostKeyChecking=accept-new -o UserKnownHostsFile=/tmp/openssh-rs/known_hosts whoami - - name: Set up ssh-agent - run: | - eval $(ssh-agent) - echo "SSH_AUTH_SOCK=$SSH_AUTH_SOCK" >> $GITHUB_ENV - echo "SSH_AGENT_PID=$SSH_AGENT_PID" >> $GITHUB_ENV - cat .test-key | ssh-add - - - - name: Run tests - run: ./run_tests.sh - env: - XDG_RUNTIME_DIR: /tmp - - - name: ssh container log - run: docker logs $(docker ps | grep openssh-server | awk '{print $1}') - if: ${{ failure() }} - - run: docker exec $(docker ps | grep openssh-server | awk '{print $1}') ls -R /config/logs/ - if: ${{ failure() }} - - run: docker exec $(docker ps | grep openssh-server | awk '{print $1}') cat /config/logs/openssh/current - name: ssh server log - if: ${{ failure() }} + - uses: actions/checkout@v4 + with: + submodules: "recursive" + + - name: Install toolchain + run: rustup toolchain install stable --no-self-update --profile minimal + + - name: Create Cargo.lock for caching + run: cargo update + - uses: Swatinem/rust-cache@v2 + + - name: Compile tests + run: cargo test --all-features --workspace --no-run + + - name: Test ssh connectivity + run: | + # Wait for startup of openssh-server + timeout 15 ./wait_for_sshd_start_up.sh + chmod 600 .test-key + mkdir /tmp/openssh-rs + ssh -i .test-key -v -p 2222 -l test-user 127.0.0.1 -o StrictHostKeyChecking=accept-new -o UserKnownHostsFile=/tmp/openssh-rs/known_hosts whoami + - name: Set up ssh-agent + run: | + eval $(ssh-agent) + echo "SSH_AUTH_SOCK=$SSH_AUTH_SOCK" >> $GITHUB_ENV + echo "SSH_AGENT_PID=$SSH_AGENT_PID" >> $GITHUB_ENV + cat .test-key | ssh-add - + + - name: Run tests + run: ./run_tests.sh + env: + XDG_RUNTIME_DIR: /tmp + + - name: ssh container log + run: docker logs $(docker ps | grep openssh-server | awk '{print $1}') + if: ${{ failure() }} + - run: docker exec $(docker ps | grep openssh-server | awk '{print $1}') ls -R /config/logs/ + if: ${{ failure() }} + - run: docker exec $(docker ps | grep openssh-server | awk '{print $1}') cat /config/logs/openssh/current + name: ssh server log + if: ${{ failure() }} services: openssh: image: linuxserver/openssh-server:amd64-latest diff --git a/check.sh b/check.sh index c2488b7..357afd4 100755 --- a/check.sh +++ b/check.sh @@ -6,6 +6,7 @@ cd "$(dirname "$(realpath "$0")")" cargo fmt --all -- --check cargo clippy --all-features --all --no-deps +cargo test --doc --all-features export RUSTDOCFLAGS="--cfg docsrs" exec cargo +nightly doc \ diff --git a/src/sftp/openssh_session.rs b/src/sftp/openssh_session.rs index 0592c72..04ff91c 100644 --- a/src/sftp/openssh_session.rs +++ b/src/sftp/openssh_session.rs @@ -19,6 +19,22 @@ pub trait CheckOpensshConnection { ) -> Pin> + Send + Sync + 'session>>; } +impl CheckOpensshConnection for F +where + F: for<'session> FnOnce( + &'session Session, + ) -> Pin< + Box> + Send + Sync + 'session>, + >, +{ + fn check_connection<'session>( + self: Box, + session: &'session Session, + ) -> Pin> + Send + Sync + 'session>> { + (self)(session) + } +} + impl Drop for OpensshSession { fn drop(&mut self) { self.0.abort(); @@ -144,12 +160,51 @@ impl Sftp { session: openssh::Session, options: SftpOptions, ) -> Result { - Self::from_session_with_check_connection(session, options, None).await + Self::from_session_with_check_connection_inner(session, options, None).await } /// Similar to [`Sftp::from_session`], but takes an additional parameter /// for checking if the connection is still alive. + /// + /// # Example + /// + /// ```rust,no_run + /// + /// fn check_connection<'session>( + /// session: &'session openssh::Session, + /// ) -> std::pin::Pin> + Send + Sync + 'session>> { + /// Box::pin(async move { + /// loop { + /// tokio::time::sleep(std::time::Duration::from_secs(10)).await; + /// session.check().await?; + /// } + /// Ok(()) + /// }) + /// } + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> Result<(), openssh_sftp_client::Error> { + /// openssh_sftp_client::Sftp::from_session_with_check_connection( + /// openssh::Session::connect_mux("me@ssh.example.com", openssh::KnownHosts::Strict).await?, + /// openssh_sftp_client::SftpOptions::default(), + /// check_connection, + /// ).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn from_session_with_check_connection( + session: openssh::Session, + options: SftpOptions, + check_openssh_connection: impl CheckOpensshConnection + Send + Sync + 'static, + ) -> Result { + Self::from_session_with_check_connection_inner( + session, + options, + Some(Box::new(check_openssh_connection)), + ) + .await + } + + async fn from_session_with_check_connection_inner( session: openssh::Session, options: SftpOptions, check_openssh_connection: Option>, @@ -165,7 +220,7 @@ impl Sftp { Err(_) => return Err(handle.await.expect_err(msg).into()), }; - Sftp::new_with_auxiliary( + Self::new_with_auxiliary( stdin, stdout, options,