Skip to content

Commit

Permalink
Folder: row, serializers
Browse files Browse the repository at this point in the history
relative pr:

Serialize and deserialize RowVector oap-project#250
  • Loading branch information
zhejiangxiaomai committed Jul 11, 2023
1 parent 4de772f commit 500d279
Show file tree
Hide file tree
Showing 7 changed files with 1,146 additions and 8 deletions.
2 changes: 1 addition & 1 deletion velox/row/UnsafeRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ FOLLY_ALWAYS_INLINE size_t serializedSizeInBytes<TypeKind::VARBINARY>() {
/// Returns the number of bytes needed to serialized fixed-width type. Throws if
/// 'type' is ot fixed-width.
FOLLY_ALWAYS_INLINE size_t serializedSizeInBytes(const TypePtr& type) {
return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializedSizeInBytes, type->kind());
}

Expand Down
111 changes: 106 additions & 5 deletions velox/row/UnsafeRowDeserializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,11 @@ struct UnsafeRowDeserializer {
std::vector<VectorPtr> columnVectors(numFields);
for (size_t i = 0; i < numFields; ++i) {
columnVectors[i] = deserialize(
StructBatchIteratorPtr->nextColumnBatch(), rowType.childAt(i), pool);
StructBatchIteratorPtr->nextColumnBatch(),
rowType.childAt(i),
pool,
numFields,
i);
}

return std::make_shared<RowVector>(
Expand All @@ -704,6 +708,75 @@ struct UnsafeRowDeserializer {
* @return a FlatVector
*/
template <TypeKind Kind>
static VectorPtr createDecimalFlatVector(
const DataBatchIteratorPtr& dataIterator,
const TypePtr& type,
memory::MemoryPool* pool,
int32_t numFields,
int32_t fieldsIdx) {
auto iterator =
std::dynamic_pointer_cast<PrimitiveBatchIterator>(dataIterator);
size_t size = iterator->numRows();
auto vector = BaseVector::create(type, size, pool);
using TypeTraits = ScalarTraits<Kind>;
using InMemoryType = typename TypeTraits::InMemoryType;

size_t nullCount = 0;
auto* flatResult = vector->asFlatVector<InMemoryType>();

for (int32_t i = 0; i < size; ++i) {
if (iterator->isNull(i)) {
vector->setNull(i, true);
iterator->next();
nullCount++;
} else {
vector->setNull(i, false);

if constexpr (std::is_same_v<InMemoryType, int64_t>) {
int64_t val =
UnsafeRowPrimitiveBatchDeserializer::deserializeFixedWidth<
int64_t>(iterator->next().value());
TypeTraits::set(flatResult, i, int64_t(val));
} else if constexpr (std::is_same_v<InMemoryType, int128_t>) {
int64_t offsetAndSize;
auto memory_address = iterator->next().value().data();
memcpy(&offsetAndSize, memory_address, sizeof(int64_t));
int32_t length = int32_t(offsetAndSize);
int32_t wordoffset = int32_t(offsetAndSize >> 32);
uint8_t bytesValue[length];
int64_t nullBitsetWidthInBytes = ((numFields + 63) / 64) * 8;
int64_t fieldOffset = nullBitsetWidthInBytes + 8L * fieldsIdx;
memcpy(bytesValue, memory_address + wordoffset - fieldOffset, length);

uint8_t bytesValue2[32]{};
for (int k = length - 1; k >= 0; k--) {
bytesValue2[length - 1 - k] = bytesValue[k];
}
if (int8_t(bytesValue[0]) < 0) {
for (int k = length; k < 32; k++) {
bytesValue2[k] = 255;
}
}
int128_t val;
memcpy(&val, bytesValue2, sizeof(int128_t));
TypeTraits::set(flatResult, i, val);
}
}
}
vector->setNullCount(nullCount);
return vector;
}

/**
* Converts a list of UnsafeRowPrimitiveBatchIterators to a FlatVector
* @tparam Kind the element's type kind.
* @param dataIterator iterator that points to the dataIterator over the
* whole column batch of data.
* @param type The element type
* @param pool
* @return a FlatVector
*/
template <TypeKind Kind>
static VectorPtr createFlatVector(
const DataBatchIteratorPtr& dataIterator,
const TypePtr& type,
Expand Down Expand Up @@ -751,21 +824,29 @@ struct UnsafeRowDeserializer {
*/
static VectorPtr convertPrimitiveIteratorsToVectors(
const DataBatchIteratorPtr& dataIterator,
memory::MemoryPool* pool) {
memory::MemoryPool* pool,
int32_t numFields = 1,
int32_t fieldsIdx = 0) {
const TypePtr& type = dataIterator->type();
assert(type->isPrimitiveType());

if (type->kind() == TypeKind::HUGEINT) {
return createDecimalFlatVector<TypeKind::HUGEINT>(
dataIterator, type, pool, numFields, fieldsIdx);
}
return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
createFlatVector, type->kind(), dataIterator, type, pool);
}

static VectorPtr convertToVectors(
const DataBatchIteratorPtr& dataIterator,
memory::MemoryPool* pool) {
memory::MemoryPool* pool,
int32_t numFields = 1,
int32_t fieldsIdx = 0) {
const TypePtr& type = dataIterator->type();

if (type->isPrimitiveType()) {
return convertPrimitiveIteratorsToVectors(dataIterator, pool);
return convertPrimitiveIteratorsToVectors(
dataIterator, pool, numFields, fieldsIdx);
} else if (type->isRow()) {
return convertStructIteratorsToVectors(dataIterator, pool);
} else if (type->isArray()) {
Expand All @@ -777,6 +858,16 @@ struct UnsafeRowDeserializer {
}
}

static VectorPtr deserialize(
std::optional<std::string_view> data,
TypePtr type,
memory::MemoryPool* pool,
int32_t numFields,
int32_t fieldsIdx) {
std::vector<std::optional<std::string_view>> vectors{data};
return deserialize(vectors, type, pool, numFields, fieldsIdx);
}

/**
* Deserializes a complex element type to its Vector representation.
* @param data A string_view over a given element in the UnsafeRow.
Expand Down Expand Up @@ -808,6 +899,16 @@ struct UnsafeRowDeserializer {
memory::MemoryPool* pool) {
return convertToVectors(getBatchIteratorPtr(data, type), pool);
}

static VectorPtr deserialize(
const std::vector<std::optional<std::string_view>>& data,
const TypePtr& type,
memory::MemoryPool* pool,
int32_t numFields,
int32_t fieldsIdx) {
return convertToVectors(
getBatchIteratorPtr(data, type), pool, numFields, fieldsIdx);
}
};

} // namespace facebook::velox::row
2 changes: 1 addition & 1 deletion velox/serializers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(velox_presto_serializer PrestoSerializer.cpp
add_library(velox_presto_serializer PrestoSerializer.cpp SingleSerializer.cpp
UnsafeRowSerializer.cpp)

target_link_libraries(velox_presto_serializer velox_vector)
Expand Down
Loading

0 comments on commit 500d279

Please sign in to comment.