Skip to content

Commit

Permalink
Merge 642a42a into ffe45cd
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 16, 2024
2 parents ffe45cd + 642a42a commit 2a78401
Show file tree
Hide file tree
Showing 31 changed files with 648 additions and 186 deletions.
29 changes: 25 additions & 4 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const
if (pathId.OwnerId() != 0) {
auto table = txCtx.TableByIdMap.FindPtr(pathId);
if (!table) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`");
} else {
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
if (pathId.TableId() == pathId.TableId()) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`");
}
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
}

Expand All @@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT
invalidatedLock.GetPathId()));
}

NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) {
TStringBuilder message;
message << "Transaction locks invalidated.";

if (auto tableInfoPtr = shardIdToTableInfo.GetPtr(shardId); tableInfoPtr) {
message << " Tables: ";
bool first = true;
for (const auto& path : tableInfoPtr->Pathes) {
if (!first) {
message << ", ";
first = false;
}
message << "`" << path << "`";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
} else {
message << " Unknown table.";
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
}

std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
TKqpTransactionContext& txCtx)
{
Expand Down
51 changes: 51 additions & 0 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,50 @@ struct TDeferredEffects {
friend class TKqpTransactionContext;
};

struct TTableInfo {
bool IsOlap = false;
THashSet<TStringBuf> Pathes;
};


class TShardIdToTableInfo {
public:
const TTableInfo& Get(ui64 shardId) const {
const auto* result = GetPtr(shardId);
AFL_ENSURE(result);
return *result;
}

const TTableInfo* GetPtr(ui64 shardId) const {
auto it = ShardIdToInfo.find(shardId);
return it != std::end(ShardIdToInfo)
? &it->second
: nullptr;
}

void Add(ui64 shardId, bool isOlap, const TString& path) {
const auto [stringsIter, _] = Strings.insert(path);
const TStringBuf pathBuf = *stringsIter;
auto infoIter = ShardIdToInfo.find(shardId);
if (infoIter != std::end(ShardIdToInfo)) {
AFL_ENSURE(infoIter->second.IsOlap == isOlap);
infoIter->second.Pathes.insert(pathBuf);
} else {
ShardIdToInfo.emplace(
shardId,
TTableInfo{
.IsOlap = isOlap,
.Pathes = {pathBuf},
});
}
}

private:
THashMap<ui64, TTableInfo> ShardIdToInfo;
std::unordered_set<TString> Strings;// Pointers aren't invalidated.
};
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down Expand Up @@ -287,6 +331,12 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
TTxAllocatorState::TPtr TxAlloc;

IKqpGateway::TKqpSnapshotHandle SnapshotHandle;

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};

struct TTxId {
Expand Down Expand Up @@ -435,6 +485,7 @@ class TTransactionsCache {
};

NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId);
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs();
kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink();
kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink();
kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx();
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.OldLookupJoinBehaviour = serviceConfig.GetOldLookupJoinBehaviour();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
bool enableOltpSink = TableServiceConfig.GetEnableOltpSink();
bool enableHtapTx = TableServiceConfig.GetEnableHtapTx();
bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs();
auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode();

Expand Down Expand Up @@ -558,6 +559,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetEnableOltpSink() != enableOltpSink ||
TableServiceConfig.GetEnableHtapTx() != enableHtapTx ||
TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs ||
TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode ||
TableServiceConfig.GetOldLookupJoinBehaviour() != oldLookupJoinBehaviour ||
Expand Down
Loading

0 comments on commit 2a78401

Please sign in to comment.