diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 8dea811383..fba8acd313 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -568,7 +568,7 @@ impl<'a> DeltaScanBuilder<'a> { let table_partition_cols = &self .snapshot - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .partition_columns; @@ -1457,9 +1457,7 @@ pub async fn find_files<'a>( state: &SessionState, predicate: Option, ) -> DeltaResult { - let current_metadata = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?; match &predicate { Some(predicate) => { diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 7ee70a3a63..31486183be 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -139,7 +139,7 @@ async fn excute_non_empty_expr( let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; let table_partition_cols = snapshot - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .partition_columns .clone(); diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 061c2eb912..433e9cda43 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -665,9 +665,7 @@ async fn execute( let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); - let current_metadata = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?; // TODO: Given the join predicate, remove any expression that involve the // source table and keep expressions that only involve the target table. diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index b63038ca4f..2b9532b096 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -770,7 +770,7 @@ pub fn create_merge_plan( let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); let partitions_keys = &snapshot - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .partition_columns; @@ -788,7 +788,7 @@ pub fn create_merge_plan( &Arc::new( >::try_from( &snapshot - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .schema, )?, @@ -940,7 +940,7 @@ fn build_zorder_plan( ))); } let field_names = snapshot - .current_metadata() + .metadata() .unwrap() .schema .fields() diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index be43bacf5f..db0959e9c7 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -209,12 +209,12 @@ async fn execute( Protocol { min_reader_version: table.get_min_reader_version(), min_writer_version: table.get_min_writer_version(), - writer_features: if snapshot.min_writer_version() < 7 { + writer_features: if snapshot.protocol().min_writer_version < 7 { None } else { table.get_writer_features().cloned() }, - reader_features: if snapshot.min_reader_version() < 3 { + reader_features: if snapshot.protocol().min_reader_version < 3 { None } else { table.get_reader_features().cloned() @@ -224,14 +224,14 @@ async fn execute( Protocol { min_reader_version: max( table.get_min_reader_version(), - snapshot.min_reader_version(), + snapshot.protocol().min_reader_version, ), min_writer_version: max( table.get_min_writer_version(), - snapshot.min_writer_version(), + snapshot.protocol().min_writer_version, ), - writer_features: snapshot.writer_features().cloned(), - reader_features: snapshot.reader_features().cloned(), + writer_features: snapshot.protocol().writer_features.clone(), + reader_features: snapshot.protocol().reader_features.clone(), } }; actions.push(Action::Protocol(protocol)); diff --git a/crates/deltalake-core/src/operations/transaction/conflict_checker.rs b/crates/deltalake-core/src/operations/transaction/conflict_checker.rs index 6cefe848b8..b74d077bf4 100644 --- a/crates/deltalake-core/src/operations/transaction/conflict_checker.rs +++ b/crates/deltalake-core/src/operations/transaction/conflict_checker.rs @@ -398,11 +398,11 @@ impl<'a> ConflictChecker<'a> { for p in self.winning_commit_summary.protocol() { let (win_read, curr_read) = ( p.min_reader_version, - self.txn_info.read_snapshot.min_reader_version(), + self.txn_info.read_snapshot.protocol().min_reader_version, ); let (win_write, curr_write) = ( p.min_writer_version, - self.txn_info.read_snapshot.min_writer_version(), + self.txn_info.read_snapshot.protocol().min_writer_version, ); if curr_read < win_read || win_write < curr_write { return Err(CommitConflictError::ProtocolChanged( @@ -475,7 +475,7 @@ impl<'a> ConflictChecker<'a> { let partition_columns = &self .txn_info .read_snapshot - .current_metadata() + .metadata() .ok_or(CommitConflictError::NoMetadata)? .partition_columns; AddContainer::new(&added_files_to_check, partition_columns, arrow_schema) diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index a0ce1df64e..38209fb4aa 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -79,10 +79,10 @@ impl ProtocolChecker { /// Check if delta-rs can read form the given delta table. pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { let required_features: Option<&HashSet> = - match snapshot.min_reader_version() { + match snapshot.protocol().min_reader_version { 0 | 1 => None, 2 => Some(&READER_V2), - _ => snapshot.reader_features(), + _ => snapshot.protocol().reader_features.as_ref(), }; if let Some(features) = required_features { let mut diff = features.difference(&self.reader_features).peekable(); @@ -101,14 +101,14 @@ impl ProtocolChecker { self.can_read_from(snapshot)?; let required_features: Option<&HashSet> = - match snapshot.min_writer_version() { + match snapshot.protocol().min_writer_version { 0 | 1 => None, 2 => Some(&WRITER_V2), 3 => Some(&WRITER_V3), 4 => Some(&WRITER_V4), 5 => Some(&WRITER_V5), 6 => Some(&WRITER_V6), - _ => snapshot.writer_features(), + _ => snapshot.protocol().writer_features.as_ref(), }; if let Some(features) = required_features { @@ -130,13 +130,15 @@ impl ProtocolChecker { self.can_write_to(snapshot)?; // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#append-only-tables - let append_only_enabled = if snapshot.min_writer_version() < 2 { + let append_only_enabled = if snapshot.protocol().min_writer_version < 2 { false - } else if snapshot.min_writer_version() < 7 { + } else if snapshot.protocol().min_writer_version < 7 { snapshot.table_config().append_only() } else { snapshot - .writer_features() + .protocol() + .writer_features + .as_ref() .ok_or(TransactionError::WriterFeaturesRequired)? .contains(&WriterFeatures::AppendOnly) && snapshot.table_config().append_only() diff --git a/crates/deltalake-core/src/operations/transaction/state.rs b/crates/deltalake-core/src/operations/transaction/state.rs index 0fbbd554dc..e64f60d917 100644 --- a/crates/deltalake-core/src/operations/transaction/state.rs +++ b/crates/deltalake-core/src/operations/transaction/state.rs @@ -30,7 +30,7 @@ impl DeltaTableState { } fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult { - let meta = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let meta = self.metadata().ok_or(DeltaTableError::NoMetadata)?; let fields = meta .schema .fields() @@ -299,7 +299,7 @@ impl PruningStatistics for DeltaTableState { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { - let partition_columns = &self.current_metadata()?.partition_columns; + let partition_columns = &self.metadata()?.partition_columns; let container = AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); container.min_values(column) @@ -308,7 +308,7 @@ impl PruningStatistics for DeltaTableState { /// return the maximum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows. fn max_values(&self, column: &Column) -> Option { - let partition_columns = &self.current_metadata()?.partition_columns; + let partition_columns = &self.metadata()?.partition_columns; let container = AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); container.max_values(column) @@ -325,7 +325,7 @@ impl PruningStatistics for DeltaTableState { /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option { - let partition_columns = &self.current_metadata()?.partition_columns; + let partition_columns = &self.metadata()?.partition_columns; let container = AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); container.null_counts(column) diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 45d0306697..b7ff813712 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -207,9 +207,7 @@ async fn execute( }) .collect::, _>>()?; - let current_metadata = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?; let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index efdde55347..7b321400e6 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -163,7 +163,12 @@ impl VacuumBuilder { /// Determine which files can be deleted. Does not actually peform the deletion async fn create_vacuum_plan(&self) -> Result { - let min_retention = Duration::milliseconds(self.snapshot.tombstone_retention_millis()); + let min_retention = Duration::milliseconds( + self.snapshot + .table_config() + .deleted_file_retention_duration() + .as_millis() as i64, + ); let retention_period = self.retention_period.unwrap_or(min_retention); let enforce_retention_duration = self.enforce_retention_duration; @@ -191,7 +196,7 @@ impl VacuumBuilder { .map_err(DeltaTableError::from)?; let partition_columns = &self .snapshot - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .partition_columns; diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index 09c1e12cbc..2a3d7609a6 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -307,7 +307,7 @@ pub(crate) async fn write_execution_plan( overwrite_schema: bool, ) -> DeltaResult> { let invariants = snapshot - .current_metadata() + .metadata() .and_then(|meta| meta.schema.get_invariants().ok()) .unwrap_or_default(); @@ -380,7 +380,7 @@ impl std::future::IntoFuture for WriteBuilder { let active_partitions = this .snapshot - .current_metadata() + .metadata() .map(|meta| meta.partition_columns.clone()); // validate partition columns @@ -498,7 +498,7 @@ impl std::future::IntoFuture for WriteBuilder { if schema != table_schema { let mut metadata = this .snapshot - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .clone(); metadata.schema = schema.clone().try_into()?; diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index ef521159d9..7ec06db9fc 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -82,8 +82,12 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> /// Delete expires log files before given version from table. The table log retention is based on /// the `logRetentionDuration` property of the Delta Table, 30 days by default. pub async fn cleanup_metadata(table: &DeltaTable) -> Result { - let log_retention_timestamp = - Utc::now().timestamp_millis() - table.get_state().log_retention_millis(); + let log_retention_timestamp = Utc::now().timestamp_millis() + - table + .get_state() + .table_config() + .log_retention_duration() + .as_millis() as i64; cleanup_expired_logs_for( table.version(), table.log_store.as_ref(), @@ -105,8 +109,12 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( .map_err(|err| ProtocolError::Generic(err.to_string()))?; create_checkpoint_for(version, table.get_state(), table.log_store.as_ref()).await?; - let enable_expired_log_cleanup = - cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup()); + let enable_expired_log_cleanup = cleanup.unwrap_or_else(|| { + table + .get_state() + .table_config() + .enable_expired_log_cleanup() + }); if table.version() >= 0 && enable_expired_log_cleanup { let deleted_log_num = cleanup_metadata(&table).await?; @@ -213,7 +221,7 @@ pub async fn cleanup_expired_logs_for( fn parquet_bytes_from_state( state: &DeltaTableState, ) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> { - let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?; + let current_metadata = state.metadata().ok_or(ProtocolError::NoMetaData)?; let partition_col_data_types = current_metadata.get_partition_col_data_types(); @@ -247,8 +255,8 @@ fn parquet_bytes_from_state( // protocol let jsons = std::iter::once(Action::Protocol(Protocol { - min_reader_version: state.min_reader_version(), - min_writer_version: state.min_writer_version(), + min_reader_version: state.protocol().min_reader_version, + min_writer_version: state.protocol().min_writer_version, writer_features: None, reader_features: None, })) diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 33fdea2ad0..4d55d3e462 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -766,9 +766,7 @@ impl DeltaTable { /// Returns the metadata associated with the loaded state. pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> { - self.state - .current_metadata() - .ok_or(DeltaTableError::NoMetadata) + self.state.metadata().ok_or(DeltaTableError::NoMetadata) } /// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log). @@ -784,23 +782,23 @@ impl DeltaTable { /// Returns the minimum reader version supported by the DeltaTable based on the loaded /// metadata. pub fn get_min_reader_version(&self) -> i32 { - self.state.min_reader_version() + self.state.protocol().min_reader_version } /// Returns the minimum writer version supported by the DeltaTable based on the loaded /// metadata. pub fn get_min_writer_version(&self) -> i32 { - self.state.min_writer_version() + self.state.protocol().min_writer_version } /// Returns current supported reader features by this table pub fn get_reader_features(&self) -> Option<&HashSet> { - self.state.reader_features() + self.state.protocol().reader_features.as_ref() } /// Returns current supported writer features by this table pub fn get_writer_features(&self) -> Option<&HashSet> { - self.state.writer_features() + self.state.protocol().writer_features.as_ref() } /// Return table schema parsed from transaction log. Return None if table hasn't been loaded or @@ -819,7 +817,7 @@ impl DeltaTable { pub fn get_configurations(&self) -> Result<&HashMap>, DeltaTableError> { Ok(self .state - .current_metadata() + .metadata() .ok_or(DeltaTableError::NoMetadata)? .get_configuration()) } @@ -869,7 +867,7 @@ impl fmt::Display for DeltaTable { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!(f, "DeltaTable({})", self.table_uri())?; writeln!(f, "\tversion: {}", self.version())?; - match self.state.current_metadata() { + match self.state.metadata() { Some(metadata) => { writeln!(f, "\tmetadata: {metadata}")?; } @@ -880,8 +878,8 @@ impl fmt::Display for DeltaTable { writeln!( f, "\tmin_version: read={}, write={}", - self.state.min_reader_version(), - self.state.min_writer_version() + self.state.protocol().min_reader_version, + self.state.protocol().min_writer_version )?; writeln!(f, "\tfiles count: {}", self.state.files().len()) } diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 8fa51c55fd..ccff7f6257 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -12,10 +12,8 @@ use serde::{Deserialize, Serialize}; use super::config::TableConfig; use crate::errors::DeltaTableError; -use crate::kernel::{ - Action, Add, CommitInfo, DataType, DomainMetadata, ReaderFeatures, Remove, StructType, - WriterFeatures, -}; +use crate::kernel::Protocol; +use crate::kernel::{Action, Add, CommitInfo, DataType, DomainMetadata, Remove, StructType}; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::ProtocolError; use crate::storage::commit_uri_from_version; @@ -41,17 +39,9 @@ pub struct DeltaTableState { // Domain metadatas provided by the system or user domain_metadatas: Vec, app_transaction_version: HashMap, - min_reader_version: i32, - min_writer_version: i32, - reader_features: Option>, - writer_features: Option>, // table metadata corresponding to current version current_metadata: Option, - // retention period for tombstones in milli-seconds - tombstone_retention_millis: i64, - // retention period for log entries in milli-seconds - log_retention_millis: i64, - enable_expired_log_cleanup: bool, + current_protocol: Option, } impl DeltaTableState { @@ -173,21 +163,6 @@ impl DeltaTableState { &self.commit_infos } - /// Retention of tombstone in milliseconds. - pub fn tombstone_retention_millis(&self) -> i64 { - self.tombstone_retention_millis - } - - /// Retention of logs in milliseconds. - pub fn log_retention_millis(&self) -> i64 { - self.log_retention_millis - } - - /// Whether to clean up expired checkpoints and delta logs. - pub fn enable_expired_log_cleanup(&self) -> bool { - self.enable_expired_log_cleanup - } - /// Full list of tombstones (remove actions) representing files removed from table state). pub fn all_tombstones(&self) -> &HashSet { &self.tombstones @@ -196,7 +171,11 @@ impl DeltaTableState { /// List of unexpired tombstones (remove actions) representing files removed from table state. /// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week. pub fn unexpired_tombstones(&self) -> impl Iterator { - let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis; + let retention_timestamp = Utc::now().timestamp_millis() + - self + .table_config() + .deleted_file_retention_duration() + .as_millis() as i64; self.tombstones .iter() .filter(move |t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp) @@ -223,28 +202,16 @@ impl DeltaTableState { &self.app_transaction_version } - /// The min reader version required by the protocol. - pub fn min_reader_version(&self) -> i32 { - self.min_reader_version - } - - /// The min writer version required by the protocol. - pub fn min_writer_version(&self) -> i32 { - self.min_writer_version - } - - /// Current supported reader features - pub fn reader_features(&self) -> Option<&HashSet> { - self.reader_features.as_ref() - } - - /// Current supported writer features - pub fn writer_features(&self) -> Option<&HashSet> { - self.writer_features.as_ref() + /// The most recent protocol of the table. + pub fn protocol(&self) -> &Protocol { + lazy_static! { + static ref DEFAULT_PROTOCOL: Protocol = Protocol::default(); + } + self.current_protocol.as_ref().unwrap_or(&DEFAULT_PROTOCOL) } /// The most recent metadata of the table. - pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { + pub fn metadata(&self) -> Option<&DeltaTableMetaData> { self.current_metadata.as_ref() } @@ -299,26 +266,14 @@ impl DeltaTableState { self.files.append(&mut new_state.files); } - if new_state.min_reader_version > 0 { - self.min_reader_version = new_state.min_reader_version; - self.min_writer_version = new_state.min_writer_version; - } - - if new_state.min_writer_version >= 5 { - self.writer_features = new_state.writer_features; - } - - if new_state.min_reader_version >= 3 { - self.reader_features = new_state.reader_features; - } - if new_state.current_metadata.is_some() { - self.tombstone_retention_millis = new_state.tombstone_retention_millis; - self.log_retention_millis = new_state.log_retention_millis; - self.enable_expired_log_cleanup = new_state.enable_expired_log_cleanup; self.current_metadata = new_state.current_metadata.take(); } + if new_state.current_protocol.is_some() { + self.current_protocol = new_state.current_protocol.take(); + } + new_state .app_transaction_version .drain() @@ -359,19 +314,10 @@ impl DeltaTableState { } } Action::Protocol(v) => { - self.min_reader_version = v.min_reader_version; - self.min_writer_version = v.min_writer_version; - self.reader_features = v.reader_features; - self.writer_features = v.writer_features; + self.current_protocol = Some(v); } Action::Metadata(v) => { let md = DeltaTableMetaData::try_from(v)?; - let table_config = TableConfig(&md.configuration); - self.tombstone_retention_millis = - table_config.deleted_file_retention_duration().as_millis() as i64; - self.log_retention_millis = - table_config.log_retention_duration().as_millis() as i64; - self.enable_expired_log_cleanup = table_config.enable_expired_log_cleanup(); self.current_metadata = Some(md); } Action::Txn(v) => { @@ -396,7 +342,7 @@ impl DeltaTableState { &'a self, filters: &'a [PartitionFilter], ) -> Result + '_, DeltaTableError> { - let current_metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; let nonpartitioned_columns: Vec = filters .iter() @@ -444,14 +390,8 @@ mod tests { commit_infos: vec![], domain_metadatas: vec![], app_transaction_version: Default::default(), - min_reader_version: 0, - min_writer_version: 0, - reader_features: None, - writer_features: None, current_metadata: None, - tombstone_retention_millis: 0, - log_retention_millis: 0, - enable_expired_log_cleanup: false, + current_protocol: None, }; let bytes = serde_json::to_vec(&expected).unwrap(); let actual: DeltaTableState = serde_json::from_slice(&bytes).unwrap(); @@ -471,14 +411,8 @@ mod tests { domain_metadatas: vec![], tombstones: HashSet::new(), current_metadata: None, - min_reader_version: 1, - min_writer_version: 1, - reader_features: None, - writer_features: None, + current_protocol: None, app_transaction_version, - tombstone_retention_millis: 0, - log_retention_millis: 0, - enable_expired_log_cleanup: true, }; let txn_action = Action::Txn(Txn { diff --git a/crates/deltalake-core/src/table/state_arrow.rs b/crates/deltalake-core/src/table/state_arrow.rs index 3dbb460879..5e2565ee08 100644 --- a/crates/deltalake-core/src/table/state_arrow.rs +++ b/crates/deltalake-core/src/table/state_arrow.rs @@ -86,7 +86,7 @@ impl DeltaTableState { (Cow::Borrowed("data_change"), Arc::new(data_change)), ]; - let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; if !metadata.partition_columns.is_empty() { let partition_cols_batch = self.partition_columns_as_batch(flatten)?; @@ -145,7 +145,7 @@ impl DeltaTableState { &self, flatten: bool, ) -> Result { - let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; let column_mapping_mode = self.table_config().column_mapping_mode(); let partition_column_types: Vec = metadata .partition_columns @@ -413,7 +413,7 @@ impl DeltaTableState { .map(|maybe_stat| maybe_stat.as_ref().map(|stat| stat.num_records)) .collect::>>(), ); - let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; let schema = &metadata.schema; #[derive(Debug)]