From 5203b737b0100f5d7339b4cbd09058fea4dd7a72 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 25 May 2023 23:09:23 +0530 Subject: [PATCH] added e2e test for grpc api for stream and non-stream for transaction Signed-off-by: Harshit Gangal --- go/test/endtoend/cluster/cluster_util.go | 15 ++ go/test/endtoend/vtgate/grpc_api/acl_test.go | 110 +++++++++ .../endtoend/vtgate/grpc_api/execute_test.go | 114 +++++++++ .../acls_test.go => grpc_api/main_test.go} | 84 +------ .../grpc_server_auth_static/main_test.go | 216 ------------------ go/vt/vtgate/vtgateconn/vtgateconn.go | 4 +- test/config.json | 13 +- 7 files changed, 251 insertions(+), 305 deletions(-) create mode 100644 go/test/endtoend/vtgate/grpc_api/acl_test.go create mode 100644 go/test/endtoend/vtgate/grpc_api/execute_test.go rename go/test/endtoend/vtgate/{grpc_server_acls/acls_test.go => grpc_api/main_test.go} (50%) delete mode 100644 go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index 59327ff6503..138e464984f 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -26,6 +26,11 @@ import ( "testing" "time" + "google.golang.org/grpc" + + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" + "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -446,3 +451,13 @@ func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard stri time.Sleep(defaultRetryDelay) } } + +// DialVTGate returns a VTGate grpc connection. +func DialVTGate(ctx context.Context, name, addr, username, password string) (*vtgateconn.VTGateConn, error) { + clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password} + creds := grpc.WithPerRPCCredentials(clientCreds) + dialerFunc := grpcvtgateconn.Dial(creds) + dialerName := name + vtgateconn.RegisterDialer(dialerName, dialerFunc) + return vtgateconn.DialProtocol(ctx, dialerName, addr) +} diff --git a/go/test/endtoend/vtgate/grpc_api/acl_test.go b/go/test/endtoend/vtgate/grpc_api/acl_test.go new file mode 100644 index 00000000000..2819a3e41d1 --- /dev/null +++ b/go/test/endtoend/vtgate/grpc_api/acl_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc_api + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/callerid" +) + +// TestEffectiveCallerIDWithAccess verifies that an authenticated gRPC static user with an effectiveCallerID that has ACL access can execute queries +func TestEffectiveCallerIDWithAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "some_other_user", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_with_access", "", ""), nil) + _, err = session.Execute(ctx, query, nil) + assert.NoError(t, err) +} + +// TestEffectiveCallerIDWithNoAccess verifies that an authenticated gRPC static user without an effectiveCallerID that has ACL access cannot execute queries +func TestEffectiveCallerIDWithNoAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "another_unrelated_user", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_no_access", "", ""), nil) + _, err = session.Execute(ctx, query, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "Select command denied to user") + assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") +} + +// TestAuthenticatedUserWithAccess verifies that an authenticated gRPC static user with ACL access can execute queries +func TestAuthenticatedUserWithAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + _, err = session.Execute(ctx, query, nil) + assert.NoError(t, err) +} + +// TestAuthenticatedUserNoAccess verifies that an authenticated gRPC static user with no ACL access cannot execute queries +func TestAuthenticatedUserNoAccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_no_access", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + _, err = session.Execute(ctx, query, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "Select command denied to user") + assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") +} + +// TestUnauthenticatedUser verifies that an unauthenticated gRPC user cannot execute queries +func TestUnauthenticatedUser(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "", "") + require.NoError(t, err) + defer vtgateConn.Close() + + session := vtgateConn.Session(keyspaceName+"@primary", nil) + query := "SELECT id FROM test_table" + _, err = session.Execute(ctx, query, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid credentials") +} diff --git a/go/test/endtoend/vtgate/grpc_api/execute_test.go b/go/test/endtoend/vtgate/grpc_api/execute_test.go new file mode 100644 index 00000000000..da9a507244c --- /dev/null +++ b/go/test/endtoend/vtgate/grpc_api/execute_test.go @@ -0,0 +1,114 @@ +package grpc_api + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + querypb "vitess.io/vitess/go/vt/proto/query" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +func TestTransctionsWithGRPCAPI(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password") + require.NoError(t, err) + defer vtgateConn.Close() + + vtSession := vtgateConn.Session(keyspaceName, nil) + workload := []string{"OLTP", "OLAP"} + for i := 0; i < 4; i++ { // running all switch combinations. + index := i % len(workload) + _, session, err := exec(ctx, vtSession, fmt.Sprintf("set workload = %s", workload[index]), nil) + require.NoError(t, err) + + require.Equal(t, workload[index], session.Options.Workload.String()) + execTest(ctx, t, workload[index], vtSession) + } + +} + +func execTest(ctx context.Context, t *testing.T, workload string, vtSession *vtgateconn.VTGateSession) { + tcases := []struct { + query string + + expRowCount int + expRowAffected int + expInTransaction bool + }{{ + query: "select id, val from test_table", + }, { + query: "begin", + expInTransaction: true, + }, { + query: "insert into test_table(id, val) values (1, 'A')", + expRowAffected: 1, + expInTransaction: true, + }, { + query: "select id, val from test_table", + expRowCount: 1, + expInTransaction: true, + }, { + query: "commit", + }, { + query: "select id, val from test_table", + expRowCount: 1, + }, { + query: "delete from test_table", + expRowAffected: 1, + }} + + for _, tc := range tcases { + t.Run(workload+":"+tc.query, func(t *testing.T) { + qr, session, err := exec(ctx, vtSession, tc.query, nil) + require.NoError(t, err) + + assert.Len(t, qr.Rows, tc.expRowCount) + assert.EqualValues(t, tc.expRowAffected, qr.RowsAffected) + assert.EqualValues(t, tc.expInTransaction, session.InTransaction) + }) + } +} + +func exec(ctx context.Context, conn *vtgateconn.VTGateSession, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, *vtgatepb.Session, error) { + options := conn.SessionPb().GetOptions() + if options != nil && options.Workload == querypb.ExecuteOptions_OLAP { + return streamExec(ctx, conn, sql, bv) + } + res, err := conn.Execute(ctx, sql, bv) + return res, conn.SessionPb(), err +} + +func streamExec(ctx context.Context, conn *vtgateconn.VTGateSession, sql string, bv map[string]*querypb.BindVariable) (*sqltypes.Result, *vtgatepb.Session, error) { + stream, err := conn.StreamExecute(ctx, sql, bv) + if err != nil { + return nil, conn.SessionPb(), err + } + result := &sqltypes.Result{} + for { + res, err := stream.Recv() + if err != nil { + if err == io.EOF { + return result, conn.SessionPb(), nil + } + return nil, conn.SessionPb(), err + } + result.Rows = append(result.Rows, res.Rows...) + result.RowsAffected += res.RowsAffected + if res.InsertID != 0 { + result.InsertID = res.InsertID + } + if res.Fields != nil { + result.Fields = res.Fields + } + } +} diff --git a/go/test/endtoend/vtgate/grpc_server_acls/acls_test.go b/go/test/endtoend/vtgate/grpc_api/main_test.go similarity index 50% rename from go/test/endtoend/vtgate/grpc_server_acls/acls_test.go rename to go/test/endtoend/vtgate/grpc_api/main_test.go index 5344a2c847e..3aad26d5a50 100644 --- a/go/test/endtoend/vtgate/grpc_server_acls/acls_test.go +++ b/go/test/endtoend/vtgate/grpc_api/main_test.go @@ -1,39 +1,13 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package grpc_server_acls +package grpc_api import ( - "context" "flag" "fmt" "os" "path" "testing" - "vitess.io/vitess/go/vt/callerid" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) var ( @@ -58,6 +32,14 @@ var ( { "Username": "another_unrelated_user", "Password": "test_password" + }, + { + "Username": "user_with_access", + "Password": "test_password" + }, + { + "Username": "user_no_access", + "Password": "test_password" } ] ` @@ -77,7 +59,6 @@ var ( ) func TestMain(m *testing.M) { - defer cluster.PanicHandler(nil) flag.Parse() @@ -144,53 +125,6 @@ func TestMain(m *testing.M) { os.Exit(exitcode) } -// TestEffectiveCallerIDWithAccess verifies that an authenticated gRPC static user with an effectiveCallerID that has ACL access can execute queries -func TestEffectiveCallerIDWithAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "some_other_user", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_with_access", "", ""), nil) - _, err = session.Execute(ctx, query, nil) - assert.NoError(t, err) -} - -// TestEffectiveCallerIDWithNoAccess verifies that an authenticated gRPC static user without an effectiveCallerID that has ACL access cannot execute queries -func TestEffectiveCallerIDWithNoAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "another_unrelated_user", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - ctx = callerid.NewContext(ctx, callerid.NewEffectiveCallerID("user_no_access", "", ""), nil) - _, err = session.Execute(ctx, query, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "Select command denied to user") - assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") -} - -func dialVTGate(ctx context.Context, t *testing.T, username string, password string) (*vtgateconn.VTGateConn, error) { - clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password} - creds := grpc.WithPerRPCCredentials(clientCreds) - dialerFunc := grpcvtgateconn.Dial(creds) - dialerName := t.Name() - vtgateconn.RegisterDialer(dialerName, dialerFunc) - return vtgateconn.DialProtocol(ctx, dialerName, vtgateGrpcAddress) -} - func createFile(path string, contents string) error { f, err := os.Create(path) if err != nil { diff --git a/go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go b/go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go deleted file mode 100644 index 348401ed85d..00000000000 --- a/go/test/endtoend/vtgate/grpc_server_auth_static/main_test.go +++ /dev/null @@ -1,216 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package grpcserverauthstatic - -import ( - "context" - "flag" - "fmt" - "os" - "path" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" -) - -var ( - clusterInstance *cluster.LocalProcessCluster - vtgateGrpcAddress string - hostname = "localhost" - keyspaceName = "ks" - cell = "zone1" - sqlSchema = ` - create table test_table ( - id bigint, - val varchar(128), - primary key(id) - ) Engine=InnoDB; -` - grpcServerAuthStaticJSON = ` - [ - { - "Username": "user_with_access", - "Password": "test_password" - }, - { - "Username": "user_no_access", - "Password": "test_password" - } - ] -` - tableACLJSON = ` - { - "table_groups": [ - { - "name": "default", - "table_names_or_prefixes": ["%"], - "readers": ["user_with_access"], - "writers": ["user_with_access"], - "admins": ["user_with_access"] - } - ] - } -` -) - -func TestMain(m *testing.M) { - defer cluster.PanicHandler(nil) - flag.Parse() - - exitcode := func() int { - clusterInstance = cluster.NewCluster(cell, hostname) - defer clusterInstance.Teardown() - - // Start topo server - if err := clusterInstance.StartTopo(); err != nil { - return 1 - } - - // Directory for authn / authz config files - authDirectory := path.Join(clusterInstance.TmpDirectory, "auth") - if err := os.Mkdir(authDirectory, 0700); err != nil { - return 1 - } - - // Create grpc_server_auth_static.json file - grpcServerAuthStaticPath := path.Join(authDirectory, "grpc_server_auth_static.json") - if err := createFile(grpcServerAuthStaticPath, grpcServerAuthStaticJSON); err != nil { - return 1 - } - - // Create table_acl.json file - tableACLPath := path.Join(authDirectory, "table_acl.json") - if err := createFile(tableACLPath, tableACLJSON); err != nil { - return 1 - } - - // Configure vtgate to use static auth - clusterInstance.VtGateExtraArgs = []string{ - "--grpc_auth_mode", "static", - "--grpc_auth_static_password_file", grpcServerAuthStaticPath, - "--grpc-use-static-authentication-callerid", - } - - // Configure vttablet to use table ACL - clusterInstance.VtTabletExtraArgs = []string{ - "--enforce-tableacl-config", - "--queryserver-config-strict-table-acl", - "--table-acl-config", tableACLPath, - } - - // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: sqlSchema, - } - if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { - return 1 - } - - // Start vtgate - if err := clusterInstance.StartVtgate(); err != nil { - clusterInstance.VtgateProcess = cluster.VtgateProcess{} - return 1 - } - vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) - - return m.Run() - }() - os.Exit(exitcode) -} - -// TestAuthenticatedUserWithAccess verifies that an authenticated gRPC static user with ACL access can execute queries -func TestAuthenticatedUserWithAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "user_with_access", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - _, err = session.Execute(ctx, query, nil) - assert.NoError(t, err) -} - -// TestAuthenticatedUserNoAccess verifies that an authenticated gRPC static user with no ACL access cannot execute queries -func TestAuthenticatedUserNoAccess(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "user_no_access", "test_password") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - _, err = session.Execute(ctx, query, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "Select command denied to user") - assert.Contains(t, err.Error(), "for table 'test_table' (ACL check error)") -} - -// TestUnauthenticatedUser verifies that an unauthenticated gRPC user cannot execute queries -func TestUnauthenticatedUser(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - vtgateConn, err := dialVTGate(ctx, t, "", "") - if err != nil { - t.Fatal(err) - } - defer vtgateConn.Close() - - session := vtgateConn.Session(keyspaceName+"@primary", nil) - query := "SELECT id FROM test_table" - _, err = session.Execute(ctx, query, nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "invalid credentials") -} - -func dialVTGate(ctx context.Context, t *testing.T, username string, password string) (*vtgateconn.VTGateConn, error) { - clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password} - creds := grpc.WithPerRPCCredentials(clientCreds) - dialerFunc := grpcvtgateconn.Dial(creds) - dialerName := t.Name() - vtgateconn.RegisterDialer(dialerName, dialerFunc) - return vtgateconn.DialProtocol(ctx, dialerName, vtgateGrpcAddress) -} - -func createFile(path string, contents string) error { - f, err := os.Create(path) - if err != nil { - return err - } - _, err = f.WriteString(contents) - if err != nil { - return err - } - return f.Close() -} diff --git a/go/vt/vtgate/vtgateconn/vtgateconn.go b/go/vt/vtgate/vtgateconn/vtgateconn.go index ad5792d63c9..5a6c5ae6b94 100644 --- a/go/vt/vtgate/vtgateconn/vtgateconn.go +++ b/go/vt/vtgate/vtgateconn/vtgateconn.go @@ -141,9 +141,7 @@ func (sn *VTGateSession) ExecuteBatch(ctx context.Context, query []string, bindV // error. Then you can pull values from the ResultStream until io.EOF, // or another error. func (sn *VTGateSession) StreamExecute(ctx context.Context, query string, bindVars map[string]*querypb.BindVariable) (sqltypes.ResultStream, error) { - // StreamExecute is only used for SELECT queries that don't change - // the session. So, the protocol doesn't return an updated session. - // This may change in the future. + // passing in the function that will update the session when received on the stream. return sn.impl.StreamExecute(ctx, sn.session, query, bindVars, func(response *vtgatepb.StreamExecuteResponse) { if response.Session != nil { sn.session = response.Session diff --git a/test/config.json b/test/config.json index 511e632e4ad..a212ebb7793 100644 --- a/test/config.json +++ b/test/config.json @@ -873,18 +873,9 @@ "RetryMax": 1, "Tags": [] }, - "vtgate_grpc_server_auth_static": { + "vtgate_grpc_api": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/grpc_server_auth_static"], - "Command": [], - "Manual": false, - "Shard": "vtgate_general_heavy", - "RetryMax": 1, - "Tags": [] - }, - "vtgate_grpc_server_acls": { - "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/grpc_server_acls"], + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/grpc_api"], "Command": [], "Manual": false, "Shard": "vtgate_general_heavy",