Skip to content

Commit

Permalink
Merge 657ef37 into f11c0b2
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 10, 2024
2 parents f11c0b2 + 657ef37 commit 2cae56a
Show file tree
Hide file tree
Showing 17 changed files with 262 additions and 108 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/service [*/*]*
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
Expand Down
29 changes: 25 additions & 4 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const
if (pathId.OwnerId() != 0) {
auto table = txCtx.TableByIdMap.FindPtr(pathId);
if (!table) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`");
} else {
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
if (pathId.TableId() == pathId.TableId()) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`");
}
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
}

Expand All @@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT
invalidatedLock.GetPathId()));
}

NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) {
TStringBuilder message;
message << "Transaction locks invalidated.";

if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) {
message << " Tables: ";
bool first = true;
for (const auto& path : it->second.Pathes) {
if (!first) {
message << ", ";
first = false;
}
message << "`" << path << "`";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
} else {
message << " Unknown table.";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
}

std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
TKqpTransactionContext& txCtx)
{
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ struct TDeferredEffects {
friend class TKqpTransactionContext;
};

struct TTableInfo {
bool IsOlap = false;
THashSet<TString> Pathes;
};

using TShardIdToTableInfo = THashMap<ui64, TTableInfo>;
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down Expand Up @@ -285,6 +293,12 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
TTxAllocatorState::TPtr TxAlloc;

IKqpGateway::TKqpSnapshotHandle SnapshotHandle;

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};

struct TTxId {
Expand Down Expand Up @@ -433,6 +447,7 @@ class TTransactionsCache {
};

NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId);
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs();
kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink();
kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink();
kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx();
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
bool enableOltpSink = TableServiceConfig.GetEnableOltpSink();
bool enableHtapTx = TableServiceConfig.GetEnableHtapTx();
bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs();
auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode();

Expand Down Expand Up @@ -556,6 +557,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetEnableOltpSink() != enableOltpSink ||
TableServiceConfig.GetEnableHtapTx() != enableHtapTx ||
TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs ||
TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode ||
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
Expand Down
70 changes: 43 additions & 27 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings)
const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, EnableOlapSink(enableOlapSink)
, UseEvWrite(useEvWrite)
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
{
Target = creator;

Expand Down Expand Up @@ -208,42 +208,53 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
void Finalize() {
YQL_ENSURE(!AlreadyReplied);
if (LocksBroken) {
return ReplyErrorAndDie(
Ydb::StatusIds::ABORTED,
YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated. Unknown table."));
YQL_ENSURE(ResponseEv->BrokenLockShardId);
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

auto addLocks = [this](const auto& data) {
auto addLocks = [this](const ui64 taskId, const auto& data) {
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
for (auto& lock : info.GetLocks()) {
Locks.push_back(lock);

const auto& task = TasksGraph.GetTask(taskId);
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Pathes.insert(stageInfo.Meta.TablePath);
}
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
for (auto& lock : info.GetLocks()) {
Locks.push_back(lock);

const auto& task = TasksGraph.GetTask(taskId);
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Pathes.insert(stageInfo.Meta.TablePath);
}
}
};

for (auto& [_, data] : ExtraData) {
for (const auto& source : data.GetSourcesExtraData()) {
addLocks(source);
for (auto& [_, extraData] : ExtraData) {
for (const auto& source : extraData.Data.GetSourcesExtraData()) {
addLocks(extraData.TaskId, source);
}
for (const auto& transform : data.GetInputTransformsData()) {
addLocks(transform);
for (const auto& transform : extraData.Data.GetInputTransformsData()) {
addLocks(extraData.TaskId, transform);
}
for (const auto& sink : data.GetSinksExtraData()) {
addLocks(sink);
for (const auto& sink : extraData.Data.GetSinksExtraData()) {
addLocks(extraData.TaskId, sink);
}
if (data.HasComputeExtraData()) {
addLocks(data.GetComputeExtraData());
if (extraData.Data.HasComputeExtraData()) {
addLocks(extraData.TaskId, extraData.Data.GetComputeExtraData());
}
}

Expand Down Expand Up @@ -497,6 +508,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
YQL_ENSURE(shardState->State == TShardState::EState::Preparing);
Counters->TxProxyMon->TxResultAborted->Inc();
LocksBroken = true;
ResponseEv->BrokenLockShardId = shardId;

if (!res->Record.GetTxLocks().empty()) {
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
Expand Down Expand Up @@ -1194,6 +1206,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardState->State = TShardState::EState::Finished;
Counters->TxProxyMon->TxResultAborted->Inc();
LocksBroken = true;
ResponseEv->BrokenLockShardId = shardId;

if (!res->Record.GetTxLocks().empty()) {
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
Expand Down Expand Up @@ -1256,9 +1270,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
LocksBroken = true;
ResponseEv->BrokenLockShardId = shardId; // todo: without responseEv

if (!res->Record.GetTxLocks().empty()) {
ResponseEv->BrokenLockPathId = TKikimrPathId(
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
res->Record.GetTxLocks(0).GetPathId());
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
Expand Down Expand Up @@ -1895,9 +1910,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

const bool hasOlapSink = HasOlapSink(stage, tx.Body->GetTables());
if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))
|| (!EnableOlapSink && hasOlapSink)) {
if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
Expand Down Expand Up @@ -1966,6 +1979,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
NYql::NDqProto::TDqTask* protoTask = ArenaSerializeTaskToProto(TasksGraph, task, true);
datashardTasks[task.Meta.ShardId].emplace_back(protoTask);

auto& info = (*ShardIdToTableInfo)[task.Meta.ShardId];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Pathes.insert(stageInfo.Meta.TablePath);
} else if (stageInfo.Meta.IsSysView()) {
computeTasks.emplace_back(task.Id);
} else {
Expand Down Expand Up @@ -2408,9 +2425,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// cause interconnect overload and reduce throughput however,
// so we don't want to use a crossover value that is too high.
const size_t minArbiterMeshSize = 5; // TODO: make configurable?
if (VolatileTx &&
if ((VolatileTx &&
receivingShardsSet.size() >= minArbiterMeshSize &&
AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())
AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters()))
{
std::vector<ui64> candidates;
candidates.reserve(receivingShardsSet.size());
Expand Down Expand Up @@ -2791,10 +2808,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool EnableOlapSink = false;
bool UseEvWrite = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;

bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
Expand Down Expand Up @@ -2836,13 +2853,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
std::move(asyncIoFactory), creator, userRequestContext,
enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings);
useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
}

} // namespace NKqp
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/common/kqp_tx.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
Expand All @@ -27,6 +28,8 @@ struct TEvKqpExecuter {
NLWTrace::TOrbit Orbit;
IKqpGateway::TKqpSnapshot Snapshot;
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
std::optional<ui64> BrokenLockShardId;

ui64 ResultRowsCount = 0;
ui64 ResultRowsBytes = 0;

Expand Down Expand Up @@ -102,8 +105,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
Expand Down
Loading

0 comments on commit 2cae56a

Please sign in to comment.