From 443f7f6f002296472cc5d9729fc786644c026975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=97=AD?= Date: Wed, 5 Jul 2023 14:55:21 +0800 Subject: [PATCH] fix orc compile error --- velox/dwio/dwrf/reader/ColumnReader.cpp | 156 +++++++++--------- velox/dwio/dwrf/reader/DwrfData.cpp | 39 ++--- velox/dwio/dwrf/reader/DwrfData.h | 2 - .../reader/SelectiveLongDecimalColumnReader.h | 5 +- .../SelectiveShortDecimalColumnReader.h | 4 +- .../SelectiveStringDirectColumnReader.cpp | 8 +- .../reader/SelectiveTimestampColumnReader.cpp | 5 - 7 files changed, 101 insertions(+), 118 deletions(-) diff --git a/velox/dwio/dwrf/reader/ColumnReader.cpp b/velox/dwio/dwrf/reader/ColumnReader.cpp index ac319ad71505..26444df74299 100644 --- a/velox/dwio/dwrf/reader/ColumnReader.cpp +++ b/velox/dwio/dwrf/reader/ColumnReader.cpp @@ -1024,77 +1024,6 @@ class StringDictionaryColumnReader : public ColumnReader { void ensureInitialized(); - void init(StripeStreams& stripe) { - auto format = stripe.format(); - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - - RleVersion rleVersion; - DwrfStreamIdentifier dataId; - DwrfStreamIdentifier lenId; - DwrfStreamIdentifier dictionaryId; - if (format == DwrfFormat::kDwrf) { - rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); - dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); - dataId = encodingKey.forKind(proto::Stream_Kind_DATA); - lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - dictionaryId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA); - - // handle in dictionary stream - std::unique_ptr inDictStream = - stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false); - if (inDictStream) { - inDictionaryReader = - createBooleanRleDecoder(std::move(inDictStream), encodingKey); - - // stride dictionary only exists if in dictionary exists - strideDictStream = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true); - DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); - - indexStream_ = stripe.getStream( - encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true); - DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); - - const auto strideDictLenId = - encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); - bool strideLenVInt = stripe.getUseVInts(strideDictLenId); - strideDictLengthDecoder = createRleDecoder( - stripe.getStream(strideDictLenId, true), - rleVersion, - memoryPool_, - strideLenVInt, - dwio::common::INT_BYTE_SIZE); - } - } else { - VELOX_CHECK(format == DwrfFormat::kOrc); - rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); - dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); - dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); - lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); - dictionaryId = - encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA); - } - - bool dictVInts = stripe.getUseVInts(dataId); - dictIndex = createRleDecoder( - stripe.getStream(dataId, true), - rleVersion, - memoryPool_, - dictVInts, - dwio::common::INT_BYTE_SIZE); - - bool lenVInts = stripe.getUseVInts(lenId); - lengthDecoder = createRleDecoder( - stripe.getStream(lenId, false), - rleVersion, - memoryPool_, - lenVInts, - dwio::common::INT_BYTE_SIZE); - - blobStream = stripe.getStream(dictionaryId, false); - } - public: StringDictionaryColumnReader( std::shared_ptr nodeType, @@ -1122,7 +1051,80 @@ StringDictionaryColumnReader::StringDictionaryColumnReader( lastStrideIndex(-1), provider(stripe.getStrideIndexProvider()), returnFlatVector_(stripe.getRowReaderOptions().getReturnFlatVector()) { - init(stripe); + auto format = stripe.format(); + EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; + + RleVersion rleVersion; + DwrfStreamIdentifier dataId; + DwrfStreamIdentifier lenId; + DwrfStreamIdentifier dictionaryId; + if (format == DwrfFormat::kDwrf) { + rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); + dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); + dictionaryId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA); + + // handle in dictionary stream + std::unique_ptr inDictStream = + stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), + streamLabels.label(), + false); + if (inDictStream) { + inDictionaryReader = + createBooleanRleDecoder(std::move(inDictStream), encodingKey); + + // stride dictionary only exists if in dictionary exists + strideDictStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), + streamLabels.label(), + true); + DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing"); + + indexStream_ = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), + streamLabels.label(), + true); + DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing"); + + const auto strideDictLenId = + encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH); + bool strideLenVInt = stripe.getUseVInts(strideDictLenId); + strideDictLengthDecoder = createRleDecoder( + stripe.getStream(strideDictLenId, streamLabels.label(), true), + rleVersion, + memoryPool_, + strideLenVInt, + dwio::common::INT_BYTE_SIZE); + } + } else { + VELOX_CHECK(format == DwrfFormat::kOrc); + rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); + dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize(); + dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA); + lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); + dictionaryId = + encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA); + } + + bool dictVInts = stripe.getUseVInts(dataId); + dictIndex = createRleDecoder( + stripe.getStream(dataId, streamLabels.label(), true), + rleVersion, + memoryPool_, + dictVInts, + dwio::common::INT_BYTE_SIZE); + + bool lenVInts = stripe.getUseVInts(lenId); + lengthDecoder = createRleDecoder( + stripe.getStream(lenId, streamLabels.label(), false), + rleVersion, + memoryPool_, + lenVInts, + dwio::common::INT_BYTE_SIZE); + + blobStream = stripe.getStream(dictionaryId, streamLabels.label(), false); } uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) { @@ -1569,15 +1571,19 @@ StringDirectColumnReader::StringDirectColumnReader( rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind()); lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH); - blobStream = - stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true); + blobStream = stripe.getStream( + encodingKey.forKind(proto::Stream_Kind_DATA), + streamLabels.label(), + true); } else { VELOX_CHECK(stripe.format() == DwrfFormat::kOrc); rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH); blobStream = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_DATA), true); + encodingKey.forKind(proto::orc::Stream_Kind_DATA), + streamLabels.label(), + true); } length = createRleDecoder( diff --git a/velox/dwio/dwrf/reader/DwrfData.cpp b/velox/dwio/dwrf/reader/DwrfData.cpp index 37541ba8f551..c2921b9c4e63 100644 --- a/velox/dwio/dwrf/reader/DwrfData.cpp +++ b/velox/dwio/dwrf/reader/DwrfData.cpp @@ -15,12 +15,19 @@ */ #include "velox/dwio/dwrf/reader/DwrfData.h" - #include "velox/dwio/common/BufferUtil.h" namespace facebook::velox::dwrf { -void DwrfData::init(StripeStreams& stripe) { +DwrfData::DwrfData( + std::shared_ptr nodeType, + StripeStreams& stripe, + const StreamLabels& streamLabels, + FlatMapContext flatMapContext) + : memoryPool_(stripe.getMemoryPool()), + nodeType_(std::move(nodeType)), + flatMapContext_(std::move(flatMapContext)), + rowsPerRowGroup_{stripe.rowsPerRowGroup()} { auto format = stripe.format(); EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; @@ -34,36 +41,16 @@ void DwrfData::init(StripeStreams& stripe) { presentStream = encodingKey.forKind(proto::orc::Stream_Kind_PRESENT); rowIndexStream = encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX); } -} - -void DwrfData::initOrc(StripeStreams& stripe) { - EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence}; - std::unique_ptr stream = stripe.getStream( - encodingKey.forKind(proto::orc::Stream_Kind_PRESENT), false); std::unique_ptr stream = stripe.getStream(presentStream, streamLabels.label(), false); if (stream) { notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey); } - - // We always initialize indexStream_ because indices are needed as - // soon as there is a single filter that can trigger row group skips - // anywhere in the reader tree. This is not known at construct time - // because the first filter can come from a hash join or other run - // time pushdown. - indexStream_ = stripe.getStream(rowIndexStream, false); -} - -DwrfData::DwrfData( - std::shared_ptr nodeType, - StripeStreams& stripe, - FlatMapContext flatMapContext) - : memoryPool_(stripe.getMemoryPool()), - nodeType_(std::move(nodeType)), - flatMapContext_(std::move(flatMapContext)), - rowsPerRowGroup_{stripe.rowsPerRowGroup()} { - init(stripe); + indexStream_ = stripe.getStream( + rowIndexStream, + streamLabels.label(), + false); } uint64_t DwrfData::skipNulls(uint64_t numValues, bool /*nullsOnly*/) { diff --git a/velox/dwio/dwrf/reader/DwrfData.h b/velox/dwio/dwrf/reader/DwrfData.h index 4d7c153cf207..d53bbe583973 100644 --- a/velox/dwio/dwrf/reader/DwrfData.h +++ b/velox/dwio/dwrf/reader/DwrfData.h @@ -96,8 +96,6 @@ class DwrfData : public dwio::common::FormatData { entry.positions().begin(), entry.positions().end()); } - void init(StripeStreams& stripe); - memory::MemoryPool& memoryPool_; const std::shared_ptr nodeType_; FlatMapContext flatMapContext_; diff --git a/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h index dc2a1f32469e..d27024893880 100644 --- a/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h @@ -59,10 +59,11 @@ class SelectiveLongDecimalColumnReader version_ = convertRleVersion(encodingKind); valueDecoder_ = createDirectDecoder( - stripe.getStream(values, true), valuesVInts, sizeof(int128_t)); + stripe.getStream(values, params.streamLabels().label(), true), + valuesVInts, sizeof(int128_t)); scaleDecoder_ = createRleDecoder( - stripe.getStream(scales, true), + stripe.getStream(scales, params.streamLabels().label(), true), version_, params.pool(), scalesVInts, diff --git a/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h index 2363b032b91d..c3ceb6c51bfb 100644 --- a/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h @@ -59,12 +59,12 @@ class SelectiveShortDecimalColumnReader version_ = convertRleVersion(encodingKind); valueDecoder_ = createDirectDecoder( - stripe.getStream(values, true), + stripe.getStream(values, params.streamLabels().label(), true), valuesVInts, facebook::velox::dwio::common::LONG_BYTE_SIZE); scaleDecoder_ = createRleDecoder( - stripe.getStream(scales, true), + stripe.getStream(scales, params.streamLabels().label(), true), version_, params.pool(), scalesVInts, diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index 8ceb07106cbd..7377012f94c4 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -24,11 +24,7 @@ void SelectiveStringDirectColumnReader::init(DwrfParams& params) { auto format = params.stripeStreams().format(); EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); -} -void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); DwrfStreamIdentifier lenId; DwrfStreamIdentifier dataId; RleVersion rleVersion; @@ -45,12 +41,12 @@ void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) { bool lenVInts = stripe.getUseVInts(lenId); lengthDecoder_ = createRleDecoder( - stripe.getStream(lenId, true), + stripe.getStream(lenId, params.streamLabels().label(), true), rleVersion, memoryPool_, lenVInts, dwio::common::INT_BYTE_SIZE); - blobStream_ = stripe.getStream(dataId, true); + blobStream_ = stripe.getStream(dataId, params.streamLabels().label(), true); } SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader( diff --git a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp index 425ca460fe6c..c107d35ae112 100644 --- a/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp @@ -26,12 +26,7 @@ void SelectiveTimestampColumnReader::init(DwrfParams& params) { auto format = params.stripeStreams().format(); EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); -} -void SelectiveTimestampColumnReader::initOrc(DwrfParams& params) { - EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence}; - auto& stripe = params.stripeStreams(); - version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind()); DwrfStreamIdentifier dataId; DwrfStreamIdentifier nanoDataId; if (format == DwrfFormat::kDwrf) {