From 8fe327b24d62e852d168d162a159536b1b10aac7 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 25 May 2024 12:54:47 +0800 Subject: [PATCH] Make greptime buildable with opendal 0.46 (#5) Signed-off-by: Xuanwo --- Cargo.lock | 1 + src/common/datasource/src/file_format.rs | 5 +- src/common/datasource/src/file_format/csv.rs | 9 +- src/common/datasource/src/file_format/json.rs | 9 +- src/common/datasource/src/file_format/orc.rs | 14 +- .../datasource/src/file_format/parquet.rs | 29 ++- src/common/datasource/src/test_util.rs | 4 +- src/datanode/src/store.rs | 84 +++++---- src/file-engine/src/manifest.rs | 3 +- src/mito2/src/cache/file_cache.rs | 19 +- src/mito2/src/cache/write_cache.rs | 25 ++- src/mito2/src/manifest/storage.rs | 106 ++++++----- src/mito2/src/sst/index/applier.rs | 24 ++- src/mito2/src/sst/index/store.rs | 26 ++- src/mito2/src/sst/parquet/helper.rs | 4 +- src/mito2/src/sst/parquet/metadata.rs | 6 +- src/mito2/src/worker/handle_drop.rs | 3 +- src/object-store/src/layers/lru_cache.rs | 20 +- .../src/layers/lru_cache/read_cache.rs | 176 +++++++++--------- src/object-store/src/lib.rs | 5 +- src/object-store/tests/object_store_test.rs | 15 +- src/operator/Cargo.toml | 1 + src/operator/src/statement/copy_table_from.rs | 37 +++- 23 files changed, 375 insertions(+), 250 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0096047c3cf..df3157204336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6769,6 +6769,7 @@ dependencies = [ "substrait 0.8.0", "table", "tokio", + "tokio-util", "tonic 0.11.0", ] diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 6f80590d26e1..5bb9258ad3d0 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -36,6 +36,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use object_store::ObjectStore; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncWriteCompatExt; use self::csv::CsvFormat; use self::json::JsonFormat; @@ -146,7 +147,8 @@ pub fn open_with_decoder DataFusionResult>( let reader = object_store .reader(&path) .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| DataFusionError::External(Box::new(e)))? + .into_bytes_stream(..); let mut upstream = compression_type.convert_stream(reader).fuse(); @@ -203,6 +205,7 @@ pub async fn stream_to_file T>( .writer_with(&path) .concurrent(concurrency) .await + .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) }); diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 4cf2b9e1336d..ade4e5409e42 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -29,6 +29,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use derive_builder::Builder; use object_store::ObjectStore; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::SyncIoBridge; use super::stream_to_file; @@ -164,10 +165,16 @@ impl FileOpener for CsvOpener { #[async_trait] impl FileFormat for CsvFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 77fde3fddb74..97057f836200 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -31,6 +31,7 @@ use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::SendableRecordBatchStream; use object_store::ObjectStore; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tokio_util::io::SyncIoBridge; use super::stream_to_file; @@ -82,10 +83,16 @@ impl Default for JsonFormat { #[async_trait] impl FileFormat for JsonFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 23e0589c99e9..11db189e9fa0 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -25,6 +25,7 @@ use orc_rust::arrow_reader::ArrowReaderBuilder; use orc_rust::async_arrow_reader::ArrowStreamReader; use snafu::ResultExt; use tokio::io::{AsyncRead, AsyncSeek}; +use tokio_util::compat::FuturesAsyncReadCompatExt; use crate::error::{self, Result}; use crate::file_format::FileFormat; @@ -32,6 +33,7 @@ use crate::file_format::FileFormat; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct OrcFormat; +/// TODO: it's better to avoid AsyncRead + AsyncSeek, use range based read instead. pub async fn new_orc_stream_reader( reader: R, ) -> Result> { @@ -51,10 +53,16 @@ pub async fn infer_orc_schema #[async_trait] impl FileFormat for OrcFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let schema = infer_orc_schema(reader).await?; @@ -100,7 +108,9 @@ impl FileOpener for OrcOpener { let reader = object_store .reader(meta.location().to_string().as_str()) .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| DataFusionError::External(Box::new(e)))? + .into_futures_async_read(0..meta.object_meta.size as u64) + .compat(); let stream_reader = new_orc_stream_reader(reader) .await diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 651d5904c874..2e887ac2f7c3 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -29,10 +29,11 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::SendableRecordBatchStream; use futures::future::BoxFuture; use futures::StreamExt; -use object_store::{ObjectStore, Reader, Writer}; +use object_store::{FuturesAsyncReader, ObjectStore}; use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use snafu::ResultExt; +use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBufferedWriter}; use crate::error::{self, Result}; @@ -45,10 +46,16 @@ pub struct ParquetFormat {} #[async_trait] impl FileFormat for ParquetFormat { async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { + let meta = store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let mut reader = store .reader(path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let metadata = reader .get_metadata() @@ -98,7 +105,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { pub struct LazyParquetFileReader { object_store: ObjectStore, - reader: Option, + reader: Option>, path: String, } @@ -114,7 +121,13 @@ impl LazyParquetFileReader { /// Must initialize the reader, or throw an error from the future. async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> { if self.reader.is_none() { - let reader = self.object_store.reader(&self.path).await?; + let meta = self.object_store.stat(&self.path).await?; + let reader = self + .object_store + .reader(&self.path) + .await? + .into_futures_async_read(0..meta.content_length()) + .compat(); self.reader = Some(reader); } @@ -167,16 +180,17 @@ pub struct BufferedWriter { } type InnerBufferedWriter = LazyBufferedWriter< - object_store::Writer, + Compat, ArrowWriter, - impl Fn(String) -> BoxFuture<'static, Result>, + impl Fn(String) -> BoxFuture<'static, Result>>, >; impl BufferedWriter { fn make_write_factory( store: ObjectStore, concurrency: usize, - ) -> impl Fn(String) -> BoxFuture<'static, Result> { + ) -> impl Fn(String) -> BoxFuture<'static, Result>> + { move |path| { let store = store.clone(); Box::pin(async move { @@ -184,6 +198,7 @@ impl BufferedWriter { .writer_with(&path) .concurrent(concurrency) .await + .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) }) } diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 8f1af59c90e7..d3a24a23d24a 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -120,7 +120,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written, origin); + assert_eq_lines(written.to_vec(), origin.to_vec()); } pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written, origin); + assert_eq_lines(written.to_vec(), origin.to_vec()); } // Ignore the CRLF difference across operating systems. diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 8b778306c466..198c0830f97f 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,16 +20,14 @@ mod gcs; mod oss; mod s3; -use std::sync::Arc; -use std::time::Duration; -use std::{env, path}; +use std::path; use common_base::readable_size::ReadableSize; use common_telemetry::info; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder}; +use object_store::{HttpClient, ObjectStore}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -107,13 +105,14 @@ async fn create_object_store_with_cache( if let Some(path) = cache_path { let atomic_temp_dir = join_dir(path, ".tmp/"); clean_temp_dir(&atomic_temp_dir)?; - let cache_store = Fs::default() - .root(path) - .atomic_write_dir(&atomic_temp_dir) - .build() - .context(error::InitBackendSnafu)?; - - let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) + let mut builder = Fs::default(); + builder.root(path); + builder.atomic_write_dir(&atomic_temp_dir); + let cache_store = ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish(); + + let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize) .await .context(error::InitBackendSnafu)?; @@ -138,35 +137,38 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> { Ok(()) } +/// FIXME: we need to use reqwest 0.12 here. pub(crate) fn build_http_client() -> Result { - let http_builder = { - let mut builder = reqwest::ClientBuilder::new(); - - // Pool max idle per host controls connection pool size. - // Default to no limit, set to `0` for disable it. - let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(usize::MAX); - builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); - - // Connect timeout default to 30s. - let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(30); - builder = builder.connect_timeout(Duration::from_secs(connect_timeout)); - - // Pool connection idle timeout default to 90s. - let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(90); - - builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout)); - - builder - }; - - HttpClient::build(http_builder).context(error::InitBackendSnafu) + // let http_builder = { + // let mut builder = reqwest::ClientBuilder::new(); + // + // // Pool max idle per host controls connection pool size. + // // Default to no limit, set to `0` for disable it. + // let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST") + // .ok() + // .and_then(|v| v.parse::().ok()) + // .unwrap_or(usize::MAX); + // builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); + // + // // Connect timeout default to 30s. + // let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT") + // .ok() + // .and_then(|v| v.parse::().ok()) + // .unwrap_or(30); + // builder = builder.connect_timeout(Duration::from_secs(connect_timeout)); + // + // // Pool connection idle timeout default to 90s. + // let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT") + // .ok() + // .and_then(|v| v.parse::().ok()) + // .unwrap_or(90); + // + // builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout)); + // + // builder + // }; + // + // HttpClient::build(http_builder).context(error::InitBackendSnafu) + + HttpClient::new().context(error::InitBackendSnafu) } diff --git a/src/file-engine/src/manifest.rs b/src/file-engine/src/manifest.rs index d2e8255301b1..6310c3ccb912 100644 --- a/src/file-engine/src/manifest.rs +++ b/src/file-engine/src/manifest.rs @@ -71,7 +71,8 @@ impl FileRegionManifest { let bs = object_store .read(path) .await - .context(LoadRegionManifestSnafu { region_id })?; + .context(LoadRegionManifestSnafu { region_id })? + .to_vec(); Self::decode(bs.as_slice()) } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 0afbf5b6695e..931e5062693a 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -112,6 +112,10 @@ impl FileCache { self.memory_index.insert(key, value).await; } + pub(crate) async fn get(&self, key: IndexKey) -> Option { + self.memory_index.get(&key).await + } + /// Reads a file from the cache. pub(crate) async fn reader(&self, key: IndexKey) -> Option { // We must use `get()` to update the estimator of the cache. @@ -372,7 +376,6 @@ fn parse_index_key(name: &str) -> Option { #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; - use futures::AsyncReadExt; use object_store::services::Fs; use super::*; @@ -451,10 +454,9 @@ mod tests { .await; // Read file content. - let mut reader = cache.reader(key).await.unwrap(); - let mut buf = String::new(); - reader.read_to_string(&mut buf).await.unwrap(); - assert_eq!("hello", buf); + let reader = cache.reader(key).await.unwrap(); + let buf = reader.read(..).await.unwrap().to_vec(); + assert_eq!("hello", String::from_utf8(buf).unwrap()); // Get weighted size. cache.memory_index.run_pending_tasks().await; @@ -549,10 +551,9 @@ mod tests { for (i, file_id) in file_ids.iter().enumerate() { let key = IndexKey::new(region_id, *file_id, file_type); - let mut reader = cache.reader(key).await.unwrap(); - let mut buf = String::new(); - reader.read_to_string(&mut buf).await.unwrap(); - assert_eq!(i.to_string(), buf); + let reader = cache.reader(key).await.unwrap(); + let buf = reader.read(..).await.unwrap().to_vec(); + assert_eq!(i.to_string(), String::from_utf8(buf).unwrap()); } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 23a84194695d..f7a5af339b48 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -19,6 +19,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; +use futures::AsyncWriteExt; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; @@ -175,19 +176,27 @@ impl WriteCache { }]) .start_timer(); + let cached_value = self + .file_cache + .local_store() + .stat(&cache_path) + .await + .context(error::OpenDalSnafu)?; let reader = self .file_cache .local_store() .reader(&cache_path) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .into_futures_async_read(0..cached_value.content_length()); let mut writer = remote_store .writer_with(upload_path) - .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .into_futures_async_write(); let bytes_written = futures::io::copy(reader, &mut writer) @@ -199,7 +208,11 @@ impl WriteCache { })?; // Must close to upload all data. - writer.close().await.context(error::OpenDalSnafu)?; + writer.close().await.context(error::UploadSnafu { + region_id, + file_id, + file_type, + })?; UPLOAD_BYTES_TOTAL.inc_by(bytes_written); @@ -315,7 +328,7 @@ mod tests { .read(&write_cache.file_cache.cache_file_path(key)) .await .unwrap(); - assert_eq!(remote_data, cache_data); + assert_eq!(remote_data.to_vec(), cache_data.to_vec()); // Check write cache contains the index key let index_key = IndexKey::new(region_id, file_id, FileType::Puffin); @@ -326,7 +339,7 @@ mod tests { .read(&write_cache.file_cache.cache_file_path(index_key)) .await .unwrap(); - assert_eq!(remote_index_data, cache_index_data); + assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec()); } #[tokio::test] diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index fc9467f65fbb..a942f80fcd12 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -244,7 +244,8 @@ impl ManifestObjectStore { .object_store .read(entry.path()) .await - .context(OpenDalSnafu)?; + .context(OpenDalSnafu)? + .to_vec(); let data = compress_type .decode(bytes) .await @@ -420,58 +421,59 @@ impl ManifestObjectStore { let path = self.checkpoint_file_path(version); // Due to backward compatibility, it is possible that the user's checkpoint not compressed, // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. - let checkpoint_data = - match self.object_store.read(&path).await { - Ok(checkpoint) => { - let checkpoint_size = checkpoint.len(); - let decompress_data = self.compress_type.decode(checkpoint).await.context( - DecompressObjectSnafu { - compress_type: self.compress_type, - path, - }, - )?; - verify_checksum(&decompress_data, metadata.checksum)?; - // set the checkpoint size - self.set_checkpoint_file_size(version, checkpoint_size as u64); - Ok(Some(decompress_data)) - } - Err(e) => { - if e.kind() == ErrorKind::NotFound { - if self.compress_type != FALL_BACK_COMPRESS_TYPE { - let fall_back_path = gen_path( - &self.path, - &checkpoint_file(version), - FALL_BACK_COMPRESS_TYPE, - ); - debug!( - "Failed to load checkpoint from path: {}, fall back to path: {}", - path, fall_back_path - ); - match self.object_store.read(&fall_back_path).await { - Ok(checkpoint) => { - let checkpoint_size = checkpoint.len(); - let decompress_data = FALL_BACK_COMPRESS_TYPE - .decode(checkpoint) - .await - .context(DecompressObjectSnafu { - compress_type: FALL_BACK_COMPRESS_TYPE, - path, - })?; - verify_checksum(&decompress_data, metadata.checksum)?; - self.set_checkpoint_file_size(version, checkpoint_size as u64); - Ok(Some(decompress_data)) - } - Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), - Err(e) => Err(e).context(OpenDalSnafu), + let checkpoint_data = match self.object_store.read(&path).await { + Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); + let decompress_data = self + .compress_type + .decode(checkpoint.to_vec()) + .await + .context(DecompressObjectSnafu { + compress_type: self.compress_type, + path, + })?; + verify_checksum(&decompress_data, metadata.checksum)?; + // set the checkpoint size + self.set_checkpoint_file_size(version, checkpoint_size as u64); + Ok(Some(decompress_data)) + } + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + let fall_back_path = gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + ); + debug!( + "Failed to load checkpoint from path: {}, fall back to path: {}", + path, fall_back_path + ); + match self.object_store.read(&fall_back_path).await { + Ok(checkpoint) => { + let checkpoint_size = checkpoint.len(); + let decompress_data = FALL_BACK_COMPRESS_TYPE + .decode(checkpoint.to_vec()) + .await + .context(DecompressObjectSnafu { + compress_type: FALL_BACK_COMPRESS_TYPE, + path, + })?; + verify_checksum(&decompress_data, metadata.checksum)?; + self.set_checkpoint_file_size(version, checkpoint_size as u64); + Ok(Some(decompress_data)) } - } else { - Ok(None) + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(OpenDalSnafu), } } else { - Err(e).context(OpenDalSnafu) + Ok(None) } + } else { + Err(e).context(OpenDalSnafu) } - }?; + } + }?; Ok(checkpoint_data.map(|data| (version, data))) } @@ -489,7 +491,7 @@ impl ManifestObjectStore { } }; - let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; + let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?; debug!( "Load checkpoint in path: {}, metadata: {:?}", @@ -501,7 +503,11 @@ impl ManifestObjectStore { #[cfg(test)] pub async fn read_file(&self, path: &str) -> Result> { - self.object_store.read(path).await.context(OpenDalSnafu) + self.object_store + .read(path) + .await + .context(OpenDalSnafu) + .map(|v| v.to_vec()) } #[cfg(test)] diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index f14251afb9d5..eb4e42cd47bf 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -121,9 +121,17 @@ impl SstIndexApplier { return Ok(None); }; + let Some(indexed_value) = file_cache + .get(IndexKey::new(self.region_id, file_id, FileType::Puffin)) + .await + else { + return Ok(None); + }; + Ok(file_cache .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .await + .map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64)) .map(PuffinFileReader::new)) } @@ -190,7 +198,13 @@ mod tests { let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + let mut puffin_writer = PuffinFileWriter::new( + object_store + .writer(&path) + .await + .unwrap() + .into_futures_async_write(), + ); puffin_writer .add_blob(Blob { blob_type: INDEX_BLOB_TYPE.to_string(), @@ -236,7 +250,13 @@ mod tests { let region_dir = "region_dir".to_string(); let path = location::index_file_path(®ion_dir, file_id); - let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + let mut puffin_writer = PuffinFileWriter::new( + object_store + .writer(&path) + .await + .unwrap() + .into_futures_async_write(), + ); puffin_writer .add_blob(Blob { blob_type: "invalid_blob_type".to_string(), diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 23893cc861fe..9d26118366ad 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -26,6 +26,8 @@ use crate::error::{OpenDalSnafu, Result}; /// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring /// metrics such as bytes read, bytes written, and the number of seek operations. +/// +/// TODO: Consider refactor InstrumentedStore to use async in trait instead of AsyncRead. #[derive(Clone)] pub(crate) struct InstrumentedStore { /// The underlying object store. @@ -58,8 +60,14 @@ impl InstrumentedStore { read_byte_count: &'a IntCounter, read_count: &'a IntCounter, seek_count: &'a IntCounter, - ) -> Result> { - let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?; + ) -> Result> { + let meta = self.object_store.stat(path).await.context(OpenDalSnafu)?; + let reader = self + .object_store + .reader(path) + .await + .context(OpenDalSnafu)? + .into_futures_async_read(0..meta.content_length()); Ok(InstrumentedAsyncRead::new( reader, read_byte_count, @@ -77,15 +85,21 @@ impl InstrumentedStore { write_byte_count: &'a IntCounter, write_count: &'a IntCounter, flush_count: &'a IntCounter, - ) -> Result> { + ) -> Result> { let writer = match self.write_buffer_size { Some(size) => self .object_store .writer_with(path) - .buffer(size) + .chunk(size) + .await + .context(OpenDalSnafu)? + .into_futures_async_write(), + None => self + .object_store + .writer(path) .await - .context(OpenDalSnafu)?, - None => self.object_store.writer(path).await.context(OpenDalSnafu)?, + .context(OpenDalSnafu)? + .into_futures_async_write(), }; Ok(InstrumentedAsyncWrite::new( writer, diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs index 34196df7c002..7b64062b272f 100644 --- a/src/mito2/src/sst/parquet/helper.rs +++ b/src/mito2/src/sst/parquet/helper.rs @@ -121,7 +121,7 @@ async fn fetch_ranges_seq( .read_with(&file_path) .range(range.start..range.end) .call()?; - Ok::<_, object_store::Error>(Bytes::from(data)) + Ok::<_, object_store::Error>(data.to_bytes()) }) .collect::>>() }; @@ -141,7 +141,7 @@ async fn fetch_ranges_concurrent( let future_read = object_store.read_with(file_path); handles.push(async move { let data = future_read.range(range.start..range.end).await?; - Ok::<_, object_store::Error>(Bytes::from(data)) + Ok::<_, object_store::Error>(data.to_bytes()) }); } let results = futures::future::try_join_all(handles).await?; diff --git a/src/mito2/src/sst/parquet/metadata.rs b/src/mito2/src/sst/parquet/metadata.rs index e0db7b40b889..3cf5a85cf893 100644 --- a/src/mito2/src/sst/parquet/metadata.rs +++ b/src/mito2/src/sst/parquet/metadata.rs @@ -85,7 +85,8 @@ impl<'a> MetadataLoader<'a> { .read_with(path) .range(buffer_start..file_size) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .to_vec(); let buffer_len = buffer.len(); let mut footer = [0; 8]; @@ -129,7 +130,8 @@ impl<'a> MetadataLoader<'a> { .read_with(path) .range(metadata_start..(file_size - FOOTER_SIZE as u64)) .await - .context(error::OpenDalSnafu)?; + .context(error::OpenDalSnafu)? + .to_vec(); let metadata = decode_metadata(&data).map_err(|e| { error::InvalidParquetSnafu { diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 814d48bd1b01..c307e437c502 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,6 +16,7 @@ use std::time::Duration; +use bytes::Bytes; use common_telemetry::{error, info, warn}; use futures::TryStreamExt; use object_store::util::join_path; @@ -50,7 +51,7 @@ impl RegionWorkerLoop { region .access_layer .object_store() - .write(&marker_path, vec![]) + .write(&marker_path, Bytes::new()) .await .context(OpenDalSnafu) .inspect_err(|e| { diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 6df9b2566dd4..bcea36603ca6 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -19,21 +19,21 @@ use opendal::raw::{ Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; -use opendal::Result; +use opendal::{Operator, Result}; mod read_cache; use common_telemetry::info; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. #[derive(Clone)] -pub struct LruCacheLayer { +pub struct LruCacheLayer { // The read cache - read_cache: ReadCache, + read_cache: ReadCache, } -impl LruCacheLayer { +impl LruCacheLayer { /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. - pub async fn new(file_cache: Arc, capacity: usize) -> Result { + pub async fn new(file_cache: Operator, capacity: usize) -> Result { let read_cache = ReadCache::new(file_cache, capacity); let (entries, bytes) = read_cache.recover_cache().await?; @@ -56,8 +56,8 @@ impl LruCacheLayer { } } -impl Layer for LruCacheLayer { - type LayeredAccess = LruCacheAccess; +impl Layer for LruCacheLayer { + type LayeredAccess = LruCacheAccess; fn layer(&self, inner: I) -> Self::LayeredAccess { LruCacheAccess { @@ -68,12 +68,12 @@ impl Layer for LruCacheLayer { } #[derive(Debug)] -pub struct LruCacheAccess { +pub struct LruCacheAccess { inner: I, - read_cache: ReadCache, + read_cache: ReadCache, } -impl LayeredAccess for LruCacheAccess { +impl LayeredAccess for LruCacheAccess { type Inner = I; type Reader = Arc; type BlockingReader = I::BlockingReader; diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index e6a3725c7a1d..6831ccf6a1b2 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use common_telemetry::debug; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use moka::future::Cache; use moka::notification::ListenerFuture; -use opendal::raw::oio::{List, Read, ReadDyn, Reader, Write}; -use opendal::raw::{Access, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; -use opendal::{Error as OpendalError, ErrorKind, Result}; +use opendal::raw::oio::{Read, ReadDyn, Reader}; +use opendal::raw::{Access, BytesRange, OpRead, RpRead}; +use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -52,26 +52,22 @@ fn can_cache(path: &str) -> bool { } /// Generate an unique cache key for the read path and range. -fn read_cache_key(path: &str, args: &OpRead) -> String { - format!( - "{:x}.cache-{}", - md5::compute(path), - args.range().to_header() - ) +fn read_cache_key(path: &str, range: BytesRange) -> String { + format!("{:x}.cache-{}", md5::compute(path), range.to_header()) } /// Local read cache for files in object storage #[derive(Clone, Debug)] -pub(crate) struct ReadCache { +pub(crate) struct ReadCache { /// Local file cache backend - file_cache: Arc, + file_cache: Operator, /// Local memory cache to track local cache files mem_cache: Cache, } -impl ReadCache { +impl ReadCache { /// Create a [`ReadCache`] with capacity in bytes. - pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { + pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self { let file_cache_cloned = file_cache.clone(); let eviction_listener = move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { @@ -83,7 +79,7 @@ impl ReadCache { if let ReadResult::Success(size) = read_result { OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); - let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; + let result = file_cache_cloned.delete(&read_key).await; debug!( "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", read_key, result, cause @@ -133,17 +129,17 @@ impl ReadCache { /// Recover existing cache items from `file_cache` to `mem_cache`. /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { - let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; + let mut pager = self.file_cache.lister("/").await?; - while let Some(entry) = pager.next().await? { + while let Some(entry) = pager.next().await.transpose()? { let read_key = entry.path(); // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, // because it's private field. let size = { - let stat = self.file_cache.stat(read_key, OpStat::default()).await?; + let stat = self.file_cache.stat(read_key).await?; - stat.into_metadata().content_length() + stat.content_length() }; OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); @@ -159,8 +155,7 @@ impl ReadCache { /// Returns true when the read cache contains the specific file. pub(crate) async fn contains_file(&self, path: &str) -> bool { self.mem_cache.run_pending_tasks().await; - self.mem_cache.contains_key(path) - && self.file_cache.stat(path, OpStat::default()).await.is_ok() + self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok() } /// Read from a specific path using the OpRead operation. @@ -181,78 +176,35 @@ impl ReadCache { return inner.read(path, args).await.map(to_output_reader); } - let read_key = read_cache_key(path, &args); - - let read_result = self - .mem_cache - .try_get_with( - read_key.clone(), - self.read_remote(inner, &read_key, path, args.clone()), - ) - .await - .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; - - match read_result { - ReadResult::Success(_) => { - // There is a concurrent issue here, the local cache may be purged - // while reading, we have to fallback to remote read - match self.file_cache.read(&read_key, OpRead::default()).await { - Ok(ret) => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["success"]) - .inc(); - Ok(to_output_reader(ret)) - } - Err(_) => { - OBJECT_STORE_LRU_CACHE_MISS.inc(); - inner.read(path, args).await.map(to_output_reader) - } - } - } - ReadResult::NotFound => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["not_found"]) - .inc(); - - Err(OpendalError::new( - ErrorKind::NotFound, - &format!("File not found: {path}"), - )) - } - } + let (rp, reader) = inner.read(path, args).await?; + let reader: ReadCacheReader = ReadCacheReader { + path: Arc::new(path.to_string()), + inner_reader: reader, + file_cache: self.file_cache.clone(), + mem_cache: self.mem_cache.clone(), + }; + Ok((rp, Arc::new(reader))) } +} - async fn try_write_cache(&self, mut reader: I::Reader, read_key: &str) -> Result - where - I: Access, - { - let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; - let mut total = 0; - while let Some(bytes) = reader.next().await { - let bytes = &bytes?; - total += bytes.len(); - writer.write(bytes).await?; - } - // Call `close` to ensure data is written. - writer.close().await?; - Ok(total) - } +pub struct ReadCacheReader { + /// Path of the file + path: Arc, + /// Remote file reader. + inner_reader: I::Reader, + /// Local file cache backend + file_cache: Operator, + /// Local memory cache to track local cache files + mem_cache: Cache, +} - /// Read the file from remote storage. If success, write the content into local cache. - async fn read_remote( - &self, - inner: &I, - read_key: &str, - path: &str, - args: OpRead, - ) -> Result - where - I: Access, - { +impl ReadCacheReader { + /// TODO: we can return the Buffer directly to avoid another read from cache. + async fn read_remote(&self, offset: u64, limit: usize) -> Result { OBJECT_STORE_LRU_CACHE_MISS.inc(); - let (_, reader) = inner.read(path, args).await?; - let result = self.try_write_cache::(reader, read_key).await; + let buf = self.inner_reader.read_at(offset, limit).await?; + let result = self.try_write_cache(buf, offset, limit).await; match result { Ok(read_bytes) => { @@ -279,6 +231,54 @@ impl ReadCache { } } } + + async fn try_write_cache(&self, buf: Buffer, offset: u64, limit: usize) -> Result { + let size = buf.len(); + let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _))); + self.file_cache.write(&read_key, buf).await?; + Ok(size) + } +} + +impl Read for ReadCacheReader { + async fn read_at(&self, offset: u64, limit: usize) -> Result { + let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _))); + + let read_result = self + .mem_cache + .try_get_with(read_key.clone(), self.read_remote(offset, limit)) + .await + .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; + + match read_result { + ReadResult::Success(_) => { + // There is a concurrent issue here, the local cache may be purged + // while reading, we have to fallback to remote read + match self.file_cache.read(&read_key).await { + Ok(ret) => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["success"]) + .inc(); + Ok(ret) + } + Err(_) => { + OBJECT_STORE_LRU_CACHE_MISS.inc(); + self.inner_reader.read_at(offset, limit).await + } + } + } + ReadResult::NotFound => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["not_found"]) + .inc(); + + Err(OpendalError::new( + ErrorKind::NotFound, + &format!("File not found: {}", self.path), + )) + } + } + } } fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index a26a9bda64ac..ae561d4bbcd6 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -14,8 +14,9 @@ pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; pub use opendal::{ - services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey, - Operator as ObjectStore, Reader, Result, Writer, + services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, + FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, + Result, Writer, }; pub mod layers; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index a7dc47182295..c3ed4931b43b 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -22,7 +22,6 @@ use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{ObjectStore, ObjectStoreBuilder}; -use opendal::raw::Access; use opendal::services::{Azblob, Gcs, Oss}; use opendal::{EntryMode, Operator, OperatorBuilder}; @@ -236,11 +235,9 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { let _ = builder .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&cache_dir.path().to_string_lossy()); - let file_cache = Arc::new(builder.build().unwrap()); + let file_cache = Operator::new(builder).unwrap().finish(); - LruCacheLayer::new(Arc::new(file_cache.clone()), 32) - .await - .unwrap() + LruCacheLayer::new(file_cache, 32).await.unwrap() }; let store = store.layer(cache_layer.clone()); @@ -253,7 +250,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { Ok(()) } -async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { +async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { assert!(cache_layer.contains_file(file_name).await); } @@ -309,9 +306,7 @@ async fn test_object_store_cache_policy() -> Result<()> { let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); // create operator for cache dir to verify cache file - let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38) - .await - .unwrap(); + let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap(); let store = store.layer(cache_layer.clone()); // create several object handler. @@ -439,7 +434,7 @@ async fn test_object_store_cache_policy() -> Result<()> { drop(cache_layer); // Test recover - let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap(); + let cache_layer = LruCacheLayer::new(cache_store, 38).await.unwrap(); // The p2 `NotFound` cache will not be recovered assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index f99ea1c63b5e..ef545c72e79f 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -57,6 +57,7 @@ substrait.workspace = true table.workspace = true tokio.workspace = true tonic.workspace = true +tokio-util.workspace = true [dev-dependencies] common-test-util.workspace = true diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 52880d700e29..5b0453634db8 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -46,6 +46,7 @@ use session::context::QueryContextRef; use snafu::ResultExt; use table::requests::{CopyTableRequest, InsertRequest}; use table::table_reference::TableReference; +use tokio_util::compat::FuturesAsyncReadCompatExt; use crate::error::{self, IntoVectorsSnafu, Result}; use crate::statement::StatementExecutor; @@ -146,10 +147,16 @@ impl StatementExecutor { path, }), Format::Parquet(_) => { + let meta = object_store + .stat(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; let mut reader = object_store .reader(&path) .await - .context(error::ReadObjectSnafu { path: &path })?; + .context(error::ReadObjectSnafu { path: &path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) .await .context(error::ReadParquetMetadataSnafu)?; @@ -161,10 +168,16 @@ impl StatementExecutor { }) } Format::Orc(_) => { + let meta = object_store + .stat(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; let reader = object_store .reader(&path) .await - .context(error::ReadObjectSnafu { path: &path })?; + .context(error::ReadObjectSnafu { path: &path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let schema = infer_orc_schema(reader) .await @@ -279,11 +292,17 @@ impl StatementExecutor { ))) } FileMetadata::Parquet { metadata, path, .. } => { + let meta = object_store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = object_store .reader_with(path) - .buffer(DEFAULT_READ_BUFFER) + .chunk(DEFAULT_READ_BUFFER) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone()); let stream = builder @@ -302,11 +321,17 @@ impl StatementExecutor { ))) } FileMetadata::Orc { path, .. } => { + let meta = object_store + .stat(path) + .await + .context(error::ReadObjectSnafu { path })?; let reader = object_store .reader_with(path) - .buffer(DEFAULT_READ_BUFFER) + .chunk(DEFAULT_READ_BUFFER) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path })? + .into_futures_async_read(0..meta.content_length()) + .compat(); let stream = new_orc_stream_reader(reader) .await .context(error::ReadOrcSnafu)?;