diff --git a/datafusion/core/src/execution/disk_manager.rs b/datafusion/core/src/execution/disk_manager.rs index c4fe6b4160fa..80d594341875 100644 --- a/datafusion/core/src/execution/disk_manager.rs +++ b/datafusion/core/src/execution/disk_manager.rs @@ -39,6 +39,9 @@ pub enum DiskManagerConfig { /// Create a new [DiskManager] that creates temporary files within /// the specified directories NewSpecified(Vec), + + /// Disable disk manager, attempts to create temporary files will error + Disabled, } impl Default for DiskManagerConfig { @@ -68,9 +71,11 @@ impl DiskManagerConfig { /// while processing dataset larger than available memory. #[derive(Debug)] pub struct DiskManager { - /// TempDirs to put temporary files in. A new OS specified - /// temporary directory will be created if this list is empty. - local_dirs: Mutex>, + /// TempDirs to put temporary files in. + /// + /// If `Some(vec![])` a new OS specified temporary directory will be created + /// If `None` an error will be returned + local_dirs: Mutex>>, } impl DiskManager { @@ -79,7 +84,7 @@ impl DiskManager { match config { DiskManagerConfig::Existing(manager) => Ok(manager), DiskManagerConfig::NewOs => Ok(Arc::new(Self { - local_dirs: Mutex::new(vec![]), + local_dirs: Mutex::new(Some(vec![])), })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; @@ -88,15 +93,23 @@ impl DiskManager { local_dirs ); Ok(Arc::new(Self { - local_dirs: Mutex::new(local_dirs), + local_dirs: Mutex::new(Some(local_dirs)), })) } + DiskManagerConfig::Disabled => Ok(Arc::new(Self { + local_dirs: Mutex::new(None), + })), } } /// Return a temporary file from a randomized choice in the configured locations pub fn create_tmp_file(&self) -> Result { - let mut local_dirs = self.local_dirs.lock(); + let mut guard = self.local_dirs.lock(); + let local_dirs = guard.as_mut().ok_or_else(|| { + DataFusionError::ResourcesExhausted( + "Cannot spill to temporary file as DiskManager is disabled".to_string(), + ) + })?; // Create a temporary directory if needed if local_dirs.is_empty() { @@ -110,7 +123,10 @@ impl DiskManager { local_dirs.push(tempdir); } - create_tmp_file(&local_dirs) + let dir_index = thread_rng().gen_range(0..local_dirs.len()); + Builder::new() + .tempfile_in(&local_dirs[dir_index]) + .map_err(DataFusionError::IoError) } } @@ -127,17 +143,6 @@ fn create_local_dirs(local_dirs: Vec) -> Result> { .collect() } -fn create_tmp_file(local_dirs: &[TempDir]) -> Result { - let dir_index = thread_rng().gen_range(0..local_dirs.len()); - let dir = local_dirs.get(dir_index).ok_or_else(|| { - DataFusionError::Internal("No directories available to DiskManager".into()) - })?; - - Builder::new() - .tempfile_in(dir) - .map_err(DataFusionError::IoError) -} - #[cfg(test)] mod tests { use std::path::Path; @@ -171,6 +176,7 @@ mod tests { dm.local_dirs .lock() .iter() + .flatten() .map(|p| p.path().into()) .collect() } @@ -194,6 +200,17 @@ mod tests { Ok(()) } + #[test] + fn test_disabled_disk_manager() { + let config = DiskManagerConfig::Disabled; + let manager = DiskManager::try_new(config).unwrap(); + let e = manager.create_tmp_file().unwrap_err().to_string(); + assert_eq!( + e, + "Resources exhausted: Cannot spill to temporary file as DiskManager is disabled" + ) + } + /// Asserts that `file_path` is found anywhere in any of `dir` directories fn assert_path_in_dirs<'a>( file_path: &'a Path,