Skip to content

Commit

Permalink
refactor: refactor arrow conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 19, 2024
1 parent 97a3293 commit 14d50a9
Show file tree
Hide file tree
Showing 31 changed files with 303 additions and 292 deletions.
383 changes: 189 additions & 194 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-array-iceberg = { workspace = true }
arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
Expand Down
12 changes: 4 additions & 8 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@ normal = ["workspace-hack"]
ahash = "0.8"
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
arrow-52-array = { workspace = true }
arrow-52-buffer = { workspace = true }
arrow-52-cast = { workspace = true }
arrow-52-schema = { workspace = true }
arrow-array-deltalake = { workspace = true }
arrow-array-iceberg = { workspace = true }
arrow-buffer = { workspace = true }
arrow-buffer-deltalake = { workspace = true }
arrow-buffer-iceberg = { workspace = true }
arrow-cast = { workspace = true }
arrow-cast-deltalake = { workspace = true }
arrow-cast-iceberg = { workspace = true }
arrow-schema = { workspace = true }
arrow-schema-deltalake = { workspace = true }
arrow-schema-iceberg = { workspace = true }
async-trait = "0.1"
auto_enums = { workspace = true }
auto_impl = "1"
Expand Down
47 changes: 47 additions & 0 deletions src/common/src/array/arrow/arrow_52.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[path = "./arrow_impl.rs"]
mod arrow_impl;
use arrow_buffer::IntervalMonthDayNano as ArrowIntervalType;
pub use arrow_impl::{FromArrow, ToArrow};
use {
arrow_52_array as arrow_array, arrow_52_buffer as arrow_buffer, arrow_52_cast as arrow_cast,
arrow_52_schema as arrow_schema,
};

use crate::array::Interval;

impl super::ArrowIntervalTypeTrait for ArrowIntervalType {
fn to_interval(self) -> Interval {
// XXX: the arrow-rs decoding is incorrect
// let (months, days, ns) = arrow_array::types::IntervalMonthDayNanoType::to_parts(value);
Interval::from_month_day_usec(self.months, self.days, self.nanoseconds / 1000)
}

fn from_interval(value: Interval) -> Self {
// XXX: the arrow-rs encoding is incorrect
// arrow_array::types::IntervalMonthDayNanoType::make_value(
// self.months(),
// self.days(),
// // TODO: this may overflow and we need `try_into`
// self.usecs() * 1000,
// )
Self {
months: value.months(),
days: value.days(),
nanoseconds: value.usecs() * 1000,
}
}
}
2 changes: 1 addition & 1 deletion src/common/src/array/arrow/arrow_deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use arrow_array::ArrayRef;
use num_traits::abs;
use {
pub use {
arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};
Expand Down
47 changes: 8 additions & 39 deletions src/common/src/array/arrow/arrow_iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,16 @@ use std::collections::HashMap;
use std::ops::{Div, Mul};
use std::sync::Arc;

use arrow_array_iceberg::{self as arrow_array, ArrayRef};
use arrow_buffer_iceberg::IntervalMonthDayNano as ArrowIntervalType;
use arrow_array::ArrayRef;
use num_traits::abs;
use {
arrow_buffer_iceberg as arrow_buffer, arrow_cast_iceberg as arrow_cast,
arrow_schema_iceberg as arrow_schema,
pub use {
arrow_52_array as arrow_array, arrow_52_buffer as arrow_buffer, arrow_52_cast as arrow_cast,
arrow_52_schema as arrow_schema,
};

pub use super::arrow_52::{FromArrow, ToArrow};
use crate::array::{Array, ArrayError, ArrayImpl, DataChunk, DataType, DecimalArray};
use crate::types::{Interval, StructType};

impl ArrowIntervalTypeTrait for ArrowIntervalType {
fn to_interval(self) -> Interval {
// XXX: the arrow-rs decoding is incorrect
// let (months, days, ns) = arrow_array::types::IntervalMonthDayNanoType::to_parts(value);
Interval::from_month_day_usec(self.months, self.days, self.nanoseconds / 1000)
}

fn from_interval(value: Interval) -> Self {
// XXX: the arrow-rs encoding is incorrect
// arrow_array::types::IntervalMonthDayNanoType::make_value(
// self.months(),
// self.days(),
// // TODO: this may overflow and we need `try_into`
// self.usecs() * 1000,
// )
Self {
months: value.months(),
days: value.days(),
nanoseconds: value.usecs() * 1000,
}
}
}

#[path = "./arrow_impl.rs"]
mod arrow_impl;

use arrow_impl::{FromArrow, ToArrow};

use crate::array::arrow::ArrowIntervalTypeTrait;
use crate::types::StructType;

pub struct IcebergArrowConvert;

Expand Down Expand Up @@ -261,10 +231,9 @@ impl ToArrow for IcebergCreateTableArrowConvert {
mod test {
use std::sync::Arc;

use arrow_array_iceberg::{ArrayRef, Decimal128Array};
use arrow_schema_iceberg::DataType;

use super::arrow_array::{ArrayRef, Decimal128Array};
use super::arrow_impl::ToArrow;
use super::arrow_schema::DataType;
use super::IcebergArrowConvert;
use crate::array::{Decimal, DecimalArray};

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@

use std::fmt::Write;

use arrow_array::array;
use arrow_array::cast::AsArray;
use arrow_array_iceberg::array;
use arrow_buffer::OffsetBuffer;
use chrono::{DateTime, NaiveDateTime, NaiveTime};
use itertools::Itertools;
Expand Down
12 changes: 5 additions & 7 deletions src/common/src/array/arrow/arrow_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@

use std::sync::Arc;

pub use arrow_impl::{FromArrow, ToArrow};
use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};
type ArrowIntervalType = i128;
pub use {
arrow_52_array as arrow_array, arrow_52_buffer as arrow_buffer, arrow_52_cast as arrow_cast,
arrow_52_schema as arrow_schema,
};

pub use super::arrow_52::{FromArrow, ToArrow};
use crate::array::{ArrayError, ArrayImpl, DataType, DecimalArray, JsonbArray};

#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;

/// Arrow conversion for UDF.
#[derive(Default, Debug)]
pub struct UdfArrowConvert {
Expand Down
17 changes: 16 additions & 1 deletion src/common/src/array/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod arrow_52;
mod arrow_deltalake;
mod arrow_iceberg;
mod arrow_udf;

pub use arrow_deltalake::DeltaLakeConvert;
pub use arrow_iceberg::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
pub use arrow_udf::{FromArrow, ToArrow, UdfArrowConvert};

pub use reexport::*;
mod reexport {
pub use super::arrow_deltalake::{
arrow_array as arrow_array_deltalake, arrow_buffer as arrow_buffer_deltalake,
arrow_cast as arrow_cast_deltalake, arrow_schema as arrow_schema_deltalake,
};
pub use super::arrow_iceberg::{
arrow_array as arrow_array_iceberg, arrow_buffer as arrow_buffer_iceberg,
arrow_cast as arrow_cast_iceberg, arrow_schema as arrow_schema_iceberg,
};
pub use super::arrow_udf::{
arrow_array as arrow_array_udf, arrow_buffer as arrow_buffer_udf,
arrow_cast as arrow_cast_udf, arrow_schema as arrow_schema_udf,
};
}
use crate::types::Interval;

trait ArrowIntervalTypeTrait {
Expand Down
6 changes: 0 additions & 6 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
apache-avro = { workspace = true }
arrow-array = { workspace = true }
arrow-array-iceberg = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-schema-iceberg = { workspace = true }
arrow-select = { workspace = true }
assert_matches = "1"
async-compression = { version = "0.4.5", features = ["gzip", "tokio"] }
async-nats = "0.35"
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ def_anyhow_newtype! {
icelake::Error => "Iceberg error",
iceberg::Error => "IcebergV2 error",
redis::RedisError => "Redis error",
arrow_schema::ArrowError => "Arrow error",
arrow_schema_iceberg::ArrowError => "Arrow error",
// currently, the following two are the same type
// risingwave_common::array::arrow::arrow_schema_udf::ArrowError => "Arrow error",
risingwave_common::array::arrow::arrow_schema_iceberg::ArrowError => "Arrow error",
google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
rumqttc::tokio_rustls::rustls::Error => "TLS error",
rumqttc::v5::ClientError => "MQTT error",
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/parquet_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
use std::future::IntoFuture;
use std::sync::Arc;

use arrow_array_iceberg::RecordBatch;
use deltalake::parquet::arrow::async_reader::AsyncFileReader;
use futures_async_stream::try_stream;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert};
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
use risingwave_common::bail;
use risingwave_common::types::{Datum, ScalarImpl};
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::marker::PhantomData;
use std::sync::Arc;

use anyhow::anyhow;
use arrow_schema_iceberg::SchemaRef;
use async_trait::async_trait;
use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
use parquet::arrow::AsyncArrowWriter;
use parquet::file::properties::WriterProperties;
use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use std::ops::Deref;
use std::sync::Arc;

use anyhow::{anyhow, Context};
use arrow_schema_iceberg::{
DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef,
};
use async_trait::async_trait;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
Expand All @@ -46,6 +43,9 @@ use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use risingwave_common::array::arrow::arrow_schema_iceberg::{
self, DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef,
};
use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow_array_iceberg::RecordBatch;
use arrow_schema_iceberg::SchemaRef;
use icelake::io_v2::{
BaseFileWriter, BaseFileWriterBuilder, BaseFileWriterMetrics, CurrentFileStatus, FileWriter,
FileWriterBuilder,
};
use icelake::Result;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef;
use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow_schema_iceberg::SchemaRef;
use icelake::io_v2::{
FanoutPartitionedWriter, FanoutPartitionedWriterBuilder, FanoutPartitionedWriterMetrics,
IcebergWriter, IcebergWriterBuilder,
};
use icelake::Result;
use risingwave_common::array::arrow::arrow_array_iceberg;
use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef;
use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use icelake::io_v2::{
PositionDeleteMetrics, PositionDeleteWriter, PositionDeleteWriterBuilder,
};
use icelake::Result;
use risingwave_common::array::arrow::arrow_schema_iceberg;
use risingwave_common::metrics::LabelGuardedIntGauge;

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow_array_iceberg::RecordBatch;
use arrow_schema_iceberg::SchemaRef;
use async_trait::async_trait;
use icelake::io_v2::{IcebergWriter, IcebergWriterBuilder};
use icelake::Result;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::HashMap;

use anyhow::Context;
use arrow_array_iceberg::{Int32Array, Int64Array, RecordBatch};
use async_trait::async_trait;
use futures::StreamExt;
use futures_async_stream::try_stream;
Expand All @@ -27,6 +26,7 @@ use itertools::Itertools;
use pulsar::consumer::InitialPosition;
use pulsar::message::proto::MessageIdData;
use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor};
use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, Int64Array, RecordBatch};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ROWID_PREFIX;
Expand Down
2 changes: 0 additions & 2 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ normal = ["workspace-hack", "ctor"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = "0.1"
auto_impl = "1"
await-tree = { workspace = true }
Expand Down
10 changes: 7 additions & 3 deletions src/expr/core/src/aggregate/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::sync::Arc;

use anyhow::Context;
use arrow_array::ArrayRef;
use arrow_schema::{Field, Fields, Schema, SchemaRef};
use risingwave_common::array::arrow::arrow_array_udf::ArrayRef;
use risingwave_common::array::arrow::arrow_schema_udf::{Field, Fields, Schema, SchemaRef};
use risingwave_common::array::arrow::{FromArrow, ToArrow, UdfArrowConvert};
use risingwave_common::array::Op;
use risingwave_common::bitmap::Bitmap;
Expand Down Expand Up @@ -154,7 +154,11 @@ pub fn new_user_defined(

Ok(Box::new(UserDefinedAggregateFunction {
return_field: arrow_convert.to_arrow_field("", return_type)?,
state_field: Field::new("state", arrow_schema::DataType::Binary, true),
state_field: Field::new(
"state",
risingwave_common::array::arrow::arrow_schema_udf::DataType::Binary,
true,
),
return_type: return_type.clone(),
arg_schema,
runtime,
Expand Down
Loading

0 comments on commit 14d50a9

Please sign in to comment.