From a3d55853655f49effb27fc63d6b3d340dddf0eef Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Tue, 12 Mar 2024 20:35:16 +0100 Subject: [PATCH] use into() --- crates/benchmarks/src/bin/merge.rs | 6 +- crates/core/src/delta_datafusion/mod.rs | 4 +- crates/core/src/operations/constraints.rs | 21 ++-- crates/core/src/operations/delete.rs | 17 ++- .../core/src/operations/drop_constraints.rs | 6 +- crates/core/src/operations/load.rs | 4 +- crates/core/src/operations/merge/mod.rs | 2 +- crates/core/src/operations/optimize.rs | 2 +- crates/core/src/operations/update.rs | 5 +- crates/core/src/operations/write.rs | 103 ++++++++++-------- crates/core/src/protocol/checkpoints.rs | 9 +- crates/core/src/writer/test_utils.rs | 2 +- crates/core/tests/command_restore.rs | 6 +- crates/core/tests/integration_datafusion.rs | 4 +- crates/deltalake/examples/basic_operations.rs | 8 +- docs/src/rust/check_constraints.rs | 2 +- docs/src/rust/operations.rs | 2 +- 17 files changed, 97 insertions(+), 106 deletions(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index 9ab8639ab4..d29ee97ce4 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -77,7 +77,7 @@ pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) - DeltaOps::try_from_uri(table_path) .await .unwrap() - .write(deltalake_core::operations::write::WriteData::Vecs(tbl)) + .write(tbl.into()) .with_partition_columns(vec!["wr_returned_date_sk"]) .await .unwrap(); @@ -568,9 +568,7 @@ async fn main() { DeltaOps::try_from_uri(output) .await .unwrap() - .write(deltalake_core::operations::write::WriteData::Vecs(vec![ - batch, - ])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 9c31e22390..d2cbb514d2 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1757,7 +1757,7 @@ mod tests { .unwrap(); // write some data let table = crate::DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(crate::protocol::SaveMode::Append) .await .unwrap(); @@ -1821,7 +1821,7 @@ mod tests { .unwrap(); // write some data let table = crate::DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(crate::protocol::SaveMode::Append) .await .unwrap(); diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index fb532ae82a..51d3eae6d0 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -272,7 +272,7 @@ mod tests { async fn add_constraint_with_invalid_data() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write); @@ -288,7 +288,7 @@ mod tests { async fn add_valid_constraint() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write); @@ -313,7 +313,7 @@ mod tests { // Add constraint by providing a datafusion expression. let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write); @@ -356,10 +356,7 @@ mod tests { ) .unwrap(); - let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) - .await - .unwrap(); + let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap(); let mut table = DeltaOps(table) .add_constraint() @@ -382,7 +379,7 @@ mod tests { async fn add_conflicting_named_constraint() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write); @@ -404,7 +401,7 @@ mod tests { async fn write_data_that_violates_constraint() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write) @@ -418,7 +415,7 @@ mod tests { Arc::new(StringArray::from(vec!["2021-02-02"])), ]; let batch = RecordBatch::try_new(get_arrow_schema(&None), invalid_values)?; - let err = table.write(WriteData::Vecs(vec![batch])).await; + let err = table.write(batch.into()).await; assert!(err.is_err()); Ok(()) } @@ -427,11 +424,11 @@ mod tests { async fn write_data_that_does_not_violate_constraint() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write); - let err = table.write(WriteData::Vecs(vec![batch])).await; + let err = table.write(batch.into()).await; assert!(err.is_ok()); Ok(()) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 096bbeef50..45719725ab 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -402,7 +402,7 @@ mod tests { .unwrap(); // write some data let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -464,7 +464,7 @@ mod tests { // write some data let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -488,7 +488,7 @@ mod tests { // write some data let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -555,10 +555,7 @@ mod tests { ) .unwrap(); - DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) - .await - .unwrap() + DeltaOps::new_in_memory().write(batch.into()).await.unwrap() } // Validate behaviour of greater than @@ -647,7 +644,7 @@ mod tests { // write some data let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -705,7 +702,7 @@ mod tests { // write some data let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -775,7 +772,7 @@ mod tests { let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()]; let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(batches)) + .write(batches.into()) .await .unwrap(); diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 91084106ed..f72433f6c4 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -147,7 +147,7 @@ mod tests { async fn drop_valid_constraint() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write); @@ -171,7 +171,7 @@ mod tests { async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let table = DeltaOps(write) @@ -187,7 +187,7 @@ mod tests { async fn drop_invalid_constraint_ignore() -> DeltaResult<()> { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let version = write.version(); diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 4c803d61a7..d3e9eacd15 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -116,7 +116,7 @@ mod tests { async fn test_write_load() -> TestResult { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let (_table, stream) = DeltaOps(table).load().await?; @@ -149,7 +149,7 @@ mod tests { async fn test_load_with_columns() -> TestResult { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .await?; let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b30f4ab2c7..75d44d87ec 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1610,7 +1610,7 @@ mod tests { .unwrap(); // write some data DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Append) .await .unwrap() diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 22aff1e9ce..0c093124b5 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -1358,7 +1358,7 @@ pub(super) mod zorder { .unwrap(); // write some data let table = crate::DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(crate::protocol::SaveMode::Append) .await .unwrap(); diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index f179569425..72c43de763 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -526,10 +526,7 @@ mod tests { ) .unwrap(); - DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) - .await - .unwrap() + DeltaOps::new_in_memory().write(batch.into()).await.unwrap() } #[tokio::test] diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index eef44bd1b2..49080b9250 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -21,7 +21,7 @@ //! let ids = arrow::array::Int32Array::from(vec![1, 2, 3, 4, 5]); //! let batch = RecordBatch::try_new(schema, vec![Arc::new(ids)])?; //! let ops = DeltaOps::try_from_uri("../path/to/empty/dir").await?; -//! let table = ops.write(WriteData::Vecs(vec![batch])).await?; +//! let table = ops.write(batch.into())).await?; //! ```` use std::collections::HashMap; @@ -126,6 +126,28 @@ pub enum WriteData { Vecs(Vec), } +impl From> for WriteData { + /// Convert an execution plan to write data, will basically just execute the plan + fn from(v: Arc) -> Self { + WriteData::DataFusionPlan(v) + } +} + +impl From> for WriteData { + /// Convert a vector of record batches to write data + fn from(v: Vec) -> Self { + WriteData::Vecs(v) + } +} + +impl From for WriteData { + /// Convert a single record batch to write data + fn from(v: RecordBatch) -> Self { + let schema = v.schema().clone(); + WriteData::RecordBatches((Box::new(std::iter::once(v)), schema)) + } +} + /// Configuration for writing data to a DeltaTable pub struct WriteBuilderConfig { /// A snapshot of the to-be-loaded table's state @@ -402,6 +424,9 @@ async fn write_execution_plan_with_predicate( // Write data to disk let mut tasks = vec![]; + + // write_data is a Vec, because we can have multiple streams if we have multiple partitions in DataFusion + // DataFusion Partitions are a performance-related thing and do not match the Delta Table partitions for mut stream in write_data { let inner_schema = target_schema.clone(); @@ -880,7 +905,7 @@ mod tests { let table = write_batch(table, batch.clone()).await; // Overwrite let _err = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Overwrite) .await .expect_err("Remove action is included when Delta table is append-only. Should error"); @@ -902,7 +927,7 @@ mod tests { // write some data let metadata = HashMap::from_iter(vec![("k1".to_string(), json!("v1.1"))]); let mut table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Append) .with_metadata(metadata.clone()) .await @@ -925,7 +950,7 @@ mod tests { let metadata: HashMap = HashMap::from_iter(vec![("k1".to_string(), json!("v1.2"))]); let mut table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Append) .with_metadata(metadata.clone()) .await @@ -948,7 +973,7 @@ mod tests { let metadata: HashMap = HashMap::from_iter(vec![("k2".to_string(), json!("v2.1"))]); let mut table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Overwrite) .with_metadata(metadata.clone()) .await @@ -984,10 +1009,7 @@ mod tests { vec![Arc::new(Int32Array::from(vec![Some(0), None]))], ) .unwrap(); - let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) - .await - .unwrap(); + let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( "value", @@ -1007,7 +1029,7 @@ mod tests { // Test cast options let table = DeltaOps::from(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_cast_safety(true) .await .unwrap(); @@ -1026,9 +1048,7 @@ mod tests { let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); - let res = DeltaOps::from(table) - .write(WriteData::Vecs(vec![batch.clone()])) - .await; + let res = DeltaOps::from(table).write(batch.clone().into()).await; assert!(res.is_err()); // Validate the datetime -> string behavior @@ -1045,10 +1065,7 @@ mod tests { )]))], ) .unwrap(); - let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) - .await - .unwrap(); + let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap(); let schema = Arc::new(ArrowSchema::new(vec![Field::new( "value", @@ -1063,10 +1080,7 @@ mod tests { ) .unwrap(); - let _res = DeltaOps::from(table) - .write(WriteData::Vecs(vec![batch])) - .await - .unwrap(); + let _res = DeltaOps::from(table).write(batch.into()).await.unwrap(); let expected = [ "+--------------------------+", "| value |", @@ -1083,7 +1097,7 @@ mod tests { async fn test_write_nonexistent() { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::ErrorIfExists) .await .unwrap(); @@ -1095,7 +1109,7 @@ mod tests { async fn test_write_partitioned() { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::ErrorIfExists) .with_partition_columns(["modified"]) .await @@ -1104,7 +1118,7 @@ mod tests { assert_eq!(table.get_files_count(), 2); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::ErrorIfExists) .with_partition_columns(["modified", "id"]) .await @@ -1117,7 +1131,7 @@ mod tests { async fn test_merge_schema() { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::ErrorIfExists) .await .unwrap(); @@ -1158,7 +1172,7 @@ mod tests { .unwrap(); let mut table = DeltaOps(table) - .write(WriteData::Vecs(vec![new_batch])) + .write(new_batch.into()) .with_save_mode(SaveMode::Append) .with_schema_mode(SchemaMode::Merge) .await @@ -1175,7 +1189,7 @@ mod tests { async fn test_merge_schema_with_partitions() { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_partition_columns(vec!["id", "value"]) .with_save_mode(SaveMode::ErrorIfExists) .await @@ -1217,7 +1231,7 @@ mod tests { .unwrap(); println!("new_batch: {:?}", new_batch.schema()); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![new_batch])) + .write(new_batch.into()) .with_save_mode(SaveMode::Append) .with_schema_mode(SchemaMode::Merge) .await @@ -1237,7 +1251,7 @@ mod tests { async fn test_overwrite_schema() { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::ErrorIfExists) .await .unwrap(); @@ -1278,7 +1292,7 @@ mod tests { .unwrap(); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![new_batch])) + .write(new_batch.into()) .with_save_mode(SaveMode::Append) .with_schema_mode(SchemaMode::Overwrite) .await; @@ -1290,7 +1304,7 @@ mod tests { // If you do not pass a schema mode, we want to check the schema let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::ErrorIfExists) .await .unwrap(); @@ -1320,7 +1334,7 @@ mod tests { RecordBatch::try_new(Arc::new(new_schema), vec![Arc::new(inserted_by)]).unwrap(); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![new_batch])) + .write(new_batch.into()) .with_save_mode(SaveMode::Append) .await; assert!(table.is_err()); @@ -1348,10 +1362,7 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); - let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) - .await - .unwrap(); + let table = DeltaOps(table).write(batch.clone().into()).await.unwrap(); assert_eq!(table.version(), 1); let schema: StructType = serde_json::from_value(json!({ @@ -1373,9 +1384,7 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); - let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) - .await; + let table = DeltaOps(table).write(batch.clone().into()).await; assert!(table.is_err()) } @@ -1392,7 +1401,7 @@ mod tests { assert_eq!(table.version(), 0); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -1431,7 +1440,7 @@ mod tests { .unwrap(); let _table = ops - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_partition_columns(["string"]) .await .unwrap(); @@ -1473,7 +1482,7 @@ mod tests { .unwrap(); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -1490,7 +1499,7 @@ mod tests { .unwrap(); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch_add])) + .write(batch_add.into()) .with_save_mode(SaveMode::Overwrite) .with_replace_where(col("id").eq(lit("C"))) .await @@ -1529,7 +1538,7 @@ mod tests { .unwrap(); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap(); @@ -1552,7 +1561,7 @@ mod tests { .unwrap(); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch_fail])) + .write(batch_fail.into()) .with_save_mode(SaveMode::Overwrite) .with_replace_where(col("id").eq(lit("C"))) .await; @@ -1570,7 +1579,7 @@ mod tests { let batch = get_record_batch(None, false); let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_partition_columns(["id", "value"]) .with_save_mode(SaveMode::Append) .await @@ -1592,7 +1601,7 @@ mod tests { .unwrap(); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch_add])) + .write(batch_add.into()) .with_save_mode(SaveMode::Overwrite) .with_replace_where(col("id").eq(lit("A"))) .await diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index de9b48e749..4013f50949 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -815,12 +815,12 @@ mod tests { let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()]; let table = DeltaOps::new_in_memory() - .write(WriteData::Vecs(batches.clone())) + .write(batches.clone().into()) .await .unwrap(); DeltaOps(table) - .write(WriteData::Vecs(batches)) + .write(batches.into()) .with_save_mode(crate::protocol::SaveMode::Overwrite) .await .unwrap() @@ -1014,10 +1014,7 @@ mod tests { ("struct_with_list", struct_with_list_array), ]) .unwrap(); - let table = DeltaOps::new_in_memory() - .write(crate::operations::write::WriteData::Vecs(vec![batch])) - .await - .unwrap(); + let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap(); create_checkpoint(&table).await.unwrap(); } diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index 06b0b14373..b07a18cef9 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -348,7 +348,7 @@ pub mod datafusion { pub async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaTable { DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Append) .await .expect("Failed to append") diff --git a/crates/core/tests/command_restore.rs b/crates/core/tests/command_restore.rs index 9b64abd248..48be2a7197 100644 --- a/crates/core/tests/command_restore.rs +++ b/crates/core/tests/command_restore.rs @@ -46,21 +46,21 @@ async fn setup_test() -> Result> { let batch = get_record_batch(); thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Append) .await .unwrap(); thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Overwrite) .await .unwrap(); thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch.clone()])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Append) .await .unwrap(); diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index c41add713d..bb744f44c8 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -115,7 +115,7 @@ mod local { for batch in batches { table = DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(save_mode.clone()) .await .unwrap(); @@ -606,7 +606,7 @@ mod local { async fn append_to_table(table: DeltaTable, batch: RecordBatch) -> DeltaTable { DeltaOps(table) - .write(WriteData::Vecs(vec![batch])) + .write(batch.into()) .with_save_mode(SaveMode::Append) .await .unwrap() diff --git a/crates/deltalake/examples/basic_operations.rs b/crates/deltalake/examples/basic_operations.rs index eb5de28d25..ede78ffebe 100644 --- a/crates/deltalake/examples/basic_operations.rs +++ b/crates/deltalake/examples/basic_operations.rs @@ -89,9 +89,7 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> { let batch = get_table_batches(); let table = DeltaOps(table) - .write(deltalake::operations::write::WriteData::Vecs(vec![ - batch.clone() - ])) + .write(batch.clone().into()) .with_writer_properties(writer_properties) .await?; @@ -103,9 +101,7 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> { // To overwrite instead of append (which is the default), use `.with_save_mode`: let table = DeltaOps(table) - .write(deltalake::operations::write::WriteData::Vecs(vec![ - batch.clone() - ])) + .write(batch.clone().into()) .with_save_mode(SaveMode::Overwrite) .with_writer_properties(writer_properties) .await?; diff --git a/docs/src/rust/check_constraints.rs b/docs/src/rust/check_constraints.rs index a0f9742554..c4d0f6d15c 100644 --- a/docs/src/rust/check_constraints.rs +++ b/docs/src/rust/check_constraints.rs @@ -16,7 +16,7 @@ async fn main() -> Result<(), Box> { Arc::new(Int32Array::from(vec![-10])) ]; let batch = RecordBatch::try_new(schema, invalid_values)?; - table.write(WriteData::Vecs(vec![batch])).await?; + table.write(batch.into()).await?; // --8<-- [end:add_data] Ok(()) diff --git a/docs/src/rust/operations.rs b/docs/src/rust/operations.rs index 2b9f7dee58..05263b0fa2 100644 --- a/docs/src/rust/operations.rs +++ b/docs/src/rust/operations.rs @@ -22,7 +22,7 @@ async fn main() -> Result<(), Box> { let table = deltalake::open_table("/tmp/my_table").await.unwrap(); let table = DeltaOps(table) - .write(WriteData::Vecs(vec![data])) + .write(data.into()) .with_save_mode(SaveMode::Overwrite) .with_replace_where(col("id").eq(lit("1"))) .await;