Skip to content

Commit

Permalink
DX-64328 Array types for Gandiva (#58)
Browse files Browse the repository at this point in the history
Add List input and output types for Gandiva functions. Add new reference implementations for array_contains and array_remove, tested via integration with Dremio. int32, int64, double and float list types have been tested.

Support List types in function specification and llvm code generation.
Pass back function type information through the expression registry.
See 1p here: https://docs.google.com/document/d/1exwXdUUnk5FqZLzVZyTdhqgwxTk0u9bL54aLVNM5Tas/edit
  • Loading branch information
lriggs authored and DenisTarasyuk committed Dec 19, 2023
1 parent 11fe7af commit 801e8a6
Show file tree
Hide file tree
Showing 42 changed files with 2,285 additions and 127 deletions.
21 changes: 19 additions & 2 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,27 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
return Reserve(sizeof(T) * new_nb_elements);
}

public:
uint8_t* offsetBuffer;
int64_t offsetCapacity;
uint8_t* validityBuffer;
uint8_t* outerValidityBuffer;

protected:
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {
offsetBuffer = nullptr;
offsetCapacity = 0;
validityBuffer = nullptr;
outerValidityBuffer = nullptr;

}
ResizableBuffer(uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm)
: MutableBuffer(data, size, std::move(mm)) {}
: MutableBuffer(data, size, std::move(mm)) {
offsetBuffer = nullptr;
offsetCapacity = 0;
validityBuffer = nullptr;
outerValidityBuffer = nullptr;
}
};

/// \defgroup buffer-allocation-functions Functions for allocating buffers
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/gandiva/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ set(SRC_FILES
expression_registry.cc
exported_funcs_registry.cc
filter.cc
array_ops.cc
function_ir_builder.cc
function_registry.cc
function_registry_arithmetic.cc
function_registry_array.cc
function_registry_datetime.cc
function_registry_hash.cc
function_registry_math_ops.cc
Expand Down Expand Up @@ -247,7 +249,8 @@ add_gandiva_test(internals-test
random_generator_holder_test.cc
hash_utils_test.cc
gdv_function_stubs_test.cc
interval_holder_test.cc)
interval_holder_test.cc
array_ops_test.cc)

add_subdirectory(precompiled)
add_subdirectory(tests)
86 changes: 78 additions & 8 deletions cpp/src/gandiva/annotator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,27 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
int data_idx = buffer_count_++;
int validity_idx = buffer_count_++;
int offsets_idx = FieldDescriptor::kInvalidIdx;
int child_offsets_idx = FieldDescriptor::kInvalidIdx;
if (arrow::is_binary_like(field->type()->id())) {
offsets_idx = buffer_count_++;
}

if (field->type()->id() == arrow::Type::LIST) {
offsets_idx = buffer_count_++;
if (arrow::is_binary_like(field->type()->field(0)->type()->id())) {
child_offsets_idx = buffer_count_++;
}
}
int data_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
if (is_output) {
data_buffer_ptr_idx = buffer_count_++;
}
int child_valid_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
if (field->type()->id() == arrow::Type::LIST) {
child_valid_buffer_ptr_idx = buffer_count_++;
}
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx,
data_buffer_ptr_idx);
data_buffer_ptr_idx, child_offsets_idx, child_valid_buffer_ptr_idx);
}

int Annotator::AddHolderPointer(void* holder) {
Expand All @@ -80,17 +92,76 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
if (desc.HasOffsetsIdx()) {
uint8_t* offsets_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.offsets_idx(), offsets_buf, array_data.offset);
++buffer_idx;

if (desc.HasChildOffsetsIdx()) {
if (is_output) {
// if list field is output field, we should put buffer pointer into eval batch
// for resizing
uint8_t* child_offsets_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0].get());
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);

} else {
// if list field is input field, just put buffer data into eval batch
uint8_t* child_offsets_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0]->data());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);
}
}
if (array_data.type->id() != arrow::Type::LIST ||
arrow::is_binary_like(array_data.type->field(0)->type()->id())) {
// primitive type list data buffer index is 1
// binary like type list data buffer index is 2
++buffer_idx;
}
}

int const childDataIndex = 0;
if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
} else {
uint8_t* data_buf =
const_cast<uint8_t*>(array_data.child_data.at(childDataIndex)->buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.child_data.at(0)->offset);

int const childDataBufferIndex = 0;
if (array_data.child_data.at(childDataIndex)->buffers[childDataBufferIndex] ) {
uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(childDataIndex)->buffers[childDataBufferIndex]->data());
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf, 0);
}

}

uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
if (is_output) {
// pass in the Buffer object for output data buffers. Can be used for resizing.
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);

if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);
} else {
// list data buffer is in child data buffer
uint8_t* data_buf_ptr = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr,
array_data.child_data.at(0)->offset);
}
}

}

EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
Expand All @@ -106,7 +177,6 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
// skip columns not involved in the expression.
continue;
}

PrepareBuffersForField(*(found->second), *(record_batch.column_data(i)),
eval_batch.get(), false /*is_output*/);
}
Expand Down
Loading

0 comments on commit 801e8a6

Please sign in to comment.