Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support trailing generic query responses #1441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
struct TProducerState {
TMaybe<ui64> LastSeqNo;
ui64 AckedFreeSpaceBytes = 0;
TActorId ActorId;
};

class TRpcFlowControlState {
Expand Down Expand Up @@ -244,8 +245,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
const auto traceId = Request_->GetTraceId();

NYql::TIssues issues;
NKikimrKqp::EQueryAction queryAction;
if (!ParseQueryAction(*req, queryAction, issues)) {
if (!ParseQueryAction(*req, QueryAction, issues)) {
return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, std::move(issues));
}

Expand Down Expand Up @@ -274,7 +274,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
cachePolicy->set_keep_in_cache(true);

auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
queryAction,
QueryAction,
queryType,
SelfId(),
Request_,
Expand All @@ -288,7 +288,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
nullptr, // operationParams
false, // keepSession
false, // useCancelAfter
syntax);
syntax,
true);

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -322,23 +323,24 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();

for (auto& pair : StreamProducers_) {
const auto& producerId = pair.first;
auto& producer = pair.second;
for (auto& pair : StreamChannels_) {
const auto& channelId = pair.first;
auto& channel = pair.second;

if (freeSpaceBytes > 0 && producer.LastSeqNo && producer.AckedFreeSpaceBytes == 0) {
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
<< ", producer: " << producerId
<< ", seqNo: " << producer.LastSeqNo
<< ", channel: " << channelId
<< ", seqNo: " << channel.LastSeqNo
<< ", freeSpace: " << freeSpaceBytes);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*producer.LastSeqNo);
resp->Record.SetSeqNo(*channel.LastSeqNo);
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetChannelId(channelId);

ctx.Send(producerId, resp.Release());
ctx.Send(channel.ActorId, resp.Release());

producer.AckedFreeSpaceBytes = freeSpaceBytes;
channel.AckedFreeSpaceBytes = freeSpaceBytes;
}
}

Expand All @@ -358,9 +360,10 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);

auto& producer = StreamProducers_[ev->Sender];
producer.LastSeqNo = ev->Get()->Record.GetSeqNo();
producer.AckedFreeSpaceBytes = freeSpaceBytes;
auto& channel = StreamChannels_[ev->Get()->Record.GetChannelId()];
channel.ActorId = ev->Sender;
channel.LastSeqNo = ev->Get()->Record.GetSeqNo();
channel.AckedFreeSpaceBytes = freeSpaceBytes;

LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
Expand All @@ -371,8 +374,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetChannelId(ev->Get()->Record.GetChannelId());

ctx.Send(ev->Sender, resp.Release());
ctx.Send(channel.ActorId, resp.Release());
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
Expand All @@ -381,14 +385,30 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
const auto& issueMessage = record.GetResponse().GetQueryIssues();

bool hasTrailingMessage = false;


auto& kqpResponse = record.GetResponse();
if (kqpResponse.GetYdbResults().size() > 1) {
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Unexpected trailing message with multiple result sets.");
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
return;
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

auto& kqpResponse = record.GetResponse();

Ydb::Query::ExecuteQueryResponsePart response;

if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
gridnevvvit marked this conversation as resolved.
Show resolved Hide resolved
hasTrailingMessage = true;
gridnevvvit marked this conversation as resolved.
Show resolved Hide resolved
response.set_result_set_index(i);
response.mutable_result_set()->Swap(record.MutableResponse()->MutableYdbResults(i));
}
}

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);

if (kqpResponse.HasTxMeta()) {
Expand Down Expand Up @@ -492,8 +512,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
private:
std::shared_ptr<TEvExecuteQueryRequest> Request_;

NKikimrKqp::EQueryAction QueryAction;
TRpcFlowControlState FlowControl_;
TMap<TActorId, TProducerState> StreamProducers_;
TMap<ui64, TProducerState> StreamChannels_;
};

} // namespace
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const ::Ydb::Operations::OperationParams* operationParams,
bool keepSession = false,
bool useCancelAfter = true,
const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED);
const ::Ydb::Query::Syntax syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED,
bool supportsStreamTrailingResult = false);
gridnevvvit marked this conversation as resolved.
Show resolved Hide resolved

TEvQueryRequest() = default;

Expand Down Expand Up @@ -285,6 +286,10 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
ProgressStatsPeriod = progressStatsPeriod;
}

bool GetSupportsStreamTrailingResult() const {
return SupportsStreamTrailingResult;
}

TDuration GetProgressStatsPeriod() const {
return ProgressStatsPeriod;
}
Expand Down Expand Up @@ -317,6 +322,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
TIntrusivePtr<TUserRequestContext> UserRequestContext;
TDuration ProgressStatsPeriod;
bool SupportsStreamTrailingResult = false;
};

struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
const ::Ydb::Operations::OperationParams* operationParams,
bool keepSession,
bool useCancelAfter,
const ::Ydb::Query::Syntax syntax)
const ::Ydb::Query::Syntax syntax,
bool supportsStreamTrailingResult)
: RequestCtx(ctx)
, RequestActorId(requestActorId)
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
Expand All @@ -36,6 +37,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
, HasOperationParams(operationParams)
, KeepSession(keepSession)
, Syntax(syntax)
, SupportsStreamTrailingResult(supportsStreamTrailingResult)
{
if (HasOperationParams) {
OperationTimeout = GetDuration(operationParams->operation_timeout());
Expand Down
45 changes: 5 additions & 40 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
)
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
, EnableOlapSink(enableOlapSink)
{
Target = creator;
Expand Down Expand Up @@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
Expand Down Expand Up @@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
Expand Down Expand Up @@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void HandleExecute(TEvDqCompute::TEvChannelData::TPtr& ev) {
auto& record = ev->Get()->Record;
auto& channelData = record.GetChannelData();

TDqSerializedBatch batch;
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
if (batch.Proto.HasPayloadId()) {
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
}

auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
YQL_ENSURE(channel.DstTask == 0);
auto shardId = TasksGraph.GetTask(channel.SrcTask).Meta.ShardId;

if (Stats) {
Stats->ResultBytes += batch.Size();
Stats->ResultRows += batch.RowCount();
}

LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << shardId
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
<< ", finished: " << channelData.GetFinished());

ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
{
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);

auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
ackEv->Record.SetSeqNo(record.GetSeqNo());
ackEv->Record.SetChannelId(channel.Id);
ackEv->Record.SetFreeSpace(50_MB);
Send(ev->Sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
}
}

private:
bool IsReadOnlyTx() const {
if (Request.TopicOperations.HasOperations()) {
Expand Down Expand Up @@ -2417,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool StreamResult = false;
bool EnableOlapSink = false;

bool HasExternalSources = false;
Expand Down
Loading
Loading