Skip to content

Commit

Permalink
Buffer Pages in ArrowWriter instead of RecordBatch (apache#3871)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 25, 2023
1 parent 56437cc commit 780b094
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 402 deletions.
57 changes: 5 additions & 52 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::arrow_writer::levels::LevelInfo;
use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
use crate::column::page::PageWriter;
use crate::column::writer::encoder::{
ColumnValueEncoder, DataPageValues, DictionaryPage,
};
use crate::column::writer::GenericColumnWriter;
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr, WriterVersion};
use crate::file::writer::OnCloseColumnChunk;
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow_array::{
Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, LargeBinaryArray,
Array, ArrayAccessor, BinaryArray, DictionaryArray, LargeBinaryArray,
LargeStringArray, StringArray,
};
use arrow_schema::DataType;
Expand Down Expand Up @@ -94,49 +90,6 @@ macro_rules! downcast_op {
};
}

/// A writer for byte array types
pub(super) struct ByteArrayWriter<'a> {
writer: GenericColumnWriter<'a, ByteArrayEncoder>,
on_close: Option<OnCloseColumnChunk<'a>>,
}

impl<'a> ByteArrayWriter<'a> {
/// Returns a new [`ByteArrayWriter`]
pub fn new(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Result<Self> {
Ok(Self {
writer: GenericColumnWriter::new(descr, props, page_writer),
on_close: Some(on_close),
})
}

pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
self.writer.write_batch_internal(
array,
Some(levels.non_null_indices()),
levels.def_levels(),
levels.rep_levels(),
None,
None,
None,
)?;
Ok(())
}

pub fn close(self) -> Result<()> {
let r = self.writer.close()?;

if let Some(on_close) = self.on_close {
on_close(r)?;
}
Ok(())
}
}

/// A fallback encoder, i.e. non-dictionary, for [`ByteArray`]
struct FallbackEncoder {
encoder: FallbackEncoderImpl,
Expand Down Expand Up @@ -427,7 +380,7 @@ impl DictEncoder {
}
}

struct ByteArrayEncoder {
pub struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
min_value: Option<ByteArray>,
Expand All @@ -437,11 +390,11 @@ struct ByteArrayEncoder {

impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = ArrayRef;
type Values = dyn Array;

fn min_max(
&self,
values: &ArrayRef,
values: &dyn Array,
value_indices: Option<&[usize]>,
) -> Option<(Self::T, Self::T)> {
match value_indices {
Expand Down
Loading

0 comments on commit 780b094

Please sign in to comment.