Skip to content

Commit

Permalink
Merge 0774b03 into b82247b
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 12, 2024
2 parents b82247b + 0774b03 commit 1be410d
Show file tree
Hide file tree
Showing 19 changed files with 331 additions and 131 deletions.
4 changes: 0 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys
Expand Down
29 changes: 25 additions & 4 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const
if (pathId.OwnerId() != 0) {
auto table = txCtx.TableByIdMap.FindPtr(pathId);
if (!table) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`");
} else {
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
if (pathId.TableId() == pathId.TableId()) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`");
}
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
}

Expand All @@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT
invalidatedLock.GetPathId()));
}

NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) {
TStringBuilder message;
message << "Transaction locks invalidated.";

if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) {
message << " Tables: ";
bool first = true;
for (const auto& path : it->second.Pathes) {
if (!first) {
message << ", ";
first = false;
}
message << "`" << path << "`";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
} else {
message << " Unknown table.";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
}

std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
TKqpTransactionContext& txCtx)
{
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ struct TDeferredEffects {
friend class TKqpTransactionContext;
};

struct TTableInfo {
bool IsOlap = false;
THashSet<TString> Pathes;
};

using TShardIdToTableInfo = THashMap<ui64, TTableInfo>;
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down Expand Up @@ -287,6 +295,12 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
TTxAllocatorState::TPtr TxAlloc;

IKqpGateway::TKqpSnapshotHandle SnapshotHandle;

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};

struct TTxId {
Expand Down Expand Up @@ -435,6 +449,7 @@ class TTransactionsCache {
};

NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId);
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs();
kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink();
kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink();
kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx();
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.OldLookupJoinBehaviour = serviceConfig.GetOldLookupJoinBehaviour();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
bool enableOltpSink = TableServiceConfig.GetEnableOltpSink();
bool enableHtapTx = TableServiceConfig.GetEnableHtapTx();
bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs();
auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode();

Expand Down Expand Up @@ -556,6 +557,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetEnableOltpSink() != enableOltpSink ||
TableServiceConfig.GetEnableHtapTx() != enableHtapTx ||
TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs ||
TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode ||
TableServiceConfig.GetOldLookupJoinBehaviour() != oldLookupJoinBehaviour ||
Expand Down
Loading

0 comments on commit 1be410d

Please sign in to comment.