Skip to content

Commit

Permalink
Merge 6edcef0 into ec1d588
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 9, 2024
2 parents ec1d588 + 6edcef0 commit 821bbcf
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 41 deletions.
79 changes: 74 additions & 5 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,34 @@
namespace NKikimr::NArrow {

namespace {
template <class TDataContainer, class TStringImpl>

template <class T>
class TColumnNameAccessor {
public:
static const std::string& GetFieldName(const T& val) {
return val;
}
};

template <>
class TColumnNameAccessor<std::shared_ptr<arrow::Field>> {
public:
static const std::string& GetFieldName(const std::shared_ptr<arrow::Field>& val) {
return val->name();
}
};

template <class TDataContainer, class TStringContainer>
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringImpl>& columnNames) {
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringContainer>& columnNames) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(columnNames.size());
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
columns.reserve(columnNames.size());

auto srcSchema = srcBatch->schema();
for (auto& name : columnNames) {
const int pos = srcSchema->GetFieldIndex(name);
const int pos = srcSchema->GetFieldIndex(TColumnNameAccessor<TStringContainer>::GetFieldName(name));
if (Y_LIKELY(pos > -1)) {
fields.push_back(srcSchema->field(pos));
columns.push_back(srcBatch->column(pos));
Expand Down Expand Up @@ -70,9 +87,9 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), srcBatch->num_rows());
}

template <class TDataContainer, class TStringType>
template <class TDataContainer, class TStringContainer>
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy,
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringContainer>& columnNames) {
AFL_VERIFY(incoming);
AFL_VERIFY(columnNames.size());
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
Expand Down Expand Up @@ -123,6 +140,16 @@ std::shared_ptr<arrow::Table> TColumnOperator::Extract(
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::Table> TColumnOperator::Extract(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns) {
return ExtractImpl(AbsentColumnPolicy, incoming, columns);
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns) {
return ExtractImpl(AbsentColumnPolicy, incoming, columns);
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
Expand Down Expand Up @@ -171,5 +198,47 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
return ReorderImpl(incoming, columnNames);
}
namespace {
template <class TDataContainer, class TSchemaImpl>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")(
"source", srcBatch->schema()->ToString())("destination", dstSchema->ToString());
return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source");
}
std::set<ui32> fieldIdx;
auto itSrc = srcBatch->schema()->fields().begin();
auto itDst = dstSchema->fields().begin();
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) {
if ((*itSrc)->name() != (*itDst)->name()) {
++itDst;
} else {
fieldIdx.emplace(itDst - dstSchema->fields().begin());
if (!(*itDst)->Equals(*itSrc)) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
}

++itDst;
++itSrc;
}
}
if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())(
"destination", dstSchema->ToString());
return TConclusionStatus::Fail("incorrect columns order in source set");
}
return TSchemaSubset(fieldIdx, dstSchema->num_fields());
}
} // namespace

TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
return BuildSequentialSubsetImpl(incoming, dstSchema);
}

} // namespace NKikimr::NArrow
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ class TColumnOperator {
std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns);
std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns);
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<TSchemaSubset> BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/size_calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow:
if (context.GetFieldsForSpecialKeys().size()) {
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
}
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
NArrow::GetBatchDataSize(batch), specialKeys);
}

Expand Down
31 changes: 15 additions & 16 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,36 @@ namespace NKikimr::NColumnShard {

using namespace NTabletFlatExecutor;

void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie,
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
switch (overloadReason) {
case EOverloadStatus::Disk:
Counters.OnWriteOverloadDisk();
break;
case EOverloadStatus::InsertTable:
Counters.OnWriteOverloadInsertTable(writeData.GetSize());
Counters.OnWriteOverloadInsertTable(writeSize);
break;
case EOverloadStatus::OverloadMetadata:
Counters.OnWriteOverloadMetadata(writeData.GetSize());
Counters.OnWriteOverloadMetadata(writeSize);
break;
case EOverloadStatus::ShardTxInFly:
Counters.OnWriteOverloadShardTx(writeData.GetSize());
Counters.OnWriteOverloadShardTx(writeSize);
break;
case EOverloadStatus::ShardWritesInFly:
Counters.OnWriteOverloadShardWrites(writeData.GetSize());
Counters.OnWriteOverloadShardWrites(writeSize);
break;
case EOverloadStatus::ShardWritesSizeInFly:
Counters.OnWriteOverloadShardWritesSize(writeData.GetSize());
Counters.OnWriteOverloadShardWritesSize(writeSize);
break;
case EOverloadStatus::None:
Y_ABORT("invalid function usage");
}

AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeData.GetSize())(
"path_id", writeData.GetWriteMeta().GetTableId())("reason", overloadReason);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())(
"reason", overloadReason);

ctx.Send(writeData.GetWriteMeta().GetSource(), event.release(), 0, cookie);
ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie);
}

TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const {
Expand Down Expand Up @@ -240,7 +240,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
OverloadWriteFail(overloadStatus, writeData.GetWriteMeta(), writeData.GetSize(), cookie, std::move(result), ctx);
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::Overload);
} else {
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
Expand Down Expand Up @@ -538,10 +538,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

auto overloadStatus = CheckOverloaded(tableId);
if (overloadStatus != EOverloadStatus::None) {
NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData, nullptr, nullptr);
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData->GetSize(), cookie, std::move(result), ctx);
return;
}

Expand All @@ -554,11 +553,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

ui64 lockId = 0;
if (behaviour == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
const ui64 shift = (ui64)1 << 47;
lockId = shift + Counter.Inc();
lockId = BuildEphemeralTxId();
} else if (behaviour == EOperationBehaviour::InTxWrite) {
lockId = record.GetTxId();
} else {
lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId();
lockId = record.GetLockTxId();
}

OperationsManager->RegisterLock(lockId, Generation());
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TColumnShard
void OnTieringModified(const std::optional<ui64> pathId = {});

public:
ui64 BuildEphemeralTxId() {
static TAtomicCounter Counter = 0;
static constexpr ui64 shift = (ui64)1 << 47;
return shift | Counter.Inc();
}

enum class EOverloadStatus {
ShardTxInFly /* "shard_tx" */,
ShardWritesInFly /* "shard_writes" */,
Expand Down Expand Up @@ -320,7 +326,7 @@ class TColumnShard
}

private:
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;

protected:
Expand Down Expand Up @@ -534,6 +540,9 @@ class TColumnShard
public:
ui64 TabletTxCounter = 0;

bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
return LongTxWrites.contains(insertWriteId);
}
void EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId);
NOlap::TSnapshot GetLastTxSnapshot() const {
return NOlap::TSnapshot(LastPlannedStep, LastPlannedTxId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ TConclusionStatus TReadMetadata::Init(
if (LockId) {
for (auto&& i : CommittedBlobs) {
if (auto writeId = i.GetWriteIdOptional()) {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
if (owner->HasLongTxWrites(*writeId)) {
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi

const std::shared_ptr<NArrow::TSchemaLite> dstSchema = GetIndexInfo().ArrowSchema();

auto batch = NArrow::TColumnOperator().SkipIfAbsent().Extract(incomingBatch, dstSchema->field_names());
auto batch = NArrow::TColumnOperator().SkipIfAbsent().Extract(incomingBatch, dstSchema->fields());

for (auto&& i : batch->schema()->fields()) {
const ui32 columnId = GetIndexInfo().GetColumnIdVerified(i->name());
Expand Down Expand Up @@ -218,8 +218,9 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::BuildDefaultBa
const std::vector<std::shared_ptr<arrow::Field>>& fields, const ui32 rowsCount, const bool force) const {
std::vector<std::shared_ptr<arrow::Array>> columns;
for (auto&& i : fields) {
auto defaultValue = GetExternalDefaultValueVerified(i->name());
if (!defaultValue && !GetIndexInfo().IsNullableVerified(i->name())) {
const ui32 columnId = GetColumnIdVerified(i->name());
auto defaultValue = GetExternalDefaultValueVerified(columnId);
if (!defaultValue && !GetIndexInfo().IsNullableVerified(columnId)) {
if (force) {
defaultValue = NArrow::DefaultScalar(i->type());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*t
return TConclusionStatus::Fail("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage());
}
auto batch = preparedConclusion.DetachResult();
const std::vector<std::shared_ptr<arrow::Field>> defaultFields = ActualSchema->GetAbsentFields(batch->schema());
std::shared_ptr<IMerger> merger;
switch (WriteData.GetWriteMeta().GetModificationType()) {
case NEvWrite::EModificationType::Upsert: {
const std::vector<std::shared_ptr<arrow::Field>> defaultFields = ActualSchema->GetAbsentFields(batch->schema());
if (defaultFields.empty()) {
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildSlicesTask>(TabletId, ParentActorId, BufferActorId, std::move(WriteData), batch, ActualSchema);
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
return TConclusionStatus::Fail("no data in batch");
}
const auto& indexSchema = ActualSchema->GetIndexInfo().ArrowSchema();
NArrow::TSchemaSubset subset;
auto reorderConclusion = NArrow::TColumnOperator().Adapt(OriginalBatch, indexSchema, &subset);
if (reorderConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString())("problem", reorderConclusion.GetErrorMessage());
ReplyError("cannot reorder schema: " + reorderConclusion.GetErrorMessage(),
auto subsetConclusion = NArrow::TColumnOperator().BuildSequentialSubset(OriginalBatch, indexSchema);
if (subsetConclusion.IsFail()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema->ToString())(
"problem", subsetConclusion.GetErrorMessage());
ReplyError(
"unadaptable schema: " + subsetConclusion.GetErrorMessage(),
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
return TConclusionStatus::Fail("cannot reorder schema: " + reorderConclusion.GetErrorMessage());
} else {
OriginalBatch = reorderConclusion.DetachResult();
return TConclusionStatus::Fail("cannot reorder schema: " + subsetConclusion.GetErrorMessage());
}
NArrow::TSchemaSubset subset = subsetConclusion.DetachResult();

if (OriginalBatch->num_columns() != indexSchema->num_fields()) {
AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())(
"index", indexSchema->num_fields());
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {

public:
TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId)
: TBase()
, DatabaseName(databaseName)
: DatabaseName(databaseName)
, Path(path)
, DedupId(dedupId)
, LongTxId(longTxId)
Expand All @@ -41,8 +40,8 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
}
}

~TLongTxWriteBase() {
MemoryInFlight.Sub(InFlightSize);
virtual ~TLongTxWriteBase() {
AFL_VERIFY(MemoryInFlight.Sub(InFlightSize) >= 0);
}

protected:
Expand All @@ -66,6 +65,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
}

auto accessor = ExtractDataAccessor();
AFL_VERIFY(!InFlightSize);
InFlightSize = accessor->GetSize();
const i64 sizeInFlight = MemoryInFlight.Add(InFlightSize);
if (TLimits::MemoryInFlightWriting < (ui64)sizeInFlight && sizeInFlight != InFlightSize) {
Expand Down

0 comments on commit 821bbcf

Please sign in to comment.