Skip to content

Commit

Permalink
upgrade object store
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Jul 12, 2024
1 parent a1869dc commit a233ee0
Show file tree
Hide file tree
Showing 15 changed files with 1,002 additions and 723 deletions.
369 changes: 304 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions src/analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,7 @@ fn open_storage(
obkv::ObkvObjectStore::try_new(
Arc::new(obkv),
obkv_opts.shard_num,
obkv_opts.part_size.0 as usize,
obkv_opts.max_object_size.0 as usize,
obkv_opts.upload_parallelism,
)
.context(OpenObjectStore)?,
);
Expand Down
2 changes: 1 addition & 1 deletion src/analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ mod tests {

let bytes = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let meta_path = object_store::Path::from(meta_path);
store.put(&meta_path, bytes).await.unwrap();
store.put(&meta_path, bytes.into()).await.unwrap();
}

#[tokio::test]
Expand Down
188 changes: 128 additions & 60 deletions src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@

//! Sst writer implementation based on parquet.

use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
future::Future,
io::Error,
pin::Pin,
sync::Arc,
task::Poll,
};

use async_trait::async_trait;
use common_types::{
datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId, schema::Schema,
time::TimeRange,
};
use datafusion::parquet::basic::Compression;
use futures::StreamExt;
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use generic_error::BoxError;
use logger::{debug, error};
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use object_store::{MultipartRef, ObjectStore, ObjectStoreRef, Path, UploadPart};
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::{io::AsyncWrite, sync::Mutex};

use crate::{
sst::{
Expand All @@ -45,8 +51,8 @@ use crate::{
},
},
writer::{
self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, Io,
MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage,
BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, MetaData,
PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage,
},
},
table::sst_util,
Expand Down Expand Up @@ -405,67 +411,129 @@ impl<'a> RecordBatchGroupWriter<'a> {
}
}

struct ObjectStoreMultiUploadAborter<'a> {
location: &'a Path,
session_id: String,
object_store: &'a ObjectStoreRef,
struct ObjectStoreMultiUpload {
multi_upload: MultipartRef,
tasks: FuturesUnordered<UploadPart>,
completion_task: Option<BoxFuture<'static, std::result::Result<(), Error>>>,
}

impl<'a> ObjectStoreMultiUploadAborter<'a> {
async fn initialize_upload(
object_store: &'a ObjectStoreRef,
location: &'a Path,
) -> Result<(
ObjectStoreMultiUploadAborter<'a>,
Box<dyn AsyncWrite + Unpin + Send>,
)> {
let (session_id, upload_writer) = object_store
impl<'a> ObjectStoreMultiUpload {
async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) -> Result<Self> {
let upload_writer = object_store
.put_multipart(location)
.await
.context(Storage)?;
let aborter = Self {
location,
session_id,
object_store,

let multi_upload = Self {
multi_upload: Arc::new(Mutex::new(upload_writer)),
tasks: FuturesUnordered::new(),
completion_task: None,
};
Ok((aborter, upload_writer))

Ok(multi_upload)
}

async fn abort(self) -> Result<()> {
self.object_store
.abort_multipart(self.location, &self.session_id)
.await
.context(Storage)
pub fn aborter(&self) -> MultipartRef {
self.multi_upload.clone()
}

pub fn poll_tasks(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::result::Result<(), object_store::ObjectStoreError> {
if self.tasks.is_empty() {
return Ok(());
}
while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
res?;
}
Ok(())
}
}

async fn write_metadata<W>(
mut meta_sink: W,
parquet_metadata: ParquetMetaData,
meta_path: &object_store::Path,
) -> writer::Result<usize>
where
W: AsyncWrite + Send + Unpin,
{
let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
let bytes = buf.as_bytes();
let bytes_size = bytes.len();
meta_sink.write_all(bytes).await.with_context(|| Io {
file: meta_path.clone(),
})?;
impl AsyncWrite for ObjectStoreMultiUpload {
// TODO: Currently,the data writing is serial, and data may need to be written
// concurrently.
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, Error>> {
let buf_size = buf.len();
let multi_upload = self.multi_upload.clone();

let buf = buf.to_vec();
let task = async move { multi_upload.lock().await.put_part(buf.into()).await };
self.as_mut().tasks.push(Box::pin(task));

self.as_mut().poll_tasks(cx)?;

Poll::Ready(Ok(buf_size))
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
self.as_mut().poll_tasks(cx)?;

if self.tasks.is_empty() {
return Poll::Ready(Ok(()));
}
Poll::Pending
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
self.as_mut().poll_tasks(cx)?;

if !self.tasks.is_empty() {
return Poll::Pending;
}

meta_sink.shutdown().await.with_context(|| Io {
file: meta_path.clone(),
})?;
let multi_upload = self.multi_upload.clone();

Ok(bytes_size)
let completion_task = self.completion_task.get_or_insert_with(|| {
Box::pin(async move {
multi_upload.lock().await.complete().await?;
Ok(())
})
});

Pin::new(completion_task).poll(cx)
}
}

async fn write_metadata(
meta_sink: ObjectStoreMultiUpload,
parquet_metadata: ParquetMetaData,
) -> Result<usize> {
let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
let buf_size = buf.len();
meta_sink
.multi_upload
.lock()
.await
.put_part(buf.into())
.await
.context(Storage)?;
meta_sink
.multi_upload
.lock()
.await
.complete()
.await
.context(Storage)?;
Ok(buf_size)
}

async fn multi_upload_abort(path: &Path, aborter: ObjectStoreMultiUploadAborter<'_>) {
async fn multi_upload_abort(aborter: MultipartRef) {
// The uploading file will be leaked if failed to abort. A repair command will
// be provided to clean up the leaked files.
if let Err(e) = aborter.abort().await {
error!("Failed to abort multi-upload for sst:{}, err:{}", path, e);
if let Err(e) = aborter.lock().await.abort().await {
error!("Failed to abort multi-upload sst, err:{}", e);
}
}

Expand All @@ -476,7 +544,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
request_id: RequestId,
meta: &MetaData,
input: RecordBatchStream,
) -> writer::Result<SstInfo> {
) -> Result<SstInfo> {
debug!(
"Build parquet file, request_id:{}, meta:{:?}, num_rows_per_row_group:{}",
request_id, meta, self.options.num_rows_per_row_group
Expand All @@ -491,28 +559,28 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
};
let group_writer = RecordBatchGroupWriter::new(request_id, input, meta, write_options);

let (aborter, sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, self.path).await?;
let sink = ObjectStoreMultiUpload::new(self.store, self.path).await?;
let aborter = sink.aborter();

let meta_path = Path::from(sst_util::new_metadata_path(self.path.as_ref()));

let (total_num_rows, parquet_metadata, mut data_encoder) =
match group_writer.write_all(sink, &meta_path).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(aborter).await;
return Err(e);
}
};
let time_range = parquet_metadata.time_range;

let (meta_aborter, meta_sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, &meta_path).await?;
let meta_size = match write_metadata(meta_sink, parquet_metadata, &meta_path).await {
let meta_sink = ObjectStoreMultiUpload::new(self.store, &meta_path).await?;
let meta_aborter = meta_sink.aborter();
let meta_size = match write_metadata(meta_sink, parquet_metadata).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(&meta_path, meta_aborter).await;
multi_upload_abort(aborter).await;
multi_upload_abort(meta_aborter).await;
return Err(e);
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ table_kv = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
twox-hash = "1.6"
upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ] }
upstream = { package = "object_store", version = "0.10.1", features = [ "aws" ] }
uuid = { version = "1.3.3", features = ["v4"] }

[dev-dependencies]
Expand Down
Loading

0 comments on commit a233ee0

Please sign in to comment.