Skip to content

Commit

Permalink
support trailing generic query responses
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Feb 2, 2024
1 parent 8ce7edb commit d12fb13
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 91 deletions.
59 changes: 40 additions & 19 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
struct TProducerState {
TMaybe<ui64> LastSeqNo;
ui64 AckedFreeSpaceBytes = 0;
TActorId ActorId;
};

class TRpcFlowControlState {
Expand Down Expand Up @@ -244,8 +245,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
const auto traceId = Request_->GetTraceId();

NYql::TIssues issues;
NKikimrKqp::EQueryAction queryAction;
if (!ParseQueryAction(*req, queryAction, issues)) {
if (!ParseQueryAction(*req, QueryAction, issues)) {
return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, std::move(issues));
}

Expand Down Expand Up @@ -274,7 +274,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
cachePolicy->set_keep_in_cache(true);

auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
queryAction,
QueryAction,
queryType,
SelfId(),
Request_,
Expand All @@ -288,7 +288,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
nullptr, // operationParams
false, // keepSession
false, // useCancelAfter
syntax);
syntax,
true);

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -322,23 +323,24 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();

for (auto& pair : StreamProducers_) {
const auto& producerId = pair.first;
auto& producer = pair.second;
for (auto& pair : StreamChannels_) {
const auto& channelId = pair.first;
auto& channel = pair.second;

if (freeSpaceBytes > 0 && producer.LastSeqNo && producer.AckedFreeSpaceBytes == 0) {
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
<< ", producer: " << producerId
<< ", seqNo: " << producer.LastSeqNo
<< ", channel: " << channelId
<< ", seqNo: " << channel.LastSeqNo
<< ", freeSpace: " << freeSpaceBytes);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*producer.LastSeqNo);
resp->Record.SetSeqNo(*channel.LastSeqNo);
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetChannelId(channelId);

ctx.Send(producerId, resp.Release());
ctx.Send(channel.ActorId, resp.Release());

producer.AckedFreeSpaceBytes = freeSpaceBytes;
channel.AckedFreeSpaceBytes = freeSpaceBytes;
}
}

Expand All @@ -358,9 +360,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);

auto& producer = StreamProducers_[ev->Sender];
producer.LastSeqNo = ev->Get()->Record.GetSeqNo();
producer.AckedFreeSpaceBytes = freeSpaceBytes;
auto& channel = StreamChannels_[ev->Get()->Record.GetChannelId()];
channel.ActorId = ev->Sender;
channel.LastSeqNo = ev->Get()->Record.GetSeqNo();
channel.AckedFreeSpaceBytes = freeSpaceBytes;

LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
Expand All @@ -371,8 +374,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetChannelId(ev->Get()->Record.GetChannelId());

ctx.Send(ev->Sender, resp.Release());
ctx.Send(channel.ActorId, resp.Release());
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
Expand All @@ -381,14 +385,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
const auto& issueMessage = record.GetResponse().GetQueryIssues();

bool hasTrailingMessage = false;


auto& kqpResponse = record.GetResponse();
if (kqpResponse.GetYdbResults().size() > 1) {
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Unexpected trailing message with multiple result sets.");
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
return;
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

auto& kqpResponse = record.GetResponse();

Ydb::Query::ExecuteQueryResponsePart response;

if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
hasTrailingMessage = true;
response.set_result_set_index(i);
response.mutable_result_set()->Swap(record.MutableResponse()->MutableYdbResults(i));
}
}

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);

if (kqpResponse.HasTxMeta()) {
Expand Down Expand Up @@ -492,8 +512,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
private:
std::shared_ptr<TEvExecuteQueryRequest> Request_;

NKikimrKqp::EQueryAction QueryAction;
TRpcFlowControlState FlowControl_;
TMap<TActorId, TProducerState> StreamProducers_;
TMap<ui64, TProducerState> StreamChannels_;
};

} // namespace
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const ::Ydb::Operations::OperationParams* operationParams,
bool keepSession = false,
bool useCancelAfter = true,
const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED);
const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED,
bool supportsStreamTrailingResult = false);

TEvQueryRequest() = default;

Expand Down Expand Up @@ -285,6 +286,10 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
ProgressStatsPeriod = progressStatsPeriod;
}

bool GetSupportsStreamTrailingResult() const {
return SupportsStreamTrailingResult;
}

TDuration GetProgressStatsPeriod() const {
return ProgressStatsPeriod;
}
Expand Down Expand Up @@ -317,6 +322,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
TIntrusivePtr<TUserRequestContext> UserRequestContext;
TDuration ProgressStatsPeriod;
bool SupportsStreamTrailingResult = false;
};

struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
const ::Ydb::Operations::OperationParams* operationParams,
bool keepSession,
bool useCancelAfter,
const ::Ydb::Query::Syntax syntax)
const ::Ydb::Query::Syntax syntax,
bool supportsStreamTrailingResult)
: RequestCtx(ctx)
, RequestActorId(requestActorId)
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
Expand All @@ -36,6 +37,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
, HasOperationParams(operationParams)
, KeepSession(keepSession)
, Syntax(syntax)
, SupportsStreamTrailingResult(supportsStreamTrailingResult)
{
if (HasOperationParams) {
OperationTimeout = GetDuration(operationParams->operation_timeout());
Expand Down
45 changes: 5 additions & 40 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
)
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
, EnableOlapSink(enableOlapSink)
{
Target = creator;
Expand Down Expand Up @@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
Expand Down Expand Up @@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
Expand Down Expand Up @@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void HandleExecute(TEvDqCompute::TEvChannelData::TPtr& ev) {
auto& record = ev->Get()->Record;
auto& channelData = record.GetChannelData();

TDqSerializedBatch batch;
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
if (batch.Proto.HasPayloadId()) {
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
}

auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
YQL_ENSURE(channel.DstTask == 0);
auto shardId = TasksGraph.GetTask(channel.SrcTask).Meta.ShardId;

if (Stats) {
Stats->ResultBytes += batch.Size();
Stats->ResultRows += batch.RowCount();
}

LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << shardId
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
<< ", finished: " << channelData.GetFinished());

ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
{
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);

auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
ackEv->Record.SetSeqNo(record.GetSeqNo());
ackEv->Record.SetChannelId(channel.Id);
ackEv->Record.SetFreeSpace(50_MB);
Send(ev->Sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
}
}

private:
bool IsReadOnlyTx() const {
if (Request.TopicOperations.HasOperations()) {
Expand Down Expand Up @@ -2417,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool StreamResult = false;
bool EnableOlapSink = false;

bool HasExternalSources = false;
Expand Down
Loading

0 comments on commit d12fb13

Please sign in to comment.