From bde7b6a355be9da5752f813d3f37d2d3db37f575 Mon Sep 17 00:00:00 2001 From: Jin Chengcheng Date: Wed, 17 May 2023 15:40:11 +0800 Subject: [PATCH] [GLUTEN-1434] Serialize and deserialize RowVector (#250) Support serialize and deserialize RowVector. --- velox/serializers/CMakeLists.txt | 2 +- velox/serializers/SingleSerializer.cpp | 752 ++++++++++++++++++ velox/serializers/SingleSerializer.h | 73 ++ velox/serializers/tests/CMakeLists.txt | 2 +- .../tests/SingleSerializerTest.cpp | 214 +++++ velox/substrait/TypeUtils.h | 5 + velox/vector/VectorStream.h | 13 + 7 files changed, 1059 insertions(+), 2 deletions(-) create mode 100644 velox/serializers/SingleSerializer.cpp create mode 100644 velox/serializers/SingleSerializer.h create mode 100644 velox/serializers/tests/SingleSerializerTest.cpp diff --git a/velox/serializers/CMakeLists.txt b/velox/serializers/CMakeLists.txt index 9fa18e048321..66f914da59db 100644 --- a/velox/serializers/CMakeLists.txt +++ b/velox/serializers/CMakeLists.txt @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_presto_serializer PrestoSerializer.cpp +add_library(velox_presto_serializer PrestoSerializer.cpp SingleSerializer.cpp UnsafeRowSerializer.cpp) target_link_libraries(velox_presto_serializer velox_vector) diff --git a/velox/serializers/SingleSerializer.cpp b/velox/serializers/SingleSerializer.cpp new file mode 100644 index 000000000000..46e694da1f4f --- /dev/null +++ b/velox/serializers/SingleSerializer.cpp @@ -0,0 +1,752 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/serializers/SingleSerializer.h" +#include "velox/common/base/Crc.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/VectorTypeUtils.h" + +namespace facebook::velox::serializer { +namespace { +constexpr int8_t kCompressedBitMask = 1; +constexpr int8_t kEncryptedBitMask = 2; +constexpr int8_t kCheckSumBitMask = 4; + +int64_t computeChecksum( + SingleOutputStreamListener* listener, + int codecMarker, + int numRows, + int uncompressedSize) { + auto result = listener->crc(); + result.process_bytes(&codecMarker, 1); + result.process_bytes(&numRows, 4); + result.process_bytes(&uncompressedSize, 4); + return result.checksum(); +} + +int64_t computeChecksum( + ByteStream* source, + int codecMarker, + int numRows, + int uncompressedSize) { + auto offset = source->tellp(); + bits::Crc32 crc32; + + auto remainingBytes = uncompressedSize; + while (remainingBytes > 0) { + auto data = source->nextView(remainingBytes); + crc32.process_bytes(data.data(), data.size()); + remainingBytes -= data.size(); + } + + crc32.process_bytes(&codecMarker, 1); + crc32.process_bytes(&numRows, 4); + crc32.process_bytes(&uncompressedSize, 4); + auto checksum = crc32.checksum(); + + source->seekp(offset); + + return checksum; +} + +char getCodecMarker() { + char marker = 0; + marker |= kCheckSumBitMask; + return marker; +} + +bool isCompressedBitSet(int8_t codec) { + return (codec & kCompressedBitMask) == kCompressedBitMask; +} + +bool isEncryptedBit(int8_t codec) { + return (codec & kEncryptedBitMask) == kEncryptedBitMask; +} + +bool isChecksumBitSet(int8_t codec) { + return (codec & kCheckSumBitMask) == kCheckSumBitMask; +} + +void readColumns( + ByteStream* source, + velox::memory::MemoryPool* pool, + int32_t length, + const std::vector& types, + std::vector& result); + +template +VectorPtr readFlatVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool); + +VectorPtr readArrayVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool) { + ArrayVector* arrayVector = nullptr; + std::vector childTypes = {type->childAt(0)}; + std::vector children(1); + if (arrayVector) { + children[0] = arrayVector->elements(); + } + VELOX_UNSUPPORTED("Not support deserialize Array"); +} + +VectorPtr readMapVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool) { + MapVector* mapVector = nullptr; + std::vector childTypes = {type->childAt(0), type->childAt(1)}; + std::vector children(2); + if (mapVector) { + children[0] = mapVector->mapKeys(); + children[1] = mapVector->mapValues(); + } + // readColumns(source, pool, childTypes, children); + VELOX_UNSUPPORTED("Not support deserialize Map"); +} + +VectorPtr readRowVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool) { + MapVector* mapVector = nullptr; + std::vector childTypes = {type->childAt(0), type->childAt(1)}; + std::vector children(2); + if (mapVector) { + children[0] = mapVector->mapKeys(); + children[1] = mapVector->mapValues(); + } + // readColumns(source, pool, childTypes, children); + VELOX_UNSUPPORTED("Not support deserialize Row"); +} + +BufferPtr readBuffer(ByteStream* source, velox::memory::MemoryPool* pool) { + auto size = source->read(); + if (size == 0) { + return BufferPtr(nullptr); + } + auto buffer = AlignedBuffer::allocate(size, pool); + source->readBytes(buffer->asMutable(), size); + return buffer; +} + +template +VectorPtr readFlatVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool) { + auto bodyBufferSize = source->read(); + VELOX_CHECK_EQ(bodyBufferSize, 2); + auto nulls = readBuffer(source, pool); + auto values = readBuffer(source, pool); + auto stringBuffersSize = source->read(); + std::vector stringBuffers; + stringBuffers.reserve(stringBuffersSize); + for (int32_t i = 0; i < stringBuffersSize; i++) { + stringBuffers.emplace_back(std::move(readBuffer(source, pool))); + } + using T = typename TypeTraits::NativeType; + return std::make_shared>( + pool, + type, + std::move(nulls), + length, + std::move(values), + std::move(stringBuffers)); +} + +int128_t readInt128(ByteStream* source) { + // ByteStream does not support reading int128_t values. + auto low = source->read(); + auto high = source->read(); + return buildInt128(high, low); +} + +template +T deserializeVariable(ByteStream* source, velox::memory::MemoryPool* pool) { + if constexpr (std::is_same_v) { + return Timestamp::fromMicros(source->read()); + } else if constexpr (std::is_same_v) { + return Date(source->read()); + } else if constexpr (std::is_same_v) { + int32_t size = source->read(); + auto values = AlignedBuffer::allocate(size, pool); + source->readBytes(values->asMutable(), size); + return StringView(values->asMutable(), size); + } else { + return source->read(); + } +} + +template +VectorPtr deserializeVariableToVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool) { + using T = typename KindToFlatVector::WrapperType; + T value = deserializeVariable(source, pool); + return std::make_shared>( + pool, length, false, type, std::move(value)); +} + +template +VectorPtr readConstantVector( + ByteStream* source, + int32_t length, + std::shared_ptr type, + velox::memory::MemoryPool* pool) { + using T = typename KindToFlatVector::WrapperType; + auto constantIsVector = source->read(); + auto constantIsNull = source->read(); + if (constantIsVector) { + std::vector childTypes = {type}; + std::vector children(1); + readColumns(source, pool, length, childTypes, children); + VELOX_CHECK_EQ(1, children[0]->size()); + return BaseVector::wrapInConstant(length, 0, children[0]); + } + + if (constantIsNull) { + return BaseVector::createNullConstant(type, length, pool); + } + + if constexpr (std::is_same_v) { + int64_t unscaledValue = source->read(); + return std::make_shared>( + pool, length, false, type, T(unscaledValue)); + } else if constexpr (std::is_same_v) { + return std::make_shared>( + pool, length, false, type, T(readInt128(source))); + } else { + return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + deserializeVariableToVector, type->kind(), source, length, type, pool); + } +} + +void readColumns( + ByteStream* source, + velox::memory::MemoryPool* pool, + int32_t numRows, + const std::vector& types, + std::vector& result) { + if (source->atEnd()) { // empty page + return; + } + for (int32_t i = 0; i < types.size(); ++i) { + auto vectorEncoding = + static_cast(source->read()); + switch (vectorEncoding) { + case VectorEncoding::Simple::FLAT: { + auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + readFlatVector, types[i]->kind(), source, numRows, types[i], pool); + result.emplace_back(std::move(res)); + } break; + case VectorEncoding::Simple::CONSTANT: { + auto res = VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + readConstantVector, + types[i]->kind(), + source, + numRows, + types[i], + pool); + result.emplace_back(std::move(res)); + } break; + case VectorEncoding::Simple::ROW: + result.emplace_back(readRowVector(source, numRows, types[i], pool)); + break; + case VectorEncoding::Simple::ARRAY: + result.emplace_back(readArrayVector(source, numRows, types[i], pool)); + break; + case VectorEncoding::Simple::MAP: + result.emplace_back(readMapVector(source, numRows, types[i], pool)); + break; + default: + VELOX_NYI("{} unsupported", __FUNCTION__); + } + } +} + +void writeInt32(OutputStream* out, int32_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +void writeInt64(OutputStream* out, int64_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +void writeBool(OutputStream* out, bool value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +// Appendable container for serialized values. To append a value at a +// time, call appendNull or appendNonNull first. Then call +// appendLength if the type has a length. A null value has a length of +// 0. Then call appendValue if the value was not null. +class SingleVectorStream { + public: + SingleVectorStream( + const TypePtr type, + StreamArena* streamArena, + int32_t initialNumRows) + : type_(type), streamArena_(streamArena), constValue_(streamArena) { + if (initialNumRows > 0) { + switch (type_->kind()) { + case TypeKind::ROW: + case TypeKind::ARRAY: + case TypeKind::MAP: + children_.resize(type_->size()); + for (int32_t i = 0; i < type_->size(); ++i) { + children_[i] = std::make_unique( + type_->childAt(i), streamArena, initialNumRows); + } + break; + default: + break; + } + } + } + + void appendBuffers(BufferPtr buffers) { + bodyBuffers_.emplace_back(buffers); + } + + void appendStringBuffers(std::vector buffers) { + stringBuffers_ = buffers; + } + + void setEncoding(VectorEncoding::Simple encoding) { + encoding_ = encoding; + } + + void setConstantIsVector(bool constantIsVector) { + constantIsVector_ = constantIsVector; + } + + void appendConstantNull(bool isNull) { + constantIsNull_ = isNull; + } + + template + void appendOneConst(const T& value) { + if (constValue_.size() == 0) { + constValue_.startWrite(sizeof(T)); + }; + append(folly::Range(&value, 1)); + } + + SingleVectorStream* childAt(int32_t index) { + return children_[index].get(); + } + + void newChild(const TypePtr type, int32_t initialNumRows) { + children_.emplace_back(std::make_unique( + type, streamArena_, initialNumRows)); + } + + template + constexpr typename std::underlying_type::type to_underlying(E e) noexcept { + return static_cast::type>(e); + } + + // similiar as flush + vector_size_t maxSerializedSize() { + vector_size_t size = 0; + size += sizeof(uint8_t); /* encoding */ + if (constantIsVector_.has_value()) { + size += sizeof(bool); + size += sizeof(bool); + if (constValue_.size() > 0) { + size += constValue_.size(); + } + } else { + size += sizeof(int32_t); + for (auto& buffer : bodyBuffers_) { + size += sizeof(int32_t); + if (buffer != nullptr) { + size += buffer->size(); + } + } + size += sizeof(int32_t); + for (const auto& buffer : stringBuffers_) { + size += sizeof(int32_t); + if (buffer != nullptr) { + size += buffer->size(); + } + } + } + + for (const auto& child : children_) { + size += child->maxSerializedSize(); + } + return size; + } + + // Writes out the accumulated contents. Does not change the state. + void flush(OutputStream* out) { + uint8_t encoding = to_underlying(encoding_); + out->write(reinterpret_cast(&encoding), sizeof(encoding)); + + if (constantIsVector_.has_value()) { + writeBool(out, constantIsVector_.value()); + writeBool(out, constantIsNull_); + if (constValue_.size() > 0) { + constValue_.flush(out); + } + } else { + writeInt32(out, bodyBuffers_.size()); + for (auto& buffer : bodyBuffers_) { + if (buffer == nullptr) { + writeInt32(out, 0); + } else { + writeInt32(out, buffer->size()); + out->write(buffer->asMutable(), buffer->size()); + } + } + writeInt32(out, stringBuffers_.size()); + for (const auto& buffer : stringBuffers_) { + if (buffer == nullptr) { + writeInt32(out, 0); + } else { + writeInt32(out, buffer->size()); + out->write(buffer->asMutable(), buffer->size()); + } + } + } + + for (const auto& child : children_) { + child->flush(out); + } + } + + private: + template + void append(folly::Range values) { + constValue_.append(values); + } + + const TypePtr type_; + StreamArena* streamArena_; + VectorEncoding::Simple encoding_; + std::optional constantIsVector_; + bool constantIsNull_; + ByteStream constValue_; + // nulls, values or others + std::vector bodyBuffers_; + std::vector stringBuffers_; + std::vector> children_; +}; + +template <> +inline void SingleVectorStream::append(folly::Range values) { + for (auto& value : values) { + auto size = value.size(); + constValue_.appendOne(size); + constValue_.appendStringPiece(folly::StringPiece(value.data(), size)); + } +} + +void serializeColumn(const BaseVector* vector, SingleVectorStream* stream); + +template +void serializeFlatVector(const BaseVector* vector, SingleVectorStream* stream) { + using T = typename TypeTraits::NativeType; + auto flatVector = dynamic_cast*>(vector); + stream->appendBuffers(flatVector->nulls()); + stream->appendBuffers(flatVector->values()); + stream->appendStringBuffers(flatVector->stringBuffers()); +} + +void serializeRowVector(const BaseVector* vector, SingleVectorStream* stream) { + auto rowVector = dynamic_cast(vector); + for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { + serializeColumn(rowVector->childAt(i).get(), stream->childAt(i)); + } +} + +void serializeArrayVector( + const BaseVector* vector, + SingleVectorStream* stream) { + auto arrayVector = dynamic_cast(vector); + stream->appendBuffers(arrayVector->nulls()); + stream->appendBuffers(arrayVector->offsets()); + stream->appendBuffers(arrayVector->sizes()); + serializeColumn(arrayVector->elements().get(), stream->childAt(0)); +} + +void serializeMapVector(const BaseVector* vector, SingleVectorStream* stream) { + auto mapVector = dynamic_cast(vector); + // Wait to serialize nullCount and sortedKeys + stream->appendBuffers(mapVector->nulls()); + stream->appendBuffers(mapVector->offsets()); + stream->appendBuffers(mapVector->sizes()); + + serializeColumn(mapVector->mapKeys().get(), stream->childAt(0)); + serializeColumn(mapVector->mapValues().get(), stream->childAt(1)); +} + +template +void serializeVariable(const BaseVector* vector, SingleVectorStream* stream) { + using T = typename KindToFlatVector::WrapperType; + auto constVector = dynamic_cast*>(vector); + T value = constVector->valueAtFast(0); + if constexpr (std::is_same_v) { + // It may lost some nanos, same with UnsafeRowSerializer + stream->appendOneConst(value.toMicros()); + } else if constexpr (std::is_same_v) { + stream->appendOneConst(value.days()); + } else { + stream->appendOneConst(value); + } +} + +template +void serializeConstantVector( + const BaseVector* vector, + SingleVectorStream* stream) { + using T = typename KindToFlatVector::WrapperType; + auto constVector = dynamic_cast*>(vector); + + stream->setConstantIsVector(constVector->valueVector() != nullptr); + stream->appendConstantNull(vector->isNullAt(0)); + if (constVector->valueVector()) { + const BaseVector* wrapped = vector->wrappedVector(); + stream->newChild(wrapped->type(), wrapped->size()); + serializeColumn(wrapped, stream->childAt(0)); + return; + } + + if (!vector->isNullAt(0)) { + if constexpr (std::is_same_v) { + T value = constVector->valueAtFast(0); + stream->appendOneConst(value.unscaledValue()); + } else if constexpr (std::is_same_v) { + T value = constVector->valueAtFast(0); + stream->appendOneConst(value.unscaledValue()); + } else { + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + serializeVariable, vector->typeKind(), vector, stream); + } + } +} + +void serializeColumn(const BaseVector* vector, SingleVectorStream* stream) { + stream->setEncoding(vector->encoding()); + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeFlatVector, vector->typeKind(), vector, stream); + break; + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + serializeConstantVector, vector->typeKind(), vector, stream); + break; + case VectorEncoding::Simple::ROW: + serializeRowVector(vector, stream); + break; + case VectorEncoding::Simple::ARRAY: + serializeArrayVector(vector, stream); + break; + case VectorEncoding::Simple::MAP: + serializeMapVector(vector, stream); + break; + case VectorEncoding::Simple::LAZY: + serializeColumn(vector->loadedVector(), stream); + break; + default: + VELOX_NYI("{} unsupported", __FUNCTION__); + } +} + +class SingleVectorSerializer : public VectorSerializer { + public: + SingleVectorSerializer( + std::shared_ptr rowType, + int32_t numRows, + StreamArena* streamArena) { + auto types = rowType->children(); + auto numTypes = types.size(); + streams_.resize(numTypes); + for (int i = 0; i < numTypes; i++) { + streams_[i] = + std::make_unique(types[i], streamArena, numRows); + } + } + + void append( + const RowVectorPtr& vector, + const folly::Range& /* ranges */) override { + VELOX_CHECK( + numRows_ == 0, + "SingleVectorSerializer can only append RowVector only once"); + if (vector->size() > 0) { + numRows_ += vector->size(); + for (int32_t i = 0; i < vector->childrenSize(); ++i) { + serializeColumn(vector->childAt(i).get(), streams_[i].get()); + } + } + } + + vector_size_t maxSerializedSize() override { + vector_size_t size = 0; + if (numRows_ != 0) { + for (auto& stream : streams_) { + size += stream->maxSerializedSize(); + } + } + size += 25; /* flush header layout size */ + return size; + } + + void flush(OutputStream* out) override { + flushInternal(numRows_, out); + } + + // Writes the contents to 'stream' in wire format + void flushInternal(int32_t numRows, OutputStream* out) { + auto listener = dynamic_cast(out->listener()); + // Reset CRC computation + if (listener) { + listener->reset(); + } + + char codec = 0; + if (listener) { + codec = getCodecMarker(); + } + + int32_t offset = out->tellp(); + + // Pause CRC computation + if (listener) { + listener->pause(); + } + + writeInt32(out, numRows); + out->write(&codec, 1); + + // Make space for uncompressedSizeInBytes & sizeInBytes + writeInt32(out, 0); + writeInt32(out, 0); + writeInt64(out, 0); // Write zero checksum + + // Number of columns and stream content. Unpause CRC. + if (listener) { + listener->resume(); + } + writeInt32(out, streams_.size()); + if (numRows != 0) { + for (auto& stream : streams_) { + stream->flush(out); + } + } + + // Pause CRC computation + if (listener) { + listener->pause(); + } + + // Fill in uncompressedSizeInBytes & sizeInBytes + int32_t size = (int32_t)out->tellp() - offset; + int32_t uncompressedSize = size - kHeaderSize; + int64_t crc = 0; + if (listener) { + crc = computeChecksum(listener, codec, numRows, uncompressedSize); + } + + out->seekp(offset + kSizeInBytesOffset); + writeInt32(out, uncompressedSize); + writeInt32(out, uncompressedSize); + writeInt64(out, crc); + out->seekp(offset + size); + } + + private: + static const int32_t kSizeInBytesOffset{4 + 1}; + static const int32_t kHeaderSize{kSizeInBytesOffset + 4 + 4 + 8}; + + int32_t numRows_{0}; + std::vector> streams_; +}; +} // namespace + +void SingleVectorSerde::estimateSerializedSize( + VectorPtr vector, + const folly::Range& /* ranges */, + vector_size_t** sizes) { + if (sizes == nullptr) { + return; + } + *(sizes[0]) += vector->estimateFlatSize(); +} + +std::unique_ptr SingleVectorSerde::createSerializer( + std::shared_ptr type, + int32_t numRows, + StreamArena* streamArena, + const Options* options) { + return std::make_unique(type, numRows, streamArena); +} + +void SingleVectorSerde::deserialize( + ByteStream* source, + velox::memory::MemoryPool* pool, + std::shared_ptr type, + std::shared_ptr* result, + const Options* options) { + auto numRows = source->read(); + + auto pageCodecMarker = source->read(); + auto uncompressedSize = source->read(); + // skip size in bytes + source->skip(4); + auto checksum = source->read(); + + int64_t actualCheckSum = 0; + if (isChecksumBitSet(pageCodecMarker)) { + actualCheckSum = + computeChecksum(source, pageCodecMarker, numRows, uncompressedSize); + } + + VELOX_CHECK_EQ( + checksum, actualCheckSum, "Received corrupted serialized page."); + + // skip number of columns + source->skip(4); + + std::vector children; + auto childTypes = type->as().children(); + + readColumns(source, pool, numRows, childTypes, children); + *result = std::make_shared( + pool, type, BufferPtr(nullptr), numRows, children); +} + +// static +void SingleVectorSerde::registerVectorSerde() { + velox::registerVectorSerde(std::make_unique()); +} + +} // namespace facebook::velox::serializer diff --git a/velox/serializers/SingleSerializer.h b/velox/serializers/SingleSerializer.h new file mode 100644 index 000000000000..55ffe9989ea4 --- /dev/null +++ b/velox/serializers/SingleSerializer.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "velox/common/base/Crc.h" +#include "velox/vector/VectorStream.h" + +namespace facebook::velox::serializer { +class SingleVectorSerde : public VectorSerde { + public: + // Not use ranges, the first element in sizes is the total size + void estimateSerializedSize( + std::shared_ptr vector, + const folly::Range& ranges, + vector_size_t** sizes) override; + + std::unique_ptr createSerializer( + std::shared_ptr type, + int32_t numRows, + StreamArena* streamArena, + const Options* options) override; + + void deserialize( + ByteStream* source, + velox::memory::MemoryPool* pool, + std::shared_ptr type, + std::shared_ptr* result, + const Options* options) override; + + static void registerVectorSerde(); +}; + +class SingleOutputStreamListener : public OutputStreamListener { + public: + void onWrite(const char* s, std::streamsize count) override { + if (not paused_) { + crc_.process_bytes(s, count); + } + } + + void pause() { + paused_ = true; + } + + void resume() { + paused_ = false; + } + + auto crc() const { + return crc_; + } + + void reset() { + crc_.reset(); + } + + private: + bool paused_{false}; + bits::Crc32 crc_; +}; +} // namespace facebook::velox::serializer diff --git a/velox/serializers/tests/CMakeLists.txt b/velox/serializers/tests/CMakeLists.txt index 03eeb12ac522..316e7d1c3049 100644 --- a/velox/serializers/tests/CMakeLists.txt +++ b/velox/serializers/tests/CMakeLists.txt @@ -14,7 +14,7 @@ add_executable( velox_presto_serializer_test PrestoOutputStreamListenerTest.cpp PrestoSerializerTest.cpp - UnsafeRowSerializerTest.cpp) + SingleSerializerTest.cpp UnsafeRowSerializerTest.cpp) add_test(velox_presto_serializer_test velox_presto_serializer_test) diff --git a/velox/serializers/tests/SingleSerializerTest.cpp b/velox/serializers/tests/SingleSerializerTest.cpp new file mode 100644 index 000000000000..b1bedf312c54 --- /dev/null +++ b/velox/serializers/tests/SingleSerializerTest.cpp @@ -0,0 +1,214 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/serializers/SingleSerializer.h" +#include +#include +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::test; + +class SingleSerializerTest : public ::testing::Test, + public test::VectorTestBase { + protected: + void SetUp() override { + pool_ = memory::addDefaultLeafMemoryPool(); + serde_ = std::make_unique(); + vectorMaker_ = std::make_unique(pool_.get()); + } + + void sanityCheckEstimateSerializedSize( + RowVectorPtr rowVector, + const folly::Range& ranges) { + auto numRows = rowVector->size(); + std::vector rowSizes(numRows, 0); + std::vector rawRowSizes(numRows); + for (auto i = 0; i < numRows; i++) { + rawRowSizes[i] = &rowSizes[i]; + } + serde_->estimateSerializedSize(rowVector, ranges, rawRowSizes.data()); + } + + void serialize( + const RowVectorPtr& rowVector, + std::ostream* output, + const VectorSerde::Options* serdeOptions) { + auto numRows = rowVector->size(); + + std::vector rows(numRows); + for (int i = 0; i < numRows; i++) { + rows[i] = IndexRange{i, 1}; + } + + sanityCheckEstimateSerializedSize( + rowVector, folly::Range(rows.data(), numRows)); + + auto arena = std::make_unique(pool_.get()); + auto rowType = asRowType(rowVector->type()); + auto serializer = + serde_->createSerializer(rowType, numRows, arena.get(), serdeOptions); + + serializer->append(rowVector, folly::Range(rows.data(), numRows)); + vector_size_t size = serializer->maxSerializedSize(); + facebook::velox::serializer::SingleOutputStreamListener listener; + OStreamOutputStream out(output, &listener); + serializer->flush(&out); + ASSERT_EQ(size, out.tellp()); + } + + std::unique_ptr toByteStream(const std::string& input) { + auto byteStream = std::make_unique(); + ByteRange byteRange{ + reinterpret_cast(const_cast(input.data())), + (int32_t)input.length(), + 0}; + byteStream->resetInput({byteRange}); + return byteStream; + } + + RowVectorPtr deserialize( + std::shared_ptr rowType, + const std::string& input, + const VectorSerde::Options* serdeOptions) { + auto byteStream = toByteStream(input); + RowVectorPtr result; + serde_->deserialize( + byteStream.get(), pool_.get(), rowType, &result, serdeOptions); + return result; + } + + RowVectorPtr makeTestVector(vector_size_t size) { + auto a = vectorMaker_->flatVector( + size, [](vector_size_t row) { return row; }); + auto b = vectorMaker_->flatVector( + size, [](vector_size_t row) { return row * 0.1; }); + + std::vector childVectors = {a, b}; + + return vectorMaker_->rowVector(childVectors); + } + + // just test rowvector with flat vector + void testRoundTripRowVector( + RowVectorPtr rowVector, + const VectorSerde::Options* serdeOptions = nullptr) { + std::ostringstream out; + serialize(rowVector, &out, serdeOptions); + + auto rowType = asRowType(rowVector->type()); + auto deserialized = deserialize(rowType, out.str(), serdeOptions); + assertEqualVectors(deserialized, rowVector); + } + + void testRoundTrip( + VectorPtr vector, + const VectorSerde::Options* serdeOptions = nullptr) { + auto rowVector = vectorMaker_->rowVector({vector}); + std::ostringstream out; + serialize(rowVector, &out, serdeOptions); + + auto rowType = asRowType(rowVector->type()); + auto deserialized = deserialize(rowType, out.str(), serdeOptions); + assertEqualVectors(deserialized, rowVector); + } + + std::shared_ptr pool_; + std::unique_ptr serde_; + std::unique_ptr vectorMaker_; +}; + +TEST_F(SingleSerializerTest, basic) { + vector_size_t numRows = 5; + auto rowVector = makeTestVector(numRows); + testRoundTripRowVector(rowVector); + + auto stringVector = makeRowVector({ + makeNullableFlatVector({std::nullopt, 1, 2, 3, 4}), + makeFlatVector({ + "ALGERIA", + "ARGENTINA", + "BRAZIL", + "CANADA", + "EGYPT", + }), + }); + testRoundTripRowVector(stringVector); +} + +TEST_F(SingleSerializerTest, constant) { + auto constVector = makeRowVector({ + makeConstant(100, 5), + makeConstant("ALGERIA", 5), + }); + testRoundTripRowVector(constVector); + + auto rowVector = makeRowVector({ + makeFlatVector({0, 1, 2, 3, 4}), + makeConstant(100, 5), + makeConstant( + UnscaledLongDecimal(10012), 5, DECIMAL(20, 2)), + makeConstant( + UnscaledShortDecimal(10012), 5, DECIMAL(10, 2)), + makeFlatVector({0, 1, 2, 3, 4}), + makeNullConstant(TypeKind::INTEGER, 5), + makeConstant("ALGERIA", 5), + }); + testRoundTripRowVector(rowVector); + + auto timeVector = makeRowVector({ + makeConstant(100, 5), + // If with Timestamp(100, 1), will throw exception + // expected {100, 1970-01-01T00:01:40.000000000, 1970-01-16}, but got + // {100, 1970-01-01T00:01:40.000000001, 1970-01-16} + makeConstant(Timestamp(100, 0), 5), + makeConstant(Date(15), 5), + }); + testRoundTripRowVector(timeVector); +} + +TEST_F(SingleSerializerTest, emptyPage) { + auto rowVector = vectorMaker_->rowVector(ROW({"a"}, {BIGINT()}), 0); + + std::ostringstream out; + serialize(rowVector, &out, nullptr); + + auto rowType = asRowType(rowVector->type()); + auto deserialized = deserialize(rowType, out.str(), nullptr); + assertEqualVectors(deserialized, rowVector); +} + +TEST_F(SingleSerializerTest, unscaledLongDecimal) { + std::vector decimalValues(102); + decimalValues[0] = UnscaledLongDecimal::min().unscaledValue(); + for (int row = 1; row < 101; row++) { + decimalValues[row] = row - 50; + } + decimalValues[101] = UnscaledLongDecimal::max().unscaledValue(); + auto vector = + vectorMaker_->longDecimalFlatVector(decimalValues, DECIMAL(20, 5)); + + testRoundTrip(vector); + + // Add some nulls. + for (auto i = 0; i < 102; i += 7) { + vector->setNull(i, true); + } + testRoundTrip(vector); +} diff --git a/velox/substrait/TypeUtils.h b/velox/substrait/TypeUtils.h index e3add3a6a283..80df64205ccc 100644 --- a/velox/substrait/TypeUtils.h +++ b/velox/substrait/TypeUtils.h @@ -95,6 +95,11 @@ struct RangeTraits { using NativeType = UnscaledShortDecimal; }; +template <> +struct RangeTraits { + using NativeType = UnscaledLongDecimal; +}; + #endif /* RANGETRAITS_H */ } // namespace facebook::velox::substrait diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 80f813408554..7beddef6ea95 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -39,6 +39,19 @@ class VectorSerializer { const RowVectorPtr& vector, const folly::Range& ranges) = 0; + // Usage + // append(vector, ranges); + // vector_size_t size = serializedSize(); + // OutputStream* stream = allocateBuffer(size); + // flush(); + // + // So we can allocate memory for flush OutputStream size + // The return value is accurate without compress, + // Return the maximum required size with different compress codec + virtual vector_size_t maxSerializedSize() { + VELOX_NYI("{} unsupported", __FUNCTION__); + }; + // Writes the contents to 'stream' in wire format virtual void flush(OutputStream* stream) = 0; };