Skip to content

Commit

Permalink
Support Semi-structured Variant data type
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Mar 16, 2022
1 parent 76251a1 commit 4e86354
Show file tree
Hide file tree
Showing 43 changed files with 1,361 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions common/datablocks/src/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl DataBlock {
.iter()
.zip(schema.fields().iter())
.map(|(col, f)| match f.is_nullable() {
true => col.into_nullable_column(),
false => col.into_column(),
true => col.into_nullable_column_with_field(&f.to_arrow()),
false => col.into_column_with_field(&f.to_arrow()),
})
.collect();

Expand Down
62 changes: 47 additions & 15 deletions common/datavalues/src/columns/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::bitmap::MutableBitmap;
use common_arrow::arrow::datatypes::Field as ArrowField;
use common_exception::ErrorCode;
use common_exception::Result;

Expand Down Expand Up @@ -150,14 +151,58 @@ pub trait Column: Send + Sync {
pub trait IntoColumn {
fn into_column(self) -> ColumnRef;
fn into_nullable_column(self) -> ColumnRef;

fn into_column_with_field(self, f: &ArrowField) -> ColumnRef;
fn into_nullable_column_with_field(self, f: &ArrowField) -> ColumnRef;

fn build_column_by_data_type(self, data_type: DataTypePtr) -> ColumnRef;
}

impl<A> IntoColumn for A
where A: AsRef<dyn Array>
{
fn into_column(self) -> ColumnRef {
use TypeID::*;
let data_type: DataTypePtr = from_arrow_type(self.as_ref().data_type());
self.as_ref().build_column_by_data_type(data_type)
}

fn into_nullable_column(self) -> ColumnRef {
let size = self.as_ref().len();
let validity = self.as_ref().validity().cloned();
let column = self.as_ref().into_column();
Arc::new(NullableColumn::new(
column,
validity.unwrap_or_else(|| {
let mut bm = MutableBitmap::with_capacity(size);
bm.extend_constant(size, true);
Bitmap::from(bm)
}),
))
}

fn into_column_with_field(self, f: &ArrowField) -> ColumnRef {
match from_arrow_field_meta(f) {
Some(data_type) => self.as_ref().build_column_by_data_type(data_type),
None => self.as_ref().into_column(),
}
}

fn into_nullable_column_with_field(self, f: &ArrowField) -> ColumnRef {
let size = self.as_ref().len();
let validity = self.as_ref().validity().cloned();
let column = self.as_ref().into_column_with_field(f);
Arc::new(NullableColumn::new(
column,
validity.unwrap_or_else(|| {
let mut bm = MutableBitmap::with_capacity(size);
bm.extend_constant(size, true);
Bitmap::from(bm)
}),
))
}

fn build_column_by_data_type(self, data_type: DataTypePtr) -> ColumnRef {
use TypeID::*;
match data_type.data_type_id() {
// arrow type has no nullable type
Nullable => unimplemented!(),
Expand All @@ -176,22 +221,9 @@ where A: AsRef<dyn Array>
Array => Arc::new(ArrayColumn::from_arrow_array(self.as_ref())),
Struct => Arc::new(StructColumn::from_arrow_array(self.as_ref())),
String => Arc::new(StringColumn::from_arrow_array(self.as_ref())),
Variant => Arc::new(JsonColumn::from_arrow_array(self.as_ref())),
}
}

fn into_nullable_column(self) -> ColumnRef {
let size = self.as_ref().len();
let validity = self.as_ref().validity().cloned();
let column = self.as_ref().into_column();
Arc::new(NullableColumn::new(
column,
validity.unwrap_or_else(|| {
let mut bm = MutableBitmap::with_capacity(size);
bm.extend_constant(size, true);
Bitmap::from(bm)
}),
))
}
}

pub fn display_helper<T: std::fmt::Display, I: IntoIterator<Item = T>>(iter: I) -> Vec<String> {
Expand Down
5 changes: 5 additions & 0 deletions common/datavalues/src/columns/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ pub fn equal(lhs: &dyn Column, rhs: &dyn Column) -> bool {

lhs.values() == rhs.values()
}
Variant => {
let lhs: &JsonColumn = lhs.as_any().downcast_ref().unwrap();
let rhs: &JsonColumn = rhs.as_any().downcast_ref().unwrap();

lhs.values() == rhs.values()
}
other => with_match_physical_primitive_type_error!(other, |$T| {
let lhs: &PrimitiveColumn<$T> = lhs.as_any().downcast_ref().unwrap();
let rhs: &PrimitiveColumn<$T> = rhs.as_any().downcast_ref().unwrap();
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/columns/group_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,6 @@ impl GroupHash for StringColumn {
Ok(())
}
}

// TODO(b41sh): implement GroupHash for JsonColumn
impl GroupHash for JsonColumn {}
2 changes: 2 additions & 0 deletions common/datavalues/src/columns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod group_hash;
mod mutable;
mod null;
mod nullable;
mod object;
mod primitive;
pub mod series;
mod string;
Expand All @@ -42,6 +43,7 @@ pub use group_hash::GroupHash;
pub use mutable::*;
pub use null::*;
pub use nullable::*;
pub use object::*;
pub use primitive::*;
pub use series::*;
pub use string::*;
Expand Down
68 changes: 68 additions & 0 deletions common/datavalues/src/columns/object/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::iter::TrustedLen;

use crate::prelude::*;

#[derive(Debug, Clone)]
pub struct ObjectValueIter<'a, T: ObjectType> {
column: &'a ObjectColumn<T>,
index: usize,
}

impl<'a, T: ObjectType> ObjectValueIter<'a, T> {
/// Creates a new [`ObjectValueIter`]
pub fn new(column: &'a ObjectColumn<T>) -> Self {
Self { column, index: 0 }
}
}

impl<'a, T> Iterator for ObjectValueIter<'a, T>
where T: Scalar + ObjectType
{
type Item = T::RefType<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let index = self.index;
if self.index >= self.column.len() {
return None;
} else {
self.index += 1;
}
self.column.values.get(index).map(|c| c.as_scalar_ref())
}

fn size_hint(&self) -> (usize, Option<usize>) {
(
self.column.len() - self.index,
Some(self.column.len() - self.index),
)
}
}

impl<'a, T: ObjectType> ExactSizeIterator for ObjectValueIter<'a, T> {
fn len(&self) -> usize {
self.column.len() - self.index
}
}

unsafe impl<T: ObjectType> TrustedLen for ObjectValueIter<'_, T> {}

impl<'a, T: ObjectType> ObjectColumn<T> {
pub fn iter(&'a self) -> ObjectValueIter<'a, T> {
ObjectValueIter::new(self)
}
}
Loading

0 comments on commit 4e86354

Please sign in to comment.