Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to disable DiskManager #4330

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions datafusion/core/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub enum DiskManagerConfig {
/// Create a new [DiskManager] that creates temporary files within
/// the specified directories
NewSpecified(Vec<PathBuf>),

/// Disable disk manager, attempts to create temporary files will error
Disabled,
}

impl Default for DiskManagerConfig {
Expand Down Expand Up @@ -68,9 +71,11 @@ impl DiskManagerConfig {
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
/// TempDirs to put temporary files in. A new OS specified
/// temporary directory will be created if this list is empty.
local_dirs: Mutex<Vec<TempDir>>,
/// TempDirs to put temporary files in.
///
/// If `Some(vec![])` a new OS specified temporary directory will be created
/// If `None` an error will be returned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// If `None` an error will be returned
/// If `None` an error will be returned (configured not to spill)

local_dirs: Mutex<Option<Vec<TempDir>>>,
}

impl DiskManager {
Expand All @@ -79,7 +84,7 @@ impl DiskManager {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(vec![]),
local_dirs: Mutex::new(Some(vec![])),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
Expand All @@ -88,15 +93,23 @@ impl DiskManager {
local_dirs
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(local_dirs),
local_dirs: Mutex::new(Some(local_dirs)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
})),
}
}

/// Return a temporary file from a randomized choice in the configured locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
let mut local_dirs = self.local_dirs.lock();
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
DataFusionError::ResourcesExhausted(
"Cannot spill to temporary file as DiskManager is disabled".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user sees this it may not be clear what is happening or what they can do about it.

What do you think about changing the message to something like

Suggested change
"Cannot spill to temporary file as DiskManager is disabled".to_string(),
"Memory Exhausted (DiskManager is disabled, cannot spill to temporary file)".to_string(),

As a follow up (which I will do), I think it would be excellent to pass in a context string to create_tmp_file to make the error more specific.

    pub fn create_tmp_file(&self, context: &str) -> Result<NamedTempFile> {

)
})?;

// Create a temporary directory if needed
if local_dirs.is_empty() {
Expand All @@ -110,7 +123,10 @@ impl DiskManager {
local_dirs.push(tempdir);
}

create_tmp_file(&local_dirs)
let dir_index = thread_rng().gen_range(0..local_dirs.len());
Builder::new()
.tempfile_in(&local_dirs[dir_index])
.map_err(DataFusionError::IoError)
}
}

Expand All @@ -127,17 +143,6 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
.collect()
}

fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {
let dir_index = thread_rng().gen_range(0..local_dirs.len());
let dir = local_dirs.get(dir_index).ok_or_else(|| {
DataFusionError::Internal("No directories available to DiskManager".into())
})?;

Builder::new()
.tempfile_in(dir)
.map_err(DataFusionError::IoError)
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down Expand Up @@ -171,6 +176,7 @@ mod tests {
dm.local_dirs
.lock()
.iter()
.flatten()
.map(|p| p.path().into())
.collect()
}
Expand All @@ -194,6 +200,17 @@ mod tests {
Ok(())
}

#[test]
fn test_disabled_disk_manager() {
let config = DiskManagerConfig::Disabled;
let manager = DiskManager::try_new(config).unwrap();
let e = manager.create_tmp_file().unwrap_err().to_string();
assert_eq!(
e,
"Resources exhausted: Cannot spill to temporary file as DiskManager is disabled"
)
}

/// Asserts that `file_path` is found anywhere in any of `dir` directories
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
Expand Down