Skip to content

Commit

Permalink
runc: split Pipe, Io, and PipedIo to async and sync modules
Browse files Browse the repository at this point in the history
Signed-off-by: jiaxiao zhou <[email protected]>
  • Loading branch information
Mossaka committed Nov 16, 2024
1 parent ac89daa commit 8ad0353
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 176 deletions.
118 changes: 118 additions & 0 deletions crates/runc/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

mod pipe;
use std::{fmt::Debug, io::Result, os::fd::AsRawFd};

use log::debug;
pub use pipe::Pipe;
use tokio::io::{AsyncRead, AsyncWrite};

use crate::Command;

pub trait Io: Debug + Send + Sync {
/// Return write side of stdin
#[cfg(feature = "async")]
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
None
}

/// Return read side of stdout
#[cfg(feature = "async")]
fn stdout(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
None
}

/// Return read side of stderr
#[cfg(feature = "async")]
fn stderr(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
None
}

/// Set IO for passed command.
/// Read side of stdin, write side of stdout and write side of stderr should be provided to command.
fn set(&self, cmd: &mut Command) -> Result<()>;

/// Only close write side (should be stdout/err "from" runc process)
fn close_after_start(&self);
}

#[derive(Debug)]
pub struct PipedIo {
pub stdin: Option<Pipe>,
pub stdout: Option<Pipe>,
pub stderr: Option<Pipe>,
}

impl Io for PipedIo {
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
self.stdin.as_ref().and_then(|pipe| {
let fd = pipe.wr.as_raw_fd();
tokio_pipe::PipeWrite::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncWrite + Send + Sync + Unpin>)
.ok()
})
}

fn stdout(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
self.stdout.as_ref().and_then(|pipe| {
let fd = pipe.rd.as_raw_fd();
tokio_pipe::PipeRead::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncRead + Send + Sync + Unpin>)
.ok()
})
}

fn stderr(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
self.stderr.as_ref().and_then(|pipe| {
let fd = pipe.rd.as_raw_fd();
tokio_pipe::PipeRead::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncRead + Send + Sync + Unpin>)
.ok()
})
}

// Note that this internally use [`std::fs::File`]'s `try_clone()`.
// Thus, the files passed to commands will be not closed after command exit.
fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
if let Some(p) = self.stdin.as_ref() {
let pr = p.rd.try_clone()?;
cmd.stdin(pr);
}

if let Some(p) = self.stdout.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
}

if let Some(p) = self.stderr.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
}

Ok(())
}

fn close_after_start(&self) {
if let Some(p) = self.stdout.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
}

if let Some(p) = self.stderr.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e));
}
}
}
38 changes: 38 additions & 0 deletions crates/runc/src/asynchronous/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use std::os::unix::io::OwnedFd;

use tokio::net::unix::pipe;

/// Struct to represent a pipe that can be used to transfer stdio inputs and outputs.
///
/// With this Io driver, methods of [crate::Runc] may capture the output/error messages.
/// When one side of the pipe is closed, the state will be represented with [`None`].
#[derive(Debug)]
pub struct Pipe {
pub rd: OwnedFd,
pub wr: OwnedFd,
}

impl Pipe {
pub fn new() -> std::io::Result<Self> {
let (tx, rx) = pipe::pipe()?;
let rd = tx.into_blocking_fd()?;
let wr = rx.into_blocking_fd()?;
Ok(Self { rd, wr })
}
}
178 changes: 3 additions & 175 deletions crates/runc/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,72 +13,19 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
#[cfg(not(feature = "async"))]
use std::io::{Read, Write};

use std::{
fmt::Debug,
fs::{File, OpenOptions},
io::Result,
os::unix::{
fs::OpenOptionsExt,
io::{AsRawFd, OwnedFd},
},
os::unix::{fs::OpenOptionsExt, io::AsRawFd},
process::Stdio,
sync::Mutex,
};

use log::debug;
use nix::unistd::{Gid, Uid};
#[cfg(feature = "async")]
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::unix::pipe;

use crate::Command;

pub trait Io: Debug + Send + Sync {
/// Return write side of stdin
#[cfg(not(feature = "async"))]
fn stdin(&self) -> Option<Box<dyn Write + Send + Sync>> {
None
}

/// Return read side of stdout
#[cfg(not(feature = "async"))]
fn stdout(&self) -> Option<Box<dyn Read + Send>> {
None
}

/// Return read side of stderr
#[cfg(not(feature = "async"))]
fn stderr(&self) -> Option<Box<dyn Read + Send>> {
None
}

/// Return write side of stdin
#[cfg(feature = "async")]
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
None
}

/// Return read side of stdout
#[cfg(feature = "async")]
fn stdout(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
None
}

/// Return read side of stderr
#[cfg(feature = "async")]
fn stderr(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
None
}

/// Set IO for passed command.
/// Read side of stdin, write side of stdout and write side of stderr should be provided to command.
fn set(&self, cmd: &mut Command) -> Result<()>;

/// Only close write side (should be stdout/err "from" runc process)
fn close_after_start(&self);
}
use crate::{Command, Io, Pipe, PipedIo};

#[derive(Debug, Clone)]
pub struct IOOption {
Expand All @@ -97,32 +44,6 @@ impl Default for IOOption {
}
}

/// Struct to represent a pipe that can be used to transfer stdio inputs and outputs.
///
/// With this Io driver, methods of [crate::Runc] may capture the output/error messages.
/// When one side of the pipe is closed, the state will be represented with [`None`].
#[derive(Debug)]
pub struct Pipe {
rd: OwnedFd,
wr: OwnedFd,
}

#[derive(Debug)]
pub struct PipedIo {
stdin: Option<Pipe>,
stdout: Option<Pipe>,
stderr: Option<Pipe>,
}

impl Pipe {
fn new() -> std::io::Result<Self> {
let (tx, rx) = pipe::pipe()?;
let rd = tx.into_blocking_fd()?;
let wr = rx.into_blocking_fd()?;
Ok(Self { rd, wr })
}
}

impl PipedIo {
pub fn new(uid: u32, gid: u32, opts: &IOOption) -> std::io::Result<Self> {
Ok(Self {
Expand Down Expand Up @@ -156,99 +77,6 @@ impl PipedIo {
}
}

impl Io for PipedIo {
#[cfg(not(feature = "async"))]
fn stdin(&self) -> Option<Box<dyn Write + Send + Sync>> {
self.stdin.as_ref().and_then(|pipe| {
pipe.wr
.try_clone()
.map(|x| Box::new(x) as Box<dyn Write + Send + Sync>)
.ok()
})
}

#[cfg(feature = "async")]
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
self.stdin.as_ref().and_then(|pipe| {
let fd = pipe.wr.as_raw_fd();
tokio_pipe::PipeWrite::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncWrite + Send + Sync + Unpin>)
.ok()
})
}

#[cfg(not(feature = "async"))]
fn stdout(&self) -> Option<Box<dyn Read + Send>> {
self.stdout.as_ref().and_then(|pipe| {
pipe.rd
.try_clone()
.map(|x| Box::new(x) as Box<dyn Read + Send>)
.ok()
})
}

#[cfg(feature = "async")]
fn stdout(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
self.stdout.as_ref().and_then(|pipe| {
let fd = pipe.rd.as_raw_fd();
tokio_pipe::PipeRead::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncRead + Send + Sync + Unpin>)
.ok()
})
}

#[cfg(not(feature = "async"))]
fn stderr(&self) -> Option<Box<dyn Read + Send>> {
self.stderr.as_ref().and_then(|pipe| {
pipe.rd
.try_clone()
.map(|x| Box::new(x) as Box<dyn Read + Send>)
.ok()
})
}

#[cfg(feature = "async")]
fn stderr(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
self.stderr.as_ref().and_then(|pipe| {
let fd = pipe.rd.as_raw_fd();
tokio_pipe::PipeRead::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncRead + Send + Sync + Unpin>)
.ok()
})
}

// Note that this internally use [`std::fs::File`]'s `try_clone()`.
// Thus, the files passed to commands will be not closed after command exit.
fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
if let Some(p) = self.stdin.as_ref() {
let pr = p.rd.try_clone()?;
cmd.stdin(pr);
}

if let Some(p) = self.stdout.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
}

if let Some(p) = self.stderr.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
}

Ok(())
}

fn close_after_start(&self) {
if let Some(p) = self.stdout.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
}

if let Some(p) = self.stderr.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e));
}
}
}

/// IO driver to direct output/error messages to /dev/null.
///
/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages.
Expand Down
7 changes: 7 additions & 0 deletions crates/runc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,22 @@ use async_trait::async_trait;
use log::debug;
use oci_spec::runtime::{LinuxResources, Process};

#[cfg(feature = "async")]
pub use crate::asynchronous::*;
#[cfg(not(feature = "async"))]
pub use crate::synchronous::*;
use crate::{container::Container, error::Error, options::*, utils::write_value_to_temp_file};

#[cfg(feature = "async")]
pub mod asynchronous;
pub mod container;
pub mod error;
pub mod events;
pub mod io;
#[cfg(feature = "async")]
pub mod monitor;
pub mod options;
pub mod synchronous;
pub mod utils;

pub type Result<T> = std::result::Result<T, crate::error::Error>;
Expand Down
2 changes: 1 addition & 1 deletion crates/runc/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
time::Duration,
};

use crate::{error::Error, io::Io, utils, DefaultExecutor, LogFormat, Runc, Spawner};
use crate::{error::Error, utils, DefaultExecutor, Io, LogFormat, Runc, Spawner};

// constants for log format
pub const JSON: &str = "json";
Expand Down
Loading

0 comments on commit 8ad0353

Please sign in to comment.