Skip to content

Commit

Permalink
Merge 59d0cae into b324f1b
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 9, 2024
2 parents b324f1b + 59d0cae commit a613b6f
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 44 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/service [*/*]*
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs();
kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink();
kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink();
kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx();
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
bool enableOltpSink = TableServiceConfig.GetEnableOltpSink();
bool enableHtapTx = TableServiceConfig.GetEnableHtapTx();
bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs();
auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode();

Expand Down Expand Up @@ -556,6 +557,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetEnableOltpSink() != enableOltpSink ||
TableServiceConfig.GetEnableHtapTx() != enableHtapTx ||
TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs ||
TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode ||
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
Expand Down
17 changes: 6 additions & 11 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const bool useEvWrite, ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings)
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, EnableOlapSink(enableOlapSink)
, UseEvWrite(useEvWrite)
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
Expand Down Expand Up @@ -1891,9 +1890,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

const bool hasOlapSink = HasOlapSink(stage, tx.Body->GetTables());
if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))
|| (!EnableOlapSink && hasOlapSink)) {
if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
Expand Down Expand Up @@ -2404,9 +2401,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// cause interconnect overload and reduce throughput however,
// so we don't want to use a crossover value that is too high.
const size_t minArbiterMeshSize = 5; // TODO: make configurable?
if (VolatileTx &&
if ((VolatileTx &&
receivingShardsSet.size() >= minArbiterMeshSize &&
AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())
AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters()))
{
std::vector<ui64> candidates;
candidates.reserve(receivingShardsSet.size());
Expand Down Expand Up @@ -2787,7 +2784,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool EnableOlapSink = false;
bool UseEvWrite = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
Expand Down Expand Up @@ -2832,13 +2828,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
std::move(asyncIoFactory), creator, userRequestContext,
enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings);
useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings);
}

} // namespace NKqp
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
namespace NKikimr {
namespace NKqp {

struct TTableInfo {
bool IsOlap = false;
TString Path;
};
using TShardIdToTableInfoPtr = std::shared_ptr<THashMap<ui64, std::shared_ptr<TTableInfo>>>;

struct TEvKqpExecuter {
struct TEvTxRequest : public TEventPB<TEvTxRequest, NKikimrKqp::TEvExecuterTxRequest,
TKqpExecuterEvents::EvTxRequest> {};
Expand All @@ -27,6 +33,7 @@ struct TEvKqpExecuter {
NLWTrace::TOrbit Orbit;
IKqpGateway::TKqpSnapshot Snapshot;
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
TShardIdToTableInfoPtr ShardIdToTableInfo;
ui64 ResultRowsCount = 0;
ui64 ResultRowsBytes = 0;

Expand Down Expand Up @@ -102,7 +109,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex,
const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);

IActor* CreateKqpSchemeExecuter(
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
ui64 IdxLookupJoinsPrefixPointLimit = 1;
bool EnableOlapSink = false;
bool EnableOltpSink = false;
bool EnableHtapTx = false;
NKikimrConfig::TTableServiceConfig_EBlockChannelsMode BlockChannelsMode;
bool EnableSpillingGenericQuery = false;
ui32 DefaultCostBasedOptimizationLevel = 4;
Expand Down
51 changes: 31 additions & 20 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
RuntimeError(
TStringBuilder() << "Got UNSPECIFIED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Unspecified error for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::UNSPECIFIED,
getIssues());
return;
Expand All @@ -411,8 +412,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
RuntimeError(
TStringBuilder() << "Got ABORTED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Aborted for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::ABORTED,
getIssues());
return;
Expand All @@ -430,8 +432,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
RetryResolveTable();
} else {
RuntimeError(
TStringBuilder() << "Got INTERNAL ERROR for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Internal error for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
getIssues());
}
Expand All @@ -445,8 +448,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
<< getIssues().ToOneLineString());

RuntimeError(
TStringBuilder() << "Got DISK_SPACE_EXHAUSTED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Disk space exhausted for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
getIssues());
return;
Expand All @@ -461,8 +465,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
// TODO: support waiting
if (!InconsistentTx) {
RuntimeError(
TStringBuilder() << "Got OVERLOADED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::OVERLOADED,
getIssues());
}
Expand All @@ -475,8 +480,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
RuntimeError(
TStringBuilder() << "Got CANCELLED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Cancelled request to table `"
<< SchemeEntry->TableId.PathId.ToString() << "`."
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::CANCELLED,
getIssues());
return;
Expand All @@ -488,8 +494,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
RuntimeError(
TStringBuilder() << "Got BAD REQUEST for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Bad request. Table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::BAD_REQUEST,
getIssues());
return;
Expand All @@ -505,8 +512,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
RetryResolveTable();
} else {
RuntimeError(
TStringBuilder() << "Got SCHEME CHANGED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Scheme changed. Table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::SCHEME_ERROR,
getIssues());
}
Expand All @@ -519,8 +527,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
RuntimeError(
TStringBuilder() << "Got LOCKS BROKEN for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
TStringBuilder() << "Transaction locks invalidated. Table `"
<< SchemeEntry->TableId.PathId.ToString() << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::ABORTED,
getIssues());
return;
Expand All @@ -545,7 +554,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
if (!LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock)) {
RuntimeError(
TStringBuilder() << "Got LOCKS BROKEN for table `"
TStringBuilder() << "Transaction locks invalidated. Table `"
<< SchemeEntry->TableId.PathId.ToString() << "`.",
NYql::NDqProto::StatusIds::ABORTED,
NYql::TIssues{});
Expand Down Expand Up @@ -713,7 +722,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
RetryShard(ev->Get()->TabletId, std::nullopt);
} else {
RuntimeError(
TStringBuilder() << "Error while delivering message to tablet " << ev->Get()->TabletId,
TStringBuilder()
<< "Error writing to table `" << SchemeEntry->TableId.PathId.ToString() << "`"
<< ": can't deliver message to tablet " << ev->Get()->TabletId << ".",
NYql::NDqProto::StatusIds::UNAVAILABLE);
}
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class TKqpQueryState : public TNonCopyable {
ui32 StatementResultIndex = 0;
ui32 StatementResultSize = 0;

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TMaybe<TString> CommandTagName;

NKikimrKqp::EQueryAction GetAction() const {
Expand Down
30 changes: 18 additions & 12 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,10 +854,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (HasOlapTable && HasOltpTable && HasTableWrite) {
QueryState->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
QueryState->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
QueryState->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (QueryState->HasOlapTable && QueryState->HasOltpTable && QueryState->HasTableWrite && !Settings.TableService.GetEnableHtapTx()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Write transactions between column and row tables are disabled at current time.");
return false;
Expand Down Expand Up @@ -1292,18 +1292,28 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.ResourceManager_ = ResourceManager_;
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());

const bool useEvWrite = ((HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!HasOlapTable && Settings.TableService.GetEnableOltpSink()))
const bool useEvWrite = (
(QueryState->HasOlapTable // olap only
&& !QueryState->HasOltpTable
&& Settings.TableService.GetEnableOlapSink())
|| (QueryState->HasOltpTable // oltp only
&& !QueryState->HasOlapTable
&& Settings.TableService.GetEnableOltpSink())
|| (QueryState->HasOlapTable // htap
&& QueryState->HasOltpTable
&& Settings.TableService.GetEnableOlapSink()
&& Settings.TableService.GetEnableHtapTx()))
&& (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY
|| request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
|| (!HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML)
|| (!HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML));
|| (!QueryState->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML)
|| (!QueryState->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML));
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService,
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(),
QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings);
useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings);

auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
Expand Down Expand Up @@ -2593,10 +2603,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
TActorId KqpTempTablesAgentActor;
std::shared_ptr<std::atomic<bool>> CompilationCookie;

bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;

TGUCSettings::TPtr GUCSettings;
};

Expand Down
Loading

0 comments on commit a613b6f

Please sign in to comment.