Skip to content

Commit

Permalink
PARQUET-947: Account for Arrow library consolidation in ARROW-795, AP…
Browse files Browse the repository at this point in the history
…I changes in ARROW-782

Author: Wes McKinney <[email protected]>

Closes apache#292 from wesm/PARQUET-947 and squashes the following commits:

2d68d5b [Wes McKinney] Fix typo
35feebc [Wes McKinney] Update to Arrow HEAD
7fa2b1b [Wes McKinney] Account for API changes in ARROW-782
8d6c50d [Wes McKinney] Update Arrow version
7b2016f [Wes McKinney] Remove arrow_io library after ARROW-795

Change-Id: I69ae35e9b0995684ca01f0fd6fea473ecca5420a
  • Loading branch information
wesm committed Apr 10, 2017
1 parent 0207dcf commit 06db161
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 33 deletions.
3 changes: 0 additions & 3 deletions cpp/src/parquet/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ add_library(parquet_arrow_objlib OBJECT
# Add dependencies so ExternalProjects are built beforehand
add_dependencies(parquet_arrow_objlib
arrow_static
arrow_io_static
parquet_static)

# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX)
Expand All @@ -47,7 +46,6 @@ if (PARQUET_BUILD_SHARED)
SOVERSION "${PARQUET_SO_VERSION}")
target_link_libraries(parquet_arrow_shared
arrow
arrow_io
parquet_shared)
if (PARQUET_RPATH_ORIGIN)
if (APPLE)
Expand Down Expand Up @@ -77,7 +75,6 @@ if (PARQUET_BUILD_STATIC)
OUTPUT_NAME "parquet_arrow")
target_link_libraries(parquet_arrow_static
arrow_static
arrow_io_static
parquet_static)
install(TARGETS parquet_arrow_static
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/parquet-arrow.pc.in
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ Description: Apache Parquet Apache arrow adapter provides Arrow IPC modules for
Version: @PARQUET_VERSION@
Libs: -L${libdir} -lparquet_arrow
Cflags: -I${includedir}
Requires: parquet arrow-io
Requires: parquet arrow
28 changes: 14 additions & 14 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -724,23 +724,23 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
std::vector<bool> nullable;
std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
nullable.push_back(current_field->nullable);
while (current_field->type->num_children() > 0) {
if (current_field->type->num_children() > 1) {
nullable.push_back(current_field->nullable());
while (current_field->type()->num_children() > 0) {
if (current_field->type()->num_children() > 1) {
return Status::NotImplemented(
"Fields with more than one child are not supported.");
} else {
if (current_field->type->type != ::arrow::Type::LIST) {
if (current_field->type()->id() != ::arrow::Type::LIST) {
return Status::NotImplemented(
"Currently only nesting with Lists is supported.");
}
current_field = current_field->type->child(0);
current_field = current_field->type()->child(0);
}
offset_builders.emplace_back(
std::make_shared<::arrow::Int32Builder>(pool_, ::arrow::int32()));
valid_bits_builders.emplace_back(
std::make_shared<::arrow::BooleanBuilder>(pool_, ::arrow::boolean()));
nullable.push_back(current_field->nullable);
nullable.push_back(current_field->nullable());
}

int64_t list_depth = offset_builders.size();
Expand Down Expand Up @@ -860,12 +860,12 @@ Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>
::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false));
}
*out = std::make_shared<ArrayType<ArrowType>>(
field_->type, valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
// Relase the ownership as the Buffer is now part of a new Array
valid_bits_buffer_.reset();
} else {
*out = std::make_shared<ArrayType<ArrowType>>(
field_->type, valid_bits_idx_, data_buffer_);
field_->type(), valid_bits_idx_, data_buffer_);
}
// Relase the ownership as the Buffer is now part of a new Array
data_buffer_.reset();
Expand Down Expand Up @@ -934,12 +934,12 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
valid_bits_buffer_ = valid_bits_buffer;
}
*out = std::make_shared<BooleanArray>(
field_->type, valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
// Relase the ownership
data_buffer_.reset();
valid_bits_buffer_.reset();
} else {
*out = std::make_shared<BooleanArray>(field_->type, valid_bits_idx_, data_buffer_);
*out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_);
data_buffer_.reset();
}

Expand Down Expand Up @@ -1028,7 +1028,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
return Status::OK();
}

switch (field_->type->type) {
switch (field_->type()->id()) {
TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
Expand All @@ -1045,8 +1045,8 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type.get());
switch (timestamp_type->unit) {
static_cast<::arrow::TimestampType*>(field_->type().get());
switch (timestamp_type->unit()) {
case ::arrow::TimeUnit::MILLI:
return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
break;
Expand All @@ -1060,7 +1060,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
}
default:
std::stringstream ss;
ss << "No support for reading columns of type " << field_->type->ToString();
ss << "No support for reading columns of type " << field_->type()->ToString();
return Status::NotImplemented(ss.str());
}
}
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition =
field->nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED;
int length = -1;

switch (field->type->type) {
switch (field->type()->id()) {
// TODO:
// case ArrowType::NA:
// break;
Expand Down Expand Up @@ -393,8 +393,8 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
logical_type = LogicalType::DATE;
break;
case ArrowType::TIMESTAMP: {
auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type.get());
if (timestamp_type->unit != ::arrow::TimestampType::Unit::MILLI) {
auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get());
if (timestamp_type->unit() != ::arrow::TimestampType::Unit::MILLI) {
return Status::NotImplemented(
"Other timestamp units than millisecond are not yet support with parquet.");
}
Expand All @@ -410,18 +410,18 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
logical_type = LogicalType::TIME_MICROS;
break;
case ArrowType::STRUCT: {
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type);
return StructToNode(struct_type, field->name, field->nullable, properties, out);
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
return StructToNode(struct_type, field->name(), field->nullable(), properties, out);
} break;
case ArrowType::LIST: {
auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type);
return ListToNode(list_type, field->name, field->nullable, properties, out);
auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
return ListToNode(list_type, field->name(), field->nullable(), properties, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
return Status::NotImplemented("unhandled type");
}
*out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length);
*out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length);
return Status::OK();
}

Expand Down
12 changes: 6 additions & 6 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
array_offsets_.push_back(array.offset()); \
valid_bitmaps_.push_back(array.null_bitmap_data()); \
null_counts_.push_back(array.null_count()); \
values_type_ = array.type_enum(); \
values_type_ = array.type_id(); \
values_array_ = &array; \
return Status::OK(); \
}
Expand Down Expand Up @@ -125,15 +125,15 @@ class LevelBuilder : public ::arrow::ArrayVisitor {

// Walk downwards to extract nullability
std::shared_ptr<Field> current_field = field;
nullable_.push_back(current_field->nullable);
while (current_field->type->num_children() > 0) {
if (current_field->type->num_children() > 1) {
nullable_.push_back(current_field->nullable());
while (current_field->type()->num_children() > 0) {
if (current_field->type()->num_children() > 1) {
return Status::NotImplemented(
"Fields with more than one child are not supported.");
} else {
current_field = current_field->type->child(0);
current_field = current_field->type()->child(0);
}
nullable_.push_back(current_field->nullable);
nullable_.push_back(current_field->nullable());
}

// Generate the levels.
Expand Down

0 comments on commit 06db161

Please sign in to comment.