Skip to content

Commit

Permalink
fix orc compile error
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxu14 committed Jul 5, 2023
1 parent 65e073c commit 443f7f6
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 118 deletions.
156 changes: 81 additions & 75 deletions velox/dwio/dwrf/reader/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,77 +1024,6 @@ class StringDictionaryColumnReader : public ColumnReader {

void ensureInitialized();

void init(StripeStreams& stripe) {
auto format = stripe.format();
EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence};

RleVersion rleVersion;
DwrfStreamIdentifier dataId;
DwrfStreamIdentifier lenId;
DwrfStreamIdentifier dictionaryId;
if (format == DwrfFormat::kDwrf) {
rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind());
dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize();
dataId = encodingKey.forKind(proto::Stream_Kind_DATA);
lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH);
dictionaryId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA);

// handle in dictionary stream
std::unique_ptr<dwio::common::SeekableInputStream> inDictStream =
stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY), false);
if (inDictStream) {
inDictionaryReader =
createBooleanRleDecoder(std::move(inDictStream), encodingKey);

// stride dictionary only exists if in dictionary exists
strideDictStream = stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY), true);
DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing");

indexStream_ = stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_ROW_INDEX), true);
DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing");

const auto strideDictLenId =
encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH);
bool strideLenVInt = stripe.getUseVInts(strideDictLenId);
strideDictLengthDecoder = createRleDecoder</*isSigned*/ false>(
stripe.getStream(strideDictLenId, true),
rleVersion,
memoryPool_,
strideLenVInt,
dwio::common::INT_BYTE_SIZE);
}
} else {
VELOX_CHECK(format == DwrfFormat::kOrc);
rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind());
dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize();
dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA);
lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH);
dictionaryId =
encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA);
}

bool dictVInts = stripe.getUseVInts(dataId);
dictIndex = createRleDecoder</*isSigned*/ false>(
stripe.getStream(dataId, true),
rleVersion,
memoryPool_,
dictVInts,
dwio::common::INT_BYTE_SIZE);

bool lenVInts = stripe.getUseVInts(lenId);
lengthDecoder = createRleDecoder</*isSigned*/ false>(
stripe.getStream(lenId, false),
rleVersion,
memoryPool_,
lenVInts,
dwio::common::INT_BYTE_SIZE);

blobStream = stripe.getStream(dictionaryId, false);
}

public:
StringDictionaryColumnReader(
std::shared_ptr<const dwio::common::TypeWithId> nodeType,
Expand Down Expand Up @@ -1122,7 +1051,80 @@ StringDictionaryColumnReader::StringDictionaryColumnReader(
lastStrideIndex(-1),
provider(stripe.getStrideIndexProvider()),
returnFlatVector_(stripe.getRowReaderOptions().getReturnFlatVector()) {
init(stripe);
auto format = stripe.format();
EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence};

RleVersion rleVersion;
DwrfStreamIdentifier dataId;
DwrfStreamIdentifier lenId;
DwrfStreamIdentifier dictionaryId;
if (format == DwrfFormat::kDwrf) {
rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind());
dictionaryCount = stripe.getEncoding(encodingKey).dictionarysize();
dataId = encodingKey.forKind(proto::Stream_Kind_DATA);
lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH);
dictionaryId = encodingKey.forKind(proto::Stream_Kind_DICTIONARY_DATA);

// handle in dictionary stream
std::unique_ptr<dwio::common::SeekableInputStream> inDictStream =
stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_IN_DICTIONARY),
streamLabels.label(),
false);
if (inDictStream) {
inDictionaryReader =
createBooleanRleDecoder(std::move(inDictStream), encodingKey);

// stride dictionary only exists if in dictionary exists
strideDictStream = stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY),
streamLabels.label(),
true);
DWIO_ENSURE_NOT_NULL(strideDictStream, "Stride dictionary is missing");

indexStream_ = stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_ROW_INDEX),
streamLabels.label(),
true);
DWIO_ENSURE_NOT_NULL(indexStream_, "String index is missing");

const auto strideDictLenId =
encodingKey.forKind(proto::Stream_Kind_STRIDE_DICTIONARY_LENGTH);
bool strideLenVInt = stripe.getUseVInts(strideDictLenId);
strideDictLengthDecoder = createRleDecoder</*isSigned*/ false>(
stripe.getStream(strideDictLenId, streamLabels.label(), true),
rleVersion,
memoryPool_,
strideLenVInt,
dwio::common::INT_BYTE_SIZE);
}
} else {
VELOX_CHECK(format == DwrfFormat::kOrc);
rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind());
dictionaryCount = stripe.getEncodingOrc(encodingKey).dictionarysize();
dataId = encodingKey.forKind(proto::orc::Stream_Kind_DATA);
lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH);
dictionaryId =
encodingKey.forKind(proto::orc::Stream_Kind_DICTIONARY_DATA);
}

bool dictVInts = stripe.getUseVInts(dataId);
dictIndex = createRleDecoder</*isSigned*/ false>(
stripe.getStream(dataId, streamLabels.label(), true),
rleVersion,
memoryPool_,
dictVInts,
dwio::common::INT_BYTE_SIZE);

bool lenVInts = stripe.getUseVInts(lenId);
lengthDecoder = createRleDecoder</*isSigned*/ false>(
stripe.getStream(lenId, streamLabels.label(), false),
rleVersion,
memoryPool_,
lenVInts,
dwio::common::INT_BYTE_SIZE);

blobStream = stripe.getStream(dictionaryId, streamLabels.label(), false);
}

uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
Expand Down Expand Up @@ -1569,15 +1571,19 @@ StringDirectColumnReader::StringDirectColumnReader(
rleVersion = convertRleVersion(stripe.getEncoding(encodingKey).kind());
lenId = encodingKey.forKind(proto::Stream_Kind_LENGTH);

blobStream =
stripe.getStream(encodingKey.forKind(proto::Stream_Kind_DATA), true);
blobStream = stripe.getStream(
encodingKey.forKind(proto::Stream_Kind_DATA),
streamLabels.label(),
true);
} else {
VELOX_CHECK(stripe.format() == DwrfFormat::kOrc);
rleVersion = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind());
lenId = encodingKey.forKind(proto::orc::Stream_Kind_LENGTH);

blobStream = stripe.getStream(
encodingKey.forKind(proto::orc::Stream_Kind_DATA), true);
encodingKey.forKind(proto::orc::Stream_Kind_DATA),
streamLabels.label(),
true);
}

length = createRleDecoder</*isSigned*/ false>(
Expand Down
39 changes: 13 additions & 26 deletions velox/dwio/dwrf/reader/DwrfData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
*/

#include "velox/dwio/dwrf/reader/DwrfData.h"

#include "velox/dwio/common/BufferUtil.h"

namespace facebook::velox::dwrf {

void DwrfData::init(StripeStreams& stripe) {
DwrfData::DwrfData(
std::shared_ptr<const dwio::common::TypeWithId> nodeType,
StripeStreams& stripe,
const StreamLabels& streamLabels,
FlatMapContext flatMapContext)
: memoryPool_(stripe.getMemoryPool()),
nodeType_(std::move(nodeType)),
flatMapContext_(std::move(flatMapContext)),
rowsPerRowGroup_{stripe.rowsPerRowGroup()} {
auto format = stripe.format();
EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence};

Expand All @@ -34,36 +41,16 @@ void DwrfData::init(StripeStreams& stripe) {
presentStream = encodingKey.forKind(proto::orc::Stream_Kind_PRESENT);
rowIndexStream = encodingKey.forKind(proto::orc::Stream_Kind_ROW_INDEX);
}
}

void DwrfData::initOrc(StripeStreams& stripe) {
EncodingKey encodingKey{nodeType_->id, flatMapContext_.sequence};

std::unique_ptr<dwio::common::SeekableInputStream> stream = stripe.getStream(
encodingKey.forKind(proto::orc::Stream_Kind_PRESENT), false);
std::unique_ptr<dwio::common::SeekableInputStream> stream =
stripe.getStream(presentStream, streamLabels.label(), false);
if (stream) {
notNullDecoder_ = createBooleanRleDecoder(std::move(stream), encodingKey);
}

// We always initialize indexStream_ because indices are needed as
// soon as there is a single filter that can trigger row group skips
// anywhere in the reader tree. This is not known at construct time
// because the first filter can come from a hash join or other run
// time pushdown.
indexStream_ = stripe.getStream(rowIndexStream, false);
}

DwrfData::DwrfData(
std::shared_ptr<const dwio::common::TypeWithId> nodeType,
StripeStreams& stripe,
FlatMapContext flatMapContext)
: memoryPool_(stripe.getMemoryPool()),
nodeType_(std::move(nodeType)),
flatMapContext_(std::move(flatMapContext)),
rowsPerRowGroup_{stripe.rowsPerRowGroup()} {
init(stripe);
indexStream_ = stripe.getStream(
rowIndexStream,
streamLabels.label(),
false);
}

uint64_t DwrfData::skipNulls(uint64_t numValues, bool /*nullsOnly*/) {
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/dwrf/reader/DwrfData.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ class DwrfData : public dwio::common::FormatData {
entry.positions().begin(), entry.positions().end());
}

void init(StripeStreams& stripe);

memory::MemoryPool& memoryPool_;
const std::shared_ptr<const dwio::common::TypeWithId> nodeType_;
FlatMapContext flatMapContext_;
Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveLongDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ class SelectiveLongDecimalColumnReader
version_ = convertRleVersion(encodingKind);

valueDecoder_ = createDirectDecoder<true>(
stripe.getStream(values, true), valuesVInts, sizeof(int128_t));
stripe.getStream(values, params.streamLabels().label(), true),
valuesVInts, sizeof(int128_t));

scaleDecoder_ = createRleDecoder<true>(
stripe.getStream(scales, true),
stripe.getStream(scales, params.streamLabels().label(), true),
version_,
params.pool(),
scalesVInts,
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveShortDecimalColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class SelectiveShortDecimalColumnReader
version_ = convertRleVersion(encodingKind);

valueDecoder_ = createDirectDecoder<true>(
stripe.getStream(values, true),
stripe.getStream(values, params.streamLabels().label(), true),
valuesVInts,
facebook::velox::dwio::common::LONG_BYTE_SIZE);

scaleDecoder_ = createRleDecoder<true>(
stripe.getStream(scales, true),
stripe.getStream(scales, params.streamLabels().label(), true),
version_,
params.pool(),
scalesVInts,
Expand Down
8 changes: 2 additions & 6 deletions velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ void SelectiveStringDirectColumnReader::init(DwrfParams& params) {
auto format = params.stripeStreams().format();
EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence};
auto& stripe = params.stripeStreams();
}

void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) {
EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence};
auto& stripe = params.stripeStreams();
DwrfStreamIdentifier lenId;
DwrfStreamIdentifier dataId;
RleVersion rleVersion;
Expand All @@ -45,12 +41,12 @@ void SelectiveStringDirectColumnReader::initOrc(DwrfParams& params) {

bool lenVInts = stripe.getUseVInts(lenId);
lengthDecoder_ = createRleDecoder</*isSigned*/ false>(
stripe.getStream(lenId, true),
stripe.getStream(lenId, params.streamLabels().label(), true),
rleVersion,
memoryPool_,
lenVInts,
dwio::common::INT_BYTE_SIZE);
blobStream_ = stripe.getStream(dataId, true);
blobStream_ = stripe.getStream(dataId, params.streamLabels().label(), true);
}

SelectiveStringDirectColumnReader::SelectiveStringDirectColumnReader(
Expand Down
5 changes: 0 additions & 5 deletions velox/dwio/dwrf/reader/SelectiveTimestampColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ void SelectiveTimestampColumnReader::init(DwrfParams& params) {
auto format = params.stripeStreams().format();
EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence};
auto& stripe = params.stripeStreams();
}

void SelectiveTimestampColumnReader::initOrc(DwrfParams& params) {
EncodingKey encodingKey{nodeType_->id, params.flatMapContext().sequence};
auto& stripe = params.stripeStreams();
version = convertRleVersion(stripe.getEncodingOrc(encodingKey).kind());
DwrfStreamIdentifier dataId;
DwrfStreamIdentifier nanoDataId;
if (format == DwrfFormat::kDwrf) {
Expand Down

0 comments on commit 443f7f6

Please sign in to comment.