Skip to content

Commit

Permalink
feat(data type): Support Semi-structured array, object data type
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Mar 25, 2022
1 parent 00e90e0 commit 724a5b9
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 8 deletions.
2 changes: 2 additions & 0 deletions common/datavalues/src/columns/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ where A: AsRef<dyn Array>
Struct => Arc::new(StructColumn::from_arrow_array(self.as_ref())),
String => Arc::new(StringColumn::from_arrow_array(self.as_ref())),
Variant => Arc::new(JsonColumn::from_arrow_array(self.as_ref())),
VariantArray => Arc::new(JsonColumn::from_arrow_array(self.as_ref())),
VariantObject => Arc::new(JsonColumn::from_arrow_array(self.as_ref())),
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ pub fn from_arrow_type(dt: &ArrowType) -> DataTypePtr {
}
ArrowType::Extension(custom_name, _, _) => match custom_name.as_str() {
"Variant" => Arc::new(VariantType::default()),
"VariantArray" => Arc::new(VariantArrayType::default()),
"VariantObject" => Arc::new(VariantObjectType::default()),
_ => unimplemented!("data_type: {:?}", dt),
},

Expand Down Expand Up @@ -166,6 +168,8 @@ pub fn from_arrow_field(f: &ArrowField) -> DataTypePtr {
},
"Interval" => return IntervalType::arc(metadata.unwrap().into()),
"Variant" => return VariantType::arc(),
"VariantArray" => return VariantArrayType::arc(),
"VariantObject" => return VariantObjectType::arc(),
_ => {}
}
}
Expand Down
3 changes: 2 additions & 1 deletion common/datavalues/src/types/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub fn equal(lhs: &dyn DataType, rhs: &dyn DataType) -> bool {
use crate::prelude::TypeID::*;
match lhs.data_type_id() {
Boolean | UInt8 | UInt16 | UInt32 | UInt64 | Int8 | Int16 | Int32 | Int64 | Float32
| Float64 | String | Date16 | Date32 | Interval | DateTime32 | Null | Variant => true,
| Float64 | String | Date16 | Date32 | Interval | DateTime32 | Null | Variant
| VariantArray | VariantObject => true,

DateTime64 => {
let lhs: &DateTime64Type = lhs.as_any().downcast_ref().unwrap();
Expand Down
4 changes: 4 additions & 0 deletions common/datavalues/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub mod type_string;
pub mod type_struct;
pub mod type_traits;
pub mod type_variant;
pub mod type_variant_array;
pub mod type_variant_object;

pub mod eq;
pub mod type_id;
Expand Down Expand Up @@ -61,3 +63,5 @@ pub use type_string::*;
pub use type_struct::*;
pub use type_traits::*;
pub use type_variant::*;
pub use type_variant_array::*;
pub use type_variant_object::*;
2 changes: 2 additions & 0 deletions common/datavalues/src/types/type_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ static TYPE_FACTORY: Lazy<Arc<TypeFactory>> = Lazy::new(|| {
type_factory.register(DateTime32Type::arc(None));
type_factory.register(DateTime64Type::arc(3, None));
type_factory.register(VariantType::arc());
type_factory.register(VariantArrayType::arc());
type_factory.register(VariantObjectType::arc());

type_factory.add_array_wrapper();
type_factory.add_nullable_wrapper();
Expand Down
4 changes: 3 additions & 1 deletion common/datavalues/src/types/type_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub enum TypeID {
/// Variant is a tagged universal type, which can store values of any other type,
/// including Object and Array, up to a maximum size of 16 MB.
Variant,
VariantArray,
VariantObject,
}

impl TypeID {
Expand Down Expand Up @@ -216,7 +218,7 @@ impl TypeID {
String => PhysicalTypeID::String,
Array => PhysicalTypeID::Array,
Struct => PhysicalTypeID::Struct,
Variant => PhysicalTypeID::Variant,
Variant | VariantArray | VariantObject => PhysicalTypeID::Variant,
}
}
}
Expand Down
101 changes: 101 additions & 0 deletions common/datavalues/src/types/type_variant_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_exception::Result;
use serde_json::Value as JsonValue;

use super::data_type::DataType;
use super::data_type::ARROW_EXTENSION_NAME;
use super::type_id::TypeID;
use crate::prelude::*;

#[derive(Default, Clone, serde::Deserialize, serde::Serialize)]
pub struct VariantArrayType {}

impl VariantArrayType {
pub fn arc() -> DataTypePtr {
Arc::new(Self {})
}
}

#[typetag::serde]
impl DataType for VariantArrayType {
fn data_type_id(&self) -> TypeID {
TypeID::VariantArray
}

#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"Array"
}

fn default_value(&self) -> DataValue {
DataValue::String("Null".as_bytes().to_vec())
}

fn create_constant_column(&self, data: &DataValue, size: usize) -> Result<ColumnRef> {
let value: JsonValue = DFTryFrom::try_from(data)?;
let column = Series::from_data(vec![value]);
Ok(Arc::new(ConstColumn::new(column, size)))
}

fn create_column(&self, data: &[DataValue]) -> Result<ColumnRef> {
let values: Vec<JsonValue> = data
.iter()
.map(DFTryFrom::try_from)
.collect::<Result<Vec<_>>>()?;

Ok(Series::from_data(values))
}

fn arrow_type(&self) -> ArrowType {
ArrowType::Extension(
"VariantArray".to_owned(),
Box::new(ArrowType::LargeBinary),
None,
)
}

fn custom_arrow_meta(&self) -> Option<BTreeMap<String, String>> {
let mut mp = BTreeMap::new();
mp.insert(ARROW_EXTENSION_NAME.to_string(), "VariantArray".to_string());
Some(mp)
}

fn create_serializer(&self) -> Box<dyn TypeSerializer> {
Box::new(VariantSerializer {})
}

fn create_deserializer(&self, capacity: usize) -> Box<dyn TypeDeserializer> {
Box::new(VariantDeserializer::with_capacity(capacity))
}

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn> {
Box::new(MutableObjectColumn::<JsonValue>::with_capacity(capacity))
}
}

impl std::fmt::Debug for VariantArrayType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
104 changes: 104 additions & 0 deletions common/datavalues/src/types/type_variant_object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_exception::Result;
use serde_json::Value as JsonValue;

use super::data_type::DataType;
use super::data_type::ARROW_EXTENSION_NAME;
use super::type_id::TypeID;
use crate::prelude::*;

#[derive(Default, Clone, serde::Deserialize, serde::Serialize)]
pub struct VariantObjectType {}

impl VariantObjectType {
pub fn arc() -> DataTypePtr {
Arc::new(Self {})
}
}

#[typetag::serde]
impl DataType for VariantObjectType {
fn data_type_id(&self) -> TypeID {
TypeID::VariantObject
}

#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"Object"
}

fn default_value(&self) -> DataValue {
DataValue::String("Null".as_bytes().to_vec())
}

fn create_constant_column(&self, data: &DataValue, size: usize) -> Result<ColumnRef> {
let value: JsonValue = DFTryFrom::try_from(data)?;
let column = Series::from_data(vec![value]);
Ok(Arc::new(ConstColumn::new(column, size)))
}

fn create_column(&self, data: &[DataValue]) -> Result<ColumnRef> {
let values: Vec<JsonValue> = data
.iter()
.map(DFTryFrom::try_from)
.collect::<Result<Vec<_>>>()?;

Ok(Series::from_data(values))
}

fn arrow_type(&self) -> ArrowType {
ArrowType::Extension(
"VariantObject".to_owned(),
Box::new(ArrowType::LargeBinary),
None,
)
}

fn custom_arrow_meta(&self) -> Option<BTreeMap<String, String>> {
let mut mp = BTreeMap::new();
mp.insert(
ARROW_EXTENSION_NAME.to_string(),
"VariantObject".to_string(),
);
Some(mp)
}

fn create_serializer(&self) -> Box<dyn TypeSerializer> {
Box::new(VariantSerializer {})
}

fn create_deserializer(&self, capacity: usize) -> Box<dyn TypeDeserializer> {
Box::new(VariantDeserializer::with_capacity(capacity))
}

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn> {
Box::new(MutableObjectColumn::<JsonValue>::with_capacity(capacity))
}
}

impl std::fmt::Debug for VariantObjectType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
14 changes: 14 additions & 0 deletions common/datavalues/tests/it/types/create_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ fn test_create_constant() -> Result<()> {
size: 2,
column_expected: Series::from_data(vec![json!("hello"), json!("hello")]),
},
Test {
name: "variant_array",
data_type: VariantArrayType::arc(),
value: DataValue::Json(json!([1, 2, 3])),
size: 2,
column_expected: Series::from_data(vec![json!([1, 2, 3]), json!([1, 2, 3])]),
},
Test {
name: "variant_object",
data_type: VariantObjectType::arc(),
value: DataValue::Json(json!({"a":1,"b":2})),
size: 2,
column_expected: Series::from_data(vec![json!({"a":1,"b":2}), json!({"a":1,"b":2})]),
},
];

for test in tests {
Expand Down
30 changes: 28 additions & 2 deletions common/functions/src/scalars/expressions/cast_with_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,31 @@ pub fn cast_with_type(
pub fn cast_to_variant(
column: &ColumnRef,
from_type: &DataTypePtr,
data_type: &DataTypePtr,
) -> Result<(ColumnRef, Option<Bitmap>)> {
if from_type.data_type_id() == TypeID::Variant {
if data_type.data_type_id() == TypeID::VariantArray {
let col: &JsonColumn = Series::check_get(column)?;
for v in col.iter() {
if !v.is_array() {
return Err(ErrorCode::BadDataValueType(format!(
"Failed to cast variant value {} to ARRAY",
v
)));
}
}
}
if data_type.data_type_id() == TypeID::VariantObject {
let col: &JsonColumn = Series::check_get(column)?;
for v in col.iter() {
if !v.is_object() {
return Err(ErrorCode::BadDataValueType(format!(
"Failed to cast variant value {} to OBJECT",
v
)));
}
}
}
return Ok((column.clone(), None));
}

Expand Down Expand Up @@ -225,8 +248,11 @@ pub fn arrow_cast_compute(
data_type: &DataTypePtr,
cast_options: &CastOptions,
) -> Result<(ColumnRef, Option<Bitmap>)> {
if data_type.data_type_id() == TypeID::Variant {
return cast_to_variant(column, from_type);
if data_type.data_type_id() == TypeID::Variant
|| data_type.data_type_id() == TypeID::VariantArray
|| data_type.data_type_id() == TypeID::VariantObject
{
return cast_to_variant(column, from_type, data_type);
}

let arrow_array = column.as_arrow_array();
Expand Down
10 changes: 8 additions & 2 deletions common/functions/src/scalars/semi_structureds/parse_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ impl<const SUPPRESS_PARSE_ERROR: bool> Function for ParseJsonFunctionImpl<SUPPRE
if column.data_type_id() == TypeID::Null {
return NullType::arc().create_constant_column(&DataValue::Null, input_rows);
}
if column.data_type_id() == TypeID::Variant {
if column.data_type_id() == TypeID::Variant
|| column.data_type_id() == TypeID::VariantArray
|| column.data_type_id() == TypeID::VariantObject
{
return Ok(column.arc());
}

Expand Down Expand Up @@ -133,7 +136,10 @@ impl<const SUPPRESS_PARSE_ERROR: bool> Function for ParseJsonFunctionImpl<SUPPRE
if column.data_type_id() == TypeID::Null {
return NullType::arc().create_constant_column(&DataValue::Null, input_rows);
}
if column.data_type_id() == TypeID::Variant {
if column.data_type_id() == TypeID::Variant
|| column.data_type_id() == TypeID::VariantArray
|| column.data_type_id() == TypeID::VariantObject
{
return Ok(column.arc());
}

Expand Down
Loading

0 comments on commit 724a5b9

Please sign in to comment.