Skip to content

Commit

Permalink
use into()
Browse files Browse the repository at this point in the history
  • Loading branch information
aersam committed Mar 12, 2024
1 parent f131eb1 commit a3d5585
Show file tree
Hide file tree
Showing 17 changed files with 97 additions and 106 deletions.
6 changes: 2 additions & 4 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) -
DeltaOps::try_from_uri(table_path)
.await
.unwrap()
.write(deltalake_core::operations::write::WriteData::Vecs(tbl))
.write(tbl.into())
.with_partition_columns(vec!["wr_returned_date_sk"])
.await
.unwrap();
Expand Down Expand Up @@ -568,9 +568,7 @@ async fn main() {
DeltaOps::try_from_uri(output)
.await
.unwrap()
.write(deltalake_core::operations::write::WriteData::Vecs(vec![
batch,
]))
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ mod tests {
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -1821,7 +1821,7 @@ mod tests {
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down
21 changes: 9 additions & 12 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ mod tests {
async fn add_constraint_with_invalid_data() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -288,7 +288,7 @@ mod tests {
async fn add_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -313,7 +313,7 @@ mod tests {
// Add constraint by providing a datafusion expression.
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand Down Expand Up @@ -356,10 +356,7 @@ mod tests {
)
.unwrap();

let table = DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch]))
.await
.unwrap();
let table = DeltaOps::new_in_memory().write(batch.into()).await.unwrap();

let mut table = DeltaOps(table)
.add_constraint()
Expand All @@ -382,7 +379,7 @@ mod tests {
async fn add_conflicting_named_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -404,7 +401,7 @@ mod tests {
async fn write_data_that_violates_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;

let table = DeltaOps(write)
Expand All @@ -418,7 +415,7 @@ mod tests {
Arc::new(StringArray::from(vec!["2021-02-02"])),
];
let batch = RecordBatch::try_new(get_arrow_schema(&None), invalid_values)?;
let err = table.write(WriteData::Vecs(vec![batch])).await;
let err = table.write(batch.into()).await;
assert!(err.is_err());
Ok(())
}
Expand All @@ -427,11 +424,11 @@ mod tests {
async fn write_data_that_does_not_violate_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

let err = table.write(WriteData::Vecs(vec![batch])).await;
let err = table.write(batch.into()).await;

assert!(err.is_ok());
Ok(())
Expand Down
17 changes: 7 additions & 10 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ mod tests {
.unwrap();
// write some data
let table = DeltaOps(table)
.write(WriteData::Vecs(vec![batch]))
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(WriteData::Vecs(vec![batch]))
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand All @@ -488,7 +488,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(WriteData::Vecs(vec![batch]))
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -555,10 +555,7 @@ mod tests {
)
.unwrap();

DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch]))
.await
.unwrap()
DeltaOps::new_in_memory().write(batch.into()).await.unwrap()
}

// Validate behaviour of greater than
Expand Down Expand Up @@ -647,7 +644,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(WriteData::Vecs(vec![batch]))
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -705,7 +702,7 @@ mod tests {

// write some data
let table = DeltaOps(table)
.write(WriteData::Vecs(vec![batch]))
.write(batch.into())
.with_save_mode(SaveMode::Append)
.await
.unwrap();
Expand Down Expand Up @@ -775,7 +772,7 @@ mod tests {
let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];

let table = DeltaOps::new_in_memory()
.write(WriteData::Vecs(batches))
.write(batches.into())
.await
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ mod tests {
async fn drop_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;
let table = DeltaOps(write);

Expand All @@ -171,7 +171,7 @@ mod tests {
async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;

let table = DeltaOps(write)
Expand All @@ -187,7 +187,7 @@ mod tests {
async fn drop_invalid_constraint_ignore() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;

let version = write.version();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ mod tests {
async fn test_write_load() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;

let (_table, stream) = DeltaOps(table).load().await?;
Expand Down Expand Up @@ -149,7 +149,7 @@ mod tests {
async fn test_load_with_columns() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.await?;

let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ mod tests {
.unwrap();
// write some data
DeltaOps(table)
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.with_save_mode(SaveMode::Append)
.await
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ pub(super) mod zorder {
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch.clone()]))
.write(batch.clone().into())
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();
Expand Down
5 changes: 1 addition & 4 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,7 @@ mod tests {
)
.unwrap();

DeltaOps::new_in_memory()
.write(WriteData::Vecs(vec![batch]))
.await
.unwrap()
DeltaOps::new_in_memory().write(batch.into()).await.unwrap()
}

#[tokio::test]
Expand Down
Loading

0 comments on commit a3d5585

Please sign in to comment.