Skip to content

Commit

Permalink
shard
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Sep 9, 2024
1 parent c2f0d40 commit 96f1725
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 13 deletions.
29 changes: 25 additions & 4 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const
if (pathId.OwnerId() != 0) {
auto table = txCtx.TableByIdMap.FindPtr(pathId);
if (!table) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`");
} else {
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
if (pathId.TableId() == pathId.TableId()) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`");
}
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
}

Expand All @@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT
invalidatedLock.GetPathId()));
}

NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) {
TStringBuilder message;
message << "Transaction locks invalidated.";

if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) {
message << " Tables: ";
bool first = true;
for (const auto& path : it->second.Pathes) {
if (!first) {
message << ", ";
first = false;
}
message << "`" << path << "`";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
} else {
message << " Unknown table.";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
}

std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
TKqpTransactionContext& txCtx)
{
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ struct TTxId {

struct TTableInfo {
bool IsOlap = false;
TString Path;
THashSet<TString> Pathes;
};

using TShardIdToTableInfo = THashMap<ui64, TTableInfo>;
Expand Down Expand Up @@ -441,6 +441,7 @@ class TTransactionsCache {
};

NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId);
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);

Expand Down
16 changes: 9 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
void Finalize() {
YQL_ENSURE(!AlreadyReplied);
if (LocksBroken) {
return ReplyErrorAndDie(
Ydb::StatusIds::ABORTED,
YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated. Unknown table."));
YQL_ENSURE(ResponseEv->BrokenLockShardId);
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Expand All @@ -227,7 +226,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Path = stageInfo.Meta.TablePath;
info.Pathes.insert(stageInfo.Meta.TablePath);
}
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
Expand All @@ -239,7 +238,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Path = stageInfo.Meta.TablePath;
info.Pathes.insert(stageInfo.Meta.TablePath);
}
}
};
Expand Down Expand Up @@ -1206,6 +1205,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardState->State = TShardState::EState::Finished;
Counters->TxProxyMon->TxResultAborted->Inc();
LocksBroken = true;
ResponseEv->BrokenLockShardId = shardId;

if (!res->Record.GetTxLocks().empty()) {
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
Expand Down Expand Up @@ -1268,9 +1269,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?
LocksBroken = true;
ResponseEv->BrokenLockShardId = shardId; // todo: without responseEv

if (!res->Record.GetTxLocks().empty()) {
ResponseEv->BrokenLockPathId = TKikimrPathId(
ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
res->Record.GetTxLocks(0).GetPathId());
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
Expand Down Expand Up @@ -1979,7 +1981,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto& info = (*ShardIdToTableInfo)[task.Meta.ShardId];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Path = stageInfo.Meta.TablePath;
info.Pathes.insert(stageInfo.Meta.TablePath);
} else if (stageInfo.Meta.IsSysView()) {
computeTasks.emplace_back(task.Id);
} else {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ struct TEvKqpExecuter {
NLWTrace::TOrbit Orbit;
IKqpGateway::TKqpSnapshot Snapshot;
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
//TShardIdToTableInfoPtr ShardIdToTableInfo;
std::optional<ui64> BrokenLockShardId;

ui64 ResultRowsCount = 0;
ui64 ResultRowsBytes = 0;

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
case Ydb::StatusIds::ABORTED: {
if (ev->BrokenLockPathId) {
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId));
} else if (ev->BrokenLockShardId) {
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->ShardIdToTableInfo, *ev->BrokenLockShardId));
}
break;
}
Expand Down

0 comments on commit 96f1725

Please sign in to comment.