diff --git a/changelog/17.0/17.0.0/summary.md b/changelog/17.0/17.0.0/summary.md index 66ebb01744d..032ffda5fb6 100644 --- a/changelog/17.0/17.0.0/summary.md +++ b/changelog/17.0/17.0.0/summary.md @@ -24,10 +24,13 @@ - [VTTablet: Initializing all replicas with super_read_only](#vttablet-initialization) - **[VReplication](#VReplication)** - [Support for the `noblob` binlog row image mode](#noblob) + - **[VTGate](#vtgate) + - [StreamExecute GRPC API](#stream-execute) - **[Deprecations and Deletions](#deprecations-and-deletions)** - [Deprecated Flags](#deprecated-flags) - [Deprecated Stats](#deprecated-stats) + ## Major Changes ### Breaking Changes @@ -349,6 +352,17 @@ would not work. Given the criticality of VReplication workflows within Vitess, t We have addressed this issue in [PR #12950](https://github.com/vitessio/vitess/pull/12950) by adding support for processing the compressed transaction events in VReplication, without any (known) limitations. +### VTGate + +#### Modified StreamExecute GRPC API +Earlier VTGate grpc api for `StreamExecute` did not return the session in the response. +Even though the underlying implementation supported transactions and other features that requires session persistence. +With [PR #13131](https://github.com/vitessio/vitess/pull/13131) VTGate will return the session to the client +so that it can be persisted with the client and sent back to VTGate on the next api call. + +This does not impact anyone using the mysql client library to connect to VTGate. +This could be a breaking change for grpc api users based on how they have implemented their grpc clients. + ### Deprecations and Deletions * The deprecated `automation` and `automationservice` protobuf definitions and associated client and server packages have been removed. diff --git a/go/cmd/vtgateclienttest/services/callerid.go b/go/cmd/vtgateclienttest/services/callerid.go index 54893f3bb07..41c80a80582 100644 --- a/go/cmd/vtgateclienttest/services/callerid.go +++ b/go/cmd/vtgateclienttest/services/callerid.go @@ -93,9 +93,9 @@ func (c *callerIDClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Ses return c.fallbackClient.ExecuteBatch(ctx, session, sqlList, bindVariablesList) } -func (c *callerIDClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (c *callerIDClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { if ok, err := c.checkCallerID(ctx, sql); ok { - return err + return session, err } return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback) } diff --git a/go/cmd/vtgateclienttest/services/echo.go b/go/cmd/vtgateclienttest/services/echo.go index 2181009be47..5b7f5177ade 100644 --- a/go/cmd/vtgateclienttest/services/echo.go +++ b/go/cmd/vtgateclienttest/services/echo.go @@ -110,7 +110,7 @@ func (c *echoClient) Execute(ctx context.Context, session *vtgatepb.Session, sql return c.fallbackClient.Execute(ctx, session, sql, bindVariables) } -func (c *echoClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (c *echoClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { if strings.HasPrefix(sql, EchoPrefix) { callback(echoQueryResult(map[string]any{ "callerId": callerid.EffectiveCallerIDFromContext(ctx), @@ -118,7 +118,7 @@ func (c *echoClient) StreamExecute(ctx context.Context, session *vtgatepb.Sessio "bindVars": bindVariables, "session": session, })) - return nil + return session, nil } return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback) } diff --git a/go/cmd/vtgateclienttest/services/errors.go b/go/cmd/vtgateclienttest/services/errors.go index 9a4a5e39366..b8eed8ac6ca 100644 --- a/go/cmd/vtgateclienttest/services/errors.go +++ b/go/cmd/vtgateclienttest/services/errors.go @@ -133,9 +133,9 @@ func (c *errorClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Sessio return c.fallbackClient.ExecuteBatch(ctx, session, sqlList, bindVariablesList) } -func (c *errorClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (c *errorClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { if err := requestToError(sql); err != nil { - return err + return session, err } return c.fallbackClient.StreamExecute(ctx, session, sql, bindVariables, callback) } diff --git a/go/cmd/vtgateclienttest/services/fallback.go b/go/cmd/vtgateclienttest/services/fallback.go index 02f9239260b..401d5986175 100644 --- a/go/cmd/vtgateclienttest/services/fallback.go +++ b/go/cmd/vtgateclienttest/services/fallback.go @@ -48,7 +48,7 @@ func (c fallbackClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Sess return c.fallback.ExecuteBatch(ctx, session, sqlList, bindVariablesList) } -func (c fallbackClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (c fallbackClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { return c.fallback.StreamExecute(ctx, session, sql, bindVariables, callback) } diff --git a/go/cmd/vtgateclienttest/services/terminal.go b/go/cmd/vtgateclienttest/services/terminal.go index 85fa664c2c2..72b17879e31 100644 --- a/go/cmd/vtgateclienttest/services/terminal.go +++ b/go/cmd/vtgateclienttest/services/terminal.go @@ -58,8 +58,8 @@ func (c *terminalClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Ses return session, nil, errTerminal } -func (c *terminalClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { - return errTerminal +func (c *terminalClient) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { + return session, errTerminal } func (c *terminalClient) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) { diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index 07a9e09df45..0e3cc2d0c95 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -26,6 +26,11 @@ import ( "testing" "time" + "google.golang.org/grpc" + + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" + "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -456,3 +461,13 @@ func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard stri time.Sleep(defaultRetryDelay) } } + +// DialVTGate returns a VTGate grpc connection. +func DialVTGate(ctx context.Context, name, addr, username, password string) (*vtgateconn.VTGateConn, error) { + clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password} + creds := grpc.WithPerRPCCredentials(clientCreds) + dialerFunc := grpcvtgateconn.Dial(creds) + dialerName := name + vtgateconn.RegisterDialer(dialerName, dialerFunc) + return vtgateconn.DialProtocol(ctx, dialerName, addr) +} diff --git a/go/test/endtoend/vtgate/grpc_api/acl_test.go b/go/test/endtoend/vtgate/grpc_api/acl_test.go new file mode 100644 index 00000000000..2819a3e41d1 --- /dev/null +++ b/go/test/endtoend/vtgate/grpc_api/acl_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc_api + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/callerid" +) + +// TestEffectiveCallerIDWithAccess verifies that an authenticated gRPC static user with an effectiveCallerID that has ACL access can execute queries +func TestEffectiveCallerIDWithAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "some_other_user", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_with_access", "", ""), nil) + _, err = session.Execute(ctx, query, nil) + assert.NoError(t, err) +} + +// TestEffectiveCallerIDWithNoAccess verifies that an authenticated gRPC static user without an effectiveCallerID that has ACL access cannot execute queries +func TestEffectiveCallerIDWithNoAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "another_unrelated_user", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_no_access", "", ""), nil) + _, err = session.Execute(ctx, query, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "Select command denied to user") + assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") +} + +// TestAuthenticatedUserWithAccess verifies that an authenticated gRPC static user with ACL access can execute queries +func TestAuthenticatedUserWithAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + _, err = session.Execute(ctx, query, nil) + assert.NoError(t, err) +} + +// TestAuthenticatedUserNoAccess verifies that an authenticated gRPC static user with no ACL access cannot execute queries +func TestAuthenticatedUserNoAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_no_access", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + _, err = session.Execute(ctx, query, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "Select command denied to user") + assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") +} + +// TestUnauthenticatedUser verifies that an unauthenticated gRPC user cannot execute queries +func TestUnauthenticatedUser(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "", "") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + _, err = session.Execute(ctx, query, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid credentials") +} diff --git a/go/test/endtoend/vtgate/grpc_api/execute_test.go b/go/test/endtoend/vtgate/grpc_api/execute_test.go new file mode 100644 index 00000000000..b1a5f3b8d80 --- /dev/null +++ b/go/test/endtoend/vtgate/grpc_api/execute_test.go @@ -0,0 +1,132 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc_api + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + querypb "vitess.io/vitess/go/vt/proto/query" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +// TestTransactionsWithGRPCAPI test the transaction queries through vtgate grpc apis. +// It is done through both streaming api and non-streaming api. +func TestTransactionsWithGRPCAPI(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + vtSession := vtgateConn.Session(keyspaceName, nil) + workload := []string{"OLTP", "OLAP"} + for i := 0; i < 4; i++ { // running all switch combinations. + index := i % len(workload) + _, session, err := exec(ctx, vtSession, fmt.Sprintf("set workload = %s", workload[index]), nil) + require.NoError(t, err) + + require.Equal(t, workload[index], session.Options.Workload.String()) + execTest(ctx, t, workload[index], vtSession) + } + +} + +func execTest(ctx context.Context, t *testing.T, workload string, vtSession *vtgateconn.VTGateSession) { + tcases := []struct { + query string + + expRowCount int + expRowAffected int + expInTransaction bool + }{{ + query: "select id, val from test_table", + }, { + query: "begin", + expInTransaction: true, + }, { + query: "insert into test_table(id, val) values (1, 'A')", + expRowAffected: 1, + expInTransaction: true, + }, { + query: "select id, val from test_table", + expRowCount: 1, + expInTransaction: true, + }, { + query: "commit", + }, { + query: "select id, val from test_table", + expRowCount: 1, + }, { + query: "delete from test_table", + expRowAffected: 1, + }} + + for _, tc := range tcases { + t.Run(workload+":"+tc.query, func(t *testing.T) { + qr, session, err := exec(ctx, vtSession, tc.query, nil) + require.NoError(t, err) + + assert.Len(t, qr.Rows, tc.expRowCount) + assert.EqualValues(t, tc.expRowAffected, qr.RowsAffected) + assert.EqualValues(t, tc.expInTransaction, session.InTransaction) + }) + } +} + +func exec(ctx context.Context, conn *vtgateconn.VTGateSession, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, *vtgatepb.Session, error) { + options := conn.SessionPb().GetOptions() + if options != nil && options.Workload == querypb.ExecuteOptions_OLAP { + return streamExec(ctx, conn, sql, bv) + } + res, err := conn.Execute(ctx, sql, bv) + return res, conn.SessionPb(), err +} + +func streamExec(ctx context.Context, conn *vtgateconn.VTGateSession, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, *vtgatepb.Session, error) { + stream, err := conn.StreamExecute(ctx, sql, bv) + if err != nil { + return nil, conn.SessionPb(), err + } + result := &sqltypes.Result{} + for { + res, err := stream.Recv() + if err != nil { + if err == io.EOF { + return result, conn.SessionPb(), nil + } + return nil, conn.SessionPb(), err + } + result.Rows = append(result.Rows, res.Rows...) + result.RowsAffected += res.RowsAffected + if res.InsertID != 0 { + result.InsertID = res.InsertID + } + if res.Fields != nil { + result.Fields = res.Fields + } + } +} diff --git a/go/test/endtoend/vtgate/grpc_server_acls/acls_test.go b/go/test/endtoend/vtgate/grpc_api/main_test.go similarity index 60% rename from go/test/endtoend/vtgate/grpc_server_acls/acls_test.go rename to go/test/endtoend/vtgate/grpc_api/main_test.go index 5344a2c847e..a51c6d9e6f2 100644 --- a/go/test/endtoend/vtgate/grpc_server_acls/acls_test.go +++ b/go/test/endtoend/vtgate/grpc_api/main_test.go @@ -14,26 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package grpc_server_acls +package grpc_api import ( - "context" "flag" "fmt" "os" "path" "testing" - "vitess.io/vitess/go/vt/callerid" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) var ( @@ -58,6 +48,14 @@ var ( { "Username": "another_unrelated_user", "Password": "test_password" + }, + { + "Username": "user_with_access", + "Password": "test_password" + }, + { + "Username": "user_no_access", + "Password": "test_password" } ] ` @@ -77,7 +75,6 @@ var ( ) func TestMain(m *testing.M) { - defer cluster.PanicHandler(nil) flag.Parse() @@ -144,53 +141,6 @@ func TestMain(m *testing.M) { os.Exit(exitcode) } -// TestEffectiveCallerIDWithAccess verifies that an authenticated gRPC static user with an effectiveCallerID that has ACL access can execute queries -func TestEffectiveCallerIDWithAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "some_other_user", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_with_access", "", ""), nil) - _, err = session.Execute(ctx, query, nil) - assert.NoError(t, err) -} - -// TestEffectiveCallerIDWithNoAccess verifies that an authenticated gRPC static user without an effectiveCallerID that has ACL access cannot execute queries -func TestEffectiveCallerIDWithNoAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "another_unrelated_user", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_no_access", "", ""), nil) - _, err = session.Execute(ctx, query, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "Select command denied to user") - assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") -} - -func dialVTGate(ctx context.Context, t *testing.T, username string, password string) (*vtgateconn.VTGateConn, error) { - clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password} - creds := grpc.WithPerRPCCredentials(clientCreds) - dialerFunc := grpcvtgateconn.Dial(creds) - dialerName := t.Name() - vtgateconn.RegisterDialer(dialerName, dialerFunc) - return vtgateconn.DialProtocol(ctx, dialerName, vtgateGrpcAddress) -} - func createFile(path string, contents string) error { f, err := os.Create(path) if err != nil { diff --git a/go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go b/go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go deleted file mode 100644 index 348401ed85d..00000000000 --- a/go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go +++ /dev/null @@ -1,216 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package grpcserverauthstatic - -import ( - "context" - "flag" - "fmt" - "os" - "path" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" -) - -var ( - clusterInstance *cluster.LocalProcessCluster - vtgateGrpcAddress string - hostname = "localhost" - keyspaceName = "ks" - cell = "zone1" - sqlSchema = ` - create table test_table ( - id bigint, - val varchar(128), - primary key(id) - ) Engine=InnoDB; -` - grpcServerAuthStaticJSON = ` - [ - { - "Username": "user_with_access", - "Password": "test_password" - }, - { - "Username": "user_no_access", - "Password": "test_password" - } - ] -` - tableACLJSON = ` - { - "table_groups": [ - { - "name": "default", - "table_names_or_prefixes": ["%"], - "readers": ["user_with_access"], - "writers": ["user_with_access"], - "admins": ["user_with_access"] - } - ] - } -` -) - -func TestMain(m *testing.M) { - defer cluster.PanicHandler(nil) - flag.Parse() - - exitcode := func() int { - clusterInstance = cluster.NewCluster(cell, hostname) - defer clusterInstance.Teardown() - - // Start topo server - if err := clusterInstance.StartTopo(); err != nil { - return 1 - } - - // Directory for authn / authz config files - authDirectory := path.Join(clusterInstance.TmpDirectory, "auth") - if err := os.Mkdir(authDirectory, 0700); err != nil { - return 1 - } - - // Create grpc_server_auth_static.json file - grpcServerAuthStaticPath := path.Join(authDirectory, "grpc_server_auth_static.json") - if err := createFile(grpcServerAuthStaticPath, grpcServerAuthStaticJSON); err != nil { - return 1 - } - - // Create table_acl.json file - tableACLPath := path.Join(authDirectory, "table_acl.json") - if err := createFile(tableACLPath, tableACLJSON); err != nil { - return 1 - } - - // Configure vtgate to use static auth - clusterInstance.VtGateExtraArgs = []string{ - "--grpc_auth_mode", "static", - "--grpc_auth_static_password_file", grpcServerAuthStaticPath, - "--grpc-use-static-authentication-callerid", - } - - // Configure vttablet to use table ACL - clusterInstance.VtTabletExtraArgs = []string{ - "--enforce-tableacl-config", - "--queryserver-config-strict-table-acl", - "--table-acl-config", tableACLPath, - } - - // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: sqlSchema, - } - if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { - return 1 - } - - // Start vtgate - if err := clusterInstance.StartVtgate(); err != nil { - clusterInstance.VtgateProcess = cluster.VtgateProcess{} - return 1 - } - vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) - - return m.Run() - }() - os.Exit(exitcode) -} - -// TestAuthenticatedUserWithAccess verifies that an authenticated gRPC static user with ACL access can execute queries -func TestAuthenticatedUserWithAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "user_with_access", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - _, err = session.Execute(ctx, query, nil) - assert.NoError(t, err) -} - -// TestAuthenticatedUserNoAccess verifies that an authenticated gRPC static user with no ACL access cannot execute queries -func TestAuthenticatedUserNoAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "user_no_access", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - _, err = session.Execute(ctx, query, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "Select command denied to user") - assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") -} - -// TestUnauthenticatedUser verifies that an unauthenticated gRPC user cannot execute queries -func TestUnauthenticatedUser(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "", "") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - _, err = session.Execute(ctx, query, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "invalid credentials") -} - -func dialVTGate(ctx context.Context, t *testing.T, username string, password string) (*vtgateconn.VTGateConn, error) { - clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password} - creds := grpc.WithPerRPCCredentials(clientCreds) - dialerFunc := grpcvtgateconn.Dial(creds) - dialerName := t.Name() - vtgateconn.RegisterDialer(dialerName, dialerFunc) - return vtgateconn.DialProtocol(ctx, dialerName, vtgateGrpcAddress) -} - -func createFile(path string, contents string) error { - f, err := os.Create(path) - if err != nil { - return err - } - _, err = f.WriteString(contents) - if err != nil { - return err - } - return f.Close() -} diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index 6880318d244..4ddb1a415c3 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -909,6 +909,8 @@ type StreamExecuteResponse struct { // The first value contains only Fields information. // The next values contain the actual rows, a few values per result. Result *query.QueryResult `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` + // session is the updated session information. + Session *Session `protobuf:"bytes,2,opt,name=session,proto3" json:"session,omitempty"` } func (x *StreamExecuteResponse) Reset() { @@ -950,6 +952,13 @@ func (x *StreamExecuteResponse) GetResult() *query.QueryResult { return nil } +func (x *StreamExecuteResponse) GetSession() *Session { + if x != nil { + return x.Session + } + return nil +} + // ResolveTransactionRequest is the payload to ResolveTransaction. type ResolveTransactionRequest struct { state protoimpl.MessageState @@ -1792,94 +1801,97 @@ var file_vtgate_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4a, 0x04, 0x08, 0x03, 0x10, 0x04, 0x4a, 0x04, 0x08, 0x04, 0x10, 0x05, 0x4a, 0x04, 0x08, 0x05, 0x10, 0x06, - 0x22, 0x43, 0x0a, 0x15, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, + 0x22, 0x6e, 0x0a, 0x15, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x5d, 0x0a, 0x19, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, - 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, - 0x12, 0x12, 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x64, 0x74, 0x69, 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0xec, 0x01, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, - 0x61, 0x67, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x5f, - 0x73, 0x6b, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6d, 0x69, 0x6e, 0x69, - 0x6d, 0x69, 0x7a, 0x65, 0x53, 0x6b, 0x65, 0x77, 0x12, 0x2d, 0x0a, 0x12, 0x68, 0x65, 0x61, 0x72, - 0x74, 0x62, 0x65, 0x61, 0x74, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x49, - 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x26, 0x0a, 0x0f, 0x73, 0x74, 0x6f, 0x70, 0x5f, - 0x6f, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x0d, 0x73, 0x74, 0x6f, 0x70, 0x4f, 0x6e, 0x52, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, - 0x14, 0x0a, 0x05, 0x63, 0x65, 0x6c, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x63, 0x65, 0x6c, 0x6c, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x65, 0x6c, 0x6c, 0x5f, 0x70, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, - 0x63, 0x65, 0x6c, 0x6c, 0x50, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x21, - 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x06, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x4f, 0x72, 0x64, 0x65, - 0x72, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, - 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, - 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, - 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, - 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, - 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, - 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, - 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, - 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, - 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, - 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, - 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, - 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, - 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, - 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, - 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x22, 0x5d, 0x0a, 0x19, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, + 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, + 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, + 0x74, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x74, 0x69, 0x64, 0x22, + 0x1c, 0x0a, 0x1a, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xec, 0x01, + 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x23, + 0x0a, 0x0d, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x5f, 0x73, 0x6b, 0x65, 0x77, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x53, + 0x6b, 0x65, 0x77, 0x12, 0x2d, 0x0a, 0x12, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, + 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x11, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, + 0x61, 0x6c, 0x12, 0x26, 0x0a, 0x0f, 0x73, 0x74, 0x6f, 0x70, 0x5f, 0x6f, 0x6e, 0x5f, 0x72, 0x65, + 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x73, 0x74, 0x6f, + 0x70, 0x4f, 0x6e, 0x52, 0x65, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x65, + 0x6c, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x63, 0x65, 0x6c, 0x6c, 0x73, + 0x12, 0x27, 0x0a, 0x0f, 0x63, 0x65, 0x6c, 0x6c, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, + 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x65, 0x6c, 0x6c, 0x50, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, + 0x6c, 0x65, 0x74, 0x5f, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x22, 0xf6, 0x01, 0x0a, + 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x35, 0x0a, + 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, + 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, + 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x12, 0x2a, 0x0a, + 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, + 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, + 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x05, 0x66, 0x6c, 0x61, + 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, + 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x52, 0x05, + 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, + 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, + 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x50, 0x72, + 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, + 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, + 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, - 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, - 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x89, - 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, - 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, - 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, - 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, - 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, - 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, - 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, - 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, - 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, - 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, - 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, - 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, - 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, - 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, - 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, - 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, + 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, + 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, + 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x02, 0x12, + 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, 0x3c, 0x0a, 0x0b, 0x43, 0x6f, + 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, + 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, 0x45, 0x10, 0x01, 0x12, 0x08, + 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x41, 0x55, 0x54, 0x4f, + 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, + 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x23, 0x76, 0x69, 0x74, + 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, + 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1967,31 +1979,32 @@ var file_vtgate_proto_depIdxs = []int32{ 28, // 25: vtgate.StreamExecuteRequest.query:type_name -> query.BoundQuery 2, // 26: vtgate.StreamExecuteRequest.session:type_name -> vtgate.Session 30, // 27: vtgate.StreamExecuteResponse.result:type_name -> query.QueryResult - 27, // 28: vtgate.ResolveTransactionRequest.caller_id:type_name -> vtrpc.CallerID - 27, // 29: vtgate.VStreamRequest.caller_id:type_name -> vtrpc.CallerID - 32, // 30: vtgate.VStreamRequest.tablet_type:type_name -> topodata.TabletType - 33, // 31: vtgate.VStreamRequest.vgtid:type_name -> binlogdata.VGtid - 34, // 32: vtgate.VStreamRequest.filter:type_name -> binlogdata.Filter - 13, // 33: vtgate.VStreamRequest.flags:type_name -> vtgate.VStreamFlags - 35, // 34: vtgate.VStreamResponse.events:type_name -> binlogdata.VEvent - 27, // 35: vtgate.PrepareRequest.caller_id:type_name -> vtrpc.CallerID - 2, // 36: vtgate.PrepareRequest.session:type_name -> vtgate.Session - 28, // 37: vtgate.PrepareRequest.query:type_name -> query.BoundQuery - 29, // 38: vtgate.PrepareResponse.error:type_name -> vtrpc.RPCError - 2, // 39: vtgate.PrepareResponse.session:type_name -> vtgate.Session - 36, // 40: vtgate.PrepareResponse.fields:type_name -> query.Field - 27, // 41: vtgate.CloseSessionRequest.caller_id:type_name -> vtrpc.CallerID - 2, // 42: vtgate.CloseSessionRequest.session:type_name -> vtgate.Session - 29, // 43: vtgate.CloseSessionResponse.error:type_name -> vtrpc.RPCError - 37, // 44: vtgate.Session.ShardSession.target:type_name -> query.Target - 38, // 45: vtgate.Session.ShardSession.tablet_alias:type_name -> topodata.TabletAlias - 39, // 46: vtgate.Session.UserDefinedVariablesEntry.value:type_name -> query.BindVariable - 3, // 47: vtgate.Session.PrepareStatementEntry.value:type_name -> vtgate.PrepareData - 48, // [48:48] is the sub-list for method output_type - 48, // [48:48] is the sub-list for method input_type - 48, // [48:48] is the sub-list for extension type_name - 48, // [48:48] is the sub-list for extension extendee - 0, // [0:48] is the sub-list for field type_name + 2, // 28: vtgate.StreamExecuteResponse.session:type_name -> vtgate.Session + 27, // 29: vtgate.ResolveTransactionRequest.caller_id:type_name -> vtrpc.CallerID + 27, // 30: vtgate.VStreamRequest.caller_id:type_name -> vtrpc.CallerID + 32, // 31: vtgate.VStreamRequest.tablet_type:type_name -> topodata.TabletType + 33, // 32: vtgate.VStreamRequest.vgtid:type_name -> binlogdata.VGtid + 34, // 33: vtgate.VStreamRequest.filter:type_name -> binlogdata.Filter + 13, // 34: vtgate.VStreamRequest.flags:type_name -> vtgate.VStreamFlags + 35, // 35: vtgate.VStreamResponse.events:type_name -> binlogdata.VEvent + 27, // 36: vtgate.PrepareRequest.caller_id:type_name -> vtrpc.CallerID + 2, // 37: vtgate.PrepareRequest.session:type_name -> vtgate.Session + 28, // 38: vtgate.PrepareRequest.query:type_name -> query.BoundQuery + 29, // 39: vtgate.PrepareResponse.error:type_name -> vtrpc.RPCError + 2, // 40: vtgate.PrepareResponse.session:type_name -> vtgate.Session + 36, // 41: vtgate.PrepareResponse.fields:type_name -> query.Field + 27, // 42: vtgate.CloseSessionRequest.caller_id:type_name -> vtrpc.CallerID + 2, // 43: vtgate.CloseSessionRequest.session:type_name -> vtgate.Session + 29, // 44: vtgate.CloseSessionResponse.error:type_name -> vtrpc.RPCError + 37, // 45: vtgate.Session.ShardSession.target:type_name -> query.Target + 38, // 46: vtgate.Session.ShardSession.tablet_alias:type_name -> topodata.TabletAlias + 39, // 47: vtgate.Session.UserDefinedVariablesEntry.value:type_name -> query.BindVariable + 3, // 48: vtgate.Session.PrepareStatementEntry.value:type_name -> vtgate.PrepareData + 49, // [49:49] is the sub-list for method output_type + 49, // [49:49] is the sub-list for method input_type + 49, // [49:49] is the sub-list for extension type_name + 49, // [49:49] is the sub-list for extension extendee + 0, // [0:49] is the sub-list for field type_name } func init() { file_vtgate_proto_init() } diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index b2fc97ca831..efc50b1600f 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -860,6 +860,16 @@ func (m *StreamExecuteResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Session != nil { + size, err := m.Session.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + } if m.Result != nil { size, err := m.Result.MarshalToSizedBufferVT(dAtA[:i]) if err != nil { @@ -1717,6 +1727,10 @@ func (m *StreamExecuteResponse) SizeVT() (n int) { l = m.Result.SizeVT() n += 1 + l + sov(uint64(l)) } + if m.Session != nil { + l = m.Session.SizeVT() + n += 1 + l + sov(uint64(l)) + } n += len(m.unknownFields) return n } @@ -4277,6 +4291,42 @@ func (m *StreamExecuteResponse) UnmarshalVT(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Session", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Session == nil { + m.Session = &Session{} + } + if err := m.Session.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/vitessdriver/driver_test.go b/go/vt/vitessdriver/driver_test.go index 5438aa0c75d..b1bdd2a833f 100644 --- a/go/vt/vitessdriver/driver_test.go +++ b/go/vt/vitessdriver/driver_test.go @@ -725,3 +725,41 @@ func TestSessionToken(t *testing.T) { t.Fatal(err) } } + +// TestStreamExec tests that different kinds of query present in `execMap` can run through streaming api +func TestStreamExec(t *testing.T) { + db, err := OpenForStreaming(testAddress, "@rdonly") + require.NoError(t, err) + defer db.Close() + + for k, v := range execMap { + t.Run(k, func(t *testing.T) { + s, err := db.Prepare(k) + require.NoError(t, err) + defer s.Close() + + r, err := s.Query(0) + require.NoError(t, err) + defer r.Close() + + fields, err := r.Columns() + require.NoError(t, err) + require.Equal(t, colList(v.result.Fields), fields) + + for r.Next() { + require.NoError(t, r.Err()) + } + }) + } +} + +func colList(fields []*querypb.Field) []string { + if fields == nil { + return nil + } + cols := make([]string, 0, len(fields)) + for _, field := range fields { + cols = append(cols, field.Name) + } + return cols +} diff --git a/go/vt/vitessdriver/fakeserver_test.go b/go/vt/vitessdriver/fakeserver_test.go index eefa2abd285..9b66b705e61 100644 --- a/go/vt/vitessdriver/fakeserver_test.go +++ b/go/vt/vitessdriver/fakeserver_test.go @@ -100,10 +100,10 @@ func (f *fakeVTGateService) ExecuteBatch(ctx context.Context, session *vtgatepb. } // StreamExecute is part of the VTGateService interface -func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { execCase, ok := execMap[sql] if !ok { - return fmt.Errorf("no match for: %s", sql) + return session, fmt.Errorf("no match for: %s", sql) } query := &queryExecute{ SQL: sql, @@ -111,25 +111,25 @@ func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb Session: session, } if !query.Equal(execCase.execQuery) { - return fmt.Errorf("request mismatch: got %+v, want %+v", query, execCase.execQuery) + return session, fmt.Errorf("request mismatch: got %+v, want %+v", query, execCase.execQuery) } if execCase.result != nil { result := &sqltypes.Result{ Fields: execCase.result.Fields, } if err := callback(result); err != nil { - return err + return execCase.session, err } for _, row := range execCase.result.Rows { result := &sqltypes.Result{ Rows: [][]sqltypes.Value{row}, } if err := callback(result); err != nil { - return err + return execCase.session, err } } } - return nil + return execCase.session, nil } // Prepare is part of the VTGateService interface diff --git a/go/vt/vtgate/fakerpcvtgateconn/conn.go b/go/vt/vtgate/fakerpcvtgateconn/conn.go index fb711820912..442c8997979 100644 --- a/go/vt/vtgate/fakerpcvtgateconn/conn.go +++ b/go/vt/vtgate/fakerpcvtgateconn/conn.go @@ -109,7 +109,7 @@ func (conn *FakeVTGateConn) ExecuteBatch(ctx context.Context, session *vtgatepb. } // StreamExecute please see vtgateconn.Impl.StreamExecute -func (conn *FakeVTGateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) { +func (conn *FakeVTGateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVars map[string]*querypb.BindVariable, _ func(response *vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error) { response, ok := conn.execMap[sql] if !ok { return nil, fmt.Errorf("no match for: %s", sql) diff --git a/go/vt/vtgate/grpcvtgateconn/conn.go b/go/vt/vtgate/grpcvtgateconn/conn.go index 4988f44b392..a681e3661cd 100644 --- a/go/vt/vtgate/grpcvtgateconn/conn.go +++ b/go/vt/vtgate/grpcvtgateconn/conn.go @@ -156,7 +156,16 @@ type streamExecuteAdapter struct { } func (a *streamExecuteAdapter) Recv() (*sqltypes.Result, error) { - qr, err := a.recv() + var qr *querypb.QueryResult + var err error + for { + qr, err = a.recv() + if qr != nil || err != nil { + break + } + // we reach here, only when it is the last packet. + // as in the last packet we receive the session and there is no result + } if err != nil { return nil, vterrors.FromGRPC(err) } @@ -166,7 +175,7 @@ func (a *streamExecuteAdapter) Recv() (*sqltypes.Result, error) { return sqltypes.CustomProto3ToResult(a.fields, qr), nil } -func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) { +func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable, processResponse func(response *vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error) { req := &vtgatepb.StreamExecuteRequest{ CallerId: callerid.EffectiveCallerIDFromContext(ctx), Query: &querypb.BoundQuery{ @@ -185,6 +194,7 @@ func (conn *vtgateConn) StreamExecute(ctx context.Context, session *vtgatepb.Ses if err != nil { return nil, err } + processResponse(ser) return ser.Result, nil }, }, nil diff --git a/go/vt/vtgate/grpcvtgateconn/suite_test.go b/go/vt/vtgate/grpcvtgateconn/suite_test.go index 02d73dc0a81..b679a4b3ba5 100644 --- a/go/vt/vtgate/grpcvtgateconn/suite_test.go +++ b/go/vt/vtgate/grpcvtgateconn/suite_test.go @@ -156,13 +156,13 @@ func (f *fakeVTGateService) ExecuteBatch(ctx context.Context, session *vtgatepb. } // StreamExecute is part of the VTGateService interface -func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { if f.panics { panic(fmt.Errorf("test forced panic")) } execCase, ok := execMap[sql] if !ok { - return fmt.Errorf("no match for: %s", sql) + return session, fmt.Errorf("no match for: %s", sql) } f.checkCallerID(ctx, "StreamExecute") query := &queryExecute{ @@ -172,32 +172,32 @@ func (f *fakeVTGateService) StreamExecute(ctx context.Context, session *vtgatepb } if !query.equal(execCase.execQuery) { f.t.Errorf("StreamExecute:\n%+v, want\n%+v", query, execCase.execQuery) - return nil + return session, nil } if execCase.result != nil { result := &sqltypes.Result{ Fields: execCase.result.Fields, } if err := callback(result); err != nil { - return err + return execCase.outSession, err } if f.hasError { // wait until the client has the response, since all streaming implementation may not // send previous messages if an error has been triggered. <-f.errorWait f.errorWait = make(chan struct{}) // for next test - return errTestVtGateError + return execCase.outSession, errTestVtGateError } for _, row := range execCase.result.Rows { result := &sqltypes.Result{ Rows: [][]sqltypes.Value{row}, } if err := callback(result); err != nil { - return err + return execCase.outSession, err } } } - return nil + return execCase.outSession, nil } // Prepare is part of the VTGateService interface diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index 0ebe829ac4d..7b87b6ed708 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -28,17 +28,16 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/callinfo" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate" - "vitess.io/vitess/go/vt/vtgate/vtgateservice" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtgateservicepb "vitess.io/vitess/go/vt/proto/vtgateservice" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate" + "vitess.io/vitess/go/vt/vtgate/vtgateservice" ) const ( @@ -185,14 +184,28 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream session = &vtgatepb.Session{Autocommit: true} } - vtgErr := vtg.server.StreamExecute(ctx, session, request.Query.Sql, request.Query.BindVariables, func(value *sqltypes.Result) error { + session, vtgErr := vtg.server.StreamExecute(ctx, session, request.Query.Sql, request.Query.BindVariables, func(value *sqltypes.Result) error { // Send is not safe to call concurrently, but vtgate // guarantees that it's not. return stream.Send(&vtgatepb.StreamExecuteResponse{ Result: sqltypes.ResultToProto3(value), }) }) - return vterrors.ToGRPC(vtgErr) + + // even if there is an error, session could have been modified. + // So, this needs to be sent back to the client. Session is sent in the last stream response. + lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{ + Session: session, + }) + + var errs []error + if vtgErr != nil { + errs = append(errs, vtgErr) + } + if lastErr != nil { + errs = append(errs, lastErr) + } + return vterrors.ToGRPC(vterrors.Aggregate(errs)) } // Prepare is the RPC version of vtgateservice.VTGateService method diff --git a/go/vt/vtgate/plugin_mysql_server.go b/go/vt/vtgate/plugin_mysql_server.go index 1dbd9074485..c7d4c53785c 100644 --- a/go/vt/vtgate/plugin_mysql_server.go +++ b/go/vt/vtgate/plugin_mysql_server.go @@ -29,24 +29,21 @@ import ( "syscall" "time" + "github.com/google/uuid" "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/callinfo" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vttls" - - "github.com/google/uuid" - querypb "vitess.io/vitess/go/vt/proto/query" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttls" ) var ( @@ -237,8 +234,12 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq }() if session.Options.Workload == querypb.ExecuteOptions_OLAP { - err := vh.vtg.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback) - return mysql.NewSQLErrorFromError(err) + session, err := vh.vtg.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback) + if err != nil { + return mysql.NewSQLErrorFromError(err) + } + fillInTxStatusFlags(c, session) + return nil } session, result, err := vh.vtg.Execute(ctx, session, query, make(map[string]*querypb.BindVariable)) @@ -340,13 +341,16 @@ func (vh *vtgateHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareDat }() if session.Options.Workload == querypb.ExecuteOptions_OLAP { - err := vh.vtg.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback) - return mysql.NewSQLErrorFromError(err) + _, err := vh.vtg.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback) + if err != nil { + return mysql.NewSQLErrorFromError(err) + } + fillInTxStatusFlags(c, session) + return nil } _, qr, err := vh.vtg.Execute(ctx, session, prepare.PrepareStmt, prepare.BindVars) if err != nil { - err = mysql.NewSQLErrorFromError(err) - return err + return mysql.NewSQLErrorFromError(err) } fillInTxStatusFlags(c, session) diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index e10471ef63f..02b4dc2ef26 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -502,18 +502,18 @@ func (vtg *VTGate) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, // StreamExecute executes a streaming query. This is a V3 function. // Note we guarantee the callback will not be called concurrently // by multiple go routines. -func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { +func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) { // In this context, we don't care if we can't fully parse destination destKeyspace, destTabletType, _, _ := vtg.executor.ParseDestinationTarget(session.TargetString) statsKey := []string{"StreamExecute", destKeyspace, topoproto.TabletTypeLString(destTabletType)} defer vtg.timings.Record(statsKey, time.Now()) + safeSession := NewSafeSession(session) var err error if bvErr := sqltypes.ValidateBindVariables(bindVariables); bvErr != nil { err = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%v", bvErr) } else { - safeSession := NewSafeSession(session) err = vtg.executor.StreamExecute( ctx, "StreamExecute", @@ -533,9 +533,9 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, session *vtgatepb.Session, "BindVariables": bindVariables, "Session": session, } - return recordAndAnnotateError(err, statsKey, query, vtg.logStreamExecute) + return safeSession.Session, recordAndAnnotateError(err, statsKey, query, vtg.logStreamExecute) } - return nil + return safeSession.Session, nil } // CloseSession closes the session, rolling back any implicit transactions. This has the diff --git a/go/vt/vtgate/vtgate_test.go b/go/vt/vtgate/vtgate_test.go index 9417edafb17..74fef53f8b3 100644 --- a/go/vt/vtgate/vtgate_test.go +++ b/go/vt/vtgate/vtgate_test.go @@ -286,7 +286,7 @@ func TestVTGateStreamExecute(t *testing.T) { hcVTGateTest.Reset() sbc := hcVTGateTest.AddTestTablet("aa", "1.1.1.1", 1001, ks, shard, topodatapb.TabletType_PRIMARY, true, 1, nil) var qrs []*sqltypes.Result - err := rpcVTGate.StreamExecute( + _, err := rpcVTGate.StreamExecute( context.Background(), &vtgatepb.Session{ TargetString: "@primary", @@ -343,7 +343,8 @@ func TestVTGateBindVarError(t *testing.T) { }, { name: "StreamExecute", f: func() error { - return rpcVTGate.StreamExecute(ctx, session, "", bindVars, func(_ *sqltypes.Result) error { return nil }) + _, err := rpcVTGate.StreamExecute(ctx, session, "", bindVars, func(_ *sqltypes.Result) error { return nil }) + return err }, }} for _, tcase := range tcases { @@ -381,7 +382,7 @@ func testErrorPropagation(t *testing.T, sbcs []*sandboxconn.SandboxConn, before for _, sbc := range sbcs { before(sbc) } - err = rpcVTGate.StreamExecute( + _, err = rpcVTGate.StreamExecute( context.Background(), primarySession, "select id from t1", diff --git a/go/vt/vtgate/vtgateconn/vtgateconn.go b/go/vt/vtgate/vtgateconn/vtgateconn.go index eca75c7d865..5a6c5ae6b94 100644 --- a/go/vt/vtgate/vtgateconn/vtgateconn.go +++ b/go/vt/vtgate/vtgateconn/vtgateconn.go @@ -141,10 +141,12 @@ func (sn *VTGateSession) ExecuteBatch(ctx context.Context, query []string, bindV // error. Then you can pull values from the ResultStream until io.EOF, // or another error. func (sn *VTGateSession) StreamExecute(ctx context.Context, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) { - // StreamExecute is only used for SELECT queries that don't change - // the session. So, the protocol doesn't return an updated session. - // This may change in the future. - return sn.impl.StreamExecute(ctx, sn.session, query, bindVars) + // passing in the function that will update the session when received on the stream. + return sn.impl.StreamExecute(ctx, sn.session, query, bindVars, func(response *vtgatepb.StreamExecuteResponse) { + if response.Session != nil { + sn.session = response.Session + } + }) } // Prepare performs a VTGate Prepare. @@ -168,7 +170,7 @@ type Impl interface { ExecuteBatch(ctx context.Context, session *vtgatepb.Session, queryList []string, bindVarsList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) // StreamExecute executes a streaming query on vtgate. This is a V3 function. - StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) + StreamExecute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable, processResponse func(*vtgatepb.StreamExecuteResponse)) (sqltypes.ResultStream, error) // Prepare returns the fields information for the query as part of supporting prepare statements. Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) diff --git a/go/vt/vtgate/vtgateservice/interface.go b/go/vt/vtgate/vtgateservice/interface.go index 3615ab3c431..f38e25a4726 100644 --- a/go/vt/vtgate/vtgateservice/interface.go +++ b/go/vt/vtgate/vtgateservice/interface.go @@ -35,7 +35,7 @@ type VTGateService interface { // V3 API Execute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) - StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error + StreamExecute(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) // Prepare statement support Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, []*querypb.Field, error) diff --git a/proto/vtgate.proto b/proto/vtgate.proto index bd5b16af7ad..c9ce25e1bca 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -268,6 +268,9 @@ message StreamExecuteResponse { // The first value contains only Fields information. // The next values contain the actual rows, a few values per result. query.QueryResult result = 1; + + // session is the updated session information. + Session session = 2; } // ResolveTransactionRequest is the payload to ResolveTransaction. diff --git a/test/config.json b/test/config.json index 946cc3f3a77..0bf5c2ab31a 100644 --- a/test/config.json +++ b/test/config.json @@ -873,18 +873,9 @@ "RetryMax": 1, "Tags": [] }, - "vtgate_grpc_server_auth_static": { + "vtgate_grpc_api": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/grpc_server_auth_static"], - "Command": [], - "Manual": false, - "Shard": "vtgate_general_heavy", - "RetryMax": 1, - "Tags": [] - }, - "vtgate_grpc_server_acls": { - "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/grpc_server_acls"], + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/grpc_api"], "Command": [], "Manual": false, "Shard": "vtgate_general_heavy",