Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify DeltaTableState #1877

Merged
merged 8 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ impl<'a> DeltaScanBuilder<'a> {

let table_partition_cols = &self
.snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

Expand Down Expand Up @@ -1457,9 +1457,7 @@ pub async fn find_files<'a>(
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;

match &predicate {
Some(predicate) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async fn excute_non_empty_expr(
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let table_partition_cols = snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.clone();
Expand Down
4 changes: 1 addition & 3 deletions crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,7 @@ async fn execute(
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();

let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;

// TODO: Given the join predicate, remove any expression that involve the
// source table and keep expressions that only involve the target table.
Expand Down
6 changes: 3 additions & 3 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ pub fn create_merge_plan(
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());

let partitions_keys = &snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

Expand All @@ -788,7 +788,7 @@ pub fn create_merge_plan(
&Arc::new(
<ArrowSchema as TryFrom<&crate::kernel::StructType>>::try_from(
&snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.schema,
)?,
Expand Down Expand Up @@ -940,7 +940,7 @@ fn build_zorder_plan(
)));
}
let field_names = snapshot
.current_metadata()
.metadata()
.unwrap()
.schema
.fields()
Expand Down
12 changes: 6 additions & 6 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ async fn execute(
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
writer_features: if snapshot.min_writer_version() < 7 {
writer_features: if snapshot.protocol().min_writer_version < 7 {
None
} else {
table.get_writer_features().cloned()
},
reader_features: if snapshot.min_reader_version() < 3 {
reader_features: if snapshot.protocol().min_reader_version < 3 {
None
} else {
table.get_reader_features().cloned()
Expand All @@ -224,14 +224,14 @@ async fn execute(
Protocol {
min_reader_version: max(
table.get_min_reader_version(),
snapshot.min_reader_version(),
snapshot.protocol().min_reader_version,
),
min_writer_version: max(
table.get_min_writer_version(),
snapshot.min_writer_version(),
snapshot.protocol().min_writer_version,
),
writer_features: snapshot.writer_features().cloned(),
reader_features: snapshot.reader_features().cloned(),
writer_features: snapshot.protocol().writer_features.clone(),
reader_features: snapshot.protocol().reader_features.clone(),
}
};
actions.push(Action::Protocol(protocol));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,11 @@ impl<'a> ConflictChecker<'a> {
for p in self.winning_commit_summary.protocol() {
let (win_read, curr_read) = (
p.min_reader_version,
self.txn_info.read_snapshot.min_reader_version(),
self.txn_info.read_snapshot.protocol().min_reader_version,
);
let (win_write, curr_write) = (
p.min_writer_version,
self.txn_info.read_snapshot.min_writer_version(),
self.txn_info.read_snapshot.protocol().min_writer_version,
);
if curr_read < win_read || win_write < curr_write {
return Err(CommitConflictError::ProtocolChanged(
Expand Down Expand Up @@ -475,7 +475,7 @@ impl<'a> ConflictChecker<'a> {
let partition_columns = &self
.txn_info
.read_snapshot
.current_metadata()
.metadata()
.ok_or(CommitConflictError::NoMetadata)?
.partition_columns;
AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
Expand Down
16 changes: 9 additions & 7 deletions crates/deltalake-core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ impl ProtocolChecker {
/// Check if delta-rs can read form the given delta table.
pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
let required_features: Option<&HashSet<ReaderFeatures>> =
match snapshot.min_reader_version() {
match snapshot.protocol().min_reader_version {
0 | 1 => None,
2 => Some(&READER_V2),
_ => snapshot.reader_features(),
_ => snapshot.protocol().reader_features.as_ref(),
};
if let Some(features) = required_features {
let mut diff = features.difference(&self.reader_features).peekable();
Expand All @@ -101,14 +101,14 @@ impl ProtocolChecker {
self.can_read_from(snapshot)?;

let required_features: Option<&HashSet<WriterFeatures>> =
match snapshot.min_writer_version() {
match snapshot.protocol().min_writer_version {
0 | 1 => None,
2 => Some(&WRITER_V2),
3 => Some(&WRITER_V3),
4 => Some(&WRITER_V4),
5 => Some(&WRITER_V5),
6 => Some(&WRITER_V6),
_ => snapshot.writer_features(),
_ => snapshot.protocol().writer_features.as_ref(),
};

if let Some(features) = required_features {
Expand All @@ -130,13 +130,15 @@ impl ProtocolChecker {
self.can_write_to(snapshot)?;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#append-only-tables
let append_only_enabled = if snapshot.min_writer_version() < 2 {
let append_only_enabled = if snapshot.protocol().min_writer_version < 2 {
false
} else if snapshot.min_writer_version() < 7 {
} else if snapshot.protocol().min_writer_version < 7 {
snapshot.table_config().append_only()
} else {
snapshot
.writer_features()
.protocol()
.writer_features
.as_ref()
.ok_or(TransactionError::WriterFeaturesRequired)?
.contains(&WriterFeatures::AppendOnly)
&& snapshot.table_config().append_only()
Expand Down
8 changes: 4 additions & 4 deletions crates/deltalake-core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl DeltaTableState {
}

fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
let meta = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let meta = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let fields = meta
.schema
.fields()
Expand Down Expand Up @@ -299,7 +299,7 @@ impl PruningStatistics for DeltaTableState {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.current_metadata()?.partition_columns;
let partition_columns = &self.metadata()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.min_values(column)
Expand All @@ -308,7 +308,7 @@ impl PruningStatistics for DeltaTableState {
/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.current_metadata()?.partition_columns;
let partition_columns = &self.metadata()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.max_values(column)
Expand All @@ -325,7 +325,7 @@ impl PruningStatistics for DeltaTableState {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.current_metadata()?.partition_columns;
let partition_columns = &self.metadata()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.null_counts(column)
Expand Down
4 changes: 1 addition & 3 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,7 @@ async fn execute(
})
.collect::<Result<HashMap<Column, Expr>, _>>()?;

let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;
let table_partition_cols = current_metadata.partition_columns.clone();

let scan_start = Instant::now();
Expand Down
9 changes: 7 additions & 2 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ impl VacuumBuilder {

/// Determine which files can be deleted. Does not actually peform the deletion
async fn create_vacuum_plan(&self) -> Result<VacuumPlan, VacuumError> {
let min_retention = Duration::milliseconds(self.snapshot.tombstone_retention_millis());
let min_retention = Duration::milliseconds(
self.snapshot
.table_config()
.deleted_file_retention_duration()
.as_millis() as i64,
);
let retention_period = self.retention_period.unwrap_or(min_retention);
let enforce_retention_duration = self.enforce_retention_duration;

Expand Down Expand Up @@ -191,7 +196,7 @@ impl VacuumBuilder {
.map_err(DeltaTableError::from)?;
let partition_columns = &self
.snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

Expand Down
6 changes: 3 additions & 3 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ pub(crate) async fn write_execution_plan(
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
let invariants = snapshot
.current_metadata()
.metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();

Expand Down Expand Up @@ -380,7 +380,7 @@ impl std::future::IntoFuture for WriteBuilder {

let active_partitions = this
.snapshot
.current_metadata()
.metadata()
.map(|meta| meta.partition_columns.clone());

// validate partition columns
Expand Down Expand Up @@ -498,7 +498,7 @@ impl std::future::IntoFuture for WriteBuilder {
if schema != table_schema {
let mut metadata = this
.snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.clone();
metadata.schema = schema.clone().try_into()?;
Expand Down
22 changes: 15 additions & 7 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError>
/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<usize, ProtocolError> {
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
let log_retention_timestamp = Utc::now().timestamp_millis()
- table
.get_state()
.table_config()
.log_retention_duration()
.as_millis() as i64;
cleanup_expired_logs_for(
table.version(),
table.log_store.as_ref(),
Expand All @@ -105,8 +109,12 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
.map_err(|err| ProtocolError::Generic(err.to_string()))?;
create_checkpoint_for(version, table.get_state(), table.log_store.as_ref()).await?;

let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());
let enable_expired_log_cleanup = cleanup.unwrap_or_else(|| {
table
.get_state()
.table_config()
.enable_expired_log_cleanup()
});

if table.version() >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
Expand Down Expand Up @@ -213,7 +221,7 @@ pub async fn cleanup_expired_logs_for(
fn parquet_bytes_from_state(
state: &DeltaTableState,
) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?;
let current_metadata = state.metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();

Expand Down Expand Up @@ -247,8 +255,8 @@ fn parquet_bytes_from_state(

// protocol
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
min_reader_version: state.protocol().min_reader_version,
min_writer_version: state.protocol().min_writer_version,
writer_features: None,
reader_features: None,
}))
Expand Down
20 changes: 9 additions & 11 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,7 @@ impl DeltaTable {

/// Returns the metadata associated with the loaded state.
pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> {
self.state
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)
self.state.metadata().ok_or(DeltaTableError::NoMetadata)
}

/// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log).
Expand All @@ -784,23 +782,23 @@ impl DeltaTable {
/// Returns the minimum reader version supported by the DeltaTable based on the loaded
/// metadata.
pub fn get_min_reader_version(&self) -> i32 {
self.state.min_reader_version()
self.state.protocol().min_reader_version
}

/// Returns the minimum writer version supported by the DeltaTable based on the loaded
/// metadata.
pub fn get_min_writer_version(&self) -> i32 {
self.state.min_writer_version()
self.state.protocol().min_writer_version
}

/// Returns current supported reader features by this table
pub fn get_reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.state.reader_features()
self.state.protocol().reader_features.as_ref()
}

/// Returns current supported writer features by this table
pub fn get_writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.state.writer_features()
self.state.protocol().writer_features.as_ref()
}

/// Return table schema parsed from transaction log. Return None if table hasn't been loaded or
Expand All @@ -819,7 +817,7 @@ impl DeltaTable {
pub fn get_configurations(&self) -> Result<&HashMap<String, Option<String>>, DeltaTableError> {
Ok(self
.state
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.get_configuration())
}
Expand Down Expand Up @@ -869,7 +867,7 @@ impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_uri())?;
writeln!(f, "\tversion: {}", self.version())?;
match self.state.current_metadata() {
match self.state.metadata() {
Some(metadata) => {
writeln!(f, "\tmetadata: {metadata}")?;
}
Expand All @@ -880,8 +878,8 @@ impl fmt::Display for DeltaTable {
writeln!(
f,
"\tmin_version: read={}, write={}",
self.state.min_reader_version(),
self.state.min_writer_version()
self.state.protocol().min_reader_version,
self.state.protocol().min_writer_version
)?;
writeln!(f, "\tfiles count: {}", self.state.files().len())
}
Expand Down
Loading
Loading