From 6e81fbe50b1f889626bb0dbc8385658b7c5f6eab Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Tue, 2 Jul 2024 10:45:41 +0300 Subject: [PATCH] correct extract column methods (#6133) --- ydb/core/formats/arrow/arrow_helpers.cpp | 141 +----------------- ydb/core/formats/arrow/arrow_helpers.h | 28 +--- ydb/core/formats/arrow/common/adapter.h | 6 + ydb/core/formats/arrow/permutations.cpp | 5 +- ydb/core/formats/arrow/process_columns.cpp | 141 ++++++++++++++++++ ydb/core/formats/arrow/process_columns.h | 48 ++++++ ydb/core/formats/arrow/program.cpp | 2 +- ydb/core/formats/arrow/special_keys.cpp | 13 +- ydb/core/formats/arrow/ya.make | 1 + ydb/core/io_formats/arrow/csv_arrow.cpp | 4 +- .../engines/changes/indexation.cpp | 4 +- .../engines/portions/read_with_blobs.cpp | 4 +- .../reader/plain_reader/iterator/merge.cpp | 6 +- .../reader/sys_view/abstract/iterator.h | 4 +- .../scheme/versions/abstract_scheme.cpp | 2 +- .../operations/batch_builder/restore.cpp | 2 +- .../operations/slice_builder/builder.cpp | 10 +- .../ut_rw/ut_columnshard_read_write.cpp | 22 +-- ydb/core/tx/program/program.h | 2 +- .../tx/tx_proxy/upload_rows_common_impl.h | 2 +- ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 20 +-- 21 files changed, 246 insertions(+), 221 deletions(-) create mode 100644 ydb/core/formats/arrow/process_columns.cpp create mode 100644 ydb/core/formats/arrow/process_columns.h diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 6445c8b4e56f..ed690f314c98 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -152,141 +152,6 @@ std::shared_ptr MakeEmptyBatch(const std::shared_ptr -std::shared_ptr ExtractColumnsImpl(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - std::vector> fields; - fields.reserve(columnNames.size()); - std::vector::TColumn>> columns; - columns.reserve(columnNames.size()); - - auto srcSchema = srcBatch->schema(); - for (auto& name : columnNames) { - int pos = srcSchema->GetFieldIndex(name); - if (pos < 0) { - return {}; - } - fields.push_back(srcSchema->field(pos)); - columns.push_back(srcBatch->column(pos)); - } - - return NAdapter::TDataBuilderPolicy::Build(std::move(fields), std::move(columns), srcBatch->num_rows()); -} -} - -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsImpl(srcBatch, columnNames); -} - -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsImpl(srcBatch, columnNames); -} - -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsImpl(srcBatch, columnNames); -} - -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsImpl(srcBatch, columnNames); -} - -namespace { -template -std::shared_ptr ExtractColumnsValidateImpl(const std::shared_ptr& srcBatch, - const std::vector& columnNames, const bool necessaryColumns) { - if (!srcBatch) { - return srcBatch; - } - if (columnNames.empty()) { - return nullptr; - } - std::vector> fields; - fields.reserve(columnNames.size()); - std::vector::TColumn>> columns; - columns.reserve(columnNames.size()); - - auto srcSchema = srcBatch->schema(); - for (auto& name : columnNames) { - const int pos = srcSchema->GetFieldIndex(name); - if (necessaryColumns) { - AFL_VERIFY(pos >= 0)("field_name", name)("names", JoinSeq(",", columnNames))("fields", JoinSeq(",", srcBatch->schema()->field_names())); - } else if (pos == -1) { - continue; - } - fields.push_back(srcSchema->field(pos)); - columns.push_back(srcBatch->column(pos)); - } - - return NAdapter::TDataBuilderPolicy::Build(std::move(fields), std::move(columns), srcBatch->num_rows()); -} -} - -std::shared_ptr ExtractColumnsValidate(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsValidateImpl(srcBatch, columnNames, true); -} - -std::shared_ptr ExtractColumnsValidate(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsValidateImpl(srcBatch, columnNames, true); -} - -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsValidateImpl(srcBatch, columnNames, false); -} - -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsValidateImpl(srcBatch, columnNames, false); -} - -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsValidateImpl(srcBatch, columnNames, false); -} - -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames) { - return ExtractColumnsValidateImpl(srcBatch, columnNames, false); -} - -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::shared_ptr& dstSchema) { - Y_ABORT_UNLESS(srcBatch); - Y_ABORT_UNLESS(dstSchema); - std::vector> columns; - columns.reserve(dstSchema->num_fields()); - - for (auto& field : dstSchema->fields()) { - const int index = srcBatch->schema()->GetFieldIndex(field->name()); - if (index == -1) { - AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name()) - ("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names())); - return nullptr; - } else { - columns.push_back(srcBatch->column(index)); - auto srcField = srcBatch->schema()->field(index); - if (!field->Equals(srcField)) { - AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name()) - ("column_type", field->ToString(true))("incoming_type", srcField->ToString(true)); - return nullptr; - } - } - - AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name()) - ("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString()); - } - - return arrow::RecordBatch::Make(dstSchema, srcBatch->num_rows(), columns); -} - std::shared_ptr CombineBatches(const std::vector>& batches) { if (batches.empty()) { return nullptr; @@ -427,7 +292,7 @@ void DedupSortedBatch(const std::shared_ptr& batch, Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, sortingKey)); - auto keyBatch = ExtractColumns(batch, sortingKey); + auto keyBatch = TColumnOperator().Adapt(batch, sortingKey).DetachResult(); auto& keyColumns = keyBatch->columns(); bool same = false; @@ -487,7 +352,7 @@ static bool IsSelfSorted(const std::shared_ptr& batch) { bool IsSorted(const std::shared_ptr& batch, const std::shared_ptr& sortingKey, bool desc) { - auto keyBatch = ExtractColumns(batch, sortingKey); + auto keyBatch = TColumnOperator().Adapt(batch, sortingKey).DetachResult(); if (desc) { return IsSelfSorted(keyBatch); } else { @@ -497,7 +362,7 @@ bool IsSorted(const std::shared_ptr& batch, bool IsSortedAndUnique(const std::shared_ptr& batch, const std::shared_ptr& sortingKey, bool desc) { - auto keyBatch = ExtractColumns(batch, sortingKey); + auto keyBatch = TColumnOperator().Adapt(batch, sortingKey).DetachResult(); if (desc) { return IsSelfSorted(keyBatch); } else { diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index c6503ccb5a7d..f6f4fd0c18a0 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -1,5 +1,6 @@ #pragma once #include "switch_type.h" +#include "process_columns.h" #include #include #include @@ -56,33 +57,6 @@ std::shared_ptr DeserializeBatch(const TString& blob, std::shared_ptr MakeEmptyBatch(const std::shared_ptr& schema, const ui32 rowsCount = 0); std::shared_ptr ToTable(const std::shared_ptr& batch); -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumnsValidate(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumnsValidate(const std::shared_ptr& srcBatch, - const std::vector& columnNames); - -std::vector ConvertStrings(const std::vector& input); -std::vector ConvertStrings(const std::vector& input); - -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumnsOptional(const std::shared_ptr& srcBatch, - const std::vector& columnNames); -std::shared_ptr ExtractColumns(const std::shared_ptr& srcBatch, - const std::shared_ptr& dstSchema); - std::shared_ptr ToBatch(const std::shared_ptr& combinedTable, const bool combine); std::shared_ptr CombineBatches(const std::vector>& batches); std::shared_ptr MergeColumns(const std::vector>& rb); diff --git a/ydb/core/formats/arrow/common/adapter.h b/ydb/core/formats/arrow/common/adapter.h index f3552019a20d..543e78511146 100644 --- a/ydb/core/formats/arrow/common/adapter.h +++ b/ydb/core/formats/arrow/common/adapter.h @@ -34,6 +34,9 @@ class TDataBuilderPolicy { [[nodiscard]] static std::shared_ptr Build(std::vector>&& fields, std::vector>&& columns, const ui32 count) { return arrow::RecordBatch::Make(std::make_shared(std::move(fields)), count, std::move(columns)); } + [[nodiscard]] static std::shared_ptr Build(const std::shared_ptr& schema, std::vector>&& columns, const ui32 count) { + return arrow::RecordBatch::Make(schema, count, std::move(columns)); + } [[nodiscard]] static std::shared_ptr ApplyArrowFilter(const std::shared_ptr& batch, const std::shared_ptr& filter) { auto res = arrow::compute::Filter(batch, filter); Y_VERIFY_S(res.ok(), res.status().message()); @@ -54,6 +57,9 @@ class TDataBuilderPolicy { [[nodiscard]] static std::shared_ptr Build(std::vector>&& fields, std::vector>&& columns, const ui32 count) { return arrow::Table::Make(std::make_shared(std::move(fields)), std::move(columns), count); } + [[nodiscard]] static std::shared_ptr Build(const std::shared_ptr& schema, std::vector>&& columns, const ui32 count) { + return arrow::Table::Make(schema, std::move(columns), count); + } [[nodiscard]] static std::shared_ptr AddColumn(const std::shared_ptr& batch, const std::shared_ptr& field, const std::shared_ptr& extCol) { return TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), field, std::make_shared(extCol))); } diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index c63965a91625..623bc15c9221 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -38,10 +38,7 @@ std::shared_ptr MakePermutation(const int size, const bool r } std::shared_ptr MakeSortPermutation(const std::shared_ptr& batch, const std::shared_ptr& sortingKey, const bool andUnique) { - auto keyBatch = ExtractColumns(batch, sortingKey); - AFL_VERIFY(batch); - AFL_VERIFY(sortingKey); - AFL_VERIFY(!!keyBatch)("problem", "cannot_find_columns")("schema", batch->schema()->ToString())("columns", sortingKey->ToString()); + auto keyBatch = TColumnOperator().VerifyIfAbsent().Adapt(batch, sortingKey).DetachResult(); auto keyColumns = std::make_shared(keyBatch->columns()); std::vector points; points.reserve(keyBatch->num_rows()); diff --git a/ydb/core/formats/arrow/process_columns.cpp b/ydb/core/formats/arrow/process_columns.cpp new file mode 100644 index 000000000000..d8795e188055 --- /dev/null +++ b/ydb/core/formats/arrow/process_columns.cpp @@ -0,0 +1,141 @@ +#include "process_columns.h" +#include "common/adapter.h" + +#include + +namespace NKikimr::NArrow { + +namespace { +template +std::shared_ptr ExtractColumnsValidateImpl(const std::shared_ptr& srcBatch, + const std::vector& columnNames) { + std::vector> fields; + fields.reserve(columnNames.size()); + std::vector::TColumn>> columns; + columns.reserve(columnNames.size()); + + auto srcSchema = srcBatch->schema(); + for (auto& name : columnNames) { + const int pos = srcSchema->GetFieldIndex(name); + if (Y_LIKELY(pos > -1)) { + fields.push_back(srcSchema->field(pos)); + columns.push_back(srcBatch->column(pos)); + } + } + + return NAdapter::TDataBuilderPolicy::Build(std::move(fields), std::move(columns), srcBatch->num_rows()); +} + +template +TConclusion> AdaptColumnsImpl(const std::shared_ptr& srcBatch, + const std::shared_ptr& dstSchema) { + AFL_VERIFY(srcBatch); + AFL_VERIFY(dstSchema); + std::vector::TColumn>> columns; + columns.reserve(dstSchema->num_fields()); + + for (auto& field : dstSchema->fields()) { + const int index = srcBatch->schema()->GetFieldIndex(field->name()); + if (index > -1) { + columns.push_back(srcBatch->column(index)); + auto srcField = srcBatch->schema()->field(index); + if (field->Equals(srcField)) { + AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name()) + ("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString()); + } else { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name()) + ("column_type", field->ToString(true))("incoming_type", srcField->ToString(true)); + return TConclusionStatus::Fail("incompatible column types"); + } + } else { + AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name()) + ("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names())); + return TConclusionStatus::Fail("not found column '" + field->name() + "'"); + } + } + + return NAdapter::TDataBuilderPolicy::Build(dstSchema, std::move(columns), srcBatch->num_rows()); +} + +template +std::shared_ptr ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy, + const std::shared_ptr& incoming, const std::vector& columnNames) { + AFL_VERIFY(incoming); + AFL_VERIFY(columnNames.size()); + auto result = ExtractColumnsValidateImpl(incoming, columnNames); + switch (policy) { + case TColumnOperator::EExtractProblemsPolicy::Verify: + AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())("required", JoinSeq(",", columnNames)); + break; + case TColumnOperator::EExtractProblemsPolicy::Null: + if ((ui32)result->num_columns() != columnNames.size()) { + return nullptr; + } + break; + case TColumnOperator::EExtractProblemsPolicy::Skip: + break; + } + return result; +} + +template +TConclusion> ReorderImpl(const std::shared_ptr& incoming, const std::vector& columnNames) { + AFL_VERIFY(!!incoming); + AFL_VERIFY(columnNames.size()); + if ((ui32)incoming->num_columns() < columnNames.size()) { + return TConclusionStatus::Fail("not enough columns for exact reordering"); + } + if ((ui32)incoming->num_columns() > columnNames.size()) { + return TConclusionStatus::Fail("need extraction before reorder call"); + } + auto result = ExtractColumnsValidateImpl(incoming, columnNames); + AFL_VERIFY(result); + if ((ui32)result->num_columns() != columnNames.size()) { + return TConclusionStatus::Fail("not enough fields for exact reordering"); + } + return result; +} + +} + +std::shared_ptr TColumnOperator::Extract(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); +} + +std::shared_ptr TColumnOperator::Extract(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); +} + +std::shared_ptr TColumnOperator::Extract(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); +} + +std::shared_ptr TColumnOperator::Extract(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); +} + +NKikimr::TConclusion> TColumnOperator::Adapt(const std::shared_ptr& incoming, const std::shared_ptr& dstSchema) { + return AdaptColumnsImpl(incoming, dstSchema); +} + +NKikimr::TConclusion> TColumnOperator::Adapt(const std::shared_ptr& incoming, const std::shared_ptr& dstSchema) { + return AdaptColumnsImpl(incoming, dstSchema); +} + +NKikimr::TConclusion> TColumnOperator::Reorder(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ReorderImpl(incoming, columnNames); +} + +NKikimr::TConclusion> TColumnOperator::Reorder(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ReorderImpl(incoming, columnNames); +} + +NKikimr::TConclusion> TColumnOperator::Reorder(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ReorderImpl(incoming, columnNames); +} + +NKikimr::TConclusion> TColumnOperator::Reorder(const std::shared_ptr& incoming, const std::vector& columnNames) { + return ReorderImpl(incoming, columnNames); +} + +} \ No newline at end of file diff --git a/ydb/core/formats/arrow/process_columns.h b/ydb/core/formats/arrow/process_columns.h new file mode 100644 index 000000000000..d07b106231d2 --- /dev/null +++ b/ydb/core/formats/arrow/process_columns.h @@ -0,0 +1,48 @@ +#pragma once +#include + +#include + +namespace NKikimr::NArrow { + +class TColumnOperator { +public: + enum class EExtractProblemsPolicy { + Null, + Verify, + Skip + }; +private: + EExtractProblemsPolicy AbsentColumnPolicy = EExtractProblemsPolicy::Verify; + +public: + TColumnOperator& NullIfAbsent() { + AbsentColumnPolicy = EExtractProblemsPolicy::Null; + return *this; + } + + TColumnOperator& VerifyIfAbsent() { + AbsentColumnPolicy = EExtractProblemsPolicy::Verify; + return *this; + } + + TColumnOperator& SkipIfAbsent() { + AbsentColumnPolicy = EExtractProblemsPolicy::Skip; + return *this; + } + + std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); + std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); + std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); + std::shared_ptr Extract(const std::shared_ptr& incoming, const std::vector& columnNames); + + TConclusion> Adapt(const std::shared_ptr& incoming, const std::shared_ptr& dstSchema); + TConclusion> Adapt(const std::shared_ptr& incoming, const std::shared_ptr& dstSchema); + + TConclusion> Reorder(const std::shared_ptr& incoming, const std::vector& columnNames); + TConclusion> Reorder(const std::shared_ptr& incoming, const std::vector& columnNames); + TConclusion> Reorder(const std::shared_ptr& incoming, const std::vector& columnNames); + TConclusion> Reorder(const std::shared_ptr& incoming, const std::vector& columnNames); +}; + +} \ No newline at end of file diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index 8ccea11e1f76..1203f1802098 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -862,7 +862,7 @@ arrow::Status TProgramStep::ApplyProjection(std::shared_ptr& return arrow::Status::Invalid("Wrong projection column '" + column.GetColumnName() + "'."); } } - batch = NArrow::ExtractColumns(batch, std::make_shared(std::move(fields))); + batch = NArrow::TColumnOperator().Adapt(batch, std::make_shared(std::move(fields))).DetachResult(); return arrow::Status::OK(); } diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index 3209fcff13a9..0b97fb3f25ed 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -48,7 +48,7 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(const std::shared_ptrnum_rows()); std::shared_ptr keyBatch = batch; if (columnNames.size()) { - keyBatch = NArrow::ExtractColumns(batch, columnNames); + keyBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(batch, columnNames); } std::vector indexes = {0}; if (batch->num_rows() > 1) { @@ -90,7 +90,7 @@ TMinMaxSpecialKeys::TMinMaxSpecialKeys(std::shared_ptr batch columnNamesString.emplace_back(i); } - auto dataBatch = NArrow::ExtractColumns(batch, columnNamesString); + auto dataBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(batch, columnNamesString); Data = NArrow::CopyRecords(dataBatch, indexes); Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2); } @@ -102,8 +102,7 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data) } std::shared_ptr TFirstLastSpecialKeys::BuildAccordingToSchemaVerified(const std::shared_ptr& schema) const { - auto newData = NArrow::ExtractColumns(Data, schema); - AFL_VERIFY(newData); + auto newData = NArrow::TColumnOperator().Adapt(Data, schema).DetachResult(); return std::make_shared(newData); } @@ -115,10 +114,8 @@ TMinMaxSpecialKeys::TMinMaxSpecialKeys(const TString& data) } std::shared_ptr TMinMaxSpecialKeys::BuildAccordingToSchemaVerified(const std::shared_ptr& schema) const { - auto newData = NArrow::ExtractColumns(Data, schema); - AFL_VERIFY(newData); - std::shared_ptr result(new TMinMaxSpecialKeys(newData)); - return result; + auto newData = NArrow::TColumnOperator().Adapt(Data, schema).DetachResult(); + return std::shared_ptr(new TMinMaxSpecialKeys(newData)); } } diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index d55ccb5799e5..d4bf1f8529c2 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -52,6 +52,7 @@ SRCS( ssa_program_optimizer.cpp special_keys.cpp simple_arrays_cache.cpp + process_columns.cpp ) END() diff --git a/ydb/core/io_formats/arrow/csv_arrow.cpp b/ydb/core/io_formats/arrow/csv_arrow.cpp index 36113047b92f..d73f5edaaeff 100644 --- a/ydb/core/io_formats/arrow/csv_arrow.cpp +++ b/ydb/core/io_formats/arrow/csv_arrow.cpp @@ -227,8 +227,8 @@ std::shared_ptr TArrowCSV::ReadNext(const TString& csv, TStr return {}; } - if (batch && !ResultColumns.empty()) { - batch = NArrow::ExtractColumns(batch, ResultColumns); + if (batch && ResultColumns.size()) { + batch = NArrow::TColumnOperator().NullIfAbsent().Extract(batch, ResultColumns); if (!batch) { errString = ErrorPrefix() + "not all result columns present"; } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 434fea56cfed..e34fa411a347 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -226,9 +226,7 @@ std::shared_ptr TInsertColumnEngineChanges::AddSpecials(cons const TIndexInfo& indexInfo, const TInsertedData& inserted) const { auto batch = IIndexInfo::AddSnapshotColumns(srcBatch, inserted.GetSnapshot()); batch = IIndexInfo::AddDeleteFlagsColumn(batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete); - auto result = NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials()); - AFL_VERIFY(result); - return result; + return NArrow::TColumnOperator().Adapt(batch, indexInfo.ArrowSchemaWithSpecials()).DetachResult(); } NColumnShard::ECumulativeCounters TInsertColumnEngineChanges::GetCounterIndex(const bool isSuccess) const { diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index 3c7e700e7b4d..ca5a1505d620 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -31,9 +31,7 @@ std::shared_ptr TReadPortionInfoWithBlobs::GetBatch(const IS for (auto&& i : columnNames) { columnNamesString.emplace_back(i.data(), i.size()); } - auto result = NArrow::ExtractColumns(*CachedBatch, columnNamesString); - Y_ABORT_UNLESS(result); - return result; + return NArrow::TColumnOperator().VerifyIfAbsent().Extract(*CachedBatch, columnNamesString); } else { auto filteredSchema = std::make_shared(data, columnNames); THashMap blobs; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index cffd348c7996..0b7bc55dffec 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -30,8 +30,7 @@ void TBaseMergeTask::PrepareResultBatch() { return; } { - ResultBatch = NArrow::ExtractColumns(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); - AFL_VERIFY(ResultBatch); + ResultBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); AFL_VERIFY((ui32)ResultBatch->num_columns() == Context->GetProgramInputColumns()->GetColumnNamesVector().size()); NArrow::TStatusValidator::Validate(Context->GetReadMetadata()->GetProgram().ApplyProgram(ResultBatch)); } @@ -74,8 +73,7 @@ bool TStartMergeTask::DoExecute() { if (container && container->num_rows()) { ResultBatch = container->BuildTable(); LastPK = Sources.begin()->second->GetLastPK(); - ResultBatch = NArrow::ExtractColumnsValidate(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); - AFL_VERIFY(ResultBatch)("info", Context->GetProgramInputColumns()->GetSchema()->ToString()); + ResultBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(ResultBatch, Context->GetProgramInputColumns()->GetColumnNamesVector()); Context->GetCommonContext()->GetCounters().OnNoScanInterval(ResultBatch->num_rows()); if (Context->GetCommonContext()->IsReverse()) { ResultBatch = NArrow::ReverseRecords(ResultBatch); diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h index aea0425815bf..08f4d78e0c08 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.h @@ -40,7 +40,7 @@ class TStatsIteratorBase: public TScanIteratorBase { if (originalBatch->num_rows() == 0) { continue; } - auto keyBatch = NArrow::ExtractColumns(originalBatch, KeySchema); + auto keyBatch = NArrow::TColumnOperator().VerifyIfAbsent().Adapt(originalBatch, KeySchema).DetachResult(); auto lastKey = keyBatch->Slice(keyBatch->num_rows() - 1, 1); { @@ -49,7 +49,7 @@ class TStatsIteratorBase: public TScanIteratorBase { } // Leave only requested columns - auto resultBatch = NArrow::ExtractColumns(originalBatch, ResultSchema); + auto resultBatch = NArrow::TColumnOperator().Adapt(originalBatch, ResultSchema).DetachResult(); NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch)); if (resultBatch->num_rows() == 0) { continue; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index 786a76a7f822..300952cc3227 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -76,7 +76,7 @@ TConclusion> ISnapshotSchema::PrepareForModi const std::shared_ptr dstSchema = GetIndexInfo().ArrowSchema(); - auto batch = NArrow::ExtractColumnsOptional(incomingBatch, dstSchema->field_names()); + auto batch = NArrow::TColumnOperator().SkipIfAbsent().Extract(incomingBatch, dstSchema->field_names()); for (auto&& i : batch->schema()->fields()) { AFL_VERIFY(GetIndexInfo().HasColumnName(i->name())); diff --git a/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp b/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp index f2794d36ac0b..6ad24b426dc0 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp @@ -9,7 +9,7 @@ std::unique_ptr TModificationRestoreTa auto request = std::make_unique(LocalPathId); request->ReadToSnapshot = Snapshot; request->RangesFilter = std::make_shared(false); - auto pkData = NArrow::ExtractColumns(IncomingData, ActualSchema->GetPKColumnNames()); + auto pkData = NArrow::TColumnOperator().VerifyIfAbsent().Extract(IncomingData, ActualSchema->GetPKColumnNames()); for (ui32 i = 0; i < pkData->num_rows(); ++i) { auto batch = pkData->Slice(i, 1); auto pFrom = std::make_shared(NKernels::EOperation::GreaterEqual, batch); diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index 8d2d9f2baedd..ba8c51c63b2e 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -43,11 +43,13 @@ bool TBuildSlicesTask::DoExecute() { return true; } const auto& indexSchema = ActualSchema->GetIndexInfo().ArrowSchema(); - OriginalBatch = NArrow::ExtractColumnsOptional(OriginalBatch, indexSchema->field_names()); - if (!OriginalBatch) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString()); - ReplyError("cannot adapt schema"); + auto reorderConclusion = NArrow::TColumnOperator().Reorder(OriginalBatch, indexSchema->field_names()); + if (reorderConclusion.IsFail()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString())("problem", reorderConclusion.GetErrorMessage()); + ReplyError("cannot reorder schema: " + reorderConclusion.GetErrorMessage()); return true; + } else { + OriginalBatch = reorderConclusion.DetachResult(); } if (!OriginalBatch->schema()->Equals(indexSchema)) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unequal schemas")("batch", OriginalBatch->schema()->ToString()) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index e1fa8bb1eb03..700ed7157f15 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -603,7 +603,7 @@ void TestWriteReadLongTxDup() { UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(rb); UNIT_ASSERT(rb->num_rows()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(CheckOrdered(rb)); UNIT_ASSERT(DataHas({rb}, portion, true)); @@ -712,7 +712,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(reader.IsCorrectlyFinished()); @@ -727,7 +727,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString reader.SetReplyColumnIds({1}); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); - NArrow::ExtractColumnsValidate(rb, {"timestamp"}); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector({ "timestamp" }))); UNIT_ASSERT((ui32)rb->num_columns() == 1); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(reader.IsCorrectlyFinished()); @@ -742,7 +742,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString reader.SetReplyColumns({"timestamp", "message"}); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); - NArrow::ExtractColumnsValidate(rb, {"timestamp", "message"}); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector({ "timestamp", "message" }))); UNIT_ASSERT((ui32)rb->num_columns() == 2); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(reader.IsCorrectlyFinished()); @@ -791,7 +791,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); @@ -808,7 +808,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); @@ -838,7 +838,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); @@ -863,7 +863,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); @@ -910,7 +910,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); @@ -927,7 +927,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); - NArrow::ExtractColumnsValidate(rb, TTestSchema::ExtractNames(ydbSchema)); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); @@ -1292,7 +1292,7 @@ void TestReadWithProgram(const TestTableDescription& table = {}) case 1: UNIT_ASSERT(rb); UNIT_ASSERT(rb->num_rows()); - NArrow::ExtractColumnsValidate(rb, {"level", "timestamp"}); + Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector({ "level", "timestamp" }))); UNIT_ASSERT(rb->num_columns() == 2); UNIT_ASSERT(DataHas({rb}, {0, 100}, true)); break; diff --git a/ydb/core/tx/program/program.h b/ydb/core/tx/program/program.h index baf10a9cdd2b..76fbe8702488 100644 --- a/ydb/core/tx/program/program.h +++ b/ydb/core/tx/program/program.h @@ -95,7 +95,7 @@ class TProgramContainer { if (Program) { return Program->ApplyTo(batch, NArrow::GetCustomExecContext()); } else if (OverrideProcessingColumnsVector) { - batch = NArrow::ExtractColumnsValidate(batch, *OverrideProcessingColumnsVector); + batch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(batch, *OverrideProcessingColumnsVector); } return arrow::Status::OK(); } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index b0b8798c816f..599f9984b8ab 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -786,7 +786,7 @@ class TUploadRowsBase : public TActorBootstrappedschema()->GetFieldIndex(columnName) < 0) { diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 813608c3ad1e..2dcd3ecf7d35 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -181,7 +181,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Less columns std::vector wrongColumns = columns; wrongColumns.resize(columns.size() - 1); - auto wrongBatch = NArrow::ExtractColumns(srcBatch, wrongColumns); + auto wrongBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(srcBatch, wrongColumns); strBatch = NArrow::SerializeBatchNoCompression(wrongBatch); auto res = client.BulkUpsert(tablePath, @@ -194,7 +194,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Reordered columns (it leads to wrong types) std::vector wrongColumns = columns; std::sort(wrongColumns.begin(), wrongColumns.end()); - auto wrongBatch = NArrow::ExtractColumns(srcBatch, wrongColumns); + auto wrongBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(srcBatch, wrongColumns); strBatch = NArrow::SerializeBatchNoCompression(wrongBatch); auto res = client.BulkUpsert(tablePath, @@ -627,7 +627,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Less columns std::vector wrongColumns = columns; wrongColumns.resize(columns.size() - 1); - csv = TTestOlap::ToCSV(NArrow::ExtractColumns(sampleBatch, wrongColumns)); + csv = TTestOlap::ToCSV(NArrow::TColumnOperator().VerifyIfAbsent().Extract(sampleBatch, wrongColumns)); auto res = client.BulkUpsert(tablePath, NYdb::NTable::EDataFormat::CSV, csv).GetValueSync(); @@ -639,7 +639,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Reordered columns (it leads to wrong types) std::vector wrongColumns = columns; std::sort(wrongColumns.begin(), wrongColumns.end()); - csv = TTestOlap::ToCSV(NArrow::ExtractColumns(sampleBatch, wrongColumns)); + csv = TTestOlap::ToCSV(NArrow::TColumnOperator().VerifyIfAbsent().Extract(sampleBatch, wrongColumns)); auto res = client.BulkUpsert(tablePath, NYdb::NTable::EDataFormat::CSV, csv).GetValueSync(); @@ -651,7 +651,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Reordered columns with header std::vector wrongColumns = columns; std::sort(wrongColumns.begin(), wrongColumns.end()); - csv = TTestOlap::ToCSV(NArrow::ExtractColumns(sampleBatch, wrongColumns), true); + csv = TTestOlap::ToCSV(NArrow::TColumnOperator().VerifyIfAbsent().Extract(sampleBatch, wrongColumns), true); NYdb::NTable::TBulkUpsertSettings upsertSettings; { @@ -790,7 +790,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Less columns std::vector wrongColumns = columns; wrongColumns.resize(columns.size() - 1); - auto wrongBatch = NArrow::ExtractColumns(srcBatch, wrongColumns); + auto wrongBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(srcBatch, wrongColumns); strBatch = NArrow::SerializeBatchNoCompression(wrongBatch); auto res = client.BulkUpsert(tablePath, @@ -803,7 +803,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Reordered columns (it leads to wrong types) std::vector wrongColumns = columns; std::sort(wrongColumns.begin(), wrongColumns.end()); - auto wrongBatch = NArrow::ExtractColumns(srcBatch, wrongColumns); + auto wrongBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(srcBatch, wrongColumns); strBatch = NArrow::SerializeBatchNoCompression(wrongBatch); auto res = client.BulkUpsert(tablePath, @@ -895,7 +895,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Less columns std::vector wrongColumns = columns; wrongColumns.resize(columns.size() - 1); - csv = TTestOlap::ToCSV(NArrow::ExtractColumns(sampleBatch, wrongColumns)); + csv = TTestOlap::ToCSV(NArrow::TColumnOperator().VerifyIfAbsent().Extract(sampleBatch, wrongColumns)); auto res = client.BulkUpsert(tablePath, NYdb::NTable::EDataFormat::CSV, csv).GetValueSync(); @@ -907,7 +907,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Reordered columns (it leads to wrong types) std::vector wrongColumns = columns; std::sort(wrongColumns.begin(), wrongColumns.end()); - csv = TTestOlap::ToCSV(NArrow::ExtractColumns(sampleBatch, wrongColumns)); + csv = TTestOlap::ToCSV(NArrow::TColumnOperator().VerifyIfAbsent().Extract(sampleBatch, wrongColumns)); auto res = client.BulkUpsert(tablePath, NYdb::NTable::EDataFormat::CSV, csv).GetValueSync(); @@ -919,7 +919,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { { // Reordered columns with header std::vector wrongColumns = columns; std::sort(wrongColumns.begin(), wrongColumns.end()); - csv = TTestOlap::ToCSV(NArrow::ExtractColumns(sampleBatch, wrongColumns), true); + csv = TTestOlap::ToCSV(NArrow::TColumnOperator().VerifyIfAbsent().Extract(sampleBatch, wrongColumns), true); NYdb::NTable::TBulkUpsertSettings upsertSettings; {