Skip to content

Commit

Permalink
Prevent SQLite from writing incomplete data on errors (datafusion-con…
Browse files Browse the repository at this point in the history
  • Loading branch information
sgrebnov authored and hozan23 committed Sep 19, 2024
1 parent 1bd4ed3 commit 2a92a77
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/sqlite/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ impl DataSink for SqliteDataSink {
_context: &Arc<TaskContext>,
) -> datafusion::common::Result<u64> {
let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel::<RecordBatch>(1);

// Since the main task/stream can be dropped or fail, we use a oneshot channel to signal that all data is received and we should commit the transaction
let (notify_commit_transaction, mut on_commit_transaction) =
tokio::sync::oneshot::channel();

let mut db_conn = self.sqlite.connect().await.map_err(to_datafusion_error)?;
let sqlite_conn = Sqlite::sqlite_conn(&mut db_conn).map_err(to_datafusion_error)?;

Expand All @@ -144,6 +149,15 @@ impl DataSink for SqliteDataSink {
})?;
}

if notify_commit_transaction.send(()).is_err() {
return Err(DataFusionError::Execution(
"Unable to send message to commit transaction to SQLite writer.".to_string(),
));
};

// Drop the sender to signal the receiver that no more data is coming
drop(batch_tx);

Ok::<_, DataFusionError>(num_rows)
});

Expand All @@ -165,6 +179,12 @@ impl DataSink for SqliteDataSink {
}
}

if on_commit_transaction.try_recv().is_err() {
return Err(tokio_rusqlite::Error::Other(
"No message to commit transaction has been received.".into(),
));
}

transaction.commit()?;

Ok(())
Expand Down

0 comments on commit 2a92a77

Please sign in to comment.