Skip to content

Commit

Permalink
Add ability to disable DiskManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Nov 22, 2022
1 parent 209c266 commit 60b17a9
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions datafusion/core/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub enum DiskManagerConfig {
/// Create a new [DiskManager] that creates temporary files within
/// the specified directories
NewSpecified(Vec<PathBuf>),

/// Disable disk manager, attempts to create temporary files will error
Disabled,
}

impl Default for DiskManagerConfig {
Expand Down Expand Up @@ -68,9 +71,11 @@ impl DiskManagerConfig {
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
/// TempDirs to put temporary files in. A new OS specified
/// temporary directory will be created if this list is empty.
local_dirs: Mutex<Vec<TempDir>>,
/// TempDirs to put temporary files in.
///
/// If `Some(vec![])` a new OS specified temporary directory will be created
/// If `None` an error will be returned
local_dirs: Mutex<Option<Vec<TempDir>>>,
}

impl DiskManager {
Expand All @@ -79,7 +84,7 @@ impl DiskManager {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(vec![]),
local_dirs: Mutex::new(Some(vec![])),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
Expand All @@ -88,15 +93,23 @@ impl DiskManager {
local_dirs
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(local_dirs),
local_dirs: Mutex::new(Some(local_dirs)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
})),
}
}

/// Return a temporary file from a randomized choice in the configured locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
let mut local_dirs = self.local_dirs.lock();
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
DataFusionError::ResourcesExhausted(
"Cannot spill to temporary file as DiskManager is disabled".to_string(),
)
})?;

// Create a temporary directory if needed
if local_dirs.is_empty() {
Expand All @@ -110,7 +123,10 @@ impl DiskManager {
local_dirs.push(tempdir);
}

create_tmp_file(&local_dirs)
let dir_index = thread_rng().gen_range(0..local_dirs.len());
Builder::new()
.tempfile_in(&local_dirs[dir_index])
.map_err(DataFusionError::IoError)
}
}

Expand All @@ -127,17 +143,6 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
.collect()
}

fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {
let dir_index = thread_rng().gen_range(0..local_dirs.len());
let dir = local_dirs.get(dir_index).ok_or_else(|| {
DataFusionError::Internal("No directories available to DiskManager".into())
})?;

Builder::new()
.tempfile_in(dir)
.map_err(DataFusionError::IoError)
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down Expand Up @@ -171,6 +176,7 @@ mod tests {
dm.local_dirs
.lock()
.iter()
.flatten()
.map(|p| p.path().into())
.collect()
}
Expand All @@ -194,6 +200,17 @@ mod tests {
Ok(())
}

#[test]
fn test_disabled_disk_manager() {
let config = DiskManagerConfig::Disabled;
let manager = DiskManager::try_new(config).unwrap();
let e = manager.create_tmp_file().unwrap_err().to_string();
assert_eq!(
e,
"Resources exhausted: Cannot spill to temporary file as DiskManager is disabled"
)
}

/// Asserts that `file_path` is found anywhere in any of `dir` directories
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
Expand Down

0 comments on commit 60b17a9

Please sign in to comment.