Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTGate StreamExecute rpc to return session as response #13131

Merged
merged 8 commits into from
May 30, 2023
223 changes: 118 additions & 105 deletions go/vt/proto/vtgate/vtgate.pb.go

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtgate/fakerpcvtgateconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (conn *FakeVTGateConn) ExecuteBatch(ctx context.Context, session *vtgatepb.
}

// StreamExecute please see vtgateconn.Impl.StreamExecute
func (conn *FakeVTGateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) {
func (conn *FakeVTGateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVars map[string]*querypb.BindVariable, _ func(response *vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error) {
response, ok := conn.execMap[sql]
if !ok {
return nil, fmt.Errorf("no match for: %s", sql)
Expand Down
14 changes: 12 additions & 2 deletions go/vt/vtgate/grpcvtgateconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,16 @@ type streamExecuteAdapter struct {
}

func (a *streamExecuteAdapter) Recv() (*sqltypes.Result, error) {
qr, err := a.recv()
var qr *querypb.QueryResult
var err error
for {
qr, err = a.recv()
if qr != nil || err != nil {
break
}
// we reach here, only when it is the last packet.
// as in the last packet we receive the session and there is no result
}
if err != nil {
return nil, vterrors.FromGRPC(err)
}
Expand All @@ -166,7 +175,7 @@ func (a *streamExecuteAdapter) Recv() (*sqltypes.Result, error) {
return sqltypes.CustomProto3ToResult(a.fields, qr), nil
}

func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) {
func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable, processResponse func(response *vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error) {
req := &vtgatepb.StreamExecuteRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Query: &querypb.BoundQuery{
Expand All @@ -185,6 +194,7 @@ func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Ses
if err != nil {
return nil, err
}
processResponse(ser)
return ser.Result, nil
},
}, nil
Expand Down
25 changes: 19 additions & 6 deletions go/vt/vtgate/grpcvtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtgateservicepb "vitess.io/vitess/go/vt/proto/vtgateservice"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"
)

const (
Expand Down Expand Up @@ -192,7 +191,21 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream
Result: sqltypes.ResultToProto3(value),
})
})
return vterrors.ToGRPC(vtgErr)

// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})

var errs []error
if vtgErr != nil {
errs = append(errs, vtgErr)
}
if lastErr != nil {
errs = append(errs, lastErr)
}
return vterrors.ToGRPC(vterrors.Aggregate(errs))
}

// Prepare is the RPC version of vtgateservice.VTGateService method
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtgate/vtgateconn/vtgateconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ func (sn *VTGateSession) StreamExecute(ctx context.Context, query string, bindVa
// StreamExecute is only used for SELECT queries that don't change
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	// StreamExecute is only used for SELECT queries that don't change
	// the session. So, the protocol doesn't return an updated session.
	// This may change in the future.

For sure it has changed and this is the future :)

// the session. So, the protocol doesn't return an updated session.
// This may change in the future.
return sn.impl.StreamExecute(ctx, sn.session, query, bindVars)
return sn.impl.StreamExecute(ctx, sn.session, query, bindVars, func(response *vtgatepb.StreamExecuteResponse) {
if response.Session != nil {
sn.session = response.Session
}
})
}

// Prepare performs a VTGate Prepare.
Expand All @@ -168,7 +172,7 @@ type Impl interface {
ExecuteBatch(ctx context.Context, session *vtgatepb.Session, queryList []string, bindVarsList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error)

// StreamExecute executes a streaming query on vtgate. This is a V3 function.
StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error)
StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable, processResponse func(*vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error)

// Prepare returns the fields information for the query as part of supporting prepare statements.
Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error)
Expand Down
3 changes: 3 additions & 0 deletions proto/vtgate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ message StreamExecuteResponse {
// The first value contains only Fields information.
// The next values contain the actual rows, a few values per result.
query.QueryResult result = 1;

// session is the updated session information.
Session session = 2;
}

// ResolveTransactionRequest is the payload to ResolveTransaction.
Expand Down