Skip to content

Commit

Permalink
reuse FSDataInputStream for same input file (#592)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Sep 26, 2024
1 parent fa36ce8 commit 324ce05
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 37 deletions.
15 changes: 9 additions & 6 deletions native-engine/datafusion-ext-commons/src/hadoop_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use blaze_jni_bridge::{
jni_call, jni_call_static, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_object,
jni_new_string,
Expand All @@ -21,6 +23,7 @@ use jni::objects::{GlobalRef, JObject};

use crate::df_execution_err;

#[derive(Clone)]
pub struct Fs {
fs: GlobalRef,
io_time: Time,
Expand Down Expand Up @@ -48,7 +51,7 @@ impl Fs {
Ok(())
}

pub fn open(&self, path: &str) -> Result<FsDataInputStream> {
pub fn open(&self, path: &str) -> Result<Arc<FsDataInputStream>> {
let _timer = self.io_time.timer();
let path_str = jni_new_string!(path)?;
let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?;
Expand All @@ -57,13 +60,13 @@ impl Fs {
HadoopFileSystem(self.fs.as_obj()).open(path.as_obj()) -> JObject
)?;

Ok(FsDataInputStream {
Ok(Arc::new(FsDataInputStream {
stream: jni_new_global_ref!(fin.as_obj())?,
io_time: self.io_time.clone(),
})
}))
}

pub fn create(&self, path: &str) -> Result<FsDataOutputStream> {
pub fn create(&self, path: &str) -> Result<Arc<FsDataOutputStream>> {
let _timer = self.io_time.timer();
let path_str = jni_new_string!(path)?;
let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?;
Expand All @@ -72,10 +75,10 @@ impl Fs {
HadoopFileSystem(self.fs.as_obj()).create(path.as_obj()) -> JObject
)?;

Ok(FsDataOutputStream {
Ok(Arc::new(FsDataOutputStream {
stream: jni_new_global_ref!(fin.as_obj())?,
io_time: self.io_time.clone(),
})
}))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,49 @@ use std::{ops::Range, sync::Arc};

use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use bytes::Bytes;
use datafusion::common::DataFusionError;
use datafusion::common::Result;
use datafusion_ext_commons::{
df_execution_err,
hadoop_fs::{FsDataInputStream, FsProvider},
hadoop_fs::{Fs, FsDataInputStream, FsProvider},
};
use object_store::ObjectMeta;
use once_cell::sync::OnceCell;

#[derive(Clone)]
pub struct InternalFileReader {
fs_provider: Arc<FsProvider>,
fs: Fs,
meta: ObjectMeta,
path: String,
input: OnceCell<Arc<FsDataInputStream>>,
}

impl InternalFileReader {
pub fn new(fs_provider: Arc<FsProvider>, meta: ObjectMeta) -> Self {
Self {
fs_provider,
pub fn try_new(fs_provider: Arc<FsProvider>, meta: ObjectMeta) -> Result<Self> {
let path = BASE64_URL_SAFE_NO_PAD
.decode(meta.location.filename().expect("missing filename"))
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.or_else(|_| {
let filename = meta.location.filename();
df_execution_err!("cannot decode filename: {filename:?}")
})?;
let fs = fs_provider.provide(&path)?;

Ok(Self {
fs,
meta,
path,
input: OnceCell::new(),
}
})
}

fn get_input(&self) -> datafusion::common::Result<Arc<FsDataInputStream>> {
fn get_input(&self) -> Result<Arc<FsDataInputStream>> {
let input = self
.input
.get_or_try_init(|| {
let path = BASE64_URL_SAFE_NO_PAD
.decode(self.meta.location.filename().expect("missing filename"))
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.or_else(|_| {
let filename = self.meta.location.filename();
df_execution_err!("cannot decode filename: {filename:?}")
})?;
let fs = self.fs_provider.provide(&path)?;
Ok(Arc::new(fs.open(&path)?))
})
.map_err(|e| DataFusionError::External(e))?;
.get_or_try_init(|| self.fs.open(&self.path))
.or_else(|e| df_execution_err!("cannot get FSDataInputStream: ${e:?}"))?;
Ok(input.clone())
}

pub fn read_fully(&self, range: Range<usize>) -> datafusion::common::Result<Bytes> {
pub fn read_fully(&self, range: Range<usize>) -> Result<Bytes> {
let mut bytes = vec![0u8; range.len()];
self.get_input()?
.read_fully(range.start as u64, &mut bytes)?;
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-plans/src/orc_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ struct OrcOpener {

impl FileOpener for OrcOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let reader = OrcFileReaderRef(Arc::new(InternalFileReader::new(
let reader = OrcFileReaderRef(Arc::new(InternalFileReader::try_new(
self.fs_provider.clone(),
file_meta.object_meta.clone(),
)));
)?));
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
Expand Down
13 changes: 7 additions & 6 deletions native-engine/datafusion-ext-plans/src/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,12 @@ impl ParquetFileReaderFactory for FsReaderFactory {
_metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
let internal_reader = Arc::new(InternalFileReader::try_new(
self.fs_provider.clone(),
file_meta.object_meta.clone(),
)?);
let reader = ParquetFileReaderRef(Arc::new(ParquetFileReader {
internal_reader: InternalFileReader::new(
self.fs_provider.clone(),
file_meta.object_meta.clone(),
),
internal_reader,
metrics: ParquetFileMetrics::new(
partition_index,
file_meta
Expand All @@ -385,7 +386,7 @@ impl ParquetFileReaderFactory for FsReaderFactory {
}

struct ParquetFileReader {
internal_reader: InternalFileReader,
internal_reader: Arc<InternalFileReader>,
metrics: ParquetFileMetrics,
}

Expand All @@ -397,7 +398,7 @@ impl ParquetFileReader {
self.internal_reader.get_meta()
}

fn get_internal_reader(&self) -> InternalFileReader {
fn get_internal_reader(&self) -> Arc<InternalFileReader> {
self.internal_reader.clone()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ impl PartWriter {
let fs = parquet_sink_context.fs_provider.provide(&part_file)?;
let bytes_written = Count::new();
let rows_written = Count::new();
let fout = fs.create(&part_file)?;
let fout = Arc::into_inner(fs.create(&part_file)?).expect("Arc::into_inner");
let data_writer = FSDataWriter::new(fout, &bytes_written);
let parquet_writer = ArrowWriter::try_new(
data_writer,
Expand Down

0 comments on commit 324ce05

Please sign in to comment.