diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 7b321400e6..c8f6d66b47 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -32,6 +32,7 @@ use object_store::Error; use object_store::{path::Path, ObjectStore}; use serde::Serialize; use serde_json::Value; +use url::Url; use super::transaction::commit; use crate::crate_version; @@ -185,7 +186,6 @@ impl VacuumBuilder { }; let expired_tombstones = get_stale_files(&self.snapshot, retention_period, now_millis); - let valid_files = self.snapshot.file_paths_iter().collect::>(); let mut files_to_delete = vec![]; let mut file_sizes = vec![]; @@ -200,12 +200,58 @@ impl VacuumBuilder { .ok_or(DeltaTableError::NoMetadata)? .partition_columns; + // A set containing the absolute paths to managed files + let managed_files = self + .snapshot + .files() + .iter() + .map(|a| { + if is_absolute_path(&a.path) { + a.path.clone() + } else { + format!("{}{}", self.log_store.root_uri(), a.path) + } + }) + .chain(self.snapshot.all_tombstones().iter().map(|r| { + if is_absolute_path(&r.path) { + r.path.clone() + } else { + format!("{}{}", self.log_store.root_uri(), r.path) + } + })) + .chain(self.snapshot.files().iter().filter_map(|a| { + return if let Some(deletion_vector) = &a.deletion_vector { + if let Ok(parent) = + &Url::parse(&format!("file://{}", self.log_store.root_uri().as_str())) + { + if let Ok(dv_absolute_path) = deletion_vector.absolute_path(parent) { + Some(dv_absolute_path?.path().to_string()) + } else { + None + } + } else { + None + } + } else { + None + }; + })) + .collect::>(); + while let Some(obj_meta) = all_files.next().await { // TODO should we allow NotFound here in case we have a temporary commit file in the list let obj_meta = obj_meta.map_err(DeltaTableError::from)?; - if valid_files.contains(&obj_meta.location) // file is still being tracked in table - || !expired_tombstones.contains(obj_meta.location.as_ref()) // file is not an expired tombstone - || is_hidden_directory(partition_columns, &obj_meta.location)? + + if is_hidden_file(partition_columns, &obj_meta.location)? { + continue; + } + + if self.is_file_managed(&managed_files, &obj_meta.location) { + if !expired_tombstones.contains(obj_meta.location.as_ref()) { + continue; + } + } else if now_millis - retention_period.num_milliseconds() + < obj_meta.last_modified.timestamp_millis() { continue; } @@ -222,6 +268,16 @@ impl VacuumBuilder { specified_retention_millis: Some(retention_period.num_milliseconds()), }) } + + /// Whether a file is contained within the set of managed files. + fn is_file_managed(&self, managed_files: &HashSet, file: &Path) -> bool { + return if is_absolute_path(file.as_ref()) { + managed_files.contains(file.as_ref()) + } else { + let path = format!("{}{}", self.log_store.root_uri(), file.as_ref()); + managed_files.contains(&path) + }; + } } impl std::future::IntoFuture for VacuumBuilder { @@ -254,6 +310,11 @@ impl std::future::IntoFuture for VacuumBuilder { } } +fn is_absolute_path(path: &str) -> bool { + let path = std::path::Path::new(path); + path.is_absolute() +} + /// Encapsulate which files are to be deleted and the parameters used to make that decision struct VacuumPlan { /// What files are to be deleted @@ -367,11 +428,15 @@ impl VacuumPlan { /// Names of the form partitionCol=[value] are partition directories, and should be /// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) /// indexes and these must be deleted when the data they are tied to is deleted. -fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result { - let path_name = path.to_string(); - Ok((path_name.starts_with('.') || path_name.starts_with('_')) - && !path_name.starts_with("_delta_index") - && !path_name.starts_with("_change_data") +fn is_hidden_file(partition_columns: &[String], path: &Path) -> Result { + let path_name = path.as_ref(); + let skip = path_name.starts_with("_delta_index") || path_name.starts_with("_change_data"); + let is_hidden = path + .parts() + .skip(skip as usize) + .any(|p| p.as_ref().starts_with('.') || p.as_ref().starts_with('_')); + + Ok(is_hidden && !partition_columns .iter() .any(|partition_column| path_name.starts_with(partition_column))) @@ -451,4 +516,19 @@ mod tests { assert_eq!(result.files_deleted, empty); } + + #[tokio::test] + async fn vacuum_table_with_dv_small() { + let table = open_table("./tests/data/table-with-dv-small") + .await + .unwrap(); + + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) + .with_dry_run(true) + .await + .unwrap(); + + let empty: Vec = Vec::new(); + assert_eq!(result.files_deleted, empty); + } } diff --git a/crates/deltalake-core/tests/command_vacuum.rs b/crates/deltalake-core/tests/command_vacuum.rs index 51ff3217b3..24644159fe 100644 --- a/crates/deltalake-core/tests/command_vacuum.rs +++ b/crates/deltalake-core/tests/command_vacuum.rs @@ -216,7 +216,6 @@ async fn test_partitions_included() { ); } -#[ignore] #[tokio::test] // files that are not managed by the delta log and have a last_modified greater // than the retention period should be deleted. Unmanaged files and directories @@ -276,7 +275,7 @@ async fn test_non_managed_files() { // Validate unmanaged files are deleted after the retention period let res = { - clock.tick(Duration::hours(1)); + clock.tick(Duration::days(7)); let (_, metrics) = DeltaOps(table) .vacuum() .with_clock(Arc::new(clock.clone()))