diff --git a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs index 8fc4c794..239e8bef 100644 --- a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs +++ b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use blaze_jni_bridge::{ jni_call, jni_call_static, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_object, jni_new_string, @@ -21,6 +23,7 @@ use jni::objects::{GlobalRef, JObject}; use crate::df_execution_err; +#[derive(Clone)] pub struct Fs { fs: GlobalRef, io_time: Time, @@ -48,7 +51,7 @@ impl Fs { Ok(()) } - pub fn open(&self, path: &str) -> Result { + pub fn open(&self, path: &str) -> Result> { let _timer = self.io_time.timer(); let path_str = jni_new_string!(path)?; let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?; @@ -57,13 +60,13 @@ impl Fs { HadoopFileSystem(self.fs.as_obj()).open(path.as_obj()) -> JObject )?; - Ok(FsDataInputStream { + Ok(Arc::new(FsDataInputStream { stream: jni_new_global_ref!(fin.as_obj())?, io_time: self.io_time.clone(), - }) + })) } - pub fn create(&self, path: &str) -> Result { + pub fn create(&self, path: &str) -> Result> { let _timer = self.io_time.timer(); let path_str = jni_new_string!(path)?; let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?; @@ -72,10 +75,10 @@ impl Fs { HadoopFileSystem(self.fs.as_obj()).create(path.as_obj()) -> JObject )?; - Ok(FsDataOutputStream { + Ok(Arc::new(FsDataOutputStream { stream: jni_new_global_ref!(fin.as_obj())?, io_time: self.io_time.clone(), - }) + })) } } diff --git a/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs b/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs index ec46b888..939f8383 100644 --- a/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs +++ b/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs @@ -19,49 +19,49 @@ use std::{ops::Range, sync::Arc}; use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine}; use bytes::Bytes; -use datafusion::common::DataFusionError; +use datafusion::common::Result; use datafusion_ext_commons::{ df_execution_err, - hadoop_fs::{FsDataInputStream, FsProvider}, + hadoop_fs::{Fs, FsDataInputStream, FsProvider}, }; use object_store::ObjectMeta; use once_cell::sync::OnceCell; -#[derive(Clone)] pub struct InternalFileReader { - fs_provider: Arc, + fs: Fs, meta: ObjectMeta, + path: String, input: OnceCell>, } impl InternalFileReader { - pub fn new(fs_provider: Arc, meta: ObjectMeta) -> Self { - Self { - fs_provider, + pub fn try_new(fs_provider: Arc, meta: ObjectMeta) -> Result { + let path = BASE64_URL_SAFE_NO_PAD + .decode(meta.location.filename().expect("missing filename")) + .map(|bytes| String::from_utf8_lossy(&bytes).to_string()) + .or_else(|_| { + let filename = meta.location.filename(); + df_execution_err!("cannot decode filename: {filename:?}") + })?; + let fs = fs_provider.provide(&path)?; + + Ok(Self { + fs, meta, + path, input: OnceCell::new(), - } + }) } - fn get_input(&self) -> datafusion::common::Result> { + fn get_input(&self) -> Result> { let input = self .input - .get_or_try_init(|| { - let path = BASE64_URL_SAFE_NO_PAD - .decode(self.meta.location.filename().expect("missing filename")) - .map(|bytes| String::from_utf8_lossy(&bytes).to_string()) - .or_else(|_| { - let filename = self.meta.location.filename(); - df_execution_err!("cannot decode filename: {filename:?}") - })?; - let fs = self.fs_provider.provide(&path)?; - Ok(Arc::new(fs.open(&path)?)) - }) - .map_err(|e| DataFusionError::External(e))?; + .get_or_try_init(|| self.fs.open(&self.path)) + .or_else(|e| df_execution_err!("cannot get FSDataInputStream: ${e:?}"))?; Ok(input.clone()) } - pub fn read_fully(&self, range: Range) -> datafusion::common::Result { + pub fn read_fully(&self, range: Range) -> Result { let mut bytes = vec![0u8; range.len()]; self.get_input()? .read_fully(range.start as u64, &mut bytes)?; diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs b/native-engine/datafusion-ext-plans/src/orc_exec.rs index 8423afd2..bb8b2086 100644 --- a/native-engine/datafusion-ext-plans/src/orc_exec.rs +++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs @@ -211,10 +211,10 @@ struct OrcOpener { impl FileOpener for OrcOpener { fn open(&self, file_meta: FileMeta) -> Result { - let reader = OrcFileReaderRef(Arc::new(InternalFileReader::new( + let reader = OrcFileReaderRef(Arc::new(InternalFileReader::try_new( self.fs_provider.clone(), file_meta.object_meta.clone(), - ))); + )?)); let batch_size = self.batch_size; let projection = self.projection.clone(); let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs index def5288c..52912186 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs @@ -365,11 +365,12 @@ impl ParquetFileReaderFactory for FsReaderFactory { _metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { + let internal_reader = Arc::new(InternalFileReader::try_new( + self.fs_provider.clone(), + file_meta.object_meta.clone(), + )?); let reader = ParquetFileReaderRef(Arc::new(ParquetFileReader { - internal_reader: InternalFileReader::new( - self.fs_provider.clone(), - file_meta.object_meta.clone(), - ), + internal_reader, metrics: ParquetFileMetrics::new( partition_index, file_meta @@ -385,7 +386,7 @@ impl ParquetFileReaderFactory for FsReaderFactory { } struct ParquetFileReader { - internal_reader: InternalFileReader, + internal_reader: Arc, metrics: ParquetFileMetrics, } @@ -397,7 +398,7 @@ impl ParquetFileReader { self.internal_reader.get_meta() } - fn get_internal_reader(&self) -> InternalFileReader { + fn get_internal_reader(&self) -> Arc { self.internal_reader.clone() } } diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs index 362f3c0e..00ef3f11 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs @@ -466,7 +466,7 @@ impl PartWriter { let fs = parquet_sink_context.fs_provider.provide(&part_file)?; let bytes_written = Count::new(); let rows_written = Count::new(); - let fout = fs.create(&part_file)?; + let fout = Arc::into_inner(fs.create(&part_file)?).expect("Arc::into_inner"); let data_writer = FSDataWriter::new(fout, &bytes_written); let parquet_writer = ArrowWriter::try_new( data_writer,